@alx1987

Что не так c моим мега php «демоном»?

Мучился, мучился и замучился.
Вот такой вот код, немного урезал, чтобы поменьше букав было.

По крону (в дальнейшем, сейчас вручную) запускается данная вещь, которая запускает еще несколько, ссылаясь на себя же любимую (на файл test.php), но с параметром type=worker и номером задания pid=X

public function start() {
		$i = 0;
		$this->run_proc = exec("ps ax | grep 'test.php' | wc -l");
		if ( $this->run_proc > 2 ) die( 'Tasks daemon already running' );

		while ( ! $this->stop ) {
			
			if ( $this->run_proc <= $this->max_proc ) {
				$i++;
				
				exec("php-cgi -f test.php type=worker pid={$i} > /dev/null 2>&1 &");
				
				$this->run_proc += 1;
			} else {
				sleep(10);
			}
		}
	}


А это воркеры, которые запускает функция выше:
Тут у меня они берут из БД задания, подходящие по параметрам, предварительно блокируя всю таблицу на запись. Затем как взяли, снимают блокировку. И передают их дальше Менеджеру задач (другому классу). Если задач нет, то засыпают на XX сек.

public function worker($pid) {
		while ( ! $this->stop ) {
			// Ставим блокировку таблицы от чтения и записи
			$this->db->query( "LOCK TABLES " . PREFIX . "_tasks WRITE" );
				
			$row = $this->db->query( "SELECT `id` FROM " . PREFIX . "_tasks WHERE `lock` = '0' LIMIT 1" );
			if ( $row['id'] ) {
				// Ставим задаче статус заблокированной
				$this->db->query( "UPDATE " . PREFIX . "_tasks SET `lock` = '1', `lock_pid` = '{$pid}' WHERE `id` = '{$row['id']}'" );
			}

			// Снимаем блокировку таблицы
			$this->db->query( "UNLOCK TABLES" );
			
			if ( $row['id'] ) {
				
				// Передаем задачу менеджеру задач
				$task = new Tasks;
				$task->run( $row['id'] );
				
				sleep(5);
			
			} else {
				sleep(30);
			}
		}
	}


В менеджере задач выполняется этот тестовый код:
Тут заполняется лог файл какими то записями и в БД у данной задачи меняются параметры.

class Test extends Tasks {
	
	public function start($id) {
		$row = $this->db->query( "SELECT * FROM " . PREFIX . "_tasks WHERE `id` = '{$id}'" );
		
		if ( $row['id'] ) {
			
			$log_file = ROOT_DIR . '/tmp/' . $row['id'] . '.log';
			$log = [];
			
			for ( $i = 0; $i <= 10; $i++ ) {
				$log[] = date( "Y-m-d H:i:s" ) . ' - ' . $row['lock_pid'] . "\n";
				sleep( rand(1,5) );
			}
			
			sleep( rand(1,5) );
			file_put_contents( $log_file, $log, FILE_APPEND );
			
			// Снимаем блокировку и устанавливаем время до которого задача будет заблокирована
			$this->db->query( "UPDATE " . PREFIX . "_tasks  SET `lock` = '0', `lock_next` = '" . date( "Y-m-d H:i:s", strtotime( '+' . rand(1,3) . ' minutes' ) ) . "'  WHERE `id` = '{$row['id']}'" );
		}
	}
	
}


Работать работает, но как-то странно, то повиснет, то запись не изменит в последней функции. И никак не могу понять, что не так. Файлы еще создает с одной и той же временной меткой, т.е. воркеры вроде в разные моменты времени их должны создавать, т.к. независимо друг от друга работают (как мне кажется ))), но время очень часто одно и тоже..
Воркеры пробовал запускать через курл, не помогло.

А суть в том, что мне нужно, чтобы как можно быстрее обрабатывалась очередь задач (в несколько потоков) и это все работало без моего участия.

Надеюсь на помощь, спасибо.
  • Вопрос задан
  • 287 просмотров
Пригласить эксперта
Ответы на вопрос 4
taliban
@taliban
php программист
rabbitmq и запускайте по крону лишь один файлик с обработкой но часто, не будет проблем, сэкономите время и силы, код будет очень понятный
Ответ написан
DmitriyEntelis
@DmitriyEntelis
Думаю за деньги
Может быть это не совсем то о чем Вы спрашивали, но зачем делать lock table?
Можно же стартовать транзакцию, выбирать в ней свободный таск for update и устанавливать ему статус
Ответ написан
index0h
@index0h
PHP, Golang. https://github.com/index0h
TL;DR
* Стартуйте воркеры через supervisor
* Для очередей используйте сервер очередей, например rabbitmq
* Используйте PSR-3 совместимый логгер

1. Запускалка воркеров по крону... посмотрите supervisor, не создавайте костыльных велосипедов.
2. Лок таблицы - жирно, очень жирно)) Лок записи - еще куда ни шло. Вы даже малую нагрузку так выдержать не сможете.
3. Воркер - такая штука, что спокойнейшим образом может отвалиться, по этому лучше ее рестратовать сразу, см. supervisor
4. В случае проблемных тасок рискуете потерять не отработавшие.
5. Вы не проверяете аргументы методов - вот это печально, что будет, если вызвать
$task->start("'; DROP TABLE " . PREFIX . '_tasks');

6. Для логгирования очень рекомендую использовать что-то на базе PSR-3, Monolog например, не плодите костылей.
7. В воркере вы локаете таску, в таскере разлокиваете, зачем таскеру это делать? Если уж так, то лок И анлок - задача воркера.
8. Вы когда выгребаете новые таски, смотрите на lock=0, при завершении ставите lock=0, похоже на хрень. Получается одна и та же таска постоянно будет выполняться.
Ответ написан
@alx1987 Автор вопроса
Вот обновил код до такого, но нет:

1. Стартуем воркеры
public function start() {
	exec("php-cgi -f test.php type=worker pid=1 > /dev/null 2>&1 &");
	exec("php-cgi -f test.php type=worker pid=2 > /dev/null 2>&1 &");
	exec("php-cgi -f test.php type=worker pid=3 > /dev/null 2>&1 &");
}


2. Код воркера
public function worker($pid) {
	while ( ! $this->stop ) {
		
		$this->db->query( "UPDATE " . PREFIX . "_tasks SET `lock` = '1', `lock_pid` = '{$pid}' WHERE `lock` = '0' AND `lock_next` <= '" . date( "Y-m-d H:i:s" ) . "' LIMIT 1" );
		
		if ( $this->db->get_affected_rows() ) {
			
			$row = $this->db->super_query( "SELECT `id` FROM " . PREFIX . "_tasks WHERE `lock` = '1' AND `lock_pid` = '{$pid}' LIMIT 1" );
			if ( $row['id'] ) {
				
				// Передаем задачу менеджеру
				$task = new Tasks;
				$task->run( $row['id'] );
			
			}
			
		} else {
			sleep( 20 );
		}

	}
}


3. Ну и выполнение задачи
class Test extends Tasks {
	
	public function start($id) {
		$row = $this->db->super_query( "SELECT * FROM " . PREFIX . "_tasks WHERE `id` = '{$id}'" );
		
		if ( $row['id'] ) {
			
			$log_file = ROOT_DIR . '/tmp/' . $row['id'] . '.log';
			$log = [];
			
			for ( $i = 0; $i <= 10; $i++ ) {
				$log[] = date( "Y-m-d H:i:s" ) . ' - ' . $row['lock_pid'] . "\n";
				sleep( rand(1,5) );
			}
			
			sleep( rand(1,5) );
			file_put_contents( $log_file, $log, FILE_APPEND );
			
			// Снимаем блокировку и устанавливаем время до которого задача будет заблокирована
			$this->db->query( "UPDATE " . PREFIX . "_tasks 
				               SET `lock` = '0', `lock_pid` = '0', `lock_next` = '" . date( "Y-m-d H:i:s", strtotime( '+2 minutes' ) ) . "' 
							   WHERE `id` = '{$row['id']}'" );
		}
	}
	
}


При выполнении задачи, он почему-то успешно делает логирование в файл, но не обновляет метку времени в БД. sleep'ы я добавил специально, чтобы затунять выполнение.
Ответ написан
Комментировать
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Похожие вопросы