qmax
@qmax
программер

Как организовать итеративно-рекурсивную паралельную обработку?

Имеется:
  • огромный массив данных (который читается итеративно)
  • желание обрабатывать каждое данное в отдельном подпроцессе
  • обработка одного данного может ркурсивно породить новые задачи (не огромное количество), и их тоже хочется распараллелить


Очевидно, что нужна какая-то комбинация multiprocessing.Pool и multiprocessing.Queue.

Для Pool можно использовать map-функции с параметром chunksize соответствующему количеству подпроцессов, тогда у итератора будет запрашиваться количество задач примерно соответствующих количеству свободных обработчиков:
pool = Pool(num_workers)
for r in pool.map(data_handler, data_input_iter, num_workers): pass

Но непонятно, как тут порождать задачи рекурсивно.

Для Queue можно установить её размер по количеству подпроцессов, тогда заполнение будет блокироваться примерно пока очередной обработчик не освободится.
queue = Queue(num_workers)
for datum in data_input_iter:
  queue.put(datum, block=True)

Но это будет блокировать заполние очереди из подпроцесса обработчика.
  • Вопрос задан
  • 267 просмотров
Пригласить эксперта
Ответы на вопрос 1
qmax
@qmax Автор вопроса
программер
В общем, получилась вот такая петрушка.
Насколько жизнеспособна - пока непонятно.

class Worker():
    def __init__(self, consumer, queue):
        self.consumer = consumer
        self.queue = queue

    def __call__(self, inp):
        if inp is None:
            return
        res = self.consumer(inp)
        if res is not None:
            for r in res:
                self.queue.put(r)


def multiprocess(producer, consumer, num_workers):
    pool = mp.Pool(num_workers)
    queue = mp.Manager().Queue()
    worker = Worker(consumer, queue)

    for _ in pool.imap_unordered(worker, producer, num_workers):
        while not queue.empty():
            pool.apply(worker, (queue.get(),))

    pool.close()
    pool.join()
Ответ написан
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Похожие вопросы