Как организовать очередь (фоновые задачи) чтобы в любой момент времени, если есть совпадение по определенным параметрам, выполнялась только одна задача?
К примеру, у меня есть очередь `cars`, и какие-то фоновые задачи под эти сущности:
- cars 1: job data A
- cars 1: job data B
- cars 2: job data C
- cars 3: job data D
Не проблема если cars 1 и cars 2 будут обрабатываться паралельно (даже должны), но надо чтобы 'cars 1: job data A' и 'cars 1: job data B' выполнялись последовательно.
То есть, cars 1: job data X, должна быть добавлена в очередь, но начать выполняться после завершения cars 1: job data W (предыдущей cars 1)
При этом надо учесть что воркеров может быть несколько и могут быть изолированы (ничего не знать друг о друге).
Код на TS и использованием bullmq:
import { Worker, Job, Queue } from 'bullmq';
import { setTimeout } from 'node:timers/promises';
const queueName = 'cars';
const queue = new Queue(queueName);
await queue.obliterate({ force: true });
await queue.add('car 1', 'job data A');
await queue.add('car 1', 'job data B');
await queue.add('car 2', 'job data C');
await queue.add('car 3', 'job data D');
createWorker(1);
createWorker(2);
function createWorker(n: number) {
const worker = new Worker(
queueName,
async (job: Job) => {
console.log(
`< started process for job ${job.id} ${job.data} in worker ${n}`,
);
await setTimeout(2000); // Emulate long processing
console.log(
`> ended process for job ${job.id} ${job.data} in worker ${n}`,
);
},
{
connection: { url: 'redis://localhost:6379' },
concurrency: 1,
},
);
worker.on('ready', () => {
console.log(`worker ${n} ready`);
});
worker.on('active', job => {
console.log(`job ${job.id} ${job.data} active in worker ${n}`);
});
worker.on('completed', job => {
console.log(`job with id ${job.id} ${job.data} has been completed`);
});
worker.on('drained', () => {
console.log(`worker ${n} drained`);
});
}
Что должно произойти:
1. началась обработка A
2. car 1 B добавлена в очередь и отложена, т.к. задача car 1 (А) в обработке
3. нет задач для car 2 С, началась обработка
4. car 3 D добавлена в очередь и отложена, т.к. достигнут лимит обработчиков
5. закончилась обработка car 1 (А), началась обработка car 1 B
6. закончилась обработка car 2 С, началась обработка car 3 D
Что я пробовал:
- использовать параметр `jobId`, если такой ид уже существует, то bullmq игнорирует эту задачу (не должен по моей задаче)