Задать вопрос
valerium
@valerium
Изобретая велосипед

Как убедиться, что все дочерние процессы завершили работу?

Задача состоит в том, чтобы по SSH с нескольких серверов получить данные, немного обработать и записать в файл. Решение я решил распараллелить, потому что в однопоточном варианте большая часть времени уходит на ожидание ответа от сервера.

Для этого я создал очередь multiprocessing.Queue, создал список из объектов multiprocessing.Process, по очереди все эти процессы запустил и каждому передал очередь в качестве аргумента. Таким образом, каждый процесс может в эту очередь писать, а я из головного процесса могу оттуда читать.

Вопрос в том, как мне узнать, что все дочерние процессы отработали? В интернетах я нашёл рецеп вызвать у каждого по очереди .join(), но это заблокирует основной процесс, а я хочу в основном процессе вести запись в файл.

Так же я пытался проверять у процессов .is_alive(), но процесс считается живым до тех пор, пока я на нём не вызову .join().

Сейчас я использую костыль, который проверяет, есть ли что в очереди, и если не находит ничего, то ждёт 20 секунд, ещё раз проверяет и в случае повторной «пустоты» завершает работу. Но это, очевидно, костыль.

Как сделать это правильно?
  • Вопрос задан
  • 1046 просмотров
Подписаться 5 Оценить Комментировать
Решения вопроса 1
Удобнее всего писать в еще одном процессе. Его задачей было бы ожидать данные из очереди по get() и писать все полученное в файл. Очевидно, тут есть проблема, что процесс никогда не завершится, но она легко решается: в главном процессе вы ждете завершения всех процессов-воркеров по join(), после чего посылаете в очередь какое-нибудь "интересное" значение (я бы послал None), после чего ждете завершения процесса-писателя опять же по join(). При виде "интересного" значения пишущий процесс завершается. Ну и главный процесс тоже завершается, соответственно. Как будто бы выглядит не очень красиво, НО это нормально: нужен какой-то механизм, который бы сказал, что "в очередь больше не придут", что мы и делаем в главном процессе. Можно расширить очередь, кидать исключения, но, думаю, None вполне достаточно. Кстати, для этого можно использовать Pipe() в случае с процессами, но я бы точно не стал так делать, потому что зачем? :)

Меня, кстати, недавно что-то подобное тревожило, искал инфу и понял, что эта задача очень похожа на шаблон producer-consumer, только у нас производящий процесс еще и обрабатывает данные, а получающий процесс только пишет. В C#, кстати, есть специальная коллекция, которая может "закрыться". А вот что пишет по этому поводу Java:

A BlockingQueue does not intrinsically support any kind of "close" or "shutdown" operation to indicate that no more items will be added. The needs and usage of such features tend to be implementation-dependent. For example, a common tactic is for producers to insert special end-of-stream or poison objects, that are interpreted accordingly when taken by consumers.

Special end-of-stream or poison objects, КАРЛ! Это я как бы оправдываюсь, что нормальное решение предлагаю. :)))

Писать в главном процессе неудобно. В таком случае мы не можем вызвать join() у воркеров, поэтому нужен иной способ убедиться, что задачи закончены. Кажется, что для этого можно использовать методы task_done() и join() у очереди. Можно было бы затолкать адреса серверов в очередь (пусть будет q_in), в воркере делать q_in.get(), обрабатывать данные и класть в другую очередь (назовем ее q_out), после чего вызывать q_in.task_done(). НО у нас снова блокирующий метод q_in.join(), который ждет завершения всех задач. Т.е. такая возможность тут не катит. Да даже если бы он не блокировал, то все равно пришлось бы делать sleep() в цикле, что совсем некрасиво. Тут правильно вызывать блокирующий get() в пишущем процессе и завершаться по получению какого-то сигнала. Этим сигналом будет либо отправка "интересного" значения (и в случае записи в главном процессе это сделать некому, х̶о̶т̶я̶ ̶м̶о̶ж̶н̶о̶ ̶с̶д̶е̶л̶а̶т̶ь̶ ̶о̶т̶д̶е̶л̶ь̶н̶ы̶й̶ ̶п̶р̶о̶ц̶е̶с̶с̶ ̶д̶л̶я̶ ̶э̶т̶о̶г̶о̶,̶ ̶г̶о̶в̶о̶р̶и̶т̶ ̶и̶з̶в̶р̶а̶щ̶е̶н̶е̶ц̶), как я уже сказал выше, либо же можно ввести еще одну сущность, назовем ее "глобальный счетчик". Т.е. он должен уменьшаться после того, как в очередь складывается результат. А главный процесс может проверять, не равен ли этот счетчик нулю после того, как сработал get() на очереди. И если равен, то выйти из бесконечного цикла, заджойниться к воркеркам, и завершиться. Но ведь это менее красиво, чем отдельный пишущий процесс: придется создать глобальную переменную и если с тредами можно было бы обойтись простым локом, то в случае с процессами там какой-то геморрой 100%, я никогда так не делал, ведь глобальные переменные - зло. В общем, с какой стороны не подойти, нужно писать в отдельном процессе, иначе можно изобрести что-нибудь.

Хочу еще пару вещей тут заметить.

Вам точно нужно писать по мере поступления данных? Быть может, это не нужно. Ведь там постоянные открытия/закрытия файла будут, это тоже некий оверхед, нужно ли это по-настоящему? Кроме того, нужны ли именно процессы? Там сложная обработка данных, сколько она времени занимает относительно ввода/вывода? Если некритично мало, то лучше использовать треды, в таком случае с ними все может оказаться быстрее. Кроме того, с тредами появляется возможность использовать глобальные объекты, которые я все-таки использую, хотя они и зло. Можно, например, вместо Queue использовать list/set/dict. В cpython они является threadsafe, но лучше на всякий случай использовать локи в таком случае, они вносят совсем небольшой оверхед, но при этом 100% защитят от интересных проблем (я бы сделал класс LockedIterator в таком случае, чтобы было универсально для всего). Главный плюс в том, что они значительно быстрее, чем Queue, даже с локами (по моим тестам, хотя, думаю, можно это и нагуглить). А ведь вам, по сути, прелести Queue и не нужны, если юзать треды. То есть вы уверены, что процессы + плавно писать в файл - это быстрее/удобнее/лучше, чем просто сделать треды без Queue, дождаться завершения, все записать в файл? Хотя тут тоже могут быть сложности с оперативой, если очень много писать нужно.

Кроме того, а почему не, например, ansible? Он умеет опрашивать хосты и принимать в себя плагины на питоне. Кроме того, там есть асинхронные задачи, я их не использовал, но, по-моему, они справятся.
Ответ написан
Пригласить эксперта
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Похожие вопросы