@ineveraskdfrths

Как заставить aio-pika не ждать выполнение процесса?

Я взял пример реализации RPC из документации aio-pika, все бы ничего, но он не применим к долгим задачам, функция передающая задачу на выполнение блокирующая и ждет пока выполнится переданная задача, поэтому программа не может отправить задачу из очереди следующему воркеру, а отправляет только по завершению предыдущей.

Как сделать так чтобы функция отправки задачи не блокировала свой луп?

Код воркера:
async def receive_task_from_queue():
    connection = await connect_robust(
        "amqp://guest:guest@127.0.0.1/",
        client_properties={"connection_name": "callee"},
    )
    channel = await connection.channel()
    await channel.set_qos(prefetch_count=1)
    rpc = await RPC.create(channel)

    await rpc.register("main", main, auto_delete=True)
    return connection

Код продюсера:
async def run_rabbitmq_queue():
    connection = await connect_robust(
        "amqp://guest:guest@127.0.0.1/",
        client_properties={"connection_name": "caller"},
    )
    async with connection:
        channel = await connection.channel()
        rpc = await RPC.create(channel)
        await rpc.proxy.main()
  • Вопрос задан
  • 1248 просмотров
Решения вопроса 1
@ineveraskdfrths Автор вопроса
Поцаны, не занимайтесь фигней короче, используйте в этом случае воркеров
Код воркера
async def worker(*, user_id):
    print('start')
    await asyncio.sleep(10)
    print(user_id)


async def main():
    connection = await connect_robust("amqp://guest:guest@127.0.0.1/")

    # Creating channel
    channel = await connection.channel()
    await channel.set_qos(prefetch_count=1)

    master = Master(channel)
    await master.create_worker("my_task_name", worker, auto_delete=True)

    return connection

Код продюсера
async def run_rabbitmq_queue(user_id):
    connection = await connect_robust("amqp://guest:guest@127.0.0.1/")

    async with connection:
        # Creating channel
        channel = await connection.channel()

        master = Master(channel)

        # Creates tasks by proxy object

        await master.proxy.my_task_name(user_id=user_id)

        # Or using create_task method
        await master.create_task(
            "my_task_name", kwargs=dict(user_id=user_id)
        )
        print('!')
Ответ написан
Пригласить эксперта
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Войти через центр авторизации
Похожие вопросы