Задать вопрос
coderisimo
@coderisimo

Как корректно работать с одним объектом при использовании Pool (multiprocessing)?

Подскажите нубу - почему иногда при работе в нескольких потоках берутся одинаковые значения, хотя я специально проверяю их, чтобы избежать дублирования.
Т.е мне нужно иметь лист, который могут модифицировать все процессы. Перед выполнением задачи процесс лезет в список, берет оттуда элемент. Когда задача завершается - элемент возвращается в список и будет доступен другим процессам.

Ниже синтетический упрощенный пример. Но он отражает в какой-то мере суть.

import random 
from multiprocessing import Pool

my_list = [1,2,3,4,5,6,7,8,9]
used = []

def test(i):
  indx =  random.randint(0,len(my_list)-1)
  while my_list[indx] in used: #ищу элемент, который ранее не использовался
    indx = random.randint(0,len(my_list)-1)
  used.append(my_list[indx]) #добавляю элемент в список используемых, чтобы избежать повторного использования
  print(my_list[indx]) #вывожу уникальный элемент на печать
  
with Pool(4) as p:
  p.map(test, [1,2,3,4,5,6,7])


например , я получаю :
9
7
6
6
3
1
4

две 6 подряд ((((( Многопоточность, черт бы ее побрал )))))
Спасибо!
  • Вопрос задан
  • 950 просмотров
Подписаться 3 Средний Комментировать
Решения вопроса 1
kshnkvn
@kshnkvn
yay ✌️ t.me/kshnkvn
Потому что вы используете не потоки, а процессы. У каждого процесса своё собственное окружение, соответственно переменные из process_1 никак не пересекаются с переменными из process_2. Что-бы шарить данные между процессами нужно использовать Manager.

Вот пример простенького ротатора:
from multiprocessing import Pool, Manager


def rotator(data_list):
    data = data_list.pop(0)
    data_list.append(data)
    return data


def print_data(data_list):
    data = rotator(data_list)
    print(data)


if __name__ == "__main__":
    manager = Manager()
    data_list = manager.list()

    for x in range(5):
        data_list.append(x)
    
    with Pool(4) as pool:
        for _ in range(10):
            pool.apply_async(print_data, [data_list])
        pool.close()
        pool.join()

Принцип простой как палка: есть список элементов, при обращении к методу первый доступный элемент удаляется из списка и добавляется в конец, что-бы следующий процесс не мог взять тот-же элемент и так по кругу.
Вот вывод:
0
1
2
3
4
0
1
2
3
4


Только учтите, что если процессы, как и потоки, стартуют в определенном порядке еще не означает, что в таком-же порядке они и завершат работу. Т.е. вполне нормально что процесс, который запустился 4м отработал на N-мс быстрее и завершился первым, в итоге вывод может быть таким:
4
2
0
1
Ответ написан
Пригласить эксперта
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Похожие вопросы