@Mr_Sinister

Как реализовать многопоточную обработку сообщений RabbitMQ (Pika)?

Есть задача: обрабатывать N сообщений параллельно одним консьюмером. Запускать N процессов не вариант (падает БД из-за большого количества соединений). При этом каждый процесс после завершения обработки очередного сообщения должен посылать basic_ack брокеру. Как это реализовать?
База данных: MongoDB (использую MongoEngine).
Устанавливаю соединение с очередью так:
def create_connect_rabbitmq():
    return pika.BlockingConnection(pika.ConnectionParameters(
        host=app_config['RABBITMQ_HOST'],
        port=app_config['RABBITMQ_PORT'],
        virtual_host=app_config['RABBITMQ_VIRTUAL_HOST'],
        credentials=pika.PlainCredentials(
            username=app_config['RABBITMQ_USERNAME'],
            password=app_config['RABBITMQ_PASSWORD']
        )
    ))

def create_queues(channel):
    channel.queue_declare(
        queue=app_config['RABBITMQ_QUEUE']['PROFILE_PARSER'],
        durable=True
    )

connection = create_connect_rabbitmq()
channel = connection.channel()
channel.basic_qos(prefetch_count=app_config['RABBITMQ_PREFETCH_COUNT'])

database = create_connect_database()

create_queues(channel)
print(' [*] Waiting for messages. To exit press CTRL+C')

channel.basic_consume(
    consumer_callback=profile_handler,
    queue=app_config['RABBITMQ_QUEUE']['PROFILE_PARSER'],
    no_ack=False
)
channel.start_consuming()
  • Вопрос задан
  • 1070 просмотров
Пригласить эксперта
Ответы на вопрос 1
leahch
@leahch
3D специалист. Dолго, Dорого, Dерьмово.
Как я понял, заканчиваются соединения к монго. Подозреваю, что нужно испльзовать connection pool. Для этого нужно установить разумные ограничения на число коннектов, как здесь описано - api.mongodb.com/python/current/faq.html#how-does-c...
Но я могу и ошибаться.
И да, запускаем не форком, а тредами.
Ответ написан
Комментировать
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Похожие вопросы