<?php
namespace alekciy\Yii\crawler;
use
alekciy\Yii\crawler\Model\Page
, alekciy\Yii\crawler\Model\Task
;
/**
* Загруженной считается страница у которой date_load != null.
*/
class WebCrawlerModule extends \CModule
{
...
/**
* Заполнить очередь на закачку адресами страниц. Вернет количество страниц поставленных в очередь.
*/
public function fillQueue()
{
$sem_key = ftok(__FILE__, 'f');
$sem = sem_get($sem_key, 1);
sem_acquire($sem);
// Если очередь не пустая, то нет смыла заполнять её, т.к. это может привести к дублированную данных в ней
$query_size = $this->_redis->lSize('page_load_query');
if ( !empty($query_size) ) {
sem_release($sem);
return 0;
}
$sql = '
SELECT
p.id
, p.url
, p.user_agent
FROM
' . Page::model()->tableName() . ' AS p
INNER JOIN
' . Task::model()->tableName() . ' AS t ON t.id = p.id_task
WHERE
p.date_next_try < NOW()
AND
p.date_load IS NULL
AND
p.load_count < t.max_try
';
$db_result = \Yii::app()->db->createCommand($sql)->query();
while( false !== ($row = $db_result->read()) )
{
$this->_redis->lPush('page_load_query', $row);
}
sem_release($sem);
return $db_result->rowCount;
}
/**
* Мультипотоком загружает страницы до тех пор, пока в очереди есть ссылки.
* Гарантирует RPS < 5 на один домен.
*/
public function pageLoad($total_download_session = 100)
{
// Через семофоры гарантируем, что запустились не более чем в 15 экземплярах
$sem_key = ftok(__FILE__, 'p');
$sem = sem_get($sem_key, 15);
sem_acquire($sem);
$total_page_load = 0;
$query_size = $this->_redis->lSize('page_load_query');
while ( !empty($query_size) )
{
// [1- Накапливаем ссылки для мультизагрузки (не более $total_download_session штук)
$url_list = array();
$domain_list = array();
for ($i = 0; $i < $total_download_session; ++$i)
{
$query_elm = $this->_redis->rPop('page_load_query');
if ( empty($query_elm['url']) ) {
break;
}
$url_list[$query_elm['url']] = array(
CURLOPT_USERAGENT => $query_elm['user_agent'],
);
// [3- Гарантируем, что хотя бы в этом процессе в рамках одного вызова mCurl (сейчас это 3-5 сек) не шлем больше 5 запросов на один домен
$domain = \Url::parse($query_elm['url'], PHP_URL_HOST);
if ( !array_key_exists($domain, $domain_list) ) {
$domain_list[$domain] = 0;
}
++$domain_list[$domain];
if ($domain_list[$domain] > 5) {
break;
}
// -3]
}
// -1]
// [2- Загружаем страницы и сохраняем результат
$download_page = \Yii::app()->mcurl->loadPageList($url_list);
foreach ($download_page as $page)
{
Page::model()->tryPageLoad($page);
++$total_page_load;
}
// -2]
$query_size = $this->_redis->lSize('page_load_query');
}
sem_release($sem);
return $total_page_load;
}
...
}
self::$_driver->wait(20)->until(\Facebook\WebDriver\WebDriverExpectedCondition::invisibilityOfElementLocated(
WebDriverBy::xpath('//*[text()="Загрузка..."]')
));
<?php
/**
* Статусы кластера:
* start - не работает, но в процессе запуска (через cron)
* run - был запущен
* stop - был остановлен
*/
class Wd extends CComponent
{
public
$start_port_num = 0
;
protected
$_proxy_list = array()
;
private
$_proxy_cursor = 0 // индекс прокси которая считается текущей
;
/**
* Инициализация соединения с WebDrivers на основании информации о прокси. Возращает объект-соединение,
* эмулирующее браузер. Если соединение установить не удалось, вернет false. Таймаут ожидания
* установки соединения 10 секунд.
*
* @param arr $proxy Информация о прокси аккаунте (содержит порт webdriver-а).
* @return (RemoteWebDriver|false)
*/
protected function _getDriver($proxy)
{
include_once(Yii::app()->basePath . '/lib/php-webdriver/lib/__init__.php');
// [1- Инициализация окружения
$webdriver_host = '127.0.1.1';
$capabilities = array(
WebDriverCapabilityType::BROWSER_NAME => 'phantomjs',
'phantomjs.page.settings.userAgent' => 'Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:25.0) Gecko/20100101 Firefox/25.0',
);
try {
$driver = RemoteWebDriver::create("{$webdriver_host}:{$proxy['wd_port']}", $capabilities, 10000);
} catch (Exception $e) {
return false;
}
// Задаем размер экрана по умолчанию
$window = new WebDriverDimension(1024, 768);
$driver->manage()->window()->setSize($window);
// -1]
return $driver;
}
/**
* По идентификатору драйвера вернет running если связанный с ним браузер запущен,
* либо stopped если он остановлен. Данные получаются на основании обращения к прослущиваемому сокету,
* поэтому отражают реальную ситуацию.
*/
public function getWebDriverStatus($id)
{
if ( !array_key_exists($id, $this->_proxy_list) ) {
throw new \Exception("Incorrect webdriver id #$id\n");
}
$proxy = $this->_proxy_list[$id];
$webdriver_host = '127.0.1.1';
$errno = 0;
$errstr = '';
$socket = @fsockopen($webdriver_host, $proxy['wd_port'], $errno, $errstr, 5);
return is_resource($socket)
? 'running'
: 'stopped';
}
/**
* Запускает группу webdriver (phamtomjs). Каждый из них ассоциирован с определенным
* прокси. Висят как демоны и слушают каждый свой локальный порт.
*/
public function startWebDriverCluster($ttl = 3600)
{
$webdriver_host = '127.0.1.1';
foreach ($this->_proxy_list as $key => $proxy)
{
// Не запускаем неактивные webdrive
if ( !$proxy['is_active'] ) {
continue;
}
// Если порт занят, то не стартуем такой webdrive (т.к. скорее всего он был запущен в прошлый вызов команды)
$errno = 0;
$errstr = '';
$socket = @fsockopen($webdriver_host, $proxy['wd_port'], $errno, $errstr, 5);
if ( is_resource($socket) ) {
continue;
}
// Если все впорядке, стартуем webdrive
$cookie_dir = Yii::app()->basePath . '/runtime/cookie/';
if ( !file_exists($cookie_dir) ) {
mkdir($cookie_dir);
chmod($cookie_dir, 0775);
}
$cookie_file = $cookie_dir . $key . '.txt';
// Костыль - часть загрузок фейлится, возможно из-за кук, поэтому тупо грохаем файл с куками
// хотя это и противоречит первоначальной задумке
if ( file_exists($cookie_file) ) {
unlink($cookie_file);
}
$cmd = "phantomjs --load-images=false --proxy={$proxy['proxy_ip']}:{$proxy['proxy_port']} --proxy-auth={$proxy['proxy_user']}:{$proxy['proxy_pass']} --ignore-ssl-errors=true --cookies-file={$cookie_file} --webdriver=127.0.1.1:{$proxy['wd_port']} > /dev/null 2>&1 &";
exec($cmd);
// Даем время phantomjs-у запуститься
$is_run = false;
for ($i = 0; $i < 20; ++$i)
{
usleep(500000);
$socket = @fsockopen($webdriver_host, $proxy['wd_port'], $errno, $errstr, 1);
if ( is_resource($socket) ) {
$is_run = true;
break;
}
}
// Отмечаем как работающий
$cache_key = 'proxy.' . $key;
$proxy_info = Yii::app()->redis->get($cache_key);
// Если данных по прокси в кэше нет, то считаем прокси доступным
if ( empty($proxy_info) )
{
$proxy_info = $proxy;
$proxy_info['is_active'] = $is_run;
$proxy_info['wd_port'] = $key + $this->start_port_num; // не используем системные порты
$proxy_info['last_req_time'] = 0; // время последнего запроса
$proxy_info['req_count'] = 0; // счетчик удачных попыток
$proxy_info['status'] = $is_run ? 'running' : 'stopped';
} else {
$proxy_info['status'] = $is_run ? 'running' : 'stopped';
}
Yii::app()->redis->set($cache_key, $proxy_info);
}
$cache_key = 'cluster.status';
Yii::app()->redis->set($cache_key, 'run', $ttl);
}
/**
* +
* Останавливает все демонты webdrive кластера если вызван без параметров.
* Остановит только конкретный драйвер по его $id.
*
* @param int $id Идентификатор прокси которую нужно выгрузить.
*/
public function stopWebDriverCluster($id = null)
{
$webdriver_host = '127.0.1.1';
foreach ($this->_proxy_list as $key => $proxy)
{
if (!is_null($id)
&& $id != $key
) {
continue;
}
$errno = 0;
$errstr = '';
$socket = @fsockopen($webdriver_host, $proxy['wd_port'], $errno, $errstr, 5);
if ( is_resource($socket) )
{
$cmd = 'kill -15 `ps ax -o pid,args | grep -v grep | grep phantom | grep \'' . $key . '.txt\' | awk \'{print $1}\'`';
exec($cmd);
}
}
if ( is_null($id) ) {
$cache_key = 'cluster.status';
Yii::app()->redis->delete($cache_key);
}
}
/**
* Вернет webdrive который можно использовать для запросов. При заданном $id прокси
* вернет связанный с ней webdrive (проверка на факт активности прокси не выполняется).
* В случае ошибок (свободных webdrive нет, заданный webdrive деактивирован/выключен)
* вернет null.
*/
public function getDriver($id = null)
{
$proxy = is_null($id)
? $this->_getActiveProxyInfo($id)
: $this->getProxyInfoById($id);
// Убеждаемся, что заданный драйвер запущен и работает, если нужно, пытается его запустить
$status = $this->getWebDriverStatus($id);
if ('stopped' == $status) {
$is_run = $this->activateProxy($id);
if ( !$is_run ) {
return null;
}
}
if ( !empty($proxy) )
{
$driver = $this->_getDriver($proxy);
if ( !empty($driver) )
{
return array($id => $driver);
}
}
return null;
}
...
}