import asyncio
import re
import aio_pika
from aio_pika.abc import AbstractRobustConnection
from aio_pika.pool import Pool
_QUEUE_IN_NAME = "queue_in"
_QUEUE_OUT_NAME = "queue_out"
async def _async_range(count):
for i in range(count):
yield (i)
await asyncio.sleep(0)
async def main() -> None:
loop = asyncio.get_event_loop()
async def get_connection() -> AbstractRobustConnection:
return await aio_pika.connect_robust("amqp://guest:guest@localhost/")
connection_pool: Pool = Pool(get_connection, max_size=1, loop=loop)
async def get_channel() -> aio_pika.Channel:
async with connection_pool.acquire() as connection:
return await connection.channel()
channel_pool: Pool = Pool(get_channel, max_size=3, loop=loop)
async def manager() -> None:
async with channel_pool.acquire() as channel: # type: aio_pika.Channel
await channel.set_qos(10)
queue = await channel.declare_queue(
_QUEUE_IN_NAME, durable=False, auto_delete=False,
)
async with queue.iterator() as queue_iter:
message: aio_pika.IncomingMessage
async for message in queue_iter:
decode_message = message.body.decode("utf-8")
number_from_message = int(re.search(r'\d+', decode_message).group())
calc_result = number_from_message * number_from_message
print("MANAGER: got message = %s , calc = %d and send to %s" % (
message.body.decode("utf-8"), calc_result, _QUEUE_OUT_NAME))
async with channel_pool.acquire() as channel: # type: aio_pika.Channel
await channel.default_exchange.publish(
aio_pika.Message(body=("orig = %s, calc = %d" % (decode_message, calc_result)).encode()),
_QUEUE_OUT_NAME,
)
await message.ack()
async def poller() -> None:
async with channel_pool.acquire() as channel: # type: aio_pika.Channel
await channel.set_qos(10)
queue = await channel.declare_queue(
_QUEUE_OUT_NAME, durable=False, auto_delete=False,
)
async with queue.iterator() as queue_iter:
message: aio_pika.IncomingMessage
async for message in queue_iter:
print("POLLER: got message = %s from %s" % (message.body.decode("utf-8"), _QUEUE_OUT_NAME))
await message.ack()
async def sender_x() -> None:
async for i in _async_range(10):
async with channel_pool.acquire() as channel: # type: aio_pika.Channel
message = "Message id=%d" % i
print("SENDER: send message = %s to %s" % (message, _QUEUE_IN_NAME))
await channel.default_exchange.publish(
aio_pika.Message(body=message.encode()),
_QUEUE_IN_NAME,
)
await asyncio.sleep(0.1)
async with connection_pool, channel_pool:
manager_task = loop.create_task(manager())
poller_task = loop.create_task(poller())
await sender_x()
await manager_task
await poller_task
if __name__ == "__main__":
asyncio.run(main())
Методом научного тыка я смог понять какой алгоритм шифрования используется и то, что в этой строке есть id пользователя, почта, время выдачи и истекания токена.
Как описано тут https://jwt.io/introduction , пробовал передавать заголовок
Authorization: Bearer token
Пишет - недопустимые символы