Имеется веб-приложение на FastAPI, в котором стартует пара фоновых процессов, один из которых падает.
Код №1
import asyncio
import logging
import uvicorn
from fastapi import FastAPI
queue = asyncio.Queue()
async def subscriptions_worker():
logging.info(f'Subscription worker started')
index = 1
while True:
await asyncio.sleep(1)
logging.info(f'Subscription #{index}')
index += 1
if index > 10:
await queue.put(f'Message #{index}')
async def queue_worker(q: asyncio.Queue):
logging.info('Queue worker started')
while True:
message = await q.get()
print(message)
async def background():
await queue.put("Message #-1")
await queue.put("Message #0")
asyncio.create_task(queue_worker(queue))
asyncio.create_task(subscriptions_worker())
app = FastAPI()
app.add_event_handler('startup', background)
if __name__ == '__main__':
uvicorn.run(app)
Output кода #1
INFO: Started server process [20632]
INFO: Waiting for application startup.
INFO: Queue worker started
INFO: Subscription worker started
INFO: Application startup complete.
ERROR: Task exception was never retrieved
future: <Task finished coro=<queue_worker() done, defined at /home/shubin/PycharmProjects/dbilling/srv.py:22> exception=RuntimeError('Task <Task pending coro=<queue_worker() running at /home/shubin/PycharmProjects/dbilling/srv.py:25>> got Future <Future pending> attached to a different loop')>
Traceback (most recent call last):
File "/home/shubin/PycharmProjects/dbilling/srv.py", line 25, in queue_worker
message = await q.get()
File "/usr/lib/python3.7/asyncio/queues.py", line 159, in get
await getter
RuntimeError: Task <Task pending coro=<queue_worker() running at /home/shubin/PycharmProjects/dbilling/srv.py:25>> got Future <Future pending> attached to a different loop
INFO: Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit)
Message #-1
Message #0
INFO: Subscription #1
INFO: Subscription #2
INFO: Subscription #3
Причем падает только тогда, когда в очереди заканчиваются сообщения и она начинает ждать, примерно та же самая корутина без веб-сервера работает:
Код №2
import asyncio
# import uvloop
async def worker(queue: asyncio.Queue):
while True:
print(await queue.get())
async def producer(queue: asyncio.Queue):
index = 1
while True:
await asyncio.sleep(1)
await queue.put(f'TEST #{index}')
index += 1
async def main():
queue = asyncio.Queue()
asyncio.create_task(worker(queue))
asyncio.create_task(producer(queue))
if __name__ == '__main__':
# uvloop.install()
loop = asyncio.get_event_loop()
print(loop)
loop.create_task(main())
loop.run_forever()
И тут она не падает. Спокойно себе ждет, когда в очереди появятся сообщения.
Чем может быть обоснованно такое поведение и как с этим бороться?