В настоящее время я изучаю Celery для использования в бэкэнде для обработки видео. По сути моя проблема заключается в следующем:
- У меня есть внешний веб-сервер, который одновременно обрабатывает большое количество видеопотоков (порядка тысяч).
- Каждый поток должен обрабатываться независимо и параллельно.
- Stream processing can be divided into two types of operations:
- Frame-by-frame operations (computations that do not need information about the preceding or following frame(s))
- Операции на уровне потока (вычисления, которые работают с подмножеством упорядоченных смежных кадров)
Учитывая пункт 3, мне нужно поддерживать и обновлять упорядоченную структуру фреймов на протяжении всего процесса и передавать вычисления на подразделах этой структуры воркерам Celery. Изначально я думал организовать вещи следующим образом:
[frontend server] -stream-> [celery worker 1 (greenlet)] --> [celery worker 2 (prefork)]
Идея состоит в том, что celery worker 1
выполняет длительные задачи, которые в основном связаны с вводом-выводом. По сути, эти задачи будут только выполнять следующие действия:
- Чтение кадра с внешнего сервера
- Декодировать кадр из его представления base64
- Поставьте его в очередь в вышеупомянутой упорядоченной структуре данных (объект
collections.deque
в его текущем виде).
Любые операции, связанные с ЦП (например, анализ изображений), передаются в celery worker 2
.
Моя проблема заключается в следующем:
Я хотел бы выполнить сопрограмму как задачу, чтобы у меня были длительные задачи, из которых я мог бы yield
не блокировать операции celery worker 1
. Другими словами, я хотел бы иметь возможность сделать что-то вроде:
def coroutine(func):
@wraps(func)
def start(*args, **kwargs):
cr = func(*args, **kwargs)
cr.next()
return cr
return start
@coroutine
def my_taks():
stream = deque() # collections.deque
source = MyAsynchronousInputThingy() # something i'll make myself, probably using select
while source.open:
if source.has_data:
stream.append(Frame(source.readline())) # read data, build frame and enqueue to persistent structure
yield # cooperatively interrupt so that other tasks can execute
Есть ли способ заставить задачу, основанную на сопрограмме, выполняться бесконечно, в идеале выдавая результаты по мере их yield
обработки?