Используйте отдельный процессор NodeJS Bull Queue

Я пытаюсь запустить process в другом processor файле itsef, как указано в bull документации, я добавил файл, как показано ниже.

// -------- Queue.js ----------

formatQueue.process(__dirname + "/processors/format-worker.js");


// On Cmplete Handler

formatQueue.on('completed', function(job, result){
    console.log(result, "Format-Complete-job"); // result is undefined
    if(job.data.is_well_format){
        existenceQueue.add(job.data, { attempts: 3, backoff: 1000 });
    }else QueueModel.lastStep(job.data)
});

// -------- Queue.js ends ---------

//format-worker.js
Validator = require("../../validators");
module.exports = (job) => {
    Validator.Format.validate(job.data, (data) => {
        job.data = data;
        return Promise.resolve(data);
    });
}

Теперь, когда работа завершена, которую я использовал раньше, я получал данные о работе с обновленными параметрами. Сейчас я не получаю обновленные данные о вакансиях. и второй параметр, который есть в документации, т.е. result это undefined. Теперь, как мне получить обновленные данные о вакансиях в этом случае.

job и processors работают нормально, я запускаю process, как показано ниже.

formatQueue.process(function(job, done){
    Validator.Format.validate(job.data, (data) => {
        job.data = data;
        done();
    });
});

В этом случае обновляются сами данные о задании, и это тоже работает.

У меня есть несколько очередей, выполняющих разные задания. После успешного завершения предыдущей работы мне нужна только другая работа, чтобы начать работу. У меня есть еще один вопрос, и я упомянул там вариант использования. проверьте это здесь.


person rakcode    schedule 13.05.2020    source источник
comment
Недавно я столкнулся с подобной проблемой. Вы пытались добавить done к функции внешнего процессора и запустить ее вместо того, чтобы возвращать Promise?   -  person Technoh    schedule 02.10.2020


Ответы (1)


Позвольте мне помочь вам и дать несколько советов. Прежде всего, в вашем коде есть ошибки. Функция процесса не возвращает обещание, созданное вами в обратном вызове проверки. Я не знаю, что возвращает Validator.Format.validate, поэтому на всякий случай напишу это так:

module.exports = job => {
  return new Promise(resolve => {
    Validator.Format.validate(job.data, data => {
      resolve(data);
    });
  });
};

Во-вторых, в целом более надежно добавить следующее задание внутри самого обработчика процесса, а не в обратном вызове завершенного события, причина в том, что, сделав это, вы получите гораздо более надежное решение в случае, если добавление следующего задания не удастся. по какой-то причине задание завершится ошибкой, и вы сможете повторить попытку или выяснить, почему оно не выполнено, и так далее.

module.exports = (job) => {
  return new Promise(resolve => {
    Validator.Format.validate(job.data, (data) => {
      if(data.is_well_format){
        resolve(existenceQueue.add(data, { attempts: 3, backoff: 1000 }));
      }else {
        resolve(QueueModel.lastStep(data))
      }
    });
  }
});
person Manuel Astudillo    schedule 16.07.2021