Микросервисы для обработки заданий с использованием Bull

Я хочу обрабатывать запланированные задания с помощью node.js bull. В основном у меня есть два процессора, которые обрабатывают 2 типа заданий. Есть один конфигуратор, который настраивает задания, которые будут добавлены в бычью очередь с помощью cron.

Планировщик будет в одном микросервисе, а каждый процессор будет отдельным микросервисом. Так что у меня будет 3 микросервиса.

У меня вопрос: правильный ли паттерн я использую с быком?

index.js

const Queue = require('bull');

const fetchQueue = new Queue('MyScheduler');
fetchQueue.add("fetcher", {name: "earthQuakeAlert"}, {repeat: {cron: '1-59/2 * * * *'}, removeOnComplete: true});
fetchQueue.add("fetcher", {name: "weatherAlert"}, {repeat: {cron: '3-59/3 * * * *'}, removeOnComplete: true});

processor-configurator.js

const Queue=require('bull');

const scheduler = new Queue("MyScheduler");
scheduler.process("processor", __dirname + "/alert-processor");

fetcher-configurator.js

const Queue=require('bull');

const scheduler = new Queue("MyScheduler");
scheduler.process("fetcher", __dirname+"/fetcher");

fetcher.js

const Queue = require('bull');
const moment = require('moment');

module.exports = function (job) {
    const scheduler = new Queue('MyScheduler');
    console.log("Insider processor ", job.data, moment().format("YYYY-MM-DD hh:mm:ss"));
    scheduler.add('processor', {'name': 'Email needs to be sent'}, {removeOnComplete: true});
    return Promise.resolve()
};

alert-processor.js

const Queue = require('bull');
const moment = require('moment');

module.exports = function (job) {
    const scheduler = new Queue('MyScheduler');
    console.log("Insider processor ", job.data, moment().format("YYYY-MM-DD hh:mm:ss"));
    scheduler.add('processor', {'name': 'Email needs to be sent'}, {removeOnComplete: true});
    return Promise.resolve()
};

Будет три микросервиса -

  1. узел index.js
  2. узел fetcher-configurator.js
  3. узел процессор-configurator.js

Я вижу непоследовательное поведение быка. Иногда я получаю сообщение об ошибке Отсутствует обработчик процесса для типа задания


person Ganesh Kumar    schedule 22.09.2019    source источник


Ответы (2)


Цитирую себя в надежде, что это будет полезно для кого-то другого

Это потому, что оба воркера используют одну и ту же очередь. Рабочий пытается получить следующее задание из очереди, получает задание неправильного типа (например, «сборщик» вместо «процессор») и терпит неудачу, потому что он знает, как обрабатывать «процессор» и не знает, что делать с «сборщиком». Bull не позволяет брать из очереди только совместимые задания, оба воркера должны иметь возможность обрабатывать все типы заданий. Самым простым решением было бы использовать две разные очереди, одну для процессоров, а другую для сборщиков. Затем вы можете удалить имена из заданий и процессоров, это больше не понадобится, так как имя определяется очередью.

https://github.com/OptimalBits/bull/issues/1481

person Stanislav Mamontov    schedule 23.09.2019

Бык:

истечение срока-queue.js

import Queue from 'bull';
import { ExpirationCompletePublisher } from '../events/publishers/expiration-complete-publisher';
import { natsWrapper } from '../nats-wrapper';
interface Payload {
  orderId: string;
}

const expirationQueue = new Queue<Payload>('order:expiration', {
  redis: {
    host: process.env.REDIS_HOST, 
  },
});

expirationQueue.process(async (job) => {
  console.log('Expiries order id', job.data.orderId);
  new ExpirationCompletePublisher(natsWrapper.client).publish({
    orderId: job.data.orderId,
  });
});

export { expirationQueue };

PromotionEndQueue.js

import Queue from 'bull';
import { PromotionEndedPublisher } from '../events/publishers/promotion-ended-publisher';
import { natsWrapper } from '../nats-wrapper';
interface Payload {
  promotionId: string;
}

const promotionEndQueue = new Queue<Payload>('promotions:end', {
  redis: {
    host: process.env.REDIS_HOST, // look at expiration-depl.yaml
  },
});

promotionEndQueue.process(async (job) => {
  console.log('Expiries promotion id', job.data.promotionId);
  new PromotionEndedPublisher(natsWrapper.client).publish({
    promotionId: job.data.promotionId,
  });
});

export { promotionEndQueue };

заказ-созданный-listener.js

import { Listener, OrderCreatedEvent, Subjects } from '@your-lib/common';
import { queueGroupName } from './queue-group-name';
import { Message } from 'node-nats-streaming';
import { expirationQueue } from '../../queues/expiration-queue';
export class OrderCreatedListener extends Listener<OrderCreatedEvent> {
  subject: Subjects.OrderCreated = Subjects.OrderCreated;
  queueGroupName = queueGroupName;

  async onMessage(data: OrderCreatedEvent['data'], msg: Message) {
    // delay = expiredTime - currentTime
    const delay = new Date(data.expiresAt).getTime() - new Date().getTime();
    // console.log("delay", delay)
    await expirationQueue.add(
      {
        orderId: data.id,
      },
      {
        delay,
      }
    );

    msg.ack();
  }
}

продвижение-начало-listener.js

import {
  Listener,
  PromotionStartedEvent,
  Subjects,
} from '@your-lib/common';
import { queueGroupName } from './queue-group-name';
import { Message } from 'node-nats-streaming';
import { promotionEndQueue } from '../../queues/promotions-end-queue';
export class PromotionStartedListener extends Listener<PromotionStartedEvent> {
  subject: Subjects.PromotionStarted = Subjects.PromotionStarted;
  queueGroupName = queueGroupName;

  async onMessage(data: PromotionStartedEvent['data'], msg: Message) {
    // delay = expiredTime - currentTime
    const delay = new Date(data.endTime).getTime() - new Date().getTime();

    // console.log("delay", delay)
    await promotionEndQueue.add(
      {
        promotionId: data.id,
      },
      {
        delay,
      }
    );

    msg.ack();
  }
}
person Lord    schedule 08.07.2021