idegree
@idegree
Senior Workaround Developer

Почему возникает RuntimeError в Python корутине на ожидании чтения из очереди?

Имеется веб-приложение на FastAPI, в котором стартует пара фоновых процессов, один из которых падает.
Код №1

import asyncio
import logging

import uvicorn
from fastapi import FastAPI


queue = asyncio.Queue()


async def subscriptions_worker():
    logging.info(f'Subscription worker started')
    index = 1
    while True:
        await asyncio.sleep(1)
        logging.info(f'Subscription #{index}')
        index += 1
        if index > 10:
            await queue.put(f'Message #{index}')


async def queue_worker(q: asyncio.Queue):
    logging.info('Queue worker started')
    while True:
        message = await q.get()
        print(message)


async def background():
    await queue.put("Message #-1")
    await queue.put("Message #0")
    asyncio.create_task(queue_worker(queue))
    asyncio.create_task(subscriptions_worker())


app = FastAPI()
app.add_event_handler('startup', background)

if __name__ == '__main__':
    uvicorn.run(app)


Output кода #1

INFO:     Started server process [20632]
INFO:     Waiting for application startup.
INFO:     Queue worker started
INFO:     Subscription worker started
INFO:     Application startup complete.
ERROR:    Task exception was never retrieved
future: <Task finished coro=<queue_worker() done, defined at /home/shubin/PycharmProjects/dbilling/srv.py:22> exception=RuntimeError('Task <Task pending coro=<queue_worker() running at /home/shubin/PycharmProjects/dbilling/srv.py:25>> got Future <Future pending> attached to a different loop')>
Traceback (most recent call last):
  File "/home/shubin/PycharmProjects/dbilling/srv.py", line 25, in queue_worker
    message = await q.get()
  File "/usr/lib/python3.7/asyncio/queues.py", line 159, in get
    await getter
RuntimeError: Task <Task pending coro=<queue_worker() running at /home/shubin/PycharmProjects/dbilling/srv.py:25>> got Future <Future pending> attached to a different loop
INFO:     Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit)
Message #-1
Message #0
INFO:     Subscription #1
INFO:     Subscription #2
INFO:     Subscription #3


Причем падает только тогда, когда в очереди заканчиваются сообщения и она начинает ждать, примерно та же самая корутина без веб-сервера работает:
Код №2

import asyncio
# import uvloop


async def worker(queue: asyncio.Queue):
    while True:
        print(await queue.get())


async def producer(queue: asyncio.Queue):
    index = 1
    while True:
        await asyncio.sleep(1)
        await queue.put(f'TEST #{index}')
        index += 1


async def main():
    queue = asyncio.Queue()
    asyncio.create_task(worker(queue))
    asyncio.create_task(producer(queue))


if __name__ == '__main__':
    # uvloop.install()
    loop = asyncio.get_event_loop()
    print(loop)
    loop.create_task(main())
    loop.run_forever()


И тут она не падает. Спокойно себе ждет, когда в очереди появятся сообщения.

Чем может быть обоснованно такое поведение и как с этим бороться?
  • Вопрос задан
  • 485 просмотров
Пригласить эксперта
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Войти через центр авторизации
Похожие вопросы