2ord
@2ord

Почему сообщения из очереди обрабатываются, не ожидая окончания их выполнения, несмотря на await?

Построил приложение-микросервис на TypeScript в целях изучения Node.js. Это обработчик сообщений очереди по AMQP.
Пробую разобраться почему сообщения продолжают извлекаться о обрабатываться, несмотря на await. Я ожидаю, что перед тем как извлекать следующее сообщение, сначала будет обработано текущее. Вместо этого вижу, что сообщения продолжают извлекаться. Что-то не так в функции Consume?
Код
export type OnMessageCallback = (msg: unknown) => Promise<void>;

///...

async Consume(queue: string, callback: OnMessageCallback): Promise<unknown> {
    logger.log('info', 'DeclareQueue: ', queue);
    const queueOk = await this.channel.assertQueue(queue, { durable: false });
    if (!queueOk) {
        return Promise.reject();
    }

    return this.channel
        .consume(
            queue,
            async (msg: lib.ConsumeMessage) => {
                if (msg === null) {
                    return;
                }

                const body = msg.content.toString();
                logger.log('info', ` [x] Received ${body}`);

                try {
                    await callback(body);
                } catch (error) {
                    logger.log('error', error);
                    return this.channel.nack(msg);
                }
                return this.channel.ack(msg);
            },
            { noAck: false },
        )
        .then(() => {
            logger.log('info', ' [*] Waiting for messages. To exit press CTRL+C');
        });
}

///...

// в главной функции
await mq.Consume('foo', async (body: string) => {
    // const obj = JSON.parse(body);
    // const event = obj as FooMessage;

    logger.log('info', 'Processing body:', body);
    await sleep(1000);
    logger.log('info', 'End processing body:', body);

    return Promise.resolve();
});
Код проекта


Добавлено позже:
Обновил код Consume, хотя проблема все та же.
async Consume(queue: string, callback: OnMessageCallback): Promise<unknown> {
		logger.log('info', 'DeclareQueue: ', queue);
		const queueOk = await this.channel.assertQueue(queue, { durable: false });
		if (!queueOk) {
			return Promise.reject();
		}

		logger.log('info', ' [*] Waiting for messages. To exit press CTRL+C');
		return this.channel.consume(
			queue,
			async (msg: lib.ConsumeMessage) => {
				if (msg === null) {
					return;
				}

				const body = msg.content.toString();
				logger.log('info', ` [x] Received ${body}`);

				try {
					await callback(body);
				} catch (error) {
					logger.log('error', error);
					return this.channel.nack(msg);
				}
				return this.channel.ack(msg);
			},
			{ noAck: false },
		);
	}
  • Вопрос задан
  • 185 просмотров
Решения вопроса 1
2ord
@2ord Автор вопроса
После создания канала необходимо указать
await this.channel.prefetch(1); // или другое кол-во, которое хотим обрабатывать одновременно.
Ответ написан
Комментировать
Пригласить эксперта
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Похожие вопросы