Как раздавать уникальные записи таблицы во многопоточном парсере?

Приблизительная задача - спарсить 10 000 000 страниц html сайта в таблицу mysql innodb.
Пусть есть 1000 прокси и мощный сервер.
Есть таблица urls с полями:

id, url, parsed (0, 1), datetime_added, datetime_start_parsed (NULL по умолчанию), datetime_end_parsed(NULL по умолчанию)

В таблице уже есть ссылки на страницы (url), которые надо спарсить

parsed - 0 - страница еще не спаршена, 1 - страница спаршена.
datetime_added - дата и время добавления урл в таблицу, уже заполнено
datetime_start_parsed - дата и время начала парсинга страницы
datetime_end_parsed - дата и время успешного парсинга страницы

Планируется парсинг запускать с помощью скрипта php многопоточно (кроном или супервизором), каждому запуску выдавать уникальный прокси и урл.
Вопрос - как выдать уникальный урл?

Т. е. в скрипт php передается номер прокси ($proxy_id), через который будет идти парсинг (от 1 до 1000). Дальше как?
Если делать так для $proxy_id = 5 делаем offset 4:
Select * from urls where parsed = '0' and datetime_start_parsed IS NULL order by id asc limit 4, 1

Получаем уникальный id записи (например 25), затем помечаем, что начали парсить страницу
Update urls set datetime_start_parsed = текущее время where id = 25

Затем скачиваем html страницы, сохраняем в другую таблицу, и отмечаем, что урл успешно спаршен

Update urls set datetime_end_parsed = текущее время where id = 25
Update urls set parsed = '1' where id = 25


Вопрос - как сделать так, чтобы каждый урл парсился только один раз? Ведь может же быть, что первый запросом запросится одна и та же запись в двух разных процессах php? Как сделать, чтобы такого не было? Подозреваю, как-то через транзакции? Как? Никогда с ними не работал, подскажите. Спасибо.
  • Вопрос задан
  • 125 просмотров
Пригласить эксперта
Ответы на вопрос 4
ipatiev
@ipatiev
Потомок старинного рода Ипатьевых-Колотитьевых
Никакие транзакции тут не нужны. транзакции вообще не про это. Почему-то новички упорно путают транзакции с блокировками. Транзакция - это про целостность данных. А чтобы два процесса не считали одну строку - это блокировка.
Но явные блокировки тут тоже не нужны.

Сначала забираем запись на себя
update urls set proxy=proxy_id where done=null and proxy=null limit 1
потом уже её селектим, работаем, и в конце отпускаем
select * from urls where proxy=proxy_id
...
Update urls set proxy=null, done=1 where proxy=proxy_id
Ответ написан
@Akina
Сетевой и системный админ, SQL-программист.
parsed - 0 - страница еще не спаршена, 1 - страница спаршена.

Не так.

NULL - не парсилась.
0 - парсинг выполнен.
>0 - взято на парсинг соединением (функция CONNECTION_ID()) номер N.

Соответственно попробовать зарезервировать запись на парсинг:
UPDATE urls 
SET parsed = CONNECTION_ID()
WHERE parsed IS NULL
ORDER BY datetime_added LIMIT 1;

Получить зарезервированную запись и начать её парсинг:
SELECT *
FROM urls
WHERE parsed = CONNECTION_ID();

Если вернётся более одной записи - в системе большие проблемы, надо звать администратора задачи. Если пустой набор - значит, запись перехватили, пробуем резервировать заново (тоже, кстати, повод позвать админа задачи - так не должно быть). Иначе - парсим полученную запись.

По окончании парсинга соответственно
UPDATE urls 
SET parsed = 0
WHERE parsed = CONNECTION_ID();

Ну и периодически выполняется event procedure, который находит записи, формально помеченные как обрабатываемые, но, судя по времени, обработчик подвис. Такие записи возвращаются на обработку
CREATE EVENT clear_parsing_flag
ON SCHEDULE EVERY 1 MINUTE
DO
UPDATE urls
SET parsed = NULL
WHERE parsed > 0
    -- считаем, что 5 минут более чем достаточно
  AND datetime_start_parsed < CURRENT_TIMESTAMP - INTERVAL 5 MINUTE;


Само собой никаких пулов соединений, никаких открыть-закрыть - все операции выполняются в рамках одного persistent connection. Автовосстановление соединения при обрыве также запрещено.

Если соединение развалилось, неважно по какой причине, бросаем обрабатываемую запись (шедулер вернёт её в необработанные), соединяемся заново и начинаем с самого начала, с резервирования.

Ну и предусмотреть случай, когда записей на парсинг просто нет. Например, если 5 резервирований подряд не смогли получить запись на обработку, то, чтобы не ставить сервер раком, вводим между попытками резервирования задержку, например, на 5 секунд... ну и вываливаем баннер, что, походу, парсить нечего.
Ответ написан
Комментировать
iMedved2009
@iMedved2009
Не люблю людей
Select потом Update - работать не будет. Потому что никто не помешает другому потоку выбрать ровно тоже запись пока делается update первым потоком. Для избежания такого есть блокировки.
Ответ написан
Комментировать
mayton2019
@mayton2019
Bigdata Engineer
Для мультипоточных систем самый лучший дизайн concurrency - это уменьшение concurrency. Есть разные способы уменьшения этого. Простое правило - это хеширование. Вычисляем хеш от урла. И берем остаток от деления на количество workers. Число в результате - сообщит нам номер воркера который будет эти линки обрабатывать. Другие воркеры будут чужие линки игнорировать.
Ответ написан
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Похожие вопросы