Задать вопрос
Similization
@Similization
В прострации

Как переслать сообщение из одной очереди в другую?

Написал 3 простые программы (с асинхронным программированием пока что есть определенный затык), суть которых заключается в том, чтобы я через poller отправлял сообщение в manager, а из него оно отправлялось (предварительно как-то обработавшись) в sender. Мне нужно, чтобы программа дожидалась получения сообщения, после чего переотправляла его, однако при запуске всего этого добра manager сразу отправляет (не дожидаясь сообщения от poller) в sender MSG.body, чего происходить не должно (в идеале).

poller.py
import asyncio
import os
import sys

from aio_pika import Message, connect

queue_name = "poller_manager"


async def main() -> None:
    connection = await connect("amqp://guest:guest@localhost/")

    async with connection:
        channel = await connection.channel()
        queue = await channel.declare_queue(name=queue_name)
        text = input()
        await channel.default_exchange.publish(
            Message(str.encode(text)),
            routing_key=queue.name,
        )

        print(" [x] Sent 'Hello World!'")


if __name__ == "__main__":
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        print('Interrupted')
        try:
            sys.exit(0)
        except SystemExit:
            os._exit(0)

manager.py
import asyncio
import os
import sys

from aio_pika import connect, Message
from aio_pika.abc import AbstractIncomingMessage

queue_from_name = "poller_manager"
queue_to_name = "manager_sender"

MSG: Message = Message(body=b"B")


async def on_message(message: AbstractIncomingMessage) -> None:
    print("Message body is: %r" % message.body)
    MSG = message


async def main() -> None:
    connection = await connect("amqp://guest:guest@localhost/")
    async with connection:
        channel = await connection.channel()
        queue_from = await channel.declare_queue(name=queue_from_name)
        queue_to = await channel.declare_queue(name=queue_to_name)
        await queue_from.consume(on_message, no_ack=True):
        await channel.default_exchange.publish(
            Message(MSG.body),
            routing_key=queue_to.name,
        )
        await asyncio.Future()


if __name__ == "__main__":
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        print('Interrupted')
        try:
            sys.exit(0)
        except SystemExit:
            os._exit(0)

sender.py
import asyncio
import os
import sys

from aio_pika import connect

async def on_message(message: AbstractIncomingMessage) -> None:
    print("Message body is: %r" % message.body)
    MSG = message

queue_name = "manager_sender"


async def main() -> None:
    connection = await connect("amqp://guest:guest@localhost/")
    async with connection:
        channel = await connection.channel()
        queue = await channel.declare_queue(name=queue_name)
        await queue.consume(callback=on_message, no_ack=True)
        await asyncio.Future()


if __name__ == "__main__":
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        print('Interrupted')
        try:
            sys.exit(0)
        except SystemExit:
            os._exit(0)


P.S.
в документации aio-pika есть пример
sender.py и receiver.py, sender отправляет, receiver соответсвенно слушает.
sender -> queue -> receiver

Я хочу сделать абсолютно тоже самое, только с 2 очередями
poller отправляет в очередь, manager слушает, после того, как что то примет - отправляет в другую очередь, из которой слушает sender
poller -> queue_1 -> manager -> queue_2 -> sender
  • Вопрос задан
  • 344 просмотра
Подписаться 2 Простой 3 комментария
Пригласить эксперта
Ответы на вопрос 1
alfss
@alfss
https://career.habr.com/alfss
Переносите в on message
await channel.default_exchange.publish(
            Message(MSG.body),
            routing_key=queue_to.name,
        )
Ответ написан
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Похожие вопросы