Задать вопрос
@ChickenGrinder

Как организовать паралельную и последовательную обработку задач по условиям?

Как организовать очередь (фоновые задачи) чтобы в любой момент времени, если есть совпадение по определенным параметрам, выполнялась только одна задача?

К примеру, у меня есть очередь `cars`, и какие-то фоновые задачи под эти сущности:
- cars 1: job data A
- cars 1: job data B
- cars 2: job data C
- cars 3: job data D

Не проблема если cars 1 и cars 2 будут обрабатываться паралельно (даже должны), но надо чтобы 'cars 1: job data A' и 'cars 1: job data B' выполнялись последовательно.
То есть, cars 1: job data X, должна быть добавлена в очередь, но начать выполняться после завершения cars 1: job data W (предыдущей cars 1)

При этом надо учесть что воркеров может быть несколько и могут быть изолированы (ничего не знать друг о друге).

Код на TS и использованием bullmq:
import { Worker, Job, Queue } from 'bullmq';
import { setTimeout } from 'node:timers/promises';

const queueName = 'cars';
const queue = new Queue(queueName);

await queue.obliterate({ force: true });
await queue.add('car 1', 'job data A');
await queue.add('car 1', 'job data B');
await queue.add('car 2', 'job data C');
await queue.add('car 3', 'job data D');

createWorker(1);
createWorker(2);

function createWorker(n: number) {
  const worker = new Worker(
    queueName,
    async (job: Job) => {
      console.log(
        `< started process for job ${job.id} ${job.data} in worker ${n}`,
      );
      await setTimeout(2000); // Emulate long processing
      console.log(
        `> ended process for job ${job.id} ${job.data} in worker ${n}`,
      );
    },
    {
      connection: { url: 'redis://localhost:6379' },
      concurrency: 1,
    },
  );

  worker.on('ready', () => {
    console.log(`worker ${n} ready`);
  });
  worker.on('active', job => {
    console.log(`job ${job.id} ${job.data}  active in worker ${n}`);
  });
  worker.on('completed', job => {
    console.log(`job with id ${job.id} ${job.data} has been completed`);
  });
  worker.on('drained', () => {
    console.log(`worker ${n} drained`);
  });
}


Что должно произойти:
1. началась обработка A
2. car 1 B добавлена в очередь и отложена, т.к. задача car 1 (А) в обработке
3. нет задач для car 2 С, началась обработка
4. car 3 D добавлена в очередь и отложена, т.к. достигнут лимит обработчиков
5. закончилась обработка car 1 (А), началась обработка car 1 B
6. закончилась обработка car 2 С, началась обработка car 3 D

Что я пробовал:
- использовать параметр `jobId`, если такой ид уже существует, то bullmq игнорирует эту задачу (не должен по моей задаче)
  • Вопрос задан
  • 119 просмотров
Подписаться 1 Средний 4 комментария
Пригласить эксперта
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Похожие вопросы