Ranc58
@Ranc58
Backend python developer

Как правильно использовать ProcessPoolExecutor с aiohttp?

Имеется aiohttp сервер , который запускает некую cpu bound задачу в отдельном процессе. Упрощенный пример ниже:

import asyncio
import time
from concurrent.futures.process import ProcessPoolExecutor


from aiohttp import web


async def task_for_executor(app):
    loop = asyncio.get_event_loop()
    await loop.run_in_executor(
        app.process_pool,
        time.sleep, 2
    )
    print("ok")


async def input_queue_listener(app):
    while True:
        await app.input_queue.get()
        app.input_queue.task_done()
        loop = asyncio.get_event_loop()
        loop.create_task(task_for_executor(app))


async def index(request):
    await request.app.input_queue.put(True)
    return web.json_response(data={"status": "ok"}, status=201)


async def startup(app):
    input_queue = asyncio.Queue()
    app.process_pool = ProcessPoolExecutor(3)
    app.input_queue = input_queue
    app.listen_task = asyncio.create_task(
        input_queue_listener(app)
    )


async def shutdown(app):
    app.listen_task.cancel()
    app.process_pool.shutdown()


if __name__ == '__main__':
    app = web.Application()
    app.on_startup.append(startup)
    app.on_shutdown.append(shutdown)
    app.add_routes([web.get('/', index),])
    web.run_app(app)


Проблема:
Когда мы дергаем ручку запускается задача в executor'e. Все вроде ок. Можем еще несколько раз дернуть ручку-запустить задачу. Если мы завершим работу сервера через Ctrl+C во время обработки задач (например послать штук 10 запросов) - все заверишься нормально. Но когда мы завершаем работу сервера ПОСЛЕ выполнения всех задач (т.е. в момент когда ничего не выполняется) - то вылетают ошибки:
======== Running on http://0.0.0.0:8080 ========
(Press CTRL+C to quit)
ok
^CProcess SpawnProcess-1:
Process SpawnProcess-3:
Process SpawnProcess-2:
Traceback (most recent call last):
Traceback (most recent call last):
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/concurrent/futures/process.py", line 233, in _process_worker
    call_item = call_queue.get(block=True)
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/multiprocessing/queues.py", line 96, in get
    with self._rlock:
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/multiprocessing/synchronize.py", line 95, in __enter__
    return self._semlock.__enter__()
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/concurrent/futures/process.py", line 233, in _process_worker
    call_item = call_queue.get(block=True)
KeyboardInterrupt
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/multiprocessing/queues.py", line 96, in get
    with self._rlock:
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/multiprocessing/synchronize.py", line 95, in __enter__
    return self._semlock.__enter__()
KeyboardInterrupt
Traceback (most recent call last):
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/concurrent/futures/process.py", line 233, in _process_worker
    call_item = call_queue.get(block=True)
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/multiprocessing/queues.py", line 97, in get
    res = self._recv_bytes()
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/multiprocessing/connection.py", line 216, in recv_bytes
    buf = self._recv_bytes(maxlength)
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/multiprocessing/connection.py", line 414, in _recv_bytes
    buf = self._recv(4)
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/multiprocessing/connection.py", line 379, in _recv
    chunk = read(handle, remaining)
KeyboardInterrupt
  • Вопрос задан
  • 1224 просмотра
Решения вопроса 1
Ranc58
@Ranc58 Автор вопроса
Backend python developer
Вопрос решился советом с другого ресурса. Продублирую сюда:


Проблема не в том, что sigint не обрабатывается, а в том, что он поднимается в каждый форк и там все взрывается, поэтому в трейсе нарисована куча KeyboardInterrupt вместо нуля.

А если sigint в форках игнорировать, то похоже что все ништяк. По идее в кейсе с aiohttp то же самое должно прокатить.


Необходимо указать initializer при запуске пула:

app.process_pool = ProcessPoolExecutor(3, initializer=register_signal_handler)


И код самой функции:

def register_signal_handler():
    signal.signal(signal.SIGINT, lambda _, __: None)
Ответ написан
Комментировать
Пригласить эксперта
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Похожие вопросы