Задать вопрос
@ItIt

Почему не выполняются методы из очереди bull nest js?

Может быть дело в не правильном подключении?
Вот логи очереди:
OK
1710330289.313322 [0 127.0.0.1:60844] "eval" "--[[\n Adds a job to the queue by doing the following:\n - Increases the job counter if needed.\n - Creates a new job key with the job data.\n\n - if delayed:\n - computes timestamp.\n - adds to delayed zset.\n - Emits a global event 'delayed' if the job is delayed.\n - if not delayed\n - Adds the jobId to the wait/paused list in one of three ways:\n - LIFO\n - FIFO\n - prioritized.\n - Adds the job to the \"added\" list so that workers gets notified.\n\n Input:\n KEYS[1] 'wait',\n KEYS[2] 'paused'\n KEYS[3] 'meta-paused'\n KEYS[4] 'id'\n KEYS[5] 'delayed'\n KEYS[6] 'priority'\n\n ARGV[1] key prefix,\n ARGV[2] custom id (will not generate one automatically)\n ARGV[3] name\n ARGV[4] data (json stringified job data)\n ARGV[5] opts (json stringified job opts)\n ARGV[6] timestamp\n ARGV[7] delay\n ARGV[8] delayedTimestamp\n ARGV[9] priority\n ARGV[10] LIFO\n ARGV[11] token\n]]\nlocal jobId\nlocal jobIdKey\nlocal rcall = redis.call\n\nlocal jobCounter = rcall(\"INCR\", KEYS[4])\n\nif ARGV[2] == \"\" then\n jobId = jobCounter\n jobIdKey = ARGV[1] .. jobId\nelse\n jobId = ARGV[2]\n jobIdKey = ARGV[1] .. jobId\n if rcall(\"EXISTS\", jobIdKey) == 1 then\n return jobId .. \"\" -- convert to string\n end\nend\n\n-- Store the job.\nrcall(\"HMSET\", jobIdKey, \"name\", ARGV[3], \"data\", ARGV[4], \"opts\", ARGV[5], \"timestamp\", ARGV[6], \"delay\", ARGV[7], \"priority\", ARGV[9])\n\n-- Check if job is delayed\nlocal delayedTimestamp = tonumber(ARGV[8])\nif(delayedTimestamp ~= 0) then\n local timestamp = delayedTimestamp * 0x1000 + bit.band(jobCounter, 0xfff)\n rcall(\"ZADD\", KEYS[5], timestamp, jobId)\n rcall(\"PUBLISH\", KEYS[5], delayedTimestamp)\nelse\n local target\n\n -- Whe check for the meta-paused key to decide if we are paused or not\n -- (since an empty list and !EXISTS are not really the same)\n local paused\n if rcall(\"EXISTS\", KEYS[3]) ~= 1 then\n target = KEYS[1]\n paused = false\n else\n target = KEYS[2]\n paused = true\n end\n\n -- Standard or priority add\n local priority = tonumber(ARGV[9])\n if priority == 0 then\n -- LIFO or FIFO\n rcall(ARGV[10], target, jobId)\n else\n -- Priority add\n rcall(\"ZADD\", KEYS[6], priority, jobId)\n local count = rcall(\"ZCOUNT\", KEYS[6], 0, priority)\n\n local len = rcall(\"LLEN\", target)\n local id = rcall(\"LINDEX\", target, len - (count-1))\n if id then\n rcall(\"LINSERT\", target, \"BEFORE\", id, jobId)\n else\n rcall(\"RPUSH\", target, jobId)\n end\n\n end\n\n -- Emit waiting event (wait..ing@token)\n rcall(\"PUBLISH\", KEYS[1] .. \"ing@\" .. ARGV[11], jobId)\nend\n\nreturn jobId .. \"\" -- convert to string\n" "6" "bull:parsers:wait" "bull:parsers:paused" "bull:parsers:meta-paused" "bull:parsers:id" "bull:parsers:delayed" "bull:parsers:priority" "bull:parsers:" "" "__default__" "{\"project_id\":46,\"group_id\":null}" "{\"attempts\":1,\"delay\":0,\"timestamp\":1710330289308}" "1710330289308" "0" "0" "0" "LPUSH" "46b0491a-5484-43a2-9c7d-9b1be6c59d19"
1710330289.315169 [0 lua] "INCR" "bull:parsers:id"
1710330289.316294 [0 lua] "HMSET" "bull:parsers:41" "name" "__default__" "data" "{\"project_id\":46,\"group_id\":null}" "opts" "{\"attempts\":1,\"delay\":0,\"timestamp\":1710330289308}" "timestamp" "1710330289308" "delay" "0" "priority" "0"
1710330289.316377 [0 lua] "EXISTS" "bull:parsers:meta-paused"
1710330289.316402 [0 lua] "LPUSH" "bull:parsers:wait" "41"
1710330289.316437 [0 lua] "PUBLISH" "bull:parsers:waiting@46b0491a-5484-43a2-9c7d-9b1be6c59d19" "41"
Вот сам код:
есть процессор:
import {
  OnQueueActive,
  OnQueueCompleted,
  Process,
  Processor,
} from '@nestjs/bull';
import { Job } from 'bull';
import { Worker } from 'worker_threads';
import { PrismaService } from '@prisma/prisma.service';
import { LoggerService } from '../logger.service';

Processor('parsers');
export class ParserQueueProcess {
  constructor(
    private readonly prisma: PrismaService,
    private readonly loggerService: LoggerService,
  ) {}
  @Process()
  async runPageParser(job: Job) {
    console.log(job);
  }

  @OnQueueActive()
  onActive(job: Job) {
    console.log(
      `Processing job ${job.id} of type ${job.name} with data ${job.data}`,
    );
  }

  @OnQueueCompleted()
  onCompleted(job: Job) {
    console.log(`Job with ${job.id} completed...`);
  }
}

есть модуль в котором подключен процессор:
import { Module } from '@nestjs/common';
import { PageParserService } from './page-parser.service';
import { PageParserController } from './page-parser.controller';
// import { ParsersQueueModule } from '../parsers-queue/parsers-queue.module';
import { BullModule } from '@nestjs/bull';
import { ParserQueueProcess } from './parser-queue.process';

@Module({
  imports: [
    BullModule.registerQueue({
      name: 'parsers',
    }),
  ],
  controllers: [PageParserController],
  providers: [PageParserService, ParserQueueProcess],
  exports: [PageParserService],
})
export class PageParserModule {}

И есть сервис:
import { Injectable } from '@nestjs/common';
import { PrismaService } from '@prisma/prisma.service';
import { InjectQueue } from '@nestjs/bull';
import { Queue } from 'bull';
import { PageParserRun } from './interfaces';

@Injectable()
export class PageParserService {
  constructor(
    private readonly prisma: PrismaService,
    @InjectQueue('parsers') private readonly queue: Queue,
  ) {}
  async runParser(project_id: number, group_id: number | null) {
    const pageParserRun: PageParserRun = {
      project_id: project_id,
      group_id: group_id,
    };
    console.log('Добавлено в очередь');
    try {
      await this.queue.add(pageParserRun);
      console.log(this.queue.client.status);
      return this.queue.client.status;
      // await this.getData(project_id, null);
    } catch (e) {
      console.error(e);
    }
  }
  async getData(project_id: number, group_id: number | null) {
    return await this.prisma.pageParserData.findMany({
      where: {
        project_id: project_id,
      },
    });
  }
}
  • Вопрос задан
  • 198 просмотров
Подписаться 1 Простой 3 комментария
Решения вопроса 1
@ItIt Автор вопроса
Синтаксическая ошибка
Ответ написан
Комментировать
Пригласить эксперта
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Похожие вопросы