@ARTIsshoque

Как в Python3 использовать вложенный список в multiprocessing?

К примеру, у меня есть словарь:
D = {'x': [1, 2, 3], 'y': [4, 5, 6], 'z': [7, 8, 9]}

и я хочу обработать его, используя несколько процессов. Для упрощения примера, пусть нужно просто умножить каждый элемент списков на 10 (хотя в реальности будет использоваться более сложная функция, для которой мультипроцессинг целесообразен). Набросок кода такой:
import multiprocessing

PROCESSES = 4

class Worker(multiprocessing.Process):

    def __init__(self, work_queue):
        super().__init__()
        self.work_queue = work_queue

    def run(self):
        while True:
            try:
                key, index = self.work_queue.get()
                self.process(key, index)
            finally:
                self.work_queue.task_done()

    def process(self, key, index):
        D[key][index] = D[key][index] * 10
        # Здесь написана полная ерунда, но я просто хочу показать, что мне нужно получить


def main():
    work_queue = multiprocessing.JoinableQueue()
    for i in range(PROCESSES):
        worker = Worker(work_queue)
        worker.daemon = True
        worker.start()
    for key, value in D.items():
        for i in range(len(value)):
            work_queue.put((key, i))
    work_queue.join()

main()


Проблема в том, что я не представляю, как можно передать очереди словарь со вложенными списками, так чтобы отдельные процессы могли сохранять результаты своих вычислений в этот словарь. Если использую Manager().dict(), то простые значения (строки, числа) сохраняются, а списки - нет.
Прошу помочь разобраться с этим вопросом.
  • Вопрос задан
  • 332 просмотра
Решения вопроса 1
@deliro
Не надо так низкоуровнево писать (обычно).

from concurrent.futures import ProcessPoolExecutor


D = {'x': [1, 2, 3], 'y': [4, 5, 6], 'z': [7, 8, 9]}


def process(arg):
    key, values = arg
    return key, [v * 10 for v in values]


if __name__ == "__main__":
    with ProcessPoolExecutor() as executor:
        result = executor.map(process, D.items())

    print(dict(result))


Чтобы убедиться, что оно действительно работает и не блокирует друг друга:
from concurrent.futures import ProcessPoolExecutor
from time import sleep


D = {'x': [1, 2, 3], 'y': [4, 5, 6], 'z': [7, 8, 9]}


def process(arg):
    key, values = arg
    print("executing", key, values)
    sleep(1)
    return key, [v * 10 for v in values]


if __name__ == "__main__":
    with ProcessPoolExecutor(2) as executor:
        result = executor.map(process, D.items())

    print(dict(result))
Ответ написан
Пригласить эксперта
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Похожие вопросы