Имеется aiohttp сервер , который запускает некую cpu bound задачу в отдельном процессе. Упрощенный пример ниже:
import asyncio
import time
from concurrent.futures.process import ProcessPoolExecutor
from aiohttp import web
async def task_for_executor(app):
loop = asyncio.get_event_loop()
await loop.run_in_executor(
app.process_pool,
time.sleep, 2
)
print("ok")
async def input_queue_listener(app):
while True:
await app.input_queue.get()
app.input_queue.task_done()
loop = asyncio.get_event_loop()
loop.create_task(task_for_executor(app))
async def index(request):
await request.app.input_queue.put(True)
return web.json_response(data={"status": "ok"}, status=201)
async def startup(app):
input_queue = asyncio.Queue()
app.process_pool = ProcessPoolExecutor(3)
app.input_queue = input_queue
app.listen_task = asyncio.create_task(
input_queue_listener(app)
)
async def shutdown(app):
app.listen_task.cancel()
app.process_pool.shutdown()
if __name__ == '__main__':
app = web.Application()
app.on_startup.append(startup)
app.on_shutdown.append(shutdown)
app.add_routes([web.get('/', index),])
web.run_app(app)
Проблема:
Когда мы дергаем ручку запускается задача в executor'e. Все вроде ок. Можем еще несколько раз дернуть ручку-запустить задачу. Если мы завершим работу сервера через
Ctrl+C
во время обработки задач (например послать штук 10 запросов) - все заверишься нормально. Но когда мы завершаем работу сервера ПОСЛЕ выполнения всех задач (т.е. в момент когда ничего не выполняется) - то вылетают ошибки:
======== Running on http://0.0.0.0:8080 ========
(Press CTRL+C to quit)
ok
^CProcess SpawnProcess-1:
Process SpawnProcess-3:
Process SpawnProcess-2:
Traceback (most recent call last):
Traceback (most recent call last):
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
self.run()
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/multiprocessing/process.py", line 108, in run
self._target(*self._args, **self._kwargs)
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/concurrent/futures/process.py", line 233, in _process_worker
call_item = call_queue.get(block=True)
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
self.run()
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/multiprocessing/queues.py", line 96, in get
with self._rlock:
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/multiprocessing/process.py", line 108, in run
self._target(*self._args, **self._kwargs)
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/multiprocessing/synchronize.py", line 95, in __enter__
return self._semlock.__enter__()
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/concurrent/futures/process.py", line 233, in _process_worker
call_item = call_queue.get(block=True)
KeyboardInterrupt
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/multiprocessing/queues.py", line 96, in get
with self._rlock:
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/multiprocessing/synchronize.py", line 95, in __enter__
return self._semlock.__enter__()
KeyboardInterrupt
Traceback (most recent call last):
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
self.run()
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/multiprocessing/process.py", line 108, in run
self._target(*self._args, **self._kwargs)
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/concurrent/futures/process.py", line 233, in _process_worker
call_item = call_queue.get(block=True)
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/multiprocessing/queues.py", line 97, in get
res = self._recv_bytes()
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/multiprocessing/connection.py", line 216, in recv_bytes
buf = self._recv_bytes(maxlength)
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/multiprocessing/connection.py", line 414, in _recv_bytes
buf = self._recv(4)
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/multiprocessing/connection.py", line 379, in _recv
chunk = read(handle, remaining)
KeyboardInterrupt