Ограничение скорости - важная функция в большинстве серверных приложений. Эта возможность позволяет владельцам продуктов реализовывать такие функции, как:
а) Лицензирование на основе транзакций в секунду
б) Функции контроля перегрузки
c) Регулирование пользователей для справедливого использования ресурсов приложения.
г) Защита приложения от атак типа «отказ в обслуживании».
Необходимо, чтобы ограничение скорости было неотъемлемой частью вашего приложения. В Интернете существует множество реализаций ограничения скорости с открытым исходным кодом.
В этом посте мы обсудим еще одну реализацию ограничения скорости в качестве демонстрации, которую вы можете использовать, чтобы поиграть, изменив некоторые параметры конфигурации.
В этом примере модуль ограничения скорости является мультитенантным, и вы можете предоставить несколько идентификаторов экземпляров ограничения скорости и определить конкретные политики для каждого из них.
Детализация часов выражается в секундах, а наименьшая детализация времени в реализации - 1 секунда. По сути, это означает, что скорость ограничена количеством транзакций в секунду (TPS), а не транзакциями в миллисекундах или любым другим меньшим значением часов.
В большинстве случаев это разумно, поскольку реализации на стороне сервера всегда ограничивают количество транзакций в секунду.
Реализация также заботится о скачкообразном трафике. Например, если ограничение скорости составляет 100 транзакций в секунду, а 100 запросов поступают в первые 10 миллисекунд, то они будут регулироваться на границах в 1 секунду.
Следовательно, критерии оценки для ограничения скорости задаются с точностью до миллисекунд, но обеспечение соблюдения находится на вторых границах.
У нас задействованы следующие исходные файлы:
package com.demo; import java.util.concurrent.TimeUnit; /** * * @author aayush * This is a simple test for a rate limiter. * We can build the RateLimitExecutor object and set the following * attributes: * a) instance_id — which acts as a key to uniquely identify the rate * limiting policy * b) threshold — the Transactions per second to be controlled * (throttle limit) * Based on this information, provision the rate limiter in a * container class — Rate Limit Manager * This class then pumps loads through iterations, and sleep time to * control traffic ingestion. */ public class RateLimiterTest implements RateLimitListener { // Number of iterations to be executed (simulate load generation) private static long iterations = 1000; // Rate Limit to be sent in Transactions per Second. // Inspect the TPS value in the prints and play around with this value. Increase this value to avoid throttling private static long threshold_to_be_enforced = 100; // Sleep time between pumping traffic in milliseconds. Set to zero for uncontrolled traffic ingestion. // Give a sleep time of 100 ms for example to pump traffic every 100 milliseconds and control the TPS private static long sleepTime = 0; public static void main (String… args) throws InterruptedException { new RateLimiterTest().test(); } private void test() throws InterruptedException { // Setup and provision the rate limit // User Defined Rate Limit Instance Id. Eg: HTTP Interface String instance_id = “HTTP Interface”; // Create an instance of Rate Limit Executor RateLimitExecutor rateLimiter = new RateLimitExecutor(); // Set the thresholds / second. rateLimiter.build(TimeUnit.SECONDS, threshold_to_be_enforced); // Associate instance ID rateLimiter.setInstance_id(instance_id); // Provision Rate Limit RateLimitManager._instance.provisionRateLimit(rateLimiter, instance_id, new RateLimitThrottleListener()); // Start the test for (int i=0;i<iterations;i++) { RateLimitManager._instance.pegTraffic(instance_id); Thread.sleep(sleepTime); } try { // Wait for graceful termination of worker thread from the pool RateLimitManager._instance.getThreadPool().awaitTermination(2, TimeUnit.SECONDS); RateLimitManager._instance.deProvisionRateLimit(“HTTP Interface”); } catch (InterruptedException e) { System.exit(0); } System.out.println(“\n Test Ended \n”); System.exit(1); } public void rateLimitThresholdBreached() { System.out.println(“Rate Limit has been breached for: “); } public void rateLimitThresholdNormal() { System.out.println(“Rate Limit is under control for: “); } }
Класс Singleton диспетчера ограничений скорости:
package com.demo; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * * @author aayush * This is a thread safe static singleton class which maintains all * the rate limiting policies in the form of a concurrent hash map. * The key of the map is the instance id while the value is the Rate * Limit Executor object. * Based on the key supplied the appropriate Rate Limiter is invoked. */ public final class RateLimitManager { // Static singleton class which is thread safe from multiple instantiation race conditions in traiditonal sigletons public static final RateLimitManager _instance = new RateLimitManager(); // Container for keeping track of all provisioned rate limits. private ConcurrentHashMap <String, RateLimitExecutor> rateLimitMap = new ConcurrentHashMap<String, RateLimitExecutor>(); // Thread pool initialization private ExecutorService threadPool = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); // To provision a new Rate Limit Policy and executor. Callback interface to receive notifications public void provisionRateLimit(RateLimitExecutor builder, String instance_id, RateLimitListener listener) { builder.setListener(listener); rateLimitMap.put(instance_id, builder); } // To remove a rate limit policy public void deProvisionRateLimit (String instance_id) { try { rateLimitMap.remove(instance_id); } catch (Exception e) { e.printStackTrace(); } } // API Call to peg traffic and evaluate the rate limit provisioned public void pegTraffic(String instance_id) { rateLimitMap.get(instance_id).evalute(); } public ExecutorService getThreadPool() { return threadPool; } }
Затем у нас есть объект Rate Limit Executor, который оценивает и применяет ограничение скорости в приложении:
package com.demo; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; /** * * @author aayush * This class calculates the TPS and applies the throttling policy * It uses granular write locking for thread safety. */ public class RateLimitExecutor { // Defaults to seconds private TimeUnit timeUnit; // Current transactions counted private long transactions = 0L; // Transactions per second allowed private long threshold; // Calculated Transactions per second private long tps; // Throttle key private String instance_id; // Timestamp for evaluation private double timeStamp; // Thread Safety aids private ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock(); private WriteLock wLock = rwLock.writeLock(); // Callback handle private RateLimitListener listener; public RateLimitExecutor() { this.timeStamp = System.currentTimeMillis(); } public void setInstanceID(String instance_id) { this.instance_id = instance_id; } public void evalute() { System.out.println(“Starting Rate Limit evaluation\n” + “Threshold set is: “+threshold); ++transactions; wLock.lock(); // Take the current timestamp long currentTime = System.currentTimeMillis(); // Get the delta time elapsed double deltaTime = (currentTime — timeStamp); System.out.println(“Delta time elapsed: “+deltaTime); // Calculate transactions per second tps = (long) (transactions/deltaTime * 1000L); // Don’t print TPS on the very first hit as its misleading if(transactions != 1) System.out.println(“TPS is — “+ tps); // What is higher, TPS threshold or transactions per second? Exclude the very first transaction to avoid false positives if(tps >=threshold && transactions !=1) { System.out.println(“Rate limit has been breached, Transaction Number: “+transactions+ “ in delta time (milliseconds): “+ deltaTime + “ Threshold: “ +threshold); RateLimitManager._instance.getThreadPool().execute(new WorkerThread(listener)); } // Leave write lock wLock.unlock(); } public void build (TimeUnit time, long threshold) { this.timeUnit = time; this.threshold = threshold; } public void setListener (RateLimitListener listener) { this.listener = listener; } public TimeUnit getTimeUnit() { return timeUnit; } public Long getTransactions() { return transactions; } public long getThreshold() { return threshold; } public String getInstance_id() { return instance_id; } public void setInstance_id(String instance_id) { this.instance_id = instance_id; } }
У нас есть рабочий поток, который информирует приложение в случае нарушения ограничения скорости:
package com.demo; /** * * @author aayush * Worker thread to notify the application whenever the rate limit is * breached. * This thread is submitted to a thread pool asynchronously. */ public class WorkerThread implements Runnable { private RateLimitListener listener; public WorkerThread(RateLimitListener listener ) { this.listener = listener; } @Override public void run() { this.listener.rateLimitThresholdBreached(); } }
Наконец, имитация приложения, получающего обратный вызов, путем реализации интерфейса слушателя:
package com.demo; /** * * @author aayush * Interface to be implemented by the application to receive * notifications whenever the rate limit is breached. * Future behavior can be added to this interface. * */ public interface RateLimitListener { public void rateLimitThresholdBreached(); }
а также..
package com.demo; /** * @author aayush * Listener interface’s implementation to receive the threshold * breach notifications. Application may take appropriate action * after receiving this callback */ public class RateLimitThrottleListener implements RateLimitListener { public RateLimitThrottleListener() { } public void rateLimitThresholdBreached() { System.out.println(“Received threshold breach callback notification”); } }
Мы можем свободно расширить эту реализацию дополнительными функциями, такими как действия, ограничивающие скорость выполнения - отключение / включение ограничений, увеличение / уменьшение ограничений и т. Д.
Возможности улучшения безграничны и зависят от характера приложения.
Если вам понравилась эта статья, не стесняйтесь подключиться к LinkedIn