Как раздавать уникальные записи таблицы во многопоточном парсере?

Приблизительная задача - спарсить 10 000 000 страниц html сайта в таблицу mysql innodb.
Пусть есть 1000 прокси и мощный сервер.
Есть таблица urls с полями:

id, url, parsed (0, 1), datetime_added, datetime_start_parsed (NULL по умолчанию), datetime_end_parsed(NULL по умолчанию)

В таблице уже есть ссылки на страницы (url), которые надо спарсить

parsed - 0 - страница еще не спаршена, 1 - страница спаршена.
datetime_added - дата и время добавления урл в таблицу, уже заполнено
datetime_start_parsed - дата и время начала парсинга страницы
datetime_end_parsed - дата и время успешного парсинга страницы

Планируется парсинг запускать с помощью скрипта php многопоточно (кроном или супервизором), каждому запуску выдавать уникальный прокси и урл.
Вопрос - как выдать уникальный урл?

Т. е. в скрипт php передается номер прокси ($proxy_id), через который будет идти парсинг (от 1 до 1000). Дальше как?
Если делать так для $proxy_id = 5 делаем offset 4:
Select * from urls where parsed = '0' and datetime_start_parsed IS NULL order by id asc limit 4, 1

Получаем уникальный id записи (например 25), затем помечаем, что начали парсить страницу
Update urls set datetime_start_parsed = текущее время where id = 25

Затем скачиваем html страницы, сохраняем в другую таблицу, и отмечаем, что урл успешно спаршен

Update urls set datetime_end_parsed = текущее время where id = 25
Update urls set parsed = '1' where id = 25


Вопрос - как сделать так, чтобы каждый урл парсился только один раз? Ведь может же быть, что первый запросом запросится одна и та же запись в двух разных процессах php? Как сделать, чтобы такого не было? Подозреваю, как-то через транзакции? Как? Никогда с ними не работал, подскажите. Спасибо.
  • Вопрос задан
  • 124 просмотра
Пригласить эксперта
Ответы на вопрос 4
ipatiev
@ipatiev
Потомок старинного рода Ипатьевых-Колотитьевых
Никакие транзакции тут не нужны. транзакции вообще не про это. Почему-то новички упорно путают транзакции с блокировками. Транзакция - это про целостность данных. А чтобы два процесса не считали одну строку - это блокировка.
Но явные блокировки тут тоже не нужны.

Сначала забираем запись на себя
update urls set proxy=proxy_id where done=null and proxy=null limit 1
потом уже её селектим, работаем, и в конце отпускаем
select * from urls where proxy=proxy_id
...
Update urls set proxy=null, done=1 where proxy=proxy_id
Ответ написан
@Akina
Сетевой и системный админ, SQL-программист.
parsed - 0 - страница еще не спаршена, 1 - страница спаршена.

Не так.

NULL - не парсилась.
0 - парсинг выполнен.
>0 - взято на парсинг соединением (функция CONNECTION_ID()) номер N.

Соответственно попробовать зарезервировать запись на парсинг:
UPDATE urls 
SET parsed = CONNECTION_ID()
WHERE parsed IS NULL
ORDER BY datetime_added LIMIT 1;

Получить зарезервированную запись и начать её парсинг:
SELECT *
FROM urls
WHERE parsed = CONNECTION_ID();

Если вернётся более одной записи - в системе большие проблемы, надо звать администратора задачи. Если пустой набор - значит, запись перехватили, пробуем резервировать заново (тоже, кстати, повод позвать админа задачи - так не должно быть). Иначе - парсим полученную запись.

По окончании парсинга соответственно
UPDATE urls 
SET parsed = 0
WHERE parsed = CONNECTION_ID();

Ну и периодически выполняется event procedure, который находит записи, формально помеченные как обрабатываемые, но, судя по времени, обработчик подвис. Такие записи возвращаются на обработку
CREATE EVENT clear_parsing_flag
ON SCHEDULE EVERY 1 MINUTE
DO
UPDATE urls
SET parsed = NULL
WHERE parsed > 0
    -- считаем, что 5 минут более чем достаточно
  AND datetime_start_parsed < CURRENT_TIMESTAMP - INTERVAL 5 MINUTE;


Само собой никаких пулов соединений, никаких открыть-закрыть - все операции выполняются в рамках одного persistent connection. Автовосстановление соединения при обрыве также запрещено.

Если соединение развалилось, неважно по какой причине, бросаем обрабатываемую запись (шедулер вернёт её в необработанные), соединяемся заново и начинаем с самого начала, с резервирования.

Ну и предусмотреть случай, когда записей на парсинг просто нет. Например, если 5 резервирований подряд не смогли получить запись на обработку, то, чтобы не ставить сервер раком, вводим между попытками резервирования задержку, например, на 5 секунд... ну и вываливаем баннер, что, походу, парсить нечего.
Ответ написан
Комментировать
iMedved2009
@iMedved2009
Не люблю людей
Select потом Update - работать не будет. Потому что никто не помешает другому потоку выбрать ровно тоже запись пока делается update первым потоком. Для избежания такого есть блокировки.
Ответ написан
Комментировать
mayton2019
@mayton2019
Bigdata Engineer
Для мультипоточных систем самый лучший дизайн concurrency - это уменьшение concurrency. Есть разные способы уменьшения этого. Простое правило - это хеширование. Вычисляем хеш от урла. И берем остаток от деления на количество workers. Число в результате - сообщит нам номер воркера который будет эти линки обрабатывать. Другие воркеры будут чужие линки игнорировать.
Ответ написан
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Войти через центр авторизации
Похожие вопросы