Celery: Ограничение скорости для задач с одинаковыми параметрами

Я ищу способ ограничить вызов функции, но только тогда, когда входные параметры разные, то есть:

@app.task(rate_limit="60/s")
def api_call(user):
   do_the_api_call()

for i in range(0,100):
  api_call("antoine")
  api_call("oscar")

Поэтому я хотел бы, чтобы api_call("antoine") также вызывался 60 раз в секунду и api_call("oscar") 60 раз в секунду.

Любая помощь в том, как я могу это сделать?

--EDIT 27/04/2015 Я попытался вызвать подзадачу с rate_limit внутри задачи, но она тоже не работает: rate_limit всегда применяется ко всем экземплярам подзадач или задач (что логично).

@app.task(rate_limit="60/s")
def sub_api_call(user):
   do_the_api_call()

@app.task
def api_call(user):
  sub_api_call(user)

for i in range(0,100):
  api_call("antoine")
  api_call("oscar")

Лучший!


person Antoine Brunel    schedule 24.04.2015    source источник
comment
Нельзя просто использовать декоратор @app.task(rate_limit=60) в методе?   -  person reptilicus    schedule 24.04.2015
comment
Ну, я так не думаю, поскольку это будет ограничивать как api_call(antoine), так и api_call(Oscar) @30/s, и я хочу, чтобы ограничение применялось к параметру, а не к функции.   -  person Antoine Brunel    schedule 24.04.2015
comment
Исправление, я хочу, чтобы ограничение применялось не только к функции, но и к параметру.   -  person Antoine Brunel    schedule 24.04.2015
comment
На основании чего значение параметра как-то? Можно ли обернуть задачу celery в другой декоратор и каким-то образом добавить rate_limit в фактический вызов задачи?   -  person reptilicus    schedule 24.04.2015
comment
Извините, я не понял, можно поконкретнее?   -  person Antoine Brunel    schedule 25.04.2015
comment
Да, в зависимости от значения параметра. Ставка должна применяться только к идентичным параметрам   -  person Antoine Brunel    schedule 25.04.2015
comment
Может быть, использовать задачи и применить ограничение скорости для подзадачи?   -  person Antoine Brunel    schedule 25.04.2015
comment
Возможный дубликат Выполнение уникальных задач с сельдереем   -  person funky-future    schedule 05.10.2017


Ответы (2)


Я потратил некоторое время на это сегодня и придумал хорошее решение. Все остальные решения этой проблемы имеют одну из следующих проблем:

  • Они требуют, чтобы задачи имели бесконечные повторные попытки, что делает механизм повторных попыток сельдерея бесполезным.
  • Они не дросселируют на основе параметров
  • Он не работает с несколькими рабочими или очередями
  • Они громоздкие и т.

По сути, вы обертываете свою задачу следующим образом:

@app.task(bind=True, max_retries=10)
@throttle_task("2/s", key="domain", jitter=(2, 15))
def scrape_domain(self, domain):
    do_stuff()

И в результате вы уменьшите скорость выполнения задачи до 2 запусков в секунду на каждый параметр домена со случайным дрожанием повторных попыток от 2 до 15 с. Параметр key является необязательным, но соответствует параметру в вашей задаче. Если ключевой параметр не указан, он просто урежет задачу до указанной скорости. Если он предусмотрен, то дроссель будет применяться к диаде (задача, ключ).

Другой способ взглянуть на это без декоратора. Это дает немного больше гибкости, но рассчитывает на то, что вы сделаете повторную попытку самостоятельно. Вместо вышеперечисленного вы можете сделать:

@app.task(bind=True, max_retries=10)
def scrape_domain(self, domain):
    proceed = is_rate_okay(self, "2/s", key=domain)
    if proceed:
        do_stuff()
    else:
        self.request.retries = task.request.retries - 1  # Don't count this as against max_retries.
        return task.retry(countdown=random.uniform(2, 15))

Я думаю, что это идентично первому примеру. Немного длиннее и разветвленнее, но показывает, как это работает, немного яснее. Я надеюсь всегда использовать декоратор сам.

Все это работает, сохраняя подсчет в Redis. Реализация очень проста. Вы создаете ключ в Redis для задачи (и ключевой параметр, если он задан), и срок действия ключа Redis истекает в соответствии с предоставленным расписанием. Если пользователь устанавливает скорость 10/м, вы создаете ключ Redis на 60 секунд и увеличиваете его каждый раз, когда выполняется задача с правильным именем. Если ваш инкремент становится слишком большим, повторите задачу. В противном случае запустите его.

def parse_rate(rate: str) -> Tuple[int, int]:
    """

    Given the request rate string, return a two tuple of:
    <allowed number of requests>, <period of time in seconds>

    (Stolen from Django Rest Framework.)
    """
    num, period = rate.split("/")
    num_requests = int(num)
    if len(period) > 1:
        # It takes the form of a 5d, or 10s, or whatever
        duration_multiplier = int(period[0:-1])
        duration_unit = period[-1]
    else:
        duration_multiplier = 1
        duration_unit = period[-1]
    duration_base = {"s": 1, "m": 60, "h": 3600, "d": 86400}[duration_unit]
    duration = duration_base * duration_multiplier
    return num_requests, duration


def throttle_task(
    rate: str,
    jitter: Tuple[float, float] = (1, 10),
    key: Any = None,
) -> Callable:
    """A decorator for throttling tasks to a given rate.

    :param rate: The maximum rate that you want your task to run. Takes the
    form of '1/m', or '10/2h' or similar.
    :param jitter: A tuple of the range of backoff times you want for throttled
    tasks. If the task is throttled, it will wait a random amount of time
    between these values before being tried again.
    :param key: An argument name whose value should be used as part of the
    throttle key in redis. This allows you to create per-argument throttles by
    simply passing the name of the argument you wish to key on.
    :return: The decorated function
    """

    def decorator_func(func: Callable) -> Callable:
        @functools.wraps(func)
        def wrapper(*args, **kwargs) -> Any:
            # Inspect the decorated function's parameters to get the task
            # itself and the value of the parameter referenced by key.
            sig = inspect.signature(func)
            bound_args = sig.bind(*args, **kwargs)
            task = bound_args.arguments["self"]
            key_value = None
            if key:
                try:
                    key_value = bound_args.arguments[key]
                except KeyError:
                    raise KeyError(
                        f"Unknown parameter '{key}' in throttle_task "
                        f"decorator of function {task.name}. "
                        f"`key` parameter must match a parameter "
                        f"name from function signature: '{sig}'"
                    )
            proceed = is_rate_okay(task, rate, key=key_value)
            if not proceed:
                logger.info(
                    "Throttling task %s (%s) via decorator.",
                    task.name,
                    task.request.id,
                )
                # Decrement the number of times the task has retried. If you
                # fail to do this, it gets auto-incremented, and you'll expend
                # retries during the backoff.
                task.request.retries = task.request.retries - 1
                return task.retry(countdown=random.uniform(*jitter))
            else:
                # All set. Run the task.
                return func(*args, **kwargs)

        return wrapper

    return decorator_func


def is_rate_okay(task: Task, rate: str = "1/s", key=None) -> bool:
    """Keep a global throttle for tasks

    Can be used via the `throttle_task` decorator above.

    This implements the timestamp-based algorithm detailed here:

        https://www.figma.com/blog/an-alternative-approach-to-rate-limiting/

    Basically, you keep track of the number of requests and use the key
    expiration as a reset of the counter.

    So you have a rate of 5/m, and your first task comes in. You create a key:

        celery_throttle:task_name = 1
        celery_throttle:task_name.expires = 60

    Another task comes in a few seconds later:

        celery_throttle:task_name = 2
        Do not update the ttl, it now has 58s remaining

    And so forth, until:

        celery_throttle:task_name = 6
        (10s remaining)

    We're over the threshold. Re-queue the task for later. 10s later:

        Key expires b/c no more ttl.

    Another task comes in:

        celery_throttle:task_name = 1
        celery_throttle:task_name.expires = 60

    And so forth.

    :param task: The task that is being checked
    :param rate: How many times the task can be run during the time period.
    Something like, 1/s, 2/h or similar.
    :param key: If given, add this to the key placed in Redis for the item.
    Typically, this will correspond to the value of an argument passed to the
    throttled task.
    :return: Whether the task should be throttled or not.
    """
    key = f"celery_throttle:{task.name}{':' + str(key) if key else ''}"

    r = make_redis_interface("CACHE")

    num_tasks, duration = parse_rate(rate)

    # Check the count in redis
    count = r.get(key)
    if count is None:
        # No key. Set the value to 1 and set the ttl of the key.
        r.set(key, 1)
        r.expire(key, duration)
        return True
    else:
        # Key found. Check it.
        if int(count) <= num_tasks:
            # We're OK, run it.
            r.incr(key, 1)
            return True
        else:
            return False
person mlissner    schedule 11.02.2021
comment
Отличное решение, пара вопросов. 1. Дроссель - это постоянное окно, а не скользящее окно (в любой момент времени должно было быть не более N задач за последний период K). Это не очень точное дросселирование. 2. В этом решении используется декоратор. Если у вас запущено более 1 задачи на другом сервере и в распределенной очереди, как они будут синхронизированы между рабочими процессами и серверами очередей? - person Vikash Singh; 13.02.2021
comment
Да, скользящее окно — это компромисс между точностью и эффективностью. Подробности смотрите в блоге, упомянутом в комментариях. Он будет правильно усреднен, но может отличаться в 2 раза в каждом окне. (2) Да, декоратор создает центральное расположение в вашей базе данных Redis, где проверяется дроссельная заслонка, поэтому вы можете помещать задачи в любую очередь, и пока они имеют одно и то же имя и используют одну и ту же базу данных Redis, вы будете быть хорошим идти. - person mlissner; 14.02.2021
comment
круто, попробую, когда будет время. - person Vikash Singh; 15.02.2021
comment
После использования этого в производстве в течение последних нескольких месяцев я обнаружил, что у него проблема с пинг-понгом. Задачи будут регулироваться, но затем очень скоро будут повторяться. Если бы было 1000 регулируемых задач, все они постоянно повторялись бы. Чтобы исправить это, я написал обновленную версию, включающую планировщик задач, которые необходимо повторить. Он отслеживает невыполненные работы и устанавливает задачи для повторения, когда они должны быть ясны. Много комментариев и примечаний в коде здесь: github. com/freelawproject/courtlistener/blob/ - person mlissner; 26.06.2021

Я не думаю, что этого можно добиться с помощью встроенного в Celery ограничителя задач.

Предполагая, что вы используете какой-то кеш для своего API, лучшим решением может быть создание хэша имени задачи и аргументов и использование этого ключа для дросселя на основе кеша.

Если вы используете Redis, вы можете либо установить блокировку с 60-секундным тайм-аутом, либо использовать инкрементный счетчик для подсчета вызовов в минуту.

Этот пост может дать вам несколько советов по распределенному регулированию задач Celery с помощью Redis:

https://callhub.io/blog/2014/02/03/distributed-rate-limiting-with-redis-and-celery/

person Erve1879    schedule 26.04.2015
comment
Спасибо за ваш ответ и предложение. До сих пор я использовал RabbitMQ с Celery, но тогда я, вероятно, продолжу использовать свой старый добрый код! - person Antoine Brunel; 27.04.2015
comment
ссылка умерла, к сожалению - person jorf.brunning; 22.07.2021