Django Celerybeat PeriodicTask работает намного лучше, чем ожидалось

Я борюсь с Django, Celery, djcelery и PeriodicTasks.

Я создал задачу, чтобы получить отчет для AdSense, чтобы создать отчет о статистике в реальном времени. Вот моя задача:

import datetime
import httplib2
import logging

from apiclient.discovery import build
from celery.task import PeriodicTask
from django.contrib.auth.models import User
from oauth2client.django_orm import Storage

from .models import Credential, Revenue


logger = logging.getLogger(__name__)


class GetReportTask(PeriodicTask):
    run_every = datetime.timedelta(minutes=2)

    def run(self, *args, **kwargs):
        scraper = Scraper()
        scraper.get_report()


class Scraper(object):
    TODAY = datetime.date.today()
    YESTERDAY = TODAY - datetime.timedelta(days=1)

    def get_report(self, start_date=YESTERDAY, end_date=TODAY):
        logger.info('Scraping Adsense report from {0} to {1}.'.format(
            start_date, end_date))
        user = User.objects.get(pk=1)
        storage = Storage(Credential, 'id', user, 'credential')
        credential = storage.get()
        if not credential is None and credential.invalid is False:
            http = httplib2.Http()
            http = credential.authorize(http)
            service = build('adsense', 'v1.2', http=http)
            reports = service.reports()
            report = reports.generate(
                startDate=start_date.strftime('%Y-%m-%d'),
                endDate=end_date.strftime('%Y-%m-%d'),
                dimension='DATE',
                metric='EARNINGS',
            )
            data = report.execute()
            for row in data['rows']:
                date = row[0]
                revenue = row[1]

                try:
                    record = Revenue.objects.get(date=date)
                except Revenue.DoesNotExist:
                    record = Revenue()
                record.date = date
                record.revenue = revenue
                record.save()
        else:
            logger.error('Invalid Adsense Credentials')

Я использую Celery и RabbitMQ. Вот мои настройки:

# Celery/RabbitMQ
BROKER_HOST = "localhost"
BROKER_PORT = 5672
BROKER_USER = "myuser"
BROKER_PASSWORD = "****"
BROKER_VHOST = "myvhost"
CELERYD_CONCURRENCY = 1
CELERYD_NODES = "w1"
CELERY_RESULT_BACKEND = "amqp"
CELERY_TIMEZONE = 'America/Denver'
CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler'

import djcelery
djcelery.setup_loader()

На первый взгляд кажется, что все работает, но после включения регистратора и наблюдения за его работой я обнаружил, что он запускает задачу по крайней мере четыре раза подряд, а иногда и больше. Также кажется, что он работает каждую минуту, а не каждые две минуты. Я пытался изменить run_every, чтобы использовать crontab, но получаю те же результаты.

Я начинаю celerybeat с помощью супервизора. Вот команда, которую я использую:

python manage.py celeryd -B -E -c 1

Любые идеи относительно того, почему он не работает должным образом?

О, и еще одна вещь: после смены дня он продолжает использовать диапазон дат, с которым он работал в первый раз. Таким образом, с течением времени он продолжает получать статистику за день, когда задача начала выполняться - если я не запустил задачу вручную в какой-то момент, она изменится на дату, когда я последний раз запускал ее вручную. Может кто-нибудь сказать мне, почему это происходит?


person Dustin    schedule 10.04.2013    source источник
comment
Вы абсолютно уверены, что не запускаете несколько битов одновременно?   -  person asksol    schedule 11.04.2013


Ответы (1)


Рассмотрите возможность создания отдельной очереди с одним рабочим процессом и фиксированной скоростью для задач этого типа и просто добавьте задачи в эту новую очередь вместо того, чтобы запускать их непосредственно из celerybeat. Я надеюсь, что это поможет вам понять, что не так с вашим кодом, проблема в celerybeat или ваши задачи выполняются дольше, чем ожидалось.

@task(queue='create_report', rate_limit='0.5/m')
def create_report():
    scraper = Scraper()
    scraper.get_report()

class GetReportTask(PeriodicTask):
    run_every = datetime.timedelta(minutes=2)

    def run(self, *args, **kwargs):
        create_report.delay()

в настройках.py

   CELERY_ROUTES = {
     'myapp.tasks.create_report': {'queue': 'create_report'},
   }

запустите дополнительного работника сельдерея, который будет обрабатывать задачи в вашей очереди

рабочий сельдерея -c 1 -Q create_report -n create_report.local

Проблема 2. Ваши переменные YESTERDAY и TODAY установлены на уровне класса, поэтому внутри одного потока они устанавливаются только один раз.

person singer    schedule 10.04.2013
comment
Извините, я не совсем уверен, как сделать то, что вы описали. Можете ли вы дать мне более подробную информацию? - person Dustin; 10.04.2013
comment
В задаче 2 СЕГОДНЯ и ВЧЕРА были глобальными переменными, и я переместил их в класс, надеясь, что это поможет. Теперь я полностью удалил переменные. Завтра узнаю, сработало ли это. - person Dustin; 10.04.2013
comment
Я попробовал код, который вы разместили. Это решило проблему. Теперь он запускает задачу AdSense каждые 2 минуты. На самом деле у меня есть 13 периодических задач в 8 разных приложениях. Должен ли я делать то же самое с каждым из них? Все они работают слишком часто, но AdSense был худшим. - person Dustin; 10.04.2013
comment
Из моего опыта: когда сомневаюсь иметь другую очередь или нет, я всегда выбираю отдельную очередь. В вашем случае, я думаю, было бы довольно сложно поддерживать 13+ очередей, поэтому сделайте несколько - одну для небольших задач и несколько очередей для больших задач, одну вещь, которую вам лучше не делать, это запускать задачи напрямую из celerybeat, просто перенаправлять их на другие очереди. Отметьте мой ответ как решение, пожалуйста. - person singer; 10.04.2013
comment
Так что похоже, что реальным решением было ограничение скорости. Если я удалю ограничение скорости, оно постоянно добавляет задачу. Так работает ли класс PeriodicTask? - person Dustin; 10.04.2013
comment
Ваш код выглядит правильно, проверьте рекурсивные вызовы где-нибудь в вашем приложении. Добавьте образец PeriodicTask, который просто регистрирует свое состояние, чтобы понять, все ли в порядке с планировщиком. - person singer; 10.04.2013
comment
Странная мысль случилась. После того, как я удалил ограничение скорости для своей задачи AdSense, хотя моя очередь AdSense добавляла задачи без остановок, все мои другие задачи начали работать нормально. Затем я вернул свой код к тому, что было, и теперь все работает как надо. Так расстраивает! Я понятия не имею, как и почему это вдруг исправлено. - person Dustin; 10.04.2013
comment
Это хорошая идея, чтобы иметь ограничение скорости для задач, которые делают какие-то вызовы RPC. Если вы столкнулись со странным поведением в своем коде - вы просто видите лишние задачи в этой очереди и у вас есть время искать решение, почему они туда попадают, а не ломают все подряд. - person singer; 10.04.2013