Redis стал важным компонентом в области распределенных приложений микросервисов благодаря своей простоте, производительности и чистому набору функций. Эта статья посвящена созданию системы планирования задач с использованием некоторых функций Redis. Требование выбрано, как показано ниже, для определения границ и способа объяснения.
Требование:
- Система должна быть масштабируемой и распределенной.
- Несколько рабочих процессов
- Задачи могут быть в реальном времени или отложенными
- Потребление ресурса должно быть минимальным при простое.
Дизайн
Система состоит из четырех компонентов
- API задач: производитель задач
- REDIS: распределенная структура данных с поддержкой SORTED SET и FIFO LIST.
- Планировщик заданий: читайте задания и помещайте их в СПИСОК в точное время
- Рабочий: Потребитель задачи
Шаги:
- API задач вычисляет оценку и добавляет элементы задачи в ZSET с оценкой
- Планировщик заданий продолжает проверять первый элемент набора.
- Предмет будет помещен в рабочую очередь, если счет соответствует текущему времени.
- Рабочий задачи остается в очереди блокировки и выполняется, как только элемент добавляется в очередь.
API задач
API задач - это обработчик POST restify, который принимает задачу и отправляет ее в Redis на основе свойства когда. Каждый элемент, отправляемый в ZSET, будет представлять собой массив в кодировке JSON с тремя элементами (идентификатор, имя рабочей очереди и сведения о задаче). В ZSET каждый член связан со счетом, который сортирует набор в порядке возрастания. Свойство когда преобразуется в Эпоху и назначается в качестве оценки.
async SaveTask(req, res, next) { let queue = req.params.queue; let id = uuidV4(); let time = req.body.when; let item = [id, queue, req.body.task]; let score = 0; let status = 'PENDING'; if (time) { score = new Date(time).getTime() console.log(`Push task ${id} to process later on ${time} `); await redisHandler.AddToSortedSet(JSON.stringify(item), score); status = 'SCHEDULED'; } else { console.log(`Push task ${id} to execute in relatime`); await redisHandler.AddItemToQueue(queue, JSON.stringify(item)); status = 'QUEUED'; } res.send({ item: item, score: score, status: status }); }
Сегмент кода ioredis
async AddToSortedSet(item, value) { return new Promise((resolve, reject) => { this.redisClient.zadd(`ZSET:SCHEDULED:ITEMS`, value, item, (err, result) => { if (err) { reject(err); } else { resolve(result); } }) }) }
Планировщик заданий
Планировщик задач получает блокировку, а затем обрабатывает критическую секцию. Он считывает первый элемент из ZSET и сравнивает «когда» с текущим временем. Выбранная задача удаляется из ZSET и помещается в соответствующую очередь для обработки.
async ProcessEvents() { while (true) { try { await lock.acquire(`LOCK:ZSET:ITEMS`); try { let value = await redisHandler.GetTopFromSortedSet(); if (value && value.length > 0) { let item = JSON.parse(value[0]); let time = value[1]; if (parseFloat(time) <= Date.now()) { console.log(`Scheduled item ${item[0]} ready to process`); await redisHandler.DeleteItemFromSet(value[0]); await redisHandler.AddItemToQueue(item[1], value[0]); } }else { //No items to work process wait 1s before check again await lock.release(); await new Promise(r => setTimeout(r, 1000)); continue; } } catch (ex) { console.log(`Error on event processing ${ex.message}`); } await lock.release(); } catch (ex) { // error in acquiring or releasing the lock } await new Promise(r => setTimeout(r, 100)); } }
Сегмент кода ioredis
async GetTopFromSortedSet() { return new Promise((resolve, reject) => { this.redisClient.zrange(`ZSET:SCHEDULED:ITEMS`, 0, 0, "WITHSCORES", (err, result) => { if (err) { reject(err); } else { resolve(result); } }) }) } async DeleteItemFromSet(item) { return new Promise((resolve, reject) => { this.redisClient.zrem(`ZSET:SCHEDULED:ITEMS`, item, (err, result) => { if (err) { reject(err); } else { resolve(result); } }) }) } async AddItemToQueue(key, item) { return new Promise((resolve, reject) => { this.redisClient.rpush(`QUEUE:${key}`, item, (err, result) => { if (err) { reject(err); } else { resolve(result); } }) }) }
Процессор задач
Процессор задач имитирует поведение FIFO, используя метод блокировки левого щелчка (blpop). Метод освобождает процесс, как только элемент становится доступным в списке. Он требует специального клиента Redis для блокировки операций, так как он также блокирует другие операции Redis.
async ProcessTasks(queue) { while (true) { let taskArray = await redisHandler.GetItemFromQueueBlocking(queue); if (taskArray && taskArray.length > 1) { let task = JSON.parse(taskArray[1]); if (task && Array.isArray(task) && task.length > 2) { let item = task[2]; console.log(`Task ${task[0]} ready to process - action: ${item.action} params: ${item.time}`); await new Promise(r => setTimeout(r, item.time)); } } } }
Сегмент кода ioredis
async GetItemFromQueueBlocking(key) { return new Promise((resolve, reject) => { this.redisClient.blpop(`QUEUE:${key}`, 5000, (err, result) => { if (err) { reject(err); } else { resolve(result); } }) }) }
Обсуждение
Redis - отличный инструмент для реализации различных распределенных шаблонов в области микросервисов. Для реализации этих шаблонов доступно множество структур данных. Некоторые из них - это распределенные блокировки, публикация и подписка, атомарный счетчик для информационных панелей, системы автозаполнения с использованием поиска. Источник можно найти на git.