Правильный способ асинхронного выполнения задачи в Django через Celery

У меня есть приложение Django, которое имитирует Instagram, в котором пользователи загружают фотографии или мемы, а затем их поклонники получают уведомление об указанной фотографии.

В настоящее время, чтобы отправлять уведомления, как только пользователь загружает фотографию, я перебираю всех фанатов, которые есть у загрузчика, добавляю уведомления в список, а затем bulk_create объекты. Как в:

    fans = UserFan.objects.filter(star=user).values_list('fan',flat=True)
    fan_list = []
    for fan in fans:
        fan_list.append(PhotoObjectSubscription(viewer_id=fan, which_photo=photo, updated_at=time, seen=False, type_of_object='1'))
    PhotoObjectSubscription.objects.bulk_create(fan_list)

Простые вещи. Обратите внимание, что у меня также установлено supervisord на виртуальной машине моего приложения, где я запускаю некоторые элементарные задачи через celeryredis в качестве брокера сообщений).

Теперь я хочу выполнить задачу bulk_create выше как celery task; асинхронно. Мой код bulk_create живет в том же представлении, которое используется для обработки загрузки фотографий, поэтому я считаю, что выполнение этого асинхронно ускорит этот процесс для пользователя.

Я новичок в задачах celery, поэтому может ли кто-нибудь указать на наглядном примере, как я могу превратить вышеупомянутую задачу bulk_create в задачу celery? Я провел исследование, и вот что я думаю, что мне нужно сделать:

1) Добавьте delay() в конце оператора bulk_create:

PhotoObjectSubscription.objects.bulk_create(fan_list).delay()

2) В tasks.py добавьте новую задачу для обработки вышеперечисленного:

@task
def bulk_create_notifications():
    PhotoObjectSubscription.objects.bulk_create(fan_list)

3) В CELERYBEAT_SCHEDULE в settings.py ничего добавлять не нужно, так как задача не периодическая.

Я, наверное, не совсем прав, так что, пожалуйста, помогите.


person Hassan Baig    schedule 30.05.2016    source источник


Ответы (1)


Вам нужно передать fans (возможно, вам придется преобразовать его в list, если это на самом деле ValuesListQuerySet) и все остальное, что нужно задаче (например, photo.id) в качестве аргумента задачи:

@task
def bulk_create_notifications(fans, photo_id):
    fan_list = []
    for fan in fans:
        fan_list.append(PhotoObjectSubscription(viewer_id=fan, which_photo_id=photo_id, updated_at=time, seen=False, type_of_object='1'))
    PhotoObjectSubscription.objects.bulk_create(fan_list)

Затем вы можете асинхронно запустить задачу через:

# call delay on the task and pass it the same params you would pass to the fnc itself
bulk_create_notifications.delay(fans)  

Поскольку аргументы должны храниться и передаваться вашей очередью задач (redis), вы можете передавать только аргументы, сериализуемые сериализатором, который вы установили в своих настройках (возможно, JSON). Это означает, что вы должны придерживаться простых типов s.a. строки, целые числа, и вы не можете передавать экземпляры модели или их списки в качестве параметров.

Конечно, вы можете начать еще выше и просто передать user.id и выполнить всю работу с базой данных в задаче.

person schwobaseggl    schedule 30.05.2016
comment
Нет. Это я неправильно прочитал ваш код. Я уже изменил это в ответе. Вы не можете передавать экземпляры модели! Это может работать в вашей среде разработки, когда у вас есть настройка CELERY_ALWAYS_EAGER на True, но не когда параметры фактически передаются через очередь. - person schwobaseggl; 31.05.2016
comment
@HassanBaig На самом деле fans в вашем коде — это ValuesListQuerySet, которое вы должны преобразовать в list, прежде чем передать его задаче. - person schwobaseggl; 31.05.2016
comment
Понятно. Я собираюсь попробовать это и вернуться к вам в немного! - person Hassan Baig; 31.05.2016
comment
Вопрос: в views.py, где у меня есть bulk_create_notifications.delay(fans), я должен также добавить from myapp/links import bulk_create_notifications вверху файла. Должен ли я относиться к этому как к любому другому методу? - person Hassan Baig; 31.05.2016
comment
@HassanBaig Да, как и любой другой. Обратите внимание, что вы по-прежнему можете вызывать обычный метод. Имя должно быть известно переводчику. Мне нравится хранить свои задачи в модуле уровня приложения с именем tasks.py. - person schwobaseggl; 31.05.2016
comment
@schwobaseggl, не могли бы вы добавить примечание, что этот код работает только с автоматической фиксацией? с транзакциями может генерировать ошибки (для django ‹ 1.9 нужно использовать django-transaction-hooks — интегрировано в django с 1.9) - person Jerzyk; 05.06.2016
comment
@Jerzyk: не могли бы вы также объяснить мне, что вы имеете в виду? Я думаю, вы имеете в виду, что это работает только в моем конкретном сценарии, поскольку я использую bulk_create (который автоматически фиксирует изменения в базе данных). Это не будет работать с другими транзакциями (было бы здорово, если бы вы могли привести пример таких транзакций - еще лучше, просто отредактируйте ответ или добавьте свой собственный ниже). - person Hassan Baig; 05.06.2016
comment
@HassanBaig, если вы используете транзакции (не автофиксацию), вы могли столкнуться с тем, что задача celery вызывается изнутри транзакции, и задача celery может начать выполняться до того, как транзакция завершится, в таком случае вы имеют несогласованность данных, потому что задача сельдерея может читать старые данные, или задача сельдерея будет быстрой и обновит запись перед фиксацией транзакции, а затем каждое изменение, сделанное задачей, будет перезаписано фиксацией, вы должны вызвать задачу сельдерея после фиксации транзакции, django 1.9 представил способ сделать это, для более ранних версий следует использовать django-transaction-hooks - person Jerzyk; 05.06.2016