Как избежать увеличения времени при выполнении задачи?

Добрый вечер.

Имеется проект - импорт товаров из файла yml.
Используются:
1) Phpamqplib
2) Rabbitmq
3) Docker
4) Supervisor.
5) MySql

Файл yml читается в массив, в цикле каждый товар ставится в очередь rabbitmq

public function handle(Command $command): void
    {
        $connection = $this->connection;

        $channel = $connection->channel();
        // Код методов ниже в вопросе.
        AMQPHelper::initProducts($channel);
        AMQPHelper::registerShutDown($connection, $channel);

        foreach ($command->importData as $product) {
            $data = [
                'type' => 'products',
                'importData' => $product,
                /**********************/
            ];

            $message = new AMQPMessage(
                json_encode($data),
                ['content_type' => 'text/plain']
            );

            $channel->basic_publish($message, AMQPHelper::EXCHANGE_PRODUCTS, 'products');
        }
    }


Обработка в consumer-e

public function execute(InputInterface $input, OutputInterface $output)
    {
        $connection = $this->connection;

        $channel = $connection->channel();
        // Код методов ниже в вопросе.
        AMQPHelper::initProducts($channel);
        AMQPHelper::registerShutDown($connection, $channel);

        $consumeTag = 'consume_' . getmypid();

        $channel->basic_consume(AMQPHelper::QUEUE_PRODUCTS, $consumeTag, false, false, false, false, function($message) use ($output) {

            $data = json_decode($message->body, true);

            $product = $data['importData'];

            $logger = new AMQPMessageHelper($this->connection);

            try {

                $command = new Create\Command(
                    Id::next(),
                    $data['userId'],
                    $product['company_id']
                    /*********************/
                );

                $this->handler->handle($command);

            } catch (Exception $e) {
                $logger->sendError(AMQPHelper::QUEUE_CUSTOM_LOGGER, $e->getMessage(), $data['importId']);
            }

            /** @var AMQPChannel $channel */
            $channel = $message->delivery_info['channel'];
            $channel->basic_ack($message->delivery_info['delivery_tag']);
        });

         while($channel->is_open()) {
            $channel->wait();
        }
        return 0;
    }


Статические методы используемые выше в коде

public static function initProducts(AMQPChannel $channel): void
    {
        $channel->queue_declare(self::QUEUE_PRODUCTS, false, true, false, false);
        $channel->exchange_declare(self::EXCHANGE_PRODUCTS, 'fanout');
        $channel->queue_bind(self::QUEUE_PRODUCTS, self::EXCHANGE_PRODUCTS, 'products');

        $channel->basic_qos((int)null, 1, (bool)null);
    }

    public static function registerShutDown(AMQPStreamConnection $connection, AMQPChannel $channel): void
    {
        register_shutdown_function(function(AMQPChannel $channel, AMQPStreamConnection $connection){
            $channel->close();
            $connection->close();
        }, $channel, $connection);
    }


Проблема в том, что каждый последующий импорт увеличивается по времени выполненения.
На локальном:
1-й импорт 9 секунд
2-й импорт 11 секунд
3-й импотр 15 секунд
..................................
10-й импорт 51 секунда

На vps:
1-й импорт 28 секунд
2-й импорт 50 секунд
3-й импорт 1 минута 14 секунд
.................................................
10-й импорт 3 минуты 11 секунд

Для тестирования использовался один и тот же файл, количество товаров не менялось.

Если делать restart контейнера rabbitmq или supervisor, то время выполнения будет минимальным, но с каждым новым импортом будет снова увеличиваться.

Как можно решить данную проблему? Что именно не так в моём коде?

p.s
Docker-compose
php-fpm:
    image: ${REGISTRY}/import:php-fpm-${IMAGE_TAG}
    restart: always
    depends_on:
      - import-mysql

  import-php-cli:
    image: ${REGISTRY}/import:php-cli-${IMAGE_TAG}
    depends_on:
      - import-mysql
      - import-rabbitmq

  supervisor:
    image: ${REGISTRY}/import:supervisor-${IMAGE_TAG}
    restart: always
    depends_on:
      - import-rabbitmq

  import-rabbitmq:
    image: ${REGISTRY}/import:rabbitmq-${IMAGE_TAG}
    hostname: import-prod
    environment:
      RABBITMQ_ERLANG_COOKIE: SQASLDFJQESLKDFJT
      RABBITMQ_DEFAULT_USER: ${RABBITMQ_DEFAULT_USER}
      RABBITMQ_DEFAULT_PASS: ${RABBITMQ_DEFAULT_PASS}
      RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS: -rabbit disk_free_limit 2147483648
    ports:
    - "5672:5672"
    - "15672:15672"
    volumes:
      - import-rabbitmq:/var/lib/rabbitmq


p.s.s
Config supervisor
Создал три похожие команды, итого на одну очередь 15 обработчиков.
[program:amqp1]
command=/usr/local/bin/php ImportAMQP/bin/app amqp:products:consume
process_name=%(program_name)s_%(process_num)02d
stdout_logfile=/app/logs/supervisor.log
stderr_logfile=/app/logs/supervisor_err.log
numprocs=5
autostart=true
autorestart=true
user=root
stopasgroup=true
killasgroup=true
startsecs=5
startretries=10
  • Вопрос задан
  • 605 просмотров
Решения вопроса 1
@dzhebrak
А у вас, случаем, не doctrine используется (судя по $this->repository->add($trades); $this->flusher->flush();)?

Если да, то в ней применяется паттерн Identity Map https://www.doctrine-project.org/projects/doctrine... , т.е. хранятся все сущности, которые вы загружали из бд или добавляли через метод persist(). Это приводит к тому, что doctrine нужно перебирать эти сущности при вызове flush(), а так как их количество постоянно увеличивается, то и затрачиваемое время будет также увеличиваться. Признаком этой проблемы будет постепенное увеличение потребляемой памяти в consumer-e (можете проверить с помощью memory_get_usage()). Решением будет вызов в consumer-e EntityManager#clear() после EntityManager#flush(). Это очистит identity map.

Еще одна возможная проблема (тоже при использовании doctrine) - логирование sql запросов. Признаки те же - увеличение потребляемой памяти в consumer-e. Решение - отключить логирование sql запросов (или переписать логирование так, как вам нужно):

$em->getConnection()->getConfiguration()->setSQLLogger(null);

или так (в более новых версиях doctrine):

$em->getConnection()->getConfiguration()->setMiddlewares([new \Doctrine\DBAL\Logging\Middleware(new \Psr\Log\NullLogger())]);
Ответ написан
Пригласить эксперта
Ответы на вопрос 1
@iljaGolubev
Вот этот кусок подозрительный:
while($channel->is_open()) {
            $channel->wait();
        }


Может так сделать?
while(count($channel->callbacks)) {
    $channel->wait();
}
Ответ написан
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Похожие вопросы