Я только что написал очередь задач на Python, задачей которой является ограничение количества задач, выполняемых одновременно. Это немного отличается от Queue.Queue
, потому что вместо того, чтобы ограничивать количество элементов в очереди, оно ограничивает количество, которое может быть извлечено за один раз. Он по-прежнему использует неограниченный Queue.Queue
для выполнения своей работы, но полагается на Semaphore
для ограничения количества потоков:
from Queue import Queue
from threading import BoundedSemaphore, Lock, Thread
class TaskQueue(object):
"""
Queues tasks to be run in separate threads and limits the number
concurrently running tasks.
"""
def __init__(self, limit):
"""Initializes a new instance of a TaskQueue."""
self.__semaphore = BoundedSemaphore(limit)
self.__queue = Queue()
self.__cancelled = False
self.__lock = Lock()
def enqueue(self, callback):
"""Indicates that the given callback should be ran."""
self.__queue.put(callback)
def start(self):
"""Tells the task queue to start running the queued tasks."""
thread = Thread(target=self.__process_items)
thread.start()
def stop(self):
self.__cancel()
# prevent blocking on a semaphore.acquire
self.__semaphore.release()
# prevent blocking on a Queue.get
self.__queue.put(lambda: None)
def __cancel(self):
print 'canceling'
with self.__lock:
self.__cancelled = True
def __process_items(self):
while True:
# see if the queue has been stopped before blocking on acquire
if self.__is_canceled():
break
self.__semaphore.acquire()
# see if the queue has been stopped before blocking on get
if self.__is_canceled():
break
callback = self.__queue.get()
# see if the queue has been stopped before running the task
if self.__is_canceled():
break
def runTask():
try:
callback()
finally:
self.__semaphore.release()
thread = Thread(target=runTask)
thread.start()
self.__queue.task_done()
def __is_canceled(self):
with self.__lock:
return self.__cancelled
Интерпретатор Python работает вечно, если я явно не остановлю очередь задач. Это намного сложнее, чем я думал. Если вы посмотрите на метод stop
, вы увидите, что я установил для очереди флаг canceled
, release
семафор и put
недействующий обратный вызов. Последние две части необходимы, потому что код может блокировать Semaphore
или Queue
. Я в основном должен заставить их пройти, чтобы у петли был шанс вырваться.
Этот код работает. Этот класс полезен при запуске службы, которая пытается выполнять тысячи задач параллельно. Чтобы обеспечить бесперебойную работу машины и предотвратить крики ОС о слишком большом количестве активных потоков, этот код будет ограничивать количество потоков, работающих в любой момент времени.
Я уже писал аналогичный фрагмент кода на C#. Что делало этот код особенно коротким, так это то, что в .NET есть что-то, называемое CancellationToken
, которое использует почти каждый класс потоковой передачи. Каждый раз, когда выполняется блокирующая операция, эта операция принимает необязательный токен. Если родительская задача когда-либо будет отменена, любые дочерние задачи, блокирующие этот токен, также будут немедленно отменены. Это кажется гораздо более чистым способом выхода, чем «подделка», освобождая семафоры или помещая значения в очередь.
Мне было интересно, есть ли эквивалентный способ сделать это в Python? Я определенно хочу использовать потоки вместо чего-то вроде асинхронных событий. Мне интересно, есть ли способ добиться того же, используя два Queue.Queue
, где один имеет максимальный размер, а другой нет, но я все еще не уверен, как справиться с отменой.