Может быть дело в не правильном подключении?
Вот логи очереди:
OK
1710330289.313322 [0 127.0.0.1:60844] "eval" "--[[\n Adds a job to the queue by doing the following:\n - Increases the job counter if needed.\n - Creates a new job key with the job data.\n\n - if delayed:\n - computes timestamp.\n - adds to delayed zset.\n - Emits a global event 'delayed' if the job is delayed.\n - if not delayed\n - Adds the jobId to the wait/paused list in one of three ways:\n - LIFO\n - FIFO\n - prioritized.\n - Adds the job to the \"added\" list so that workers gets notified.\n\n Input:\n KEYS[1] 'wait',\n KEYS[2] 'paused'\n KEYS[3] 'meta-paused'\n KEYS[4] 'id'\n KEYS[5] 'delayed'\n KEYS[6] 'priority'\n\n ARGV[1] key prefix,\n ARGV[2] custom id (will not generate one automatically)\n ARGV[3] name\n ARGV[4] data (json stringified job data)\n ARGV[5] opts (json stringified job opts)\n ARGV[6] timestamp\n ARGV[7] delay\n ARGV[8] delayedTimestamp\n ARGV[9] priority\n ARGV[10] LIFO\n ARGV[11] token\n]]\nlocal jobId\nlocal jobIdKey\nlocal rcall = redis.call\n\nlocal jobCounter = rcall(\"INCR\", KEYS[4])\n\nif ARGV[2] == \"\" then\n jobId = jobCounter\n jobIdKey = ARGV[1] .. jobId\nelse\n jobId = ARGV[2]\n jobIdKey = ARGV[1] .. jobId\n if rcall(\"EXISTS\", jobIdKey) == 1 then\n return jobId .. \"\" -- convert to string\n end\nend\n\n-- Store the job.\nrcall(\"HMSET\", jobIdKey, \"name\", ARGV[3], \"data\", ARGV[4], \"opts\", ARGV[5], \"timestamp\", ARGV[6], \"delay\", ARGV[7], \"priority\", ARGV[9])\n\n-- Check if job is delayed\nlocal delayedTimestamp = tonumber(ARGV[8])\nif(delayedTimestamp ~= 0) then\n local timestamp = delayedTimestamp * 0x1000 + bit.band(jobCounter, 0xfff)\n rcall(\"ZADD\", KEYS[5], timestamp, jobId)\n rcall(\"PUBLISH\", KEYS[5], delayedTimestamp)\nelse\n local target\n\n -- Whe check for the meta-paused key to decide if we are paused or not\n -- (since an empty list and !EXISTS are not really the same)\n local paused\n if rcall(\"EXISTS\", KEYS[3]) ~= 1 then\n target = KEYS[1]\n paused = false\n else\n target = KEYS[2]\n paused = true\n end\n\n -- Standard or priority add\n local priority = tonumber(ARGV[9])\n if priority == 0 then\n -- LIFO or FIFO\n rcall(ARGV[10], target, jobId)\n else\n -- Priority add\n rcall(\"ZADD\", KEYS[6], priority, jobId)\n local count = rcall(\"ZCOUNT\", KEYS[6], 0, priority)\n\n local len = rcall(\"LLEN\", target)\n local id = rcall(\"LINDEX\", target, len - (count-1))\n if id then\n rcall(\"LINSERT\", target, \"BEFORE\", id, jobId)\n else\n rcall(\"RPUSH\", target, jobId)\n end\n\n end\n\n -- Emit waiting event (wait..ing@token)\n rcall(\"PUBLISH\", KEYS[1] .. \"ing@\" .. ARGV[11], jobId)\nend\n\nreturn jobId .. \"\" -- convert to string\n" "6" "bull:parsers:wait" "bull:parsers:paused" "bull:parsers:meta-paused" "bull:parsers:id" "bull:parsers:delayed" "bull:parsers:priority" "bull:parsers:" "" "__default__" "{\"project_id\":46,\"group_id\":null}" "{\"attempts\":1,\"delay\":0,\"timestamp\":1710330289308}" "1710330289308" "0" "0" "0" "LPUSH" "46b0491a-5484-43a2-9c7d-9b1be6c59d19"
1710330289.315169 [0 lua] "INCR" "bull:parsers:id"
1710330289.316294 [0 lua] "HMSET" "bull:parsers:41" "name" "__default__" "data" "{\"project_id\":46,\"group_id\":null}" "opts" "{\"attempts\":1,\"delay\":0,\"timestamp\":1710330289308}" "timestamp" "1710330289308" "delay" "0" "priority" "0"
1710330289.316377 [0 lua] "EXISTS" "bull:parsers:meta-paused"
1710330289.316402 [0 lua] "LPUSH" "bull:parsers:wait" "41"
1710330289.316437 [0 lua] "PUBLISH" "bull:parsers:waiting@46b0491a-5484-43a2-9c7d-9b1be6c59d19" "41"
Вот сам код:
есть процессор:
import {
OnQueueActive,
OnQueueCompleted,
Process,
Processor,
} from '@nestjs/bull';
import { Job } from 'bull';
import { Worker } from 'worker_threads';
import { PrismaService } from '@prisma/prisma.service';
import { LoggerService } from '../logger.service';
Processor('parsers');
export class ParserQueueProcess {
constructor(
private readonly prisma: PrismaService,
private readonly loggerService: LoggerService,
) {}
@Process()
async runPageParser(job: Job) {
console.log(job);
}
@OnQueueActive()
onActive(job: Job) {
console.log(
`Processing job ${job.id} of type ${job.name} with data ${job.data}`,
);
}
@OnQueueCompleted()
onCompleted(job: Job) {
console.log(`Job with ${job.id} completed...`);
}
}
есть модуль в котором подключен процессор:
import { Module } from '@nestjs/common';
import { PageParserService } from './page-parser.service';
import { PageParserController } from './page-parser.controller';
// import { ParsersQueueModule } from '../parsers-queue/parsers-queue.module';
import { BullModule } from '@nestjs/bull';
import { ParserQueueProcess } from './parser-queue.process';
@Module({
imports: [
BullModule.registerQueue({
name: 'parsers',
}),
],
controllers: [PageParserController],
providers: [PageParserService, ParserQueueProcess],
exports: [PageParserService],
})
export class PageParserModule {}
И есть сервис:
import { Injectable } from '@nestjs/common';
import { PrismaService } from '@prisma/prisma.service';
import { InjectQueue } from '@nestjs/bull';
import { Queue } from 'bull';
import { PageParserRun } from './interfaces';
@Injectable()
export class PageParserService {
constructor(
private readonly prisma: PrismaService,
@InjectQueue('parsers') private readonly queue: Queue,
) {}
async runParser(project_id: number, group_id: number | null) {
const pageParserRun: PageParserRun = {
project_id: project_id,
group_id: group_id,
};
console.log('Добавлено в очередь');
try {
await this.queue.add(pageParserRun);
console.log(this.queue.client.status);
return this.queue.client.status;
// await this.getData(project_id, null);
} catch (e) {
console.error(e);
}
}
async getData(project_id: number, group_id: number | null) {
return await this.prisma.pageParserData.findMany({
where: {
project_id: project_id,
},
});
}
}