Моделирование токенов отмены в Python Threading

Я только что написал очередь задач на Python, задачей которой является ограничение количества задач, выполняемых одновременно. Это немного отличается от Queue.Queue, потому что вместо того, чтобы ограничивать количество элементов в очереди, оно ограничивает количество, которое может быть извлечено за один раз. Он по-прежнему использует неограниченный Queue.Queue для выполнения своей работы, но полагается на Semaphore для ограничения количества потоков:

from Queue import Queue
from threading import BoundedSemaphore, Lock, Thread


class TaskQueue(object):
    """
    Queues tasks to be run in separate threads and limits the number
    concurrently running tasks.

    """

    def __init__(self, limit):
        """Initializes a new instance of a TaskQueue."""
        self.__semaphore = BoundedSemaphore(limit)
        self.__queue = Queue()
        self.__cancelled = False
        self.__lock = Lock()

    def enqueue(self, callback):
        """Indicates that the given callback should be ran."""
        self.__queue.put(callback)

    def start(self):
        """Tells the task queue to start running the queued tasks."""
        thread = Thread(target=self.__process_items)
        thread.start()

    def stop(self):
        self.__cancel()
        # prevent blocking on a semaphore.acquire
        self.__semaphore.release()
        # prevent blocking on a Queue.get
        self.__queue.put(lambda: None)

    def __cancel(self):
        print 'canceling'
        with self.__lock:
            self.__cancelled = True

    def __process_items(self):
        while True:
            # see if the queue has been stopped before blocking on acquire
            if self.__is_canceled():
                break

            self.__semaphore.acquire()

            # see if the queue has been stopped before blocking on get
            if self.__is_canceled():
                break

            callback = self.__queue.get()

            # see if the queue has been stopped before running the task
            if self.__is_canceled():
                break

            def runTask():
                try:
                    callback()
                finally:
                    self.__semaphore.release()

            thread = Thread(target=runTask)
            thread.start()
            self.__queue.task_done()

    def __is_canceled(self):
        with self.__lock:
            return self.__cancelled

Интерпретатор Python работает вечно, если я явно не остановлю очередь задач. Это намного сложнее, чем я думал. Если вы посмотрите на метод stop, вы увидите, что я установил для очереди флаг canceled, release семафор и put недействующий обратный вызов. Последние две части необходимы, потому что код может блокировать Semaphore или Queue. Я в основном должен заставить их пройти, чтобы у петли был шанс вырваться.

Этот код работает. Этот класс полезен при запуске службы, которая пытается выполнять тысячи задач параллельно. Чтобы обеспечить бесперебойную работу машины и предотвратить крики ОС о слишком большом количестве активных потоков, этот код будет ограничивать количество потоков, работающих в любой момент времени.

Я уже писал аналогичный фрагмент кода на C#. Что делало этот код особенно коротким, так это то, что в .NET есть что-то, называемое CancellationToken, которое использует почти каждый класс потоковой передачи. Каждый раз, когда выполняется блокирующая операция, эта операция принимает необязательный токен. Если родительская задача когда-либо будет отменена, любые дочерние задачи, блокирующие этот токен, также будут немедленно отменены. Это кажется гораздо более чистым способом выхода, чем «подделка», освобождая семафоры или помещая значения в очередь.

Мне было интересно, есть ли эквивалентный способ сделать это в Python? Я определенно хочу использовать потоки вместо чего-то вроде асинхронных событий. Мне интересно, есть ли способ добиться того же, используя два Queue.Queue, где один имеет максимальный размер, а другой нет, но я все еще не уверен, как справиться с отменой.


person Travis Parks    schedule 16.09.2012    source источник
comment
Двойное подчеркивание не является питоническим способом указания конфиденциальности. Одиночное подчеркивание есть. Двойное подчеркивание приводит к искажению имени. См. здесь: docs.python.org/3/tutorial /   -  person pillmuncher    schedule 17.02.2014


Ответы (3)


Кажется, вы создаете новый поток для каждой задачи из очереди. Это само по себе расточительно, а также приводит к проблеме ограничения количества потоков.

Вместо этого распространенный подход заключается в создании фиксированного числа рабочих потоков и предоставлении им возможности свободно извлекать задачи из очереди. Чтобы отменить очередь, вы можете очистить ее и оставить рабочие процессы в ожидании будущей работы.

person Janne Karila    schedule 16.09.2012
comment
Проблема с сохранением живых потоков заключается в том, что это поддерживает процесс Python навсегда. Это затрудняет написание модульных тестов. - person Travis Parks; 17.09.2012
comment
Вы можете использовать потоки демона, они не поддерживают процесс. - person Janne Karila; 17.09.2012

Я думаю, что ваш код можно упростить, используя отравление и Thread.join():

from Queue import Queue
from threading import Thread

poison = object()

class TaskQueue(object):

    def __init__(self, limit):
        def process_items():
            while True:
                callback = self._queue.get()
                if callback is poison:
                    break
                try:
                    callback()
                except:
                    pass
                finally:
                    self._queue.task_done()
        self._workers = [Thread(target=process_items) for _ in range(limit)]
        self._queue = Queue()

    def enqueue(self, callback):
        self._queue.put(callback)

    def start(self):
        for worker in self._workers:
            worker.start()

    def stop(self):
        for worker in self._workers:
            self._queue.put(poison)
        while self._workers:
            self._workers.pop().join()

Непроверенный.

Я удалил комментарии, для краткости.

Кроме того, в этой версии process_items() действительно приватный.

Кстати: Весь смысл модуля Queue в том, чтобы освободить вас от ужасных блокировок и событий.

person pillmuncher    schedule 17.09.2012

Я последовал совету Janne Karila и создал пул потоков. Это устранило необходимость в семафоре. Проблема в том, что если вы когда-нибудь ожидаете, что очередь исчезнет, ​​вам придется остановить выполнение рабочих потоков (просто вариант того, что я делал раньше). Новый код довольно похож:

class TaskQueue(object):
    """
    Queues tasks to be run in separate threads and limits the number
    concurrently running tasks.

    """

    def __init__(self, limit):
        """Initializes a new instance of a TaskQueue."""
        self.__workers = []
        for _ in range(limit):
            worker = Thread(target=self.__process_items)
            self.__workers.append(worker)
        self.__queue = Queue()
        self.__cancelled = False
        self.__lock = Lock()
        self.__event = Event()

    def enqueue(self, callback):
        """Indicates that the given callback should be ran."""
        self.__queue.put(callback)

    def start(self):
        """Tells the task queue to start running the queued tasks."""
        for worker in self.__workers:
            worker.start()

    def stop(self):
        """
        Stops the queue from processing anymore tasks. Any actively running
        tasks will run to completion.

        """
        self.__cancel()
        # prevent blocking on a Queue.get
        for _ in range(len(self.__workers)):
            self.__queue.put(lambda: None)
            self.__event.wait()

    def __cancel(self):
        with self.__lock:
            self.__queue.queue.clear()
            self.__cancelled = True

    def __process_items(self):
        while True:
            callback = self.__queue.get()

            # see if the queue has been stopped before running the task
            if self.__is_canceled():
                break

            try:
                callback()
            except:
                pass
            finally:
                self.__queue.task_done()
        self.__event.set()

    def __is_canceled(self):
        with self.__lock:
            return self.__cancelled

Если вы посмотрите внимательно, мне пришлось провести некоторые расчеты, чтобы убить рабочих. В основном я жду Event столько раз, сколько рабочих. Я clear устанавливаю базовую очередь, чтобы предотвратить отмену воркеров любым другим способом. Я также жду после закачки каждого фиктивного значения в очередь, поэтому только один рабочий может отменить за раз.

Я провел несколько тестов на этом, и, похоже, он работает. Было бы неплохо устранить необходимость в фиктивных значениях.

person Travis Parks    schedule 17.09.2012
comment
Вы не должны использовать двойные подчеркивания так, как вы это делаете. Они для искажения имен, а не для соблюдения конфиденциальности. Для этого вы должны использовать одиночные символы подчеркивания. Это сигнал другим программистам: не трогайте, если не знаете, что делаете! Однако это не помешает им соприкасаться, но двойное подчеркивание тоже не помешает, поскольку к ним можно получить доступ по их искаженным именам. - person pillmuncher; 17.09.2012