coderisimo
@coderisimo
Уважайте тех ,кто вам помогает , ёклмн! )

Как лучше организовать работу с очередями на Bull, как отследить окончание обработки текущей очереди?

Всем привет. Пытаюсь осваивать очереди на node.js. Взялся за Bull.
Запускаю отдельно воркер . В единицу времени может работать 3 процесса.

const Queue = require('bull');
const tasksQueue = new Queue('my_queue', {redis, defaultJobOptions, settings, limiter});
tasksQueue.process('my_process', 3, `${__dirname}/processor.js`);
tasksQueue.on('active', function (job, result) {
        console.log(`Start ${job.data.interval}`);
    }
);

tasksQueue.on('completed', function (job, result) {
        console.log(`Job ${job.data.interval} completed! Result: ${result}`);
        job.remove();
        tasksQueue.getJobCounts().then((count) => {
            if (!count.waiting && !count.active)
                console.log('ЭНТО КОНЕЦ!'');
        });
    }
);

processor.js выглядит вот так . этих функций будет много (штук 100). Будут работать по заданию параллельно.
module.exports =  function (job, done) {
    return new Promise((resolve, reject) => {
        setTimeout(function () {
            resolve(job.data.interval);
        }, job.data.interval);
    }).then(response => done(null, response));
}


ну и добавляю новые таски -
const tasksQueue = new Queue('my_queue', {redis, defaultJobOptions, settings, limiter});
return Promise.all(
    [tasksQueue.add('my_process', {interval: 7001}),
        tasksQueue.add('my_process', {interval: 7001}),
        tasksQueue.add('my_process', {interval: 7001}),
        tasksQueue.add('my_process', {interval: 7001}),
        tasksQueue.add('my_process', {interval: 7001})]
).then(() => {
    process.exit();
});

лог выглядит следующим образом :
Start 7001
Start 7001
Start 7001
Job 7001 completed! Result: 7001
Job 7001 completed! Result: 7001
Job 7001 completed! Result: 7001
Start 7001
Start 7001
Job 7001 completed! Result: 7001
Job 7001 completed! Result: 7001
'ЭНТО КОНЕЦ!'
'ЭНТО КОНЕЦ!'


т.е я не могу однозначно отловить момент, когда все задачи выполнены. По идее 'ЭНТО КОНЕЦ!' должен выводиться единожды. Отсюда вопросы :
1) Подскажите, есть ли способ решить данную задачу - может есть встроенные инструменты для понимания, когда все отработало (и можно начинать пить пиво), а я изобретаю велосиПЭД ? (.on('drained' тоже не помогло).
2) При просмотре статов в процессе выполнения задач, я вижу, что waiting и active отражают реальное положение вещей, а вот completed все время 0 . Должен ли я вручную вызвать moveToCompleted для выполненной задачи?

В документации пока утонул (((.
Спасибо.
  • Вопрос задан
  • 50 просмотров
Решения вопроса 1
coderisimo
@coderisimo Автор вопроса
Уважайте тех ,кто вам помогает , ёклмн! )
1) вот это работает :
let resultsReturned = false;

    let finishJobs = function (tasksQueue) {
        tasksQueue.getJobCounts().then((count) => {
            if (!count.waiting && !count.active) {
                if (!resultsReturned) {
                    resultsReturned = true;
                    results.forEach(i => console.table(i));
                    setTimeout(() => {
                        resultsReturned = false;
                        results = [];
                    }, 500);
                }
            }
        });
    };




  tasksQueue.on('completed', function (job, result) {
            results.push(result);
            job.remove();
            finishJobs(tasksQueue);
        }
    );
Ответ написан
Пригласить эксперта
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Войти через центр авторизации
Похожие вопросы