Всем привет. Пытаюсь осваивать очереди на node.js. Взялся за Bull.
Запускаю отдельно воркер . В единицу времени может работать 3 процесса.
const Queue = require('bull');
const tasksQueue = new Queue('my_queue', {redis, defaultJobOptions, settings, limiter});
tasksQueue.process('my_process', 3, `${__dirname}/processor.js`);
tasksQueue.on('active', function (job, result) {
console.log(`Start ${job.data.interval}`);
}
);
tasksQueue.on('completed', function (job, result) {
console.log(`Job ${job.data.interval} completed! Result: ${result}`);
job.remove();
tasksQueue.getJobCounts().then((count) => {
if (!count.waiting && !count.active)
console.log('ЭНТО КОНЕЦ!'');
});
}
);
processor.js выглядит вот так . этих функций будет много (штук 100). Будут работать по заданию параллельно.
module.exports = function (job, done) {
return new Promise((resolve, reject) => {
setTimeout(function () {
resolve(job.data.interval);
}, job.data.interval);
}).then(response => done(null, response));
}
ну и добавляю новые таски -
const tasksQueue = new Queue('my_queue', {redis, defaultJobOptions, settings, limiter});
return Promise.all(
[tasksQueue.add('my_process', {interval: 7001}),
tasksQueue.add('my_process', {interval: 7001}),
tasksQueue.add('my_process', {interval: 7001}),
tasksQueue.add('my_process', {interval: 7001}),
tasksQueue.add('my_process', {interval: 7001})]
).then(() => {
process.exit();
});
лог выглядит следующим образом :
Start 7001
Start 7001
Start 7001
Job 7001 completed! Result: 7001
Job 7001 completed! Result: 7001
Job 7001 completed! Result: 7001
Start 7001
Start 7001
Job 7001 completed! Result: 7001
Job 7001 completed! Result: 7001
'ЭНТО КОНЕЦ!'
'ЭНТО КОНЕЦ!'
т.е я не могу однозначно отловить момент, когда все задачи выполнены. По идее 'ЭНТО КОНЕЦ!' должен выводиться единожды. Отсюда вопросы :
1) Подскажите, есть ли способ решить данную задачу - может есть встроенные инструменты для понимания, когда все отработало (и можно начинать пить пиво), а я изобретаю велосиПЭД ? (.on('drained' тоже не помогло).
2) При просмотре статов в процессе выполнения задач, я вижу, что waiting и active отражают реальное положение вещей, а вот completed все время 0 . Должен ли я вручную вызвать moveToCompleted для выполненной задачи?
В документации пока утонул (((.
Спасибо.