Есть задача: обрабатывать N сообщений параллельно одним консьюмером. Запускать N процессов не вариант (падает БД из-за большого количества соединений). При этом каждый процесс после завершения обработки очередного сообщения должен посылать basic_ack брокеру. Как это реализовать?
База данных: MongoDB (использую MongoEngine).
Устанавливаю соединение с очередью так:
def create_connect_rabbitmq():
return pika.BlockingConnection(pika.ConnectionParameters(
host=app_config['RABBITMQ_HOST'],
port=app_config['RABBITMQ_PORT'],
virtual_host=app_config['RABBITMQ_VIRTUAL_HOST'],
credentials=pika.PlainCredentials(
username=app_config['RABBITMQ_USERNAME'],
password=app_config['RABBITMQ_PASSWORD']
)
))
def create_queues(channel):
channel.queue_declare(
queue=app_config['RABBITMQ_QUEUE']['PROFILE_PARSER'],
durable=True
)
connection = create_connect_rabbitmq()
channel = connection.channel()
channel.basic_qos(prefetch_count=app_config['RABBITMQ_PREFETCH_COUNT'])
database = create_connect_database()
create_queues(channel)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.basic_consume(
consumer_callback=profile_handler,
queue=app_config['RABBITMQ_QUEUE']['PROFILE_PARSER'],
no_ack=False
)
channel.start_consuming()