Построил приложение-микросервис на TypeScript в целях изучения Node.js. Это обработчик сообщений очереди по AMQP.
Пробую разобраться почему сообщения продолжают извлекаться о обрабатываться, несмотря на await. Я ожидаю, что перед тем как извлекать следующее сообщение, сначала будет обработано текущее. Вместо этого вижу, что сообщения продолжают извлекаться. Что-то не так в функции Consume?
Кодexport type OnMessageCallback = (msg: unknown) => Promise<void>;
///...
async Consume(queue: string, callback: OnMessageCallback): Promise<unknown> {
logger.log('info', 'DeclareQueue: ', queue);
const queueOk = await this.channel.assertQueue(queue, { durable: false });
if (!queueOk) {
return Promise.reject();
}
return this.channel
.consume(
queue,
async (msg: lib.ConsumeMessage) => {
if (msg === null) {
return;
}
const body = msg.content.toString();
logger.log('info', ` [x] Received ${body}`);
try {
await callback(body);
} catch (error) {
logger.log('error', error);
return this.channel.nack(msg);
}
return this.channel.ack(msg);
},
{ noAck: false },
)
.then(() => {
logger.log('info', ' [*] Waiting for messages. To exit press CTRL+C');
});
}
///...
// в главной функции
await mq.Consume('foo', async (body: string) => {
// const obj = JSON.parse(body);
// const event = obj as FooMessage;
logger.log('info', 'Processing body:', body);
await sleep(1000);
logger.log('info', 'End processing body:', body);
return Promise.resolve();
});
Код проекта
Добавлено позже:
Обновил код Consume, хотя проблема все та же.
async Consume(queue: string, callback: OnMessageCallback): Promise<unknown> {
logger.log('info', 'DeclareQueue: ', queue);
const queueOk = await this.channel.assertQueue(queue, { durable: false });
if (!queueOk) {
return Promise.reject();
}
logger.log('info', ' [*] Waiting for messages. To exit press CTRL+C');
return this.channel.consume(
queue,
async (msg: lib.ConsumeMessage) => {
if (msg === null) {
return;
}
const body = msg.content.toString();
logger.log('info', ` [x] Received ${body}`);
try {
await callback(body);
} catch (error) {
logger.log('error', error);
return this.channel.nack(msg);
}
return this.channel.ack(msg);
},
{ noAck: false },
);
}