Как хранить и обмениваться текущим смещением строк в файле при его построчном чтении несколькими процессами?
Приветствую.
Есть приложение на Laravel. Есть мастер-комманда, которая запускает в фоне N процессов (артисан команды) , которые обрабатывают файлы. Не вдаваясь в подробности, есть условный файл с 10000 строк.
Задача запустить 100 конкурирующих процессов, которые будут читать и обрабатывать файл построчно.
Сейчас сделано так, что на каждый процесс выделяется по 100 строк и он их обрабатывает, по завершению запускается новый процесс со 100 строками и т. д. Это решение не нравится тем, что если какой-то процесс работает медленно, остальные не могу ему "помочь", даже если они уже свободные.
В целом, реализация ясна, но как лучше организовать хранение текущего смещения, что бы избежать пропусков и повторных обработок строк? Т. е. что бы каждая строка была гарантировано обработана и только один раз.
P. S. Использование mysql не рассматриваю в принципе.
Сейчас сделано так, что на каждый процесс выделяется по 100 строк и он их обрабатывает, по завершению запускается новый процесс со 100 строками и т. д. Это решение не нравится тем, что если какой-то процесс работает медленно, остальные не могу ему "помочь", даже если они уже свободные.
А почему размер пачки такой маленький? У тебя 100 строк обрабатываются по часу?
rPman, Так накладные расходы по времени на запуск процесса будут велики. Уменьшать не вариант, увеличивать тоже, ввиду причин описаных выше, один процесс может тормозить завершение задачи в целом.
Так накладные расходы по времени на запуск процесса будут велики.
важнейший вопрос, эти расходы зависят от указанных при старте приложения строк? можно ли модифицировать код так чтобы в работающий процесс добавлять новые строки?
rPman, нет, эти расходы на старте одинаковые, независимо от кол-ва передаваемых строк. Именно из-за этого и хотелось бы выдергивать строки, ничего не перезапуская.
Для организации конкурентной обработки строк файла без использования MySQL или другой базы данных вы можете воспользоваться механизмом межпроцессового взаимодействия, таким как блокировки и семафоры. Вам потребуется использовать некоторые инструменты, доступные в Laravel, такие как Redis или файловые блокировки.
Вот общий план действий:
1. Разделите ваш файл на N частей (где N - количество процессов, например, 100), и определите смещение каждой части в файле.
2. Создайте N отдельных процессов, каждый из которых будет обрабатывать свою часть файла.
3. Используйте механизм блокировок, чтобы гарантировать, что каждый процесс будет обрабатывать только свои строки. Это может быть достигнуто с использованием Redis или файловых блокировок.
4. В каждом процессе считайте свою часть файла с учетом смещения и обрабатывайте строки.
5. По мере того как каждый процесс завершает обработку своей части, он может освободить блокировку или сообщить главному процессу о завершении.
Пример использования Redis для синхронизации процессов в Laravel:
use Illuminate\Support\Facades\Redis;
// В главном процессе
$numberOfProcesses = 100;
$fileSize = 10000;
$chunkSize = ceil($fileSize / $numberOfProcesses);
for ($i = 0; $i < $numberOfProcesses; $i++) {
$offset = $i * $chunkSize;
$chunkKey = "process:$i";
Redis::set($chunkKey, $offset);
// Запустить процесс с $offset и $chunkSize для чтения и обработки своей части файла.
}
// В каждом процессе
$processId = getProcessId(); // Здесь уникальный идентификатор процесса
$chunkKey = "process:$processId";
while (true) {
$offset = Redis::get($chunkKey);
if ($offset === false) {
break; // Процесс завершает работу, так как нет больше данных для обработки.
}
// Читайте и обрабатывайте часть файла с учетом смещения $offset и размера чанка.
// После обработки, можно освободить блокировку
Redis::del($chunkKey);
}
// Главный процесс может дождаться завершения всех процессов перед продолжением выполнения.
Этот пример демонстрирует, как можно использовать Redis для синхронизации процессов и обработки файла по частям без использования базы данных MySQL. Вы можете адаптировать этот код под ваши потребности и добавить обработку ошибок и другие функции, которые вам нужны.
Я бы на твоём месте использовал атомарный инкремент редиса $currentRow = $redis->incr('current_row');
Даже если все 100 потоков в одну миллисекунду выполнят этот код, то redis гарантированно отдаст каждому потоку своё уникальное значение без коллизий
Ипатьев, Передам юзерам, чтобы не загружали свои ужасные файлы. Отказ от использования реляционных баз не принципиальность, а пройденый этап. База данных вполне используется там, где она уместна.
Вы либо слишком переусложняете, либо я задачу вашу не понимаю.
1. Есть 100 воркеров. Они крутятся через supervisor, ожидают задач
2. Что угодно, хоть консольная команда, хоть в контроллере, хоть суперворкер - читаем файл по 10 строк и отправляем в очередь. MyJob::dispatch($tenLinesFromFile);
Итого, файл разделился на 1000 кусков, освободившийся воркер обрабатывает следующую часть.
Накладные расходы? 10мс? При времени обработки в 15 мин кажется не там вы оптимизировать пытаетесь.