Условия
Более сложным механизмом коммуникации между потоками является механизм условий. Условия представляются в виде экземпляров класса threading.Condition и, подобно только что рассмотренным событиям, оповещают потоки об изменении некоторого состояния. Конструктор класса threading.Condition принимает необязательный параметр, задающий замок класса threading.Lock или threading.RLock. По умолчанию создается новый экземпляр замка класса threading.RLock. Методы объекта-условия описаны ниже:
- acquire(...)Запрашивает замок. Фактически вызывается одноименный метод принадлежащего объекту-условию объекта-замка.
- release()Снимает замок.
- wait([timeout])Переводит поток в режим ожидания. Этот метод может быть вызван только в том случае, если вызывающий его поток получил замок. Метод снимает замок и блокирует поток до появления объявлений, то есть вызовов методов notify() и notifyAll() другими потоками. Необязательный аргумент timeout задает таймаут ожидания в секундах. При выходе из ожидания поток снова запрашивает замок и возвращается из метода wait().
- notify()Выводит из режима ожидания один из потоков, ожидающих данные условия. Метод можно вызвать, только овладев замком, ассоциированным с условием. Документация предупреждает, что в будущих реализациях модуля из целей оптимизации этот метод будет прерывать ожидание сразу нескольких потоков. Сам по себе метод notify() не приводит к продолжению выполнения ожидавших условия потоков, так как этому препятствует занятый замок. Потоки получают управление только после снятия замка потоком, вызвавшим метод notify().
- notifyAll()Этот метод аналогичен методу notify(), но прерывает ожидание всех ждущих выполнения условия потоков.
В следующем примере условия используются для оповещения потоков о прибытии новой порции данных (организуется связь производитель - потребитель, producer - consumer):
import threading
cv = threading.Condition()
class Item: """Класс-контейнер для элементов, которые будут потребляться в потоках""" def __init__(self): self._items = [] def is_available(self): return len(self._items) > 0 def get(self): return self._items.pop() def make(self, i): self._items.append(i)
item = Item()
def consume(): """Потребление очередного элемента (с ожиданием его появления)""" cv.acquire() while not item.is_available(): cv.wait() it = item.get() cv.release() return it
def consumer(): while True: print consume()
def produce(i): """Занесение нового элемента в контейнер и оповещение потоков""" cv.acquire() item.make(i) cv.notify() cv.release()
p1 = threading.Thread(target=consumer, name="t1") p1.setDaemon(True) p2 = threading.Thread(target=consumer, name="t2") p2.setDaemon(True) p1.start() p2.start() produce("ITEM1") produce("ITEM2") produce("ITEM3") produce("ITEM4") p1.join() p2.join()
В этом примере условие cv отражает наличие необработанных элементов в контейнере item. Функция produce() "производит" элементы, а consume(), работающая внутри потоков, "потребляет". Стоит отметить, что в приведенном виде программа никогда не закончится, так как имеет бесконечный цикл в потоках, а в главном потоке - ожидание завершения этих потоков. Еще одна особенность - признак демона, установленный с помощью метода setDaemon() объекта-потока до его старта.