Добрый день.
Есть проект, в котором есть функционал импорта товаров из yml файла.
Необходимо сделать это в очереди, для работы изначально был выбран RabbitMQ.
Мой опыт работы с очередями до этого сводится к работе с пакетами, которые предоставляют framework-и, так что я сомневаюсь правильно ли я выбрал подход к реализации задачи.
Сам импорт разделён на на три этапа.
1) Загрузка файла (через форму или с url)
2) Сопоставление категорий товаров
3) Запись товаров в базу и загрузка изображений к товарам.
Так же есть функционал обновления товаров.
Предыдущий разработчик использовал пакет для framework-a yii2, чтобы работать с rabbitmq, но этот подход не дал желаемого результата.
Сейчас решили вынести код работы с rabbitmq в отдельное приложение и работать без пакетов для yii2, используя php-amqplib.
На данный момент первый этап импорта (Загрузка файла) работает так:
1) Пользователь отправляет форму, в которое есть url к файлу импорта, id пользователя, настройки, необходимые для импота. Всё это сохраняется в базе данных для ведения истории импортов. При сохранении в базу срабатывает событие, по которому данные импорта передаются в очередь.
2) В очереди загружается файл, сохраняется на диск, отправляются уведомления пользователю и идут запись в лог импорта.
Пригаю участки кода, которые отвечают за выполнение данных действий.
Отправка формы:
if ($form->load(Yii::$app->request->post()) && $form->validate()) {
$command = new Create\Command(
userId: $form->user_id,
companyId: $form->company_id,
name: $form->name,
typeFile: $form->import_type,
typeImport: $form->type,
path: $form->website_url,
updateFields: $form->updated_fields,
updatePeriod: $form->update_period
);
try {
// в handle срабатывает событие, по которому отправляются данные в очередь
$importDataID = $this->handler->handle($command);
return $this->redirect(['/trade/yml-import/process', 'id' => $importDataID]);
} catch (DomainException $e) {
Yii::$app->session->setFlash('Error: ' . $e->getMessage());
}
}
Отправка данных в очередь:
public function handle(ImportAdded $event): void
{
$connection = $this->connection;
$channel = $this->connection->channel();
AMQPHelper::initNotifications($channel);
AMQPHelper::registerShutdown($connection, $channel);
$data = [
'type' => 'notification',
'userId' => $event->userId,
'companyId' => $event->companyId,
'file' => $event->file,
'dir' => self::DIRECTORY . $event->userId
];
$message = new AMQPMessage(
json_encode($data),
['content_type' => 'text/plain']
);
$channel->basic_publish($message, AMQPHelper::EXCHANGE_NOTIFICATIONS, 'note');
}
Обработка данных в очереди (это только черновик кода, главное порядок действий):
$channel->basic_consume(AMQPHelper::QUEUE_NOTIFICATIONS, $consumerTag, false, false, false, false, function($message) use ($output) {
/** @var AMQPMessageHelper $logger */
$logger = new AMQPMessageHelper($this->connection);
// отпрака сообщеня для записи в лог импорта
$logger->sendSuccess(AMQPHelper::QUEUE_CUSTOM_LOGGER, 'Upload file start.', time());
// change status
// код для смены статуса импорта
try {
$data = json_decode($message->body, true);
$content = file_get_contents($data['file']);
$basename = basename($data['file']);
if(!is_dir($data['dir'])) {
mkdir($data['dir']);
}
file_put_contents($data['dir'] . '/' . $basename, $content);
// отпрака сообщеня для записи в лог импорта
$logger->sendSuccess(AMQPHelper::QUEUE_CUSTOM_LOGGER, 'File is uploaded.', time());
} catch (RuntimeException $e) {
// отпрака сообщеня для записи в лог импорта
$logger->sendError(AMQPHelper::QUEUE_CUSTOM_LOGGER, 'Error: ' . $e->getMessage(), time());
// change status
// код для смены статуса импорта
}
try {
// отпрака сообщеня для записи в лог импорта
$logger->sendSuccess(AMQPHelper::QUEUE_CUSTOM_LOGGER, 'Start parsing file.', time());
$file = simplexml_load_file($data['dir'] . '/' . $basename);
// обработка файла с данными товаров
$newFile = YmlReader::reader($file, 1, $data['companyId'], $data['userId']);
file_put_contents($data['dir'] . '/importCategoryData.txt', serialize($newFile['arrayCategory']));
file_put_contents($data['dir'] . '/importData.txt', serialize($newFile['offer']));
// отпрака сообщеня для записи в лог импорта
$logger->sendSuccess(AMQPHelper::QUEUE_CUSTOM_LOGGER, 'Done parsing file.', time());
// change status
// код для смены статуса импорта
} catch (RuntimeException $e) {
// отпрака сообщеня для записи в лог импорта
$logger->sendError(AMQPHelper::QUEUE_CUSTOM_LOGGER, 'Error: ' . $e->getMessage(), time());
// change status
// код для смены статуса импорта
}
/** @var AMQPChannel $channel */
$channel = $message->delivery_info['channel'];
$channel->basic_ack($message->delivery_info['delivery_tag']);
});
По поводу третьего участка кода у меня возникли вопросы:
1) Насколько правильно обрабатывать в одной очереди и загрузку файла и парсинг самого файла? Если произойдёт какая-то ошибка при чтении файла
$newFile = YmlReader::reader($file, 1, $data['companyId'], $data['userId']);
то надо будет вернуть обратно в очередь для повторной попытки, но при этом опять всё пойдёт с начала, то есть с загрузки файла. Или надо делить это на две очереди, очередь загрузки файла и очередь чтения файла?
2) Запись сообщений в лог происходит в отдельной очереди, а для смены статуса импорта желательно отправлять сообщения в отдельную очередь или можно прямо здесь выполнять код для смены статуса?
И вопрос по третьему этапу импорта (Запись товаров в базу и загрузка изображений к товарам).
Количество товаров в файле доходит до нескольких тысяч, в среднем около 4-х тысяч. К каждому товару идёт от одного до десяти изображений. В коде, который достался от предыдущего разработчика в очередь отправляется весь файл целиком и все товары с одного файла обрабатываются в одной очереди, только изображения отправляются для обработки в одельной очереди. Я думаю, что это сильно напрягает саму очередь и нужно отправлять каждый товар в очередь по отдельности. Потому, что если произойдёт сбой, то вся очередь будет потеряна. Если же сделать очередь сохраняемой на диск, то после восстановления работы очереди всё пойдёт с начала, а это будет лишней нагрузкой на очередь. Насколько это будет правильно, отправка каждого товара отдельно в очередь? Или как это должно быть?
Так же есть ещё обновление товаров по расписанию. В этом случае по cron-у запускается код, который берёт все файлы и закидывает их в очередь. Получается, что все товары висят в одной очереди и ждут когда их проверят. Думаю создавать для каждого товара отдельную очередь, и после завершения обновления удалять эту очередь автоматически. Насколько это правильно? Или как будет правильно поступить в данной ситуации?