Может ли сельдерей совместно запускать сопрограммы как задачи с состоянием/возобновляемостью?

В настоящее время я изучаю Celery для использования в бэкэнде для обработки видео. По сути моя проблема заключается в следующем:

  1. У меня есть внешний веб-сервер, который одновременно обрабатывает большое количество видеопотоков (порядка тысяч).
  2. Каждый поток должен обрабатываться независимо и параллельно.
  3. Stream processing can be divided into two types of operations:
    1. Frame-by-frame operations (computations that do not need information about the preceding or following frame(s))
    2. Операции на уровне потока (вычисления, которые работают с подмножеством упорядоченных смежных кадров)

Учитывая пункт 3, мне нужно поддерживать и обновлять упорядоченную структуру фреймов на протяжении всего процесса и передавать вычисления на подразделах этой структуры воркерам Celery. Изначально я думал организовать вещи следующим образом:

[frontend server]  -stream-> [celery worker 1 (greenlet)] --> [celery worker 2 (prefork)]

Идея состоит в том, что celery worker 1 выполняет длительные задачи, которые в основном связаны с вводом-выводом. По сути, эти задачи будут только выполнять следующие действия:

  1. Чтение кадра с внешнего сервера
  2. Декодировать кадр из его представления base64
  3. Поставьте его в очередь в вышеупомянутой упорядоченной структуре данных (объект collections.deque в его текущем виде).

Любые операции, связанные с ЦП (например, анализ изображений), передаются в celery worker 2.

Моя проблема заключается в следующем:

Я хотел бы выполнить сопрограмму как задачу, чтобы у меня были длительные задачи, из которых я мог бы yield не блокировать операции celery worker 1. Другими словами, я хотел бы иметь возможность сделать что-то вроде:

def coroutine(func):
    @wraps(func)
    def start(*args, **kwargs):
        cr = func(*args, **kwargs)
        cr.next()
        return cr
    return start

@coroutine
def my_taks():
    stream = deque()  # collections.deque
    source = MyAsynchronousInputThingy()  # something i'll make myself, probably using select

    while source.open:
        if source.has_data:
            stream.append(Frame(source.readline()))  # read data, build frame and enqueue to persistent structure
        yield  # cooperatively interrupt so that other tasks can execute

Есть ли способ заставить задачу, основанную на сопрограмме, выполняться бесконечно, в идеале выдавая результаты по мере их yieldобработки?


person Louis Thibault    schedule 25.10.2014    source источник


Ответы (1)


Основная идея Eventlet заключается в том, что вы хотите написать синхронный код, как и в случае с потоками, socket.recv() должен блокировать текущий поток до следующего оператора. Этот стиль очень легко читать, поддерживать и рассуждать во время отладки. Чтобы сделать вещи эффективными и масштабируемыми, за кулисами Eventlet делает волшебство, заменяя, казалось бы, блокирующий код зелеными потоками и механизмами epoll/kqueue/etc для пробуждения этих зеленых потоков в нужное время.

Поэтому все, что вам нужно, это выполнить eventlet.monkey_patch() как можно скорее (например, вторую строку в модуле) и убедиться, что вы используете чистые операции сокета Python в MyInputThingy. Забудьте об асинхронности, просто напишите обычный блокирующий код, как в случае с потоками.

Eventlet снова делает синхронный код хорошим.

person temoto    schedule 02.11.2014