Redis стал важным компонентом в области распределенных приложений микросервисов благодаря своей простоте, производительности и чистому набору функций. Эта статья посвящена созданию системы планирования задач с использованием некоторых функций Redis. Требование выбрано, как показано ниже, для определения границ и способа объяснения.

Требование:

  • Система должна быть масштабируемой и распределенной.
  • Несколько рабочих процессов
  • Задачи могут быть в реальном времени или отложенными
  • Потребление ресурса должно быть минимальным при простое.

Дизайн

Система состоит из четырех компонентов

  • API задач: производитель задач
  • REDIS: распределенная структура данных с поддержкой SORTED SET и FIFO LIST.
  • Планировщик заданий: читайте задания и помещайте их в СПИСОК в точное время
  • Рабочий: Потребитель задачи

Шаги:

  1. API задач вычисляет оценку и добавляет элементы задачи в ZSET с оценкой
  2. Планировщик заданий продолжает проверять первый элемент набора.
  3. Предмет будет помещен в рабочую очередь, если счет соответствует текущему времени.
  4. Рабочий задачи остается в очереди блокировки и выполняется, как только элемент добавляется в очередь.

API задач

API задач - это обработчик POST restify, который принимает задачу и отправляет ее в Redis на основе свойства когда. Каждый элемент, отправляемый в ZSET, будет представлять собой массив в кодировке JSON с тремя элементами (идентификатор, имя рабочей очереди и сведения о задаче). В ZSET каждый член связан со счетом, который сортирует набор в порядке возрастания. Свойство когда преобразуется в Эпоху и назначается в качестве оценки.

async SaveTask(req, res, next) {
        let queue = req.params.queue;
        let id = uuidV4();
        let time = req.body.when;
        let item = [id, queue, req.body.task];
        let score = 0;
        let status = 'PENDING';
        if (time) {
            score = new Date(time).getTime()
            console.log(`Push task ${id} to process later on ${time} `);
            await redisHandler.AddToSortedSet(JSON.stringify(item), score);
            status = 'SCHEDULED';
} else {
            console.log(`Push task ${id} to execute in relatime`);
            await redisHandler.AddItemToQueue(queue, JSON.stringify(item));
            status = 'QUEUED';
        }
res.send({ item: item, score: score, status: status });
}

Сегмент кода ioredis

async AddToSortedSet(item, value) {
        return new Promise((resolve, reject) => {
            this.redisClient.zadd(`ZSET:SCHEDULED:ITEMS`, value, item, (err, result) => {
                if (err) {
                    reject(err);
                } else {
                    resolve(result);
                }
            })
        })
    }

Планировщик заданий

Планировщик задач получает блокировку, а затем обрабатывает критическую секцию. Он считывает первый элемент из ZSET и сравнивает «когда» с текущим временем. Выбранная задача удаляется из ZSET и помещается в соответствующую очередь для обработки.

async ProcessEvents() {
  while (true) {
    try {
      await lock.acquire(`LOCK:ZSET:ITEMS`);
      try {
        let value = await redisHandler.GetTopFromSortedSet();
        if (value && value.length > 0) {
          let item = JSON.parse(value[0]);
          let time = value[1];
          if (parseFloat(time) <= Date.now()) {
            console.log(`Scheduled item ${item[0]} ready to process`);
            await redisHandler.DeleteItemFromSet(value[0]);
            await redisHandler.AddItemToQueue(item[1], value[0]);
          }
        }else {
          //No items to work process wait 1s before check again
          await lock.release();
          await new Promise(r => setTimeout(r, 1000));
          continue;
        }
      } catch (ex) {
        console.log(`Error on event processing ${ex.message}`);
      }
      await lock.release();
    } catch (ex) {
      // error in acquiring or releasing the lock
    }
    await new Promise(r => setTimeout(r, 100));
  }
}

Сегмент кода ioredis

async GetTopFromSortedSet() {
        return new Promise((resolve, reject) => {
this.redisClient.zrange(`ZSET:SCHEDULED:ITEMS`, 0, 0, "WITHSCORES", (err, result) => {
                if (err) {
                    reject(err);
                } else {
                    resolve(result);
                }
            })
        })
    }
async DeleteItemFromSet(item) {
        return new Promise((resolve, reject) => {
this.redisClient.zrem(`ZSET:SCHEDULED:ITEMS`, item, (err, result) => {
                if (err) {
                    reject(err);
                } else {
                    resolve(result);
                }
            })
        })
    }
async AddItemToQueue(key, item) {
        return new Promise((resolve, reject) => {
this.redisClient.rpush(`QUEUE:${key}`, item, (err, result) => {
                if (err) {
                    reject(err);
                } else {
                    resolve(result);
                }
            })
        })
    }

Процессор задач

Процессор задач имитирует поведение FIFO, используя метод блокировки левого щелчка (blpop). Метод освобождает процесс, как только элемент становится доступным в списке. Он требует специального клиента Redis для блокировки операций, так как он также блокирует другие операции Redis.

async ProcessTasks(queue) {
while (true) {
let taskArray = await redisHandler.GetItemFromQueueBlocking(queue);
            if (taskArray && taskArray.length > 1) {
                let task = JSON.parse(taskArray[1]);
                if (task && Array.isArray(task) && task.length > 2) {
                    let item = task[2];
                    console.log(`Task ${task[0]} ready to process - action: ${item.action} params: ${item.time}`);
                    await new Promise(r => setTimeout(r, item.time));
                }
            }
        }
}

Сегмент кода ioredis

async GetItemFromQueueBlocking(key) {
        return new Promise((resolve, reject) => {
this.redisClient.blpop(`QUEUE:${key}`, 5000, (err, result) => {
                if (err) {
                    reject(err);
                } else {
                    resolve(result);
                }
            })
        })
    }

Обсуждение

Redis - отличный инструмент для реализации различных распределенных шаблонов в области микросервисов. Для реализации этих шаблонов доступно множество структур данных. Некоторые из них - это распределенные блокировки, публикация и подписка, атомарный счетчик для информационных панелей, системы автозаполнения с использованием поиска. Источник можно найти на git.