Ограничение скорости - важная функция в большинстве серверных приложений. Эта возможность позволяет владельцам продуктов реализовывать такие функции, как:

а) Лицензирование на основе транзакций в секунду

б) Функции контроля перегрузки

c) Регулирование пользователей для справедливого использования ресурсов приложения.

г) Защита приложения от атак типа «отказ в обслуживании».

Необходимо, чтобы ограничение скорости было неотъемлемой частью вашего приложения. В Интернете существует множество реализаций ограничения скорости с открытым исходным кодом.

В этом посте мы обсудим еще одну реализацию ограничения скорости в качестве демонстрации, которую вы можете использовать, чтобы поиграть, изменив некоторые параметры конфигурации.

В этом примере модуль ограничения скорости является мультитенантным, и вы можете предоставить несколько идентификаторов экземпляров ограничения скорости и определить конкретные политики для каждого из них.

Детализация часов выражается в секундах, а наименьшая детализация времени в реализации - 1 секунда. По сути, это означает, что скорость ограничена количеством транзакций в секунду (TPS), а не транзакциями в миллисекундах или любым другим меньшим значением часов.

В большинстве случаев это разумно, поскольку реализации на стороне сервера всегда ограничивают количество транзакций в секунду.

Реализация также заботится о скачкообразном трафике. Например, если ограничение скорости составляет 100 транзакций в секунду, а 100 запросов поступают в первые 10 миллисекунд, то они будут регулироваться на границах в 1 секунду.

Следовательно, критерии оценки для ограничения скорости задаются с точностью до миллисекунд, но обеспечение соблюдения находится на вторых границах.

У нас задействованы следующие исходные файлы:

package com.demo;
import java.util.concurrent.TimeUnit;
/**
*
* @author aayush
* This is a simple test for a rate limiter.
* We can build the RateLimitExecutor object and set the following
* attributes:
* a) instance_id — which acts as a key to uniquely identify the rate
* limiting policy
* b) threshold — the Transactions per second to be controlled
* (throttle limit)
* Based on this information, provision the rate limiter in a
* container class — Rate Limit Manager
* This class then pumps loads through iterations, and sleep time to
* control traffic ingestion.
*/
public class RateLimiterTest implements RateLimitListener
{
// Number of iterations to be executed (simulate load generation)
private static long iterations = 1000;
// Rate Limit to be sent in Transactions per Second.
// Inspect the TPS value in the prints and play around with this value. Increase this value to avoid throttling
private static long threshold_to_be_enforced = 100;
// Sleep time between pumping traffic in milliseconds. Set to zero for uncontrolled traffic ingestion.
// Give a sleep time of 100 ms for example to pump traffic every 100 milliseconds and control the TPS
private static long sleepTime = 0;
public static void main (String… args) throws InterruptedException
{
new RateLimiterTest().test();
}
private void test() throws InterruptedException
{
// Setup and provision the rate limit
// User Defined Rate Limit Instance Id. Eg: HTTP Interface
String instance_id = “HTTP Interface”;
// Create an instance of Rate Limit Executor
RateLimitExecutor rateLimiter = new RateLimitExecutor();
// Set the thresholds / second.
rateLimiter.build(TimeUnit.SECONDS, threshold_to_be_enforced);
// Associate instance ID
rateLimiter.setInstance_id(instance_id);
// Provision Rate Limit
RateLimitManager._instance.provisionRateLimit(rateLimiter, instance_id, new RateLimitThrottleListener());
// Start the test
for (int i=0;i<iterations;i++)
{
RateLimitManager._instance.pegTraffic(instance_id);
Thread.sleep(sleepTime);
}
try {
// Wait for graceful termination of worker thread from the pool
RateLimitManager._instance.getThreadPool().awaitTermination(2, TimeUnit.SECONDS);
RateLimitManager._instance.deProvisionRateLimit(“HTTP Interface”);
} catch (InterruptedException e)
{
System.exit(0);
}
System.out.println(“\n Test Ended \n”);
System.exit(1);
}
public void rateLimitThresholdBreached()
{
System.out.println(“Rate Limit has been breached for: “);
}
public void rateLimitThresholdNormal()
{
System.out.println(“Rate Limit is under control for: “);
}
}

Класс Singleton диспетчера ограничений скорости:

package com.demo;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
*
* @author aayush
* This is a thread safe static singleton class which maintains all
* the rate limiting policies in the form of a concurrent hash map.
* The key of the map is the instance id while the value is the Rate
* Limit Executor object.
* Based on the key supplied the appropriate Rate Limiter is invoked.
*/
public final class RateLimitManager
{
// Static singleton class which is thread safe from multiple instantiation race conditions in traiditonal sigletons
public static final RateLimitManager _instance = new RateLimitManager();
// Container for keeping track of all provisioned rate limits.
private ConcurrentHashMap <String, RateLimitExecutor> rateLimitMap = new ConcurrentHashMap<String, RateLimitExecutor>();
// Thread pool initialization
private ExecutorService threadPool = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
// To provision a new Rate Limit Policy and executor. Callback interface to receive notifications
public void provisionRateLimit(RateLimitExecutor builder, String instance_id, RateLimitListener listener)
{
builder.setListener(listener);
rateLimitMap.put(instance_id, builder);
}
// To remove a rate limit policy
public void deProvisionRateLimit (String instance_id)
{
try
{
rateLimitMap.remove(instance_id);
}
catch (Exception e)
{
e.printStackTrace();
}
}
// API Call to peg traffic and evaluate the rate limit provisioned
public void pegTraffic(String instance_id)
{
rateLimitMap.get(instance_id).evalute();
}
public ExecutorService getThreadPool()
{
return threadPool;
}
}

Затем у нас есть объект Rate Limit Executor, который оценивает и применяет ограничение скорости в приложении:

package com.demo;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
/**
*
* @author aayush
* This class calculates the TPS and applies the throttling policy
* It uses granular write locking for thread safety.
*/
public class RateLimitExecutor
{
// Defaults to seconds
private TimeUnit timeUnit;
// Current transactions counted
private long transactions = 0L;
// Transactions per second allowed
private long threshold;
// Calculated Transactions per second
private long tps;
// Throttle key
private String instance_id;
// Timestamp for evaluation
private double timeStamp;
// Thread Safety aids
private ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
private WriteLock wLock = rwLock.writeLock();
// Callback handle
private RateLimitListener listener;
public RateLimitExecutor()
{
this.timeStamp = System.currentTimeMillis();
}
public void setInstanceID(String instance_id)
{
this.instance_id = instance_id;
}
public void evalute()
{
System.out.println(“Starting Rate Limit evaluation\n” + “Threshold set is: “+threshold);
++transactions;
wLock.lock();
// Take the current timestamp
long currentTime = System.currentTimeMillis();
// Get the delta time elapsed
double deltaTime = (currentTime — timeStamp);
System.out.println(“Delta time elapsed: “+deltaTime);
// Calculate transactions per second
tps = (long) (transactions/deltaTime * 1000L);
// Don’t print TPS on the very first hit as its misleading
if(transactions != 1)
System.out.println(“TPS is — “+ tps);
// What is higher, TPS threshold or transactions per second? Exclude the very first transaction to avoid false positives
if(tps >=threshold && transactions !=1)
{
System.out.println(“Rate limit has been breached, Transaction Number: “+transactions+
“ in delta time (milliseconds): “+ deltaTime +
“ Threshold: “ +threshold);
RateLimitManager._instance.getThreadPool().execute(new WorkerThread(listener));
}
// Leave write lock
wLock.unlock();
}
public void build (TimeUnit time, long threshold)
{
this.timeUnit = time;
this.threshold = threshold;
}
public void setListener (RateLimitListener listener)
{
this.listener = listener;
}
public TimeUnit getTimeUnit()
{
return timeUnit;
}
public Long getTransactions()
{
return transactions;
}
public long getThreshold()
{
return threshold;
}
public String getInstance_id()
{
return instance_id;
}
public void setInstance_id(String instance_id)
{
this.instance_id = instance_id;
}
}

У нас есть рабочий поток, который информирует приложение в случае нарушения ограничения скорости:

package com.demo;
/**
*
* @author aayush
* Worker thread to notify the application whenever the rate limit is
* breached.
* This thread is submitted to a thread pool asynchronously.
*/
public class WorkerThread implements Runnable
{
private RateLimitListener listener;
public WorkerThread(RateLimitListener listener )
{
this.listener = listener;
}
@Override
public void run()
{
this.listener.rateLimitThresholdBreached();
}
}

Наконец, имитация приложения, получающего обратный вызов, путем реализации интерфейса слушателя:

package com.demo;
/**
*
* @author aayush
* Interface to be implemented by the application to receive
* notifications whenever the rate limit is breached.
* Future behavior can be added to this interface.
*
*/
public interface RateLimitListener
{
public void rateLimitThresholdBreached();
}

а также..

package com.demo;
/**
* @author aayush
* Listener interface’s implementation to receive the threshold
* breach notifications. Application may take appropriate action
* after receiving this callback
*/
public class RateLimitThrottleListener implements RateLimitListener
{
public RateLimitThrottleListener()
{
}
public void rateLimitThresholdBreached()
{
System.out.println(“Received threshold breach callback notification”);
}
}

Мы можем свободно расширить эту реализацию дополнительными функциями, такими как действия, ограничивающие скорость выполнения - отключение / включение ограничений, увеличение / уменьшение ограничений и т. Д.

Возможности улучшения безграничны и зависят от характера приложения.

Если вам понравилась эта статья, не стесняйтесь подключиться к LinkedIn