Как организовать многопоточную обработку массива в Perl?

Здравствуйте!
Уже не знаю где искать информацию. Решил изучить многопоточноть. До этого опыта ни на одном языке - не было.

Основная задача:
Брать массив элементов, скажем там 1000 элементов. Обрабатывать в определенное кол-во потоков.

Более подробно:
Есть файл со списком доменов. Я его открываю и преобразую каждую строку в элемент массива.
Есть функция-обработчик, которая заходит на данный домен и проверяет код ответа. Если ответ = 200, выводит его в консоль.
Не знаю как это всё раскидать по потокам, что бы скажем я поставил 10 потоков, и скрипт обрабатывал одновременно 10 доменов. Так же если один из доменов проверился раньше других - не ждать пока все домены проверятся, а сразу в данный поток отправлять новый домен. Вообщем нужно как-то отслеживать завершенные потоки и добавлять постоянно новые, что бы всегда работало ровно 10 потоков.

Если у кого ни будь есть пример, или скажем можете на коленке написать - буду признателен!
  • Вопрос задан
  • 1091 просмотр
Пригласить эксперта
Ответы на вопрос 3
@krypt3r
Можете попробовать взять за основу вот такой код (с использованием Thread::Queue)
#!/usr/bin/perl

use strict;
use warnings;
use threads;
use Thread::Queue;
use Data::Dumper;

my @myarray = (1 .. 1000);
#print Dumper (\@myarray);die;

my $count = shift || 10;
print "Number of threads: $count\n";

my $q = Thread::Queue->new;
my @threads;
for (0 .. $count - 1)
{
    push @threads, async {
        while (defined (my $f = $q->dequeue))
        {
            some_process ($f);
        }
    };
}

for (@myarray)
{
    $q->enqueue ($_);
}

# Tell workers they are no longer needed.
$q->enqueue (undef) for @threads;

# Wait for workers to end
$_->join for @threads;

print "Complete\n";

1;

sub some_process
{
    my $element = shift;
    my $tid = threads->self->tid;
    #my $count = threads->list (threads::running);
    #print "Running threads: $count\n";
    print "Thread $tid started\n";
    open my $F, '>>', $tid . '.txt';
    print $F 'TID: ', $tid, ', element: ', $element, "\n";
    close $F;
    print "Thread $tid stopped\n";
}
Ответ написан
Комментировать
@Rozello
Создавать и убивать потоки, это плохая идея, нужно создавать потоки обрабатывающие массив в цикле, пока он не опустеет.
То есть массив используется как очередь.
Но потоки это беда, имхо.

#!/usr/bin/env perl

use strict;
use warnings;

use threads;
use threads::shared;

# Создаём расшареную переменную
my @numbers:shared = (1..100);

# Задаём количество потоков
my $threads = shift || 10;

# Создаём потоки и кладём их обекты в массив
my @threads;
for (1..$threads) {
    push @threads, threads->new(
        sub {
            while (@numbers) {
                # Достаём данные из массива
                # Предварительно заблокировава его для остальных потоков
                my $number;
                {
                    # Блокировка работает только в этом скоупе
                    lock(@numbers);
                    $number = shift(@numbers);
                }
                
                print 'Result: '.($number*10)."\n";
            }
        }
    );
}

# Запускаем потоки привязав их к основному процессу
$_->join for (@threads);
Ответ написан
Комментировать
Где-то видел пример в основе AnyEvent.

Проще не морочиться с многопоточностью, а использовать какой-нить брокер сообщений.
Будет большой задел на масштабируемость.
Ответ написан
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Похожие вопросы