Задать вопрос
Vindicar
@Vindicar
RTFM!

Как запустить отдельный скрипт в субпроцессах, получив интерфейс типа concurrent.futures.Executor?

Решаемая задача: вынести в дочерние процессы CPU-bound нагрузку, чтобы не тормозить ей основной асинхронный процесс.

Как я понимаю, обычные механизмы мультпроцессности питона создают клон запускающего процесса, и уже в этом клоне находят и запускают указанную целевую функцию. Но в моем случае основной запускающий скрипт 1) тяжелый и 2) импортирует код, который нужно распараллелить, неявным образом. Не факт, что с ним всё заведётся, как надо.

Хотелось бы запустить в качестве дочернего процесса отдельный легковесный скрипт. Но тогда я автоматически оказываюсь ограничен интерфейсом subprocess.Popen или asyncio.subprocess.Process, теряя все милые фишки вроде Executor.submit() и Executor.shutdown().
Колхозить решение на pickle и сокетах (или, того хуже, на пайпах) не очень-то хочется, равно как и тащить в системе MQ-брокера только ради этой задачи.

Что можете посоветовать?
  • Вопрос задан
  • 106 просмотров
Подписаться 2 Сложный 3 комментария
Решения вопроса 1
Vindicar
@Vindicar Автор вопроса
RTFM!
Плохая новость - готового решения сформулированной в вопросе задачи, похоже, нет. Нужно использовать брокеры или что-то подобное.

Хорошая новость - я снова попался на проблему XY. Всё нормально, если спрятать тяжёлые импорты в теле основной программы вот так:
# ------------------- main.py -------------------
async def main():
    import urllib3  # как бы тяжелый модуль, нужный только главному скрипту
    import importlib
    ...  # получаем исходные данные
    # динамически подтаскиваем нужный модуль
    mod = importlib.import_module('modules.submodule')  
    m = mod.Module(x=-1)  # используем класс из него
    # этот метод под капотом использует мультипроцессинг
    results = await m.run_tasks(some_data)
    ...  # что-то делаем с результатами

if __name__ == '__main__':
    import asyncio
    asyncio.run(main())

# ------------------- modules/submodule.py -------------------
import asyncio
import concurrent.futures
import time
import sys
import os

class Module:
        def __init__(self, x):
        self.x = x

    def __repr__(self) -> str:
        return f'{self.__class__.__name__}(x={self.x!r}) {hex(id(self)).upper()} @ {os.getpid()}'

    async def run_tasks(self, data):
        self.x = len(items)
        print(f'Using instance {self!r}')
        loop = asyncio.get_running_loop()
        pool = concurrent.futures.ProcessPoolExecutor(4)  # где будем исполнять таск
        # готовим таски к исполнению
        futures = [loop.run_in_executor(pool, self.worker_func, item) for item in data]
        # ждём их завершения
        return await asyncio.gather(*futures, return_exceptions=True)

    def worker_func(self, item):
        # инстансы разные, разумеется, но их состояние, похоже, клонируется в новые процессы...
        print(f'Using instance {self!r}. urllib3', 'is' if 'urllib3' in sys.modules else 'is not', 'present', flush=True)
        time.sleep(0.1)  # имитируем напряжённую работу
        return repr(item)
Ответ написан
Комментировать
Пригласить эксперта
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Похожие вопросы