С использованием с точки зрения Android
Ограничение скорости
Реальные пользователи жестоки, нетерпеливы и коварны. В системах с высокой степенью параллелизма могут возникнуть ситуации, когда ваш сервер может быть засыпан фиктивными запросами, поэтому вы можете захотеть это контролировать.
Некоторые из реальных вариантов использования могут быть следующими:
1. Управление квотами API. В качестве поставщика вы можете ограничить скорость, с которой запросы API отправляются на ваш сервер, в зависимости от тарифных планов, выбранных пользователем. Это может быть на стороне клиента или службы.
2. Безопасность — для защиты от DDOS-атак.
3. Контроль затрат. Это не обязательно касается стороны службы или даже стороны клиента. Если какой-то компонент генерирует событие с очень высокой скоростью, это может помочь в его контроле. Это может помочь контролировать телеметрию, отправляемую со стороны клиента.
Варианты, которые у нас есть при ограничении скорости
В зависимости от типа запроса/события, с которым мы имеем дело, может произойти следующее:
- Мы можем отказаться от дополнительных запросов
- Мы можем заставить запросы ждать, пока система не снизит их до предопределенной скорости.
Общие алгоритмы ограничения скорости
- Алгоритм корзины токенов
- Алгоритм дырявого ведра
- Фиксированное окно
Мы не будем вдаваться во внутренние детали этих алгоритмов, так как это выходит за рамки данной статьи.
Мы возьмем алгоритм фиксированного окна в качестве опорной точки. Запишем высокоуровневые требования к реализации.
Фиксированное окно — Окна разбиты на части, и у каждого окна есть счетчик. Каждый запрос увеличивает счетчик на единицу. Как только счетчик достигает порогового значения, новые запросы отбрасываются до тех пор, пока не начнется новое временное окно.
Требования
- Должна иметь возможность принимать требуемые (TPS) транзакции в секунду или скорость.
- Должны отбрасывать транзакции, если они превышают установленную нами скорость.
- Должен работать в параллельных ситуациях.
Расширенные функции (в следующих статьях)
- Должна иметь возможность сглаживать всплески запросов. Например, если мы определили TPS как
5
, и все пять запросов приходят в один и тот же момент, он должен иметь возможность выстраивать их в линию с фиксированными интервалами, т.е. выполнять каждый из них с интервалом в 200 мс. Для этого требуется внутренняя схема синхронизации. - Если у нас есть TPS
5
, и в одном из 1-секундных слотов мы используем только три жетона в течение следующей секунды, мы должны быть в состоянии предоставить 5 + 2 = 7 жетонов в качестве бонуса. Но из расчета 1/7 (142,28 мс) на токен. Бонус не должен переноситься на следующий слот.
Давайте сначала определим контракт для нашего RateLimiter
:
/** * Rate limiter helps in limiting the rate of execution of a piece of code. The rate is defined in terms of * TPS(Transactions per second). Rate of 5 would suggest, 5 transactions/second. Transaction could be a DB call, API call, * or a simple function call. * <p> * Every {@link RateLimiter} implementation should implement either {@link RateLimiter#throttle(Code)} or, {@link RateLimiter#enter()}. * They can also choose to implement all. * <p> * {@link Code} represents a piece of code that needs to be rate limited. It could be a function call, if the code to be rate limited * spreads across multiple functions, we need to use entry() and exit() contract. */ public interface RateLimiter { /** * Rate limits the code passed inside as an argument. * * @param code representation of the piece of code that needs to be rate limited. * @return true if executed, false otherwise. */ boolean throttle(Code code); /** * When the piece of code that needs to be rate limited cannot be represented as a contiguous * code, then entry() should be used before we start executing the code. This brings the code inside the rate * limiting boundaries. * * @return true if the code will execute and false if rate limited. * <p */ boolean enter(); /** * Interface to represent a contiguous piece of code that needs to be rate limited. */ interface Code { /** * Calling this function should execute the code that is delegated to this interface. */ void invoke(); } }
Наш RateLimiter
имеет два набора API: throttle(code)
и enter()
. Оба обслуживают одну и ту же функциональность, но двумя способами:
boolean throttle(code)
— можно использовать для передачи блока кода, если у нас есть непрерывный код.boolean enter()
— обычно может использоваться перед API, БД или любым вызовом, который мы хотим регулировать. Если код, следующий за этим, выполнится, он вернетtrue
иfalse
, если скорость ограничена. Вы можете поставить эти запросы в очередь или отклонить их.
Вы никогда не увидите реализацию
throttle(code)
в продакшене, потому что она неоптимальна. Пожалуйста, дайте мне знать, почему в комментариях. Большинство ограничителей скорости используют API, которые выглядят какenter()
.
Основная функциональность
Чтобы построить ядро нашего ограничителя скорости, нам нужно убедиться, что между любыми двумя секундами мы не должны разрешать более N
транзакций. Как мы это сделаем?
Рассмотрим момент, когда мы совершаем первую транзакцию t0
. Итак,
до (t0 + 1)s
нам разрешено совершать только N
транзакций. И как это будет гарантировать? Во время следующей транзакции мы проверим, current time ≤ (t0+1)
. Если нет, то значит мы вошли в другую секунду, и нам разрешено совершать N
транзакций. Давайте посмотрим на небольшой участок кода, демонстрирующий это:
long now = System.nanoTime(); if (now <= mNextSecondBoundary) { // If we are within the time limit of current second if (mCounter < N) { // If token available mLastExecutionNanos = now; mCounter++; // Allocate token invoke(code); // Invoke the code passed the throttle method. } }
Итак, как мы определяем mNextSecondBoundary
? Это будет сделано, когда мы совершим первую транзакцию, как уже говорилось, мы добавим одну секунду к моменту, когда первая транзакция будет выполнена.
if (mLastExecutionNanos == 0L) { mCounter++; // Allocate the very first token here. mLastExecutionNanos = System.nanoTime(); mNextSecondBoundary = mLastExecutionNanos + NANO_PER_SEC; // (10^9) }
Теперь, что нам делать, если мы выполним код и увидим, что мы вошли в другую секунду? Мы улучшим предыдущий код, сбросив время последнего выполнения, количество доступных токенов и повторив тот же процесс, снова вызвав throttle()
. Наш метод уже умеет обрабатывать новую секунду.
@Override public boolean throttle(Code code) { if (mTPS <= 0) { // We do not want anything to pass. return false; } synchronized (mLock) { if (mLastExecutionNanos == 0L) { mCounter++; mLastExecutionNanos = System.nanoTime(); mNextSecondBoundary = mLastExecutionNanos + NANO_PER_SEC; invoke(code); return true; } else { long now = System.nanoTime(); if (now <= mNextSecondBoundary) { if (mCounter < mTPS) { mLastExecutionNanos = now; mCounter++; invoke(code); return true; } else { return false; } } else { // Reset the counter as we in a different second now. mCounter = 0; mLastExecutionNanos = 0L; mNextSecondBoundary = 0L; return throttle(code); } } } }
В этой реализации мы можем передать блок кода, который нужно регулировать, но с этим кодом есть проблема. Это будет работать, но работать будет плохо. Не рекомендуется, но почему? Пожалуйста, дайте мне знать в комментариях.
Теперь пришло время создать второй API, используя те же строительные блоки и enter()
. Мы будем использовать ту же логику, но не будем выполнять блок кода внутри метода. Скорее, это будет следовать после вызова enter()
, поскольку мы делаем управление состоянием. Реализация метода заключается в следующем:
@Override public boolean enter() { if (mTPS == 0L) { return false; } synchronized (mBoundaryLock) { if (mLastExecutionNanos == 0L) { mLastExecutionNanos = System.nanoTime(); mCounter++; mNextSecondBoundary = mLastExecutionNanos + NANO_PER_SEC; return true; } else { long now = System.nanoTime(); if (now <= mNextSecondBoundary) { if (mCounter < mTPS) { mLastExecutionNanos = now; mCounter++; return true; } else return false; } else { // Reset the counter as we in a different second now. mCounter = 0; mLastExecutionNanos = 0L; mNextSecondBoundary = 0L; return enter(); } } } }
Теперь наш простой ограничитель скорости готов к использованию. Вы можете ознакомиться с полным кодом здесь.
Полученные результаты
Мы попробуем создать код драйвера, создающий шесть потоков. Каждый поток пытается считать от 0 до 100 с задержкой 50 мс (может быть установлено любое число). Мы запустим наш ограничитель скорости следующим образом:
public static void main(String[] args) { RateLimiter limiter = new SimpleTokenBucketRateLimiter(1); Thread[] group = new Thread[6]; Runnable r = () -> { for (int i = 0; i < 100; i++) { try { Thread.sleep(50); } catch (InterruptedException e) { throw new RuntimeException(e); } if (limiter.enter()) { System.out.println("Values:- " + Thread.currentThread().getName() + ": " + i); } } }; for (int i = 0; i < 6; i++) { group[i] = new Thread(r); group[i].start(); } }
Наши API не поддерживают сглаживание транзакций, вместо этого они заставляют их ждать, пока не будет назначен следующий токен, а не отбрасывать запросы. После их отклонения он возвращает false, поэтому мы можем поставить их в очередь, если мы действительно этого хотим.
if (limiter.enter()) { System.out.println("Values:- " + Thread.currentThread().getName() + ": " + i); } else { // queue the work again }
Когда мы пытаемся установить TPS на 2
, мы видим следующий вывод:
Оно работает!
Использование с точки зрения Android
- Рассмотрим случай, когда вы пишете код для захвата подписи пользователя. Когда они перетаскивают указатель, вы фиксируете тысячи точек. Все они могут не потребоваться для гладкой подписи, поэтому вы берете образец с ограничением скорости.
- Некоторые события вызываются с высокой частотой. Вы можете контролировать это.
- У нас есть ленивый слушатель
MessageQueue
. Когда мы слушаем его в основном потоке, он вызывается случайным образом. Иногда он вызывается несколько раз в секунду. Если мы хотим создать систему сердцебиения, которая сообщает нам, когда основной поток простаивает, мы можем использовать это для получения событий в секунду. Если мы не получаем событие в течение секунды, мы можем считать, что основной поток занят. - Для управления квотами API вашего фреймворка/библиотеки вы можете контролировать вызовы API в зависимости от плана оплаты, который выбрал пользователь.
Вот и все.
В следующих статьях мы построим более сложный ограничитель скорости.