Как задать консьюмеру IBMMQ количество вычитываемых сообщений?
Собственно, сабж: как настроить консьюмер IBMMQ так, чтобы он забирал из очереди не все сообщения - без ограничений - которые ему хочет выдать сервер (менеджер), а как-то дозированно: по 1, 2, N и только после обработки уже вычитанных вычитывал следующие?
Я знаю, как это устроено в RabbitMQ - там можно задавать режим ACK - автоматически подтверждать принятое сообщение (по приему) или вручную (по номеру/индексу), ну и кол-во сообещний, которое может одновременно обрабатывать 1 консьюмер.
Как-то так:
_consumer.Model.BasicQos(0, (ushort) _settings.MessageCount, true);
_consumer.Model.BasicConsume(_queueName, _settings.AutoConfirm, consumer: _consumer, consumerTag: this.Instance.Id);
- prefetchCount - кол-во сообщений (_settings.MessageCount)
- autoAck - автоподтвержение _settings.AutoConfirm
Как эти же параметры задать при работе с IBMMQ?
Клиент на .net 6.0, библиотеки IBM.XMS (amqmxmsstd.dll)
как настроить консьюмер IBMMQ так, чтобы он забирал из очереди не все сообщения - без ограничений - которые ему хочет выдать сервер (менеджер), а как-то дозированно: по 1, 2, N и только после обработки уже вычитанных вычитывал следующие?
Зачем?
Используй СУБД, считывая записи с LIMIT, OFFSET
sheich,
тогда консьюмером можно читать по одному сообщению и писать во временный буфер (СУБД, k/v).
Когда сообщение получено и закинуто в буфер - ACK. И по достижению нужного размера (проверяя буфер) что-то делать или просто отправлять в другую очередь.
Everything_is_not_so_bad, я именно про это и спрашиваю: как сказать консьюмеру ibmmq, чтобы он читал только по 1-му сообщению и не читал следующее, пока я не скажу ACK предыдущему?
я именно про это и спрашиваю: как сказать консьюмеру ibmmq, чтобы он читал только по 1-му сообщению и не читал следующее, пока я не скажу ACK предыдущему?
Задача консьюмера - бежать, выполняя задача одну за другой, пока имеются. Они не могут "остановиться", подумать о чем-то или что-то другое.
Чтобы было в нужном темпе, нужно задействовать некоторый буфер, который будет использоваться для накопления задач. Допустим, надо делать батч по 5 задач. И приходит 23 - первый консьюмер кладет их всех в буфер. В то же время другой обработчик (независимый процесс от консьюмеров ibmmq) будет смотреть на буфер в цикле и брать только до 5 задач. Буфер будет увеличиваться и уменьшаться от нуля до того кол-ва задач, которое обработчик, смотрящий на буфер, способен выгребать.
В общем, надеюсь, что уловил идею.
Everything_is_not_so_bad, этот подход понятен и я что-то похожее уже реализовал (через семафоры, которые создают узкое место в процессе обработки - ожидание). Недостаток в том, что если мой сервис упал и успел перед этим набрать к себе в буфер (память) N сообщений из очереди, то все.. в очереди (на сервере) их больше нет, они безвозвратно пропали. Хранить их прямо в БД (чтобы не пропадали при падении) - слишком медленно. Мы как раз хотим избежать лишних обращений к БД.