Мучился, мучился и замучился.
Вот такой вот код, немного урезал, чтобы поменьше букав было.
По крону (в дальнейшем, сейчас вручную) запускается данная вещь, которая запускает еще несколько, ссылаясь на себя же любимую (на файл test.php), но с параметром type=worker и номером задания pid=X
public function start() {
$i = 0;
$this->run_proc = exec("ps ax | grep 'test.php' | wc -l");
if ( $this->run_proc > 2 ) die( 'Tasks daemon already running' );
while ( ! $this->stop ) {
if ( $this->run_proc <= $this->max_proc ) {
$i++;
exec("php-cgi -f test.php type=worker pid={$i} > /dev/null 2>&1 &");
$this->run_proc += 1;
} else {
sleep(10);
}
}
}
А это воркеры, которые запускает функция выше:
Тут у меня они берут из БД задания, подходящие по параметрам, предварительно блокируя всю таблицу на запись. Затем как взяли, снимают блокировку. И передают их дальше Менеджеру задач (другому классу). Если задач нет, то засыпают на XX сек.
public function worker($pid) {
while ( ! $this->stop ) {
// Ставим блокировку таблицы от чтения и записи
$this->db->query( "LOCK TABLES " . PREFIX . "_tasks WRITE" );
$row = $this->db->query( "SELECT `id` FROM " . PREFIX . "_tasks WHERE `lock` = '0' LIMIT 1" );
if ( $row['id'] ) {
// Ставим задаче статус заблокированной
$this->db->query( "UPDATE " . PREFIX . "_tasks SET `lock` = '1', `lock_pid` = '{$pid}' WHERE `id` = '{$row['id']}'" );
}
// Снимаем блокировку таблицы
$this->db->query( "UNLOCK TABLES" );
if ( $row['id'] ) {
// Передаем задачу менеджеру задач
$task = new Tasks;
$task->run( $row['id'] );
sleep(5);
} else {
sleep(30);
}
}
}
В менеджере задач выполняется этот тестовый код:
Тут заполняется лог файл какими то записями и в БД у данной задачи меняются параметры.
class Test extends Tasks {
public function start($id) {
$row = $this->db->query( "SELECT * FROM " . PREFIX . "_tasks WHERE `id` = '{$id}'" );
if ( $row['id'] ) {
$log_file = ROOT_DIR . '/tmp/' . $row['id'] . '.log';
$log = [];
for ( $i = 0; $i <= 10; $i++ ) {
$log[] = date( "Y-m-d H:i:s" ) . ' - ' . $row['lock_pid'] . "\n";
sleep( rand(1,5) );
}
sleep( rand(1,5) );
file_put_contents( $log_file, $log, FILE_APPEND );
// Снимаем блокировку и устанавливаем время до которого задача будет заблокирована
$this->db->query( "UPDATE " . PREFIX . "_tasks SET `lock` = '0', `lock_next` = '" . date( "Y-m-d H:i:s", strtotime( '+' . rand(1,3) . ' minutes' ) ) . "' WHERE `id` = '{$row['id']}'" );
}
}
}
Работать работает, но как-то странно, то повиснет, то запись не изменит в последней функции. И никак не могу понять, что не так. Файлы еще создает с одной и той же временной меткой, т.е. воркеры вроде в разные моменты времени их должны создавать, т.к. независимо друг от друга работают (как мне кажется ))), но время очень часто одно и тоже..
Воркеры пробовал запускать через курл, не помогло.
А суть в том, что мне нужно, чтобы как можно быстрее обрабатывалась очередь задач (в несколько потоков) и это все работало без моего участия.
Надеюсь на помощь, спасибо.