Bull queue: при сбое задания, как остановить обработку оставшихся заданий в очереди?

Я использую очередь bull для обработки некоторых заданий. В текущем сценарии каждое задание - это своего рода операция. Таким образом, всякий раз, когда операция (задание) из списка операций в очереди терпит неудачу, очередь должна прекратить обработку оставшихся заданий (операций).

Что я пробовал до сих пор?

Поэтому я попытался приостановить очередь, когда не удалось выполнить конкретное задание. Затем очередь возобновляется, когда она истощается. Теперь, когда она возобновляется, очередь не начинается с неудачного задания, а переходит к следующему заданию.

var Queue = require('bull');

let redisOptions = {
  redis: { port: 6379, host: '127.0.0.1' },
  limiter: { max: 1, duration: 1000 }
}
var myQueue = new Queue('Linear-Queue', redisOptions);

myQueue.process('Type-1', function (job, done) {
  setTimeout(() => {
    done(job.data.error);
  }, job.data.time);
});

let options = {
  attempts: 3,
  removeOnComplete: false, // removes job from queue on success
  removeOnFail: false // removes job from queue on failure
}

setTimeout(() => {
  myQueue.add('Type-1', { time: 10000, description: "Type-1 One", error: false }, options);
}, 1 * 1000);

setTimeout(() => {
  myQueue.add('Type-1', { time: 5000, description: "Type-1 two", error: true }, options);
}, 2 * 1000);

setTimeout(() => {
  myQueue.add('Type-1', { time: 3000, description: "Type-1 three", error: false }, options);
}, 3 * 1000);


myQueue.on('completed', function (job, result) {
  console.log("Completed: " + job.data.description);
});

myQueue.on('failed', async function (job, error) {
  console.log("Failed: " + job.data.description);
  try {
    await myQueue.pause();
  } catch (error) {
    console.log(error);
  }
});

myQueue.on('drained', async function () {
  try {
    await myQueue.resume();
  } catch (error) {
    console.log(error);
  }
});

Текущий выход:

Текущий результат

Ожидаемый результат: если Type-1 two успешно завершится с третьей попытки.

Completed: Type-1 One
Failed: Type-1 two
Failed: Type-1 two
Completed: Type-1 two
Completed: Type-1 three

Ожидаемый результат: если Type-1 two потерпел неудачу и при третьей попытке.

Completed: Type-1 One
Failed: Type-1 two
Failed: Type-1 two
Failed: Type-1 two

Все, что я хочу, это то, что очередь должна прекратить обработку новых заданий, пока текущее задание не будет выполнено без сбоев. В случае сбоя невыполненное задание должно выполняться какое-то x количество раз. При x+1 попытке он должен очистить (удалить все задания) очередь. Итак, как добиться этого линейного поведения в очереди.


person Dheemanth Bhat    schedule 07.05.2020    source источник


Ответы (1)


В bull Невозможно повторить одно и то же задание сразу после его сбоя, прежде чем взять следующее задание в очереди.

Решение:

  1. Создайте новое задание и установите для него значение, меньшее, чем у текущего типа задания.
  2. Освободить невыполненное задание (resolve() или done())
  3. Это новое задание будет немедленно принято bull для обработки.

Пример кода: в приведенном ниже коде Job-3 завершится ошибкой и создаст новое задание и так далее, пока цель задания не будет достигнута в какой-то момент времени.

var Queue = require('bull');

let redisOptions = {
  redis: { port: 6379, host: '127.0.0.1' }
}
var myQueue = new Queue('Linear-Queue', redisOptions);

myQueue.process('Type-1', function (job, done) {
  console.log(`Processing Job-${job.id} Attempt: ${job.attemptsMade}`);
  downloadFile(job, async function (error) {
    if (error) {
      await repeatSameJob(job, done);
    } else {
      done();
    }
  });
});

async function repeatSameJob(job, done) {
  let newJob = await myQueue.add('Type-1', job.data, { ...{ priority: 1 }, ...job.opts });
  console.log(`Job-${job.id} failed. Creating new Job-${newJob.id} with highest priority for same data.`);
  done(true);
}

function downloadFile(job, done) {
  setTimeout(async () => {
    done(job.data.error)
  }, job.data.time);
}

myQueue.on('completed', function (job, result) {
  console.log("Completed: Job-" + job.id);
});

myQueue.on('failed', async function (job, error) {
  console.log("Failed: Job-" + job.id);
});

let options = {
  removeOnComplete: true, // removes job from queue on success
  removeOnFail: true // removes job from queue on failure
}

for (let i = 1; i <= 5; i++) {
  let error = false;
  if (i == 3) { error = true; }

  setTimeout(i => {
    let jobData = {
      time: i * 2000,
      error: error,
      description: `Job-${i}`
    }
    myQueue.add('Type-1', jobData, options);
  }, i * 2000, i);
}

Выход:

Вывод

person Dheemanth Bhat    schedule 11.05.2020