Написал 3 простые программы (с асинхронным программированием пока что есть определенный затык), суть которых заключается в том, чтобы я через poller отправлял сообщение в manager, а из него оно отправлялось (предварительно как-то обработавшись) в sender. Мне нужно, чтобы программа дожидалась получения сообщения, после чего переотправляла его, однако при запуске всего этого добра manager сразу отправляет (не дожидаясь сообщения от poller) в sender MSG.body, чего происходить не должно (в идеале).
poller.py
import asyncio
import os
import sys
from aio_pika import Message, connect
queue_name = "poller_manager"
async def main() -> None:
connection = await connect("amqp://guest:guest@localhost/")
async with connection:
channel = await connection.channel()
queue = await channel.declare_queue(name=queue_name)
text = input()
await channel.default_exchange.publish(
Message(str.encode(text)),
routing_key=queue.name,
)
print(" [x] Sent 'Hello World!'")
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
print('Interrupted')
try:
sys.exit(0)
except SystemExit:
os._exit(0)
manager.py
import asyncio
import os
import sys
from aio_pika import connect, Message
from aio_pika.abc import AbstractIncomingMessage
queue_from_name = "poller_manager"
queue_to_name = "manager_sender"
MSG: Message = Message(body=b"B")
async def on_message(message: AbstractIncomingMessage) -> None:
print("Message body is: %r" % message.body)
MSG = message
async def main() -> None:
connection = await connect("amqp://guest:guest@localhost/")
async with connection:
channel = await connection.channel()
queue_from = await channel.declare_queue(name=queue_from_name)
queue_to = await channel.declare_queue(name=queue_to_name)
await queue_from.consume(on_message, no_ack=True):
await channel.default_exchange.publish(
Message(MSG.body),
routing_key=queue_to.name,
)
await asyncio.Future()
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
print('Interrupted')
try:
sys.exit(0)
except SystemExit:
os._exit(0)
sender.py
import asyncio
import os
import sys
from aio_pika import connect
async def on_message(message: AbstractIncomingMessage) -> None:
print("Message body is: %r" % message.body)
MSG = message
queue_name = "manager_sender"
async def main() -> None:
connection = await connect("amqp://guest:guest@localhost/")
async with connection:
channel = await connection.channel()
queue = await channel.declare_queue(name=queue_name)
await queue.consume(callback=on_message, no_ack=True)
await asyncio.Future()
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
print('Interrupted')
try:
sys.exit(0)
except SystemExit:
os._exit(0)
P.S.
в документации aio-pika есть
пример
sender.py и receiver.py, sender отправляет, receiver соответсвенно слушает.
sender -> queue -> receiver
Я хочу сделать абсолютно тоже самое, только с 2 очередями
poller отправляет в очередь, manager слушает, после того, как что то примет - отправляет в другую очередь, из которой слушает sender
poller -> queue_1 -> manager -> queue_2 -> sender