С использованием с точки зрения Android

Ограничение скорости

Реальные пользователи жестоки, нетерпеливы и коварны. В системах с высокой степенью параллелизма могут возникнуть ситуации, когда ваш сервер может быть засыпан фиктивными запросами, поэтому вы можете захотеть это контролировать.

Некоторые из реальных вариантов использования могут быть следующими:

1. Управление квотами API. В качестве поставщика вы можете ограничить скорость, с которой запросы API отправляются на ваш сервер, в зависимости от тарифных планов, выбранных пользователем. Это может быть на стороне клиента или службы.

2. Безопасность — для защиты от DDOS-атак.

3. Контроль затрат. Это не обязательно касается стороны службы или даже стороны клиента. Если какой-то компонент генерирует событие с очень высокой скоростью, это может помочь в его контроле. Это может помочь контролировать телеметрию, отправляемую со стороны клиента.

Варианты, которые у нас есть при ограничении скорости

В зависимости от типа запроса/события, с которым мы имеем дело, может произойти следующее:

  1. Мы можем отказаться от дополнительных запросов
  2. Мы можем заставить запросы ждать, пока система не снизит их до предопределенной скорости.

Общие алгоритмы ограничения скорости

  1. Алгоритм корзины токенов
  2. Алгоритм дырявого ведра
  3. Фиксированное окно

Мы не будем вдаваться во внутренние детали этих алгоритмов, так как это выходит за рамки данной статьи.

Мы возьмем алгоритм фиксированного окна в качестве опорной точки. Запишем высокоуровневые требования к реализации.

Фиксированное окно  —  Окна разбиты на части, и у каждого окна есть счетчик. Каждый запрос увеличивает счетчик на единицу. Как только счетчик достигает порогового значения, новые запросы отбрасываются до тех пор, пока не начнется новое временное окно.

Требования

  1. Должна иметь возможность принимать требуемые (TPS) транзакции в секунду или скорость.
  2. Должны отбрасывать транзакции, если они превышают установленную нами скорость.
  3. Должен работать в параллельных ситуациях.

Расширенные функции (в следующих статьях)

  1. Должна иметь возможность сглаживать всплески запросов. Например, если мы определили TPS как 5, и все пять запросов приходят в один и тот же момент, он должен иметь возможность выстраивать их в линию с фиксированными интервалами, т.е. выполнять каждый из них с интервалом в 200 мс. Для этого требуется внутренняя схема синхронизации.
  2. Если у нас есть TPS 5, и в одном из 1-секундных слотов мы используем только три жетона в течение следующей секунды, мы должны быть в состоянии предоставить 5 + 2 = 7 жетонов в качестве бонуса. Но из расчета 1/7 (142,28 мс) на токен. Бонус не должен переноситься на следующий слот.

Давайте сначала определим контракт для нашего RateLimiter:

/**
 * Rate limiter helps in limiting the rate of execution of a piece of code. The rate is defined in terms of
 * TPS(Transactions per second). Rate of 5 would suggest, 5 transactions/second. Transaction could be a DB call, API call,
 * or a simple function call.
 * <p>
 * Every {@link RateLimiter} implementation should implement either {@link RateLimiter#throttle(Code)} or, {@link RateLimiter#enter()}.
 * They can also choose to implement all.
 * <p>
 * {@link Code} represents a piece of code that needs to be rate limited. It could be a function call, if the code to be rate limited
 * spreads across multiple functions, we need to use entry() and exit() contract.
 */
public interface RateLimiter {

/**
     * Rate limits the code passed inside as an argument.
     *
     * @param code representation of the piece of code that needs to be rate limited.
     * @return true if executed, false otherwise.
     */
    boolean throttle(Code code);
    /**
     * When the piece of code that needs to be rate limited cannot be represented as a contiguous
     * code, then entry() should be used before we start executing the code. This brings the code inside the rate
     * limiting boundaries.
     *
     * @return true if the code will execute and false if rate limited.
     * <p
     */
    boolean enter();
    /**
     * Interface to represent a contiguous piece of code that needs to be rate limited.
     */
    interface Code {
        /**
         * Calling this function should execute the code that is delegated to this interface.
         */
        void invoke();
    }
}

Наш RateLimiter имеет два набора API: throttle(code) и enter(). Оба обслуживают одну и ту же функциональность, но двумя способами:

  1. boolean throttle(code) — можно использовать для передачи блока кода, если у нас есть непрерывный код.
  2. boolean enter() — обычно может использоваться перед API, БД или любым вызовом, который мы хотим регулировать. Если код, следующий за этим, выполнится, он вернет true и false, если скорость ограничена. Вы можете поставить эти запросы в очередь или отклонить их.

Вы никогда не увидите реализацию throttle(code) в продакшене, потому что она неоптимальна. Пожалуйста, дайте мне знать, почему в комментариях. Большинство ограничителей скорости используют API, которые выглядят как enter().

Основная функциональность

Чтобы построить ядро ​​нашего ограничителя скорости, нам нужно убедиться, что между любыми двумя секундами мы не должны разрешать более N транзакций. Как мы это сделаем?

Рассмотрим момент, когда мы совершаем первую транзакцию t0. Итак,
до (t0 + 1)s нам разрешено совершать только N транзакций. И как это будет гарантировать? Во время следующей транзакции мы проверим,
current time ≤ (t0+1). Если нет, то значит мы вошли в другую секунду, и нам разрешено совершать N транзакций. Давайте посмотрим на небольшой участок кода, демонстрирующий это:

long now = System.nanoTime();
if (now <= mNextSecondBoundary) { // If we are within the time limit of current second
    if (mCounter < N) { // If token available
        mLastExecutionNanos = now;
        mCounter++; // Allocate token
        invoke(code); // Invoke the code passed the throttle method.
    }
}

Итак, как мы определяем mNextSecondBoundary? Это будет сделано, когда мы совершим первую транзакцию, как уже говорилось, мы добавим одну секунду к моменту, когда первая транзакция будет выполнена.

if (mLastExecutionNanos == 0L) {
    mCounter++; // Allocate the very first token here.
    mLastExecutionNanos = System.nanoTime();
    mNextSecondBoundary = mLastExecutionNanos + NANO_PER_SEC;  // (10^9)
}

Теперь, что нам делать, если мы выполним код и увидим, что мы вошли в другую секунду? Мы улучшим предыдущий код, сбросив время последнего выполнения, количество доступных токенов и повторив тот же процесс, снова вызвав throttle(). Наш метод уже умеет обрабатывать новую секунду.

@Override
public boolean throttle(Code code) {
    if (mTPS <= 0) {
        // We do not want anything to pass.
        return false;
    }

synchronized (mLock) {
        if (mLastExecutionNanos == 0L) {
            mCounter++;
            mLastExecutionNanos = System.nanoTime();
            mNextSecondBoundary = mLastExecutionNanos + NANO_PER_SEC;
            invoke(code);
            return true;
        } else {
            long now = System.nanoTime();
            if (now <= mNextSecondBoundary) {
                if (mCounter < mTPS) {
                    mLastExecutionNanos = now;
                    mCounter++;
                    invoke(code);
                    return true;
                } else {
                    return false;
                }
            } else {
                // Reset the counter as we in a different second now.
                mCounter = 0;
                mLastExecutionNanos = 0L;
                mNextSecondBoundary = 0L;
                return throttle(code);
            }
        }
    }
}

В этой реализации мы можем передать блок кода, который нужно регулировать, но с этим кодом есть проблема. Это будет работать, но работать будет плохо. Не рекомендуется, но почему? Пожалуйста, дайте мне знать в комментариях.

Теперь пришло время создать второй API, используя те же строительные блоки и enter(). Мы будем использовать ту же логику, но не будем выполнять блок кода внутри метода. Скорее, это будет следовать после вызова enter(), поскольку мы делаем управление состоянием. Реализация метода заключается в следующем:

@Override
public boolean enter() {
    if (mTPS == 0L) {
        return false;
    }

synchronized (mBoundaryLock) {
        if (mLastExecutionNanos == 0L) {
            mLastExecutionNanos = System.nanoTime();
            mCounter++;
            mNextSecondBoundary = mLastExecutionNanos + NANO_PER_SEC;
            return true;
        } else {
            long now = System.nanoTime();
            if (now <= mNextSecondBoundary) {
                if (mCounter < mTPS) {
                    mLastExecutionNanos = now;
                    mCounter++;
                    return true;
                } else return false;
            } else {
                // Reset the counter as we in a different second now.
                mCounter = 0;
                mLastExecutionNanos = 0L;
                mNextSecondBoundary = 0L;
                return enter();
            }
        }
    }
}

Теперь наш простой ограничитель скорости готов к использованию. Вы можете ознакомиться с полным кодом здесь.

Полученные результаты

Мы попробуем создать код драйвера, создающий шесть потоков. Каждый поток пытается считать от 0 до 100 с задержкой 50 мс (может быть установлено любое число). Мы запустим наш ограничитель скорости следующим образом:

public static void main(String[] args) {
    RateLimiter limiter = new SimpleTokenBucketRateLimiter(1);
    Thread[] group = new Thread[6];
    Runnable r = () -> {
        for (int i = 0; i < 100; i++) {
            try {
                Thread.sleep(50);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            if (limiter.enter()) {
                System.out.println("Values:- " + Thread.currentThread().getName() + ": " + i);
            }
        }
    };


for (int i = 0; i < 6; i++) {
        group[i] = new Thread(r);
        group[i].start();
    }
}

Наши API не поддерживают сглаживание транзакций, вместо этого они заставляют их ждать, пока не будет назначен следующий токен, а не отбрасывать запросы. После их отклонения он возвращает false, поэтому мы можем поставить их в очередь, если мы действительно этого хотим.

if (limiter.enter()) {
                System.out.println("Values:- " + Thread.currentThread().getName() + ": " + i);
} else { // queue the work again }

Когда мы пытаемся установить TPS на 2, мы видим следующий вывод:

Оно работает!

Использование с точки зрения Android

  1. Рассмотрим случай, когда вы пишете код для захвата подписи пользователя. Когда они перетаскивают указатель, вы фиксируете тысячи точек. Все они могут не потребоваться для гладкой подписи, поэтому вы берете образец с ограничением скорости.
  2. Некоторые события вызываются с высокой частотой. Вы можете контролировать это.
  3. У нас есть ленивый слушатель MessageQueue. Когда мы слушаем его в основном потоке, он вызывается случайным образом. Иногда он вызывается несколько раз в секунду. Если мы хотим создать систему сердцебиения, которая сообщает нам, когда основной поток простаивает, мы можем использовать это для получения событий в секунду. Если мы не получаем событие в течение секунды, мы можем считать, что основной поток занят.
  4. Для управления квотами API вашего фреймворка/библиотеки вы можете контролировать вызовы API в зависимости от плана оплаты, который выбрал пользователь.

Вот и все.

В следующих статьях мы построим более сложный ограничитель скорости.