Как правильно обработать товары в очереди?

Добрый день.

Есть проект, в котором есть функционал импорта товаров из yml файла.

Необходимо сделать это в очереди, для работы изначально был выбран RabbitMQ.

Мой опыт работы с очередями до этого сводится к работе с пакетами, которые предоставляют framework-и, так что я сомневаюсь правильно ли я выбрал подход к реализации задачи.

Сам импорт разделён на на три этапа.
1) Загрузка файла (через форму или с url)
2) Сопоставление категорий товаров
3) Запись товаров в базу и загрузка изображений к товарам.

Так же есть функционал обновления товаров.

Предыдущий разработчик использовал пакет для framework-a yii2, чтобы работать с rabbitmq, но этот подход не дал желаемого результата.
Сейчас решили вынести код работы с rabbitmq в отдельное приложение и работать без пакетов для yii2, используя php-amqplib.

На данный момент первый этап импорта (Загрузка файла) работает так:

1) Пользователь отправляет форму, в которое есть url к файлу импорта, id пользователя, настройки, необходимые для импота. Всё это сохраняется в базе данных для ведения истории импортов. При сохранении в базу срабатывает событие, по которому данные импорта передаются в очередь.

2) В очереди загружается файл, сохраняется на диск, отправляются уведомления пользователю и идут запись в лог импорта.

Пригаю участки кода, которые отвечают за выполнение данных действий.

Отправка формы:

if ($form->load(Yii::$app->request->post()) && $form->validate()) {

            $command = new Create\Command(
                userId: $form->user_id,
                companyId: $form->company_id,
                name: $form->name,
                typeFile: $form->import_type,
                typeImport: $form->type,
                path: $form->website_url,
                updateFields: $form->updated_fields,
                updatePeriod: $form->update_period
            );

            try {
                // в handle срабатывает событие, по которому отправляются данные в очередь
                $importDataID = $this->handler->handle($command);
                return $this->redirect(['/trade/yml-import/process', 'id' => $importDataID]);
            } catch (DomainException $e) {
                Yii::$app->session->setFlash('Error: ' . $e->getMessage());
            }

        }


Отправка данных в очередь:

public function handle(ImportAdded $event): void
    {
        $connection = $this->connection;

        $channel = $this->connection->channel();

        AMQPHelper::initNotifications($channel);
        AMQPHelper::registerShutdown($connection, $channel);

            $data = [
                'type' => 'notification',
                'userId' => $event->userId,
                'companyId' => $event->companyId,
                'file' => $event->file,
                'dir' => self::DIRECTORY . $event->userId
            ];

            $message = new AMQPMessage(
                json_encode($data),
                ['content_type' => 'text/plain']
            );

            $channel->basic_publish($message, AMQPHelper::EXCHANGE_NOTIFICATIONS, 'note');
    }

Обработка данных в очереди (это только черновик кода, главное порядок действий):

$channel->basic_consume(AMQPHelper::QUEUE_NOTIFICATIONS, $consumerTag, false, false, false, false, function($message) use ($output) {

            /** @var AMQPMessageHelper $logger */
            $logger = new AMQPMessageHelper($this->connection);

            // отпрака сообщеня для записи в лог импорта
            $logger->sendSuccess(AMQPHelper::QUEUE_CUSTOM_LOGGER, 'Upload file start.', time());
            // change status
            // код для смены статуса импорта

            try {
                $data = json_decode($message->body, true);

                $content = file_get_contents($data['file']);

                $basename = basename($data['file']);

                if(!is_dir($data['dir'])) {
                    mkdir($data['dir']);
                }

                file_put_contents($data['dir'] . '/' . $basename, $content);

                // отпрака сообщеня для записи в лог импорта
                $logger->sendSuccess(AMQPHelper::QUEUE_CUSTOM_LOGGER, 'File is uploaded.', time());

            } catch (RuntimeException $e) {
                // отпрака сообщеня для записи в лог импорта
                $logger->sendError(AMQPHelper::QUEUE_CUSTOM_LOGGER, 'Error: ' . $e->getMessage(), time());
                // change status
                // код для смены статуса импорта
            }

            try {
                // отпрака сообщеня для записи в лог импорта
                $logger->sendSuccess(AMQPHelper::QUEUE_CUSTOM_LOGGER, 'Start parsing file.', time());

                $file = simplexml_load_file($data['dir'] . '/' . $basename);
                
                // обработка файла с данными товаров
                $newFile = YmlReader::reader($file, 1, $data['companyId'], $data['userId']);

                file_put_contents($data['dir'] . '/importCategoryData.txt', serialize($newFile['arrayCategory']));
                file_put_contents($data['dir'] . '/importData.txt', serialize($newFile['offer']));
                
                // отпрака сообщеня для записи в лог импорта
                $logger->sendSuccess(AMQPHelper::QUEUE_CUSTOM_LOGGER, 'Done parsing file.', time());
                // change status
                // код для смены статуса импорта

            } catch (RuntimeException $e) {
                // отпрака сообщеня для записи в лог импорта
                $logger->sendError(AMQPHelper::QUEUE_CUSTOM_LOGGER, 'Error: ' . $e->getMessage(), time());
                // change status
                // код для смены статуса импорта
            }
              /** @var AMQPChannel $channel */
              $channel = $message->delivery_info['channel'];
              $channel->basic_ack($message->delivery_info['delivery_tag']);
        });

По поводу третьего участка кода у меня возникли вопросы:

1) Насколько правильно обрабатывать в одной очереди и загрузку файла и парсинг самого файла? Если произойдёт какая-то ошибка при чтении файла

$newFile = YmlReader::reader($file, 1, $data['companyId'], $data['userId']);


то надо будет вернуть обратно в очередь для повторной попытки, но при этом опять всё пойдёт с начала, то есть с загрузки файла. Или надо делить это на две очереди, очередь загрузки файла и очередь чтения файла?

2) Запись сообщений в лог происходит в отдельной очереди, а для смены статуса импорта желательно отправлять сообщения в отдельную очередь или можно прямо здесь выполнять код для смены статуса?

И вопрос по третьему этапу импорта (Запись товаров в базу и загрузка изображений к товарам).

Количество товаров в файле доходит до нескольких тысяч, в среднем около 4-х тысяч. К каждому товару идёт от одного до десяти изображений. В коде, который достался от предыдущего разработчика в очередь отправляется весь файл целиком и все товары с одного файла обрабатываются в одной очереди, только изображения отправляются для обработки в одельной очереди. Я думаю, что это сильно напрягает саму очередь и нужно отправлять каждый товар в очередь по отдельности. Потому, что если произойдёт сбой, то вся очередь будет потеряна. Если же сделать очередь сохраняемой на диск, то после восстановления работы очереди всё пойдёт с начала, а это будет лишней нагрузкой на очередь. Насколько это будет правильно, отправка каждого товара отдельно в очередь? Или как это должно быть?

Так же есть ещё обновление товаров по расписанию. В этом случае по cron-у запускается код, который берёт все файлы и закидывает их в очередь. Получается, что все товары висят в одной очереди и ждут когда их проверят. Думаю создавать для каждого товара отдельную очередь, и после завершения обновления удалять эту очередь автоматически. Насколько это правильно? Или как будет правильно поступить в данной ситуации?
  • Вопрос задан
  • 65 просмотров
Пригласить эксперта
Ответы на вопрос 2
1) Насколько правильно обрабатывать в одной очереди и загрузку файла и парсинг самого файла?
Если произойдёт какая-то ошибка при чтении файла
Пока я не вижу смысла в дополнительной очереди. При скачивании нужно определять код статуса и если возвращает 20x, то можно приступать к чтению. Если 40x, то это баг клиента. Если 50x или сбой связи, то можно сохранить в таблице с временем последней попытки, чтобы можно было выбирать когда нужно провести следующую попытку.
Если файл скачан без ошибок и он цел, то проблем с чтением файла быть не должно, при условии соблюдения формата файла. А если скачан частично при коде 20x, то повторная обработка ничего не даст.
Скачанный файл можно удалять, если импорт окончен. А при сбое можно заново не скачивать - экономия времени.
Смену статуса импорта сделать сразу по окончанию обработки.

2) Запись сообщений в лог происходит в отдельной очереди, а для смены статуса импорта желательно отправлять сообщения в отдельную очередь или можно прямо здесь выполнять код для смены статуса?
не понял почему нужна очередь для лога.

только изображения отправляются для обработки в одельной очереди. Я думаю, что это сильно напрягает саму очередь
сами изображения отправляются или что-то другое? Если первое, то это нелогично и это действительно нагружает очередь. Зачем все товары слать в очередь? Разве в очереди импорта нельзя считывать всё с файла и записывать сразу в СУБД? Это ведь никого задерживать не будет.

Получается, что все товары висят в одной очереди и ждут когда их проверят.
Нет, очередь нужна для обработки без ожидания. А для хранения используется СУБД с пометкой последней проверки и, возможно, временем, когда товар надо снова обновить. Какой-то периодический процесс может опрашивать таблицу: у кого протухли товары, отзовись!
Ответ написан
@oldzas
Вы читаете файл построчно или целиком? Смысл использования rabbit если вы читаете построчно и каждую позицию отправляете в rabbit - но тут надо решать еще одну задачу - проверку дублей - вы отправляли такой товар в очередь или еще нет (привет redis).
Ответ написан
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Войти через центр авторизации
Похожие вопросы