Добрый вечер.
Имеется проект - импорт товаров из файла yml.
Используются:
1) Phpamqplib
2) Rabbitmq
3) Docker
4) Supervisor.
5) MySql
Файл yml читается в массив, в цикле каждый товар ставится в очередь rabbitmq
public function handle(Command $command): void
{
$connection = $this->connection;
$channel = $connection->channel();
// Код методов ниже в вопросе.
AMQPHelper::initProducts($channel);
AMQPHelper::registerShutDown($connection, $channel);
foreach ($command->importData as $product) {
$data = [
'type' => 'products',
'importData' => $product,
/**********************/
];
$message = new AMQPMessage(
json_encode($data),
['content_type' => 'text/plain']
);
$channel->basic_publish($message, AMQPHelper::EXCHANGE_PRODUCTS, 'products');
}
}
Обработка в consumer-e
public function execute(InputInterface $input, OutputInterface $output)
{
$connection = $this->connection;
$channel = $connection->channel();
// Код методов ниже в вопросе.
AMQPHelper::initProducts($channel);
AMQPHelper::registerShutDown($connection, $channel);
$consumeTag = 'consume_' . getmypid();
$channel->basic_consume(AMQPHelper::QUEUE_PRODUCTS, $consumeTag, false, false, false, false, function($message) use ($output) {
$data = json_decode($message->body, true);
$product = $data['importData'];
$logger = new AMQPMessageHelper($this->connection);
try {
$command = new Create\Command(
Id::next(),
$data['userId'],
$product['company_id']
/*********************/
);
$this->handler->handle($command);
} catch (Exception $e) {
$logger->sendError(AMQPHelper::QUEUE_CUSTOM_LOGGER, $e->getMessage(), $data['importId']);
}
/** @var AMQPChannel $channel */
$channel = $message->delivery_info['channel'];
$channel->basic_ack($message->delivery_info['delivery_tag']);
});
while($channel->is_open()) {
$channel->wait();
}
return 0;
}
Статические методы используемые выше в коде
public static function initProducts(AMQPChannel $channel): void
{
$channel->queue_declare(self::QUEUE_PRODUCTS, false, true, false, false);
$channel->exchange_declare(self::EXCHANGE_PRODUCTS, 'fanout');
$channel->queue_bind(self::QUEUE_PRODUCTS, self::EXCHANGE_PRODUCTS, 'products');
$channel->basic_qos((int)null, 1, (bool)null);
}
public static function registerShutDown(AMQPStreamConnection $connection, AMQPChannel $channel): void
{
register_shutdown_function(function(AMQPChannel $channel, AMQPStreamConnection $connection){
$channel->close();
$connection->close();
}, $channel, $connection);
}
Проблема в том, что каждый последующий импорт увеличивается по времени выполненения.
На локальном:
1-й импорт 9 секунд
2-й импорт 11 секунд
3-й импотр 15 секунд
..................................
10-й импорт 51 секунда
На vps:
1-й импорт 28 секунд
2-й импорт 50 секунд
3-й импорт 1 минута 14 секунд
.................................................
10-й импорт 3 минуты 11 секунд
Для тестирования использовался один и тот же файл, количество товаров не менялось.
Если делать restart контейнера rabbitmq или supervisor, то время выполнения будет минимальным, но с каждым новым импортом будет снова увеличиваться.
Как можно решить данную проблему? Что именно не так в моём коде?
p.s
Docker-compose
php-fpm:
image: ${REGISTRY}/import:php-fpm-${IMAGE_TAG}
restart: always
depends_on:
- import-mysql
import-php-cli:
image: ${REGISTRY}/import:php-cli-${IMAGE_TAG}
depends_on:
- import-mysql
- import-rabbitmq
supervisor:
image: ${REGISTRY}/import:supervisor-${IMAGE_TAG}
restart: always
depends_on:
- import-rabbitmq
import-rabbitmq:
image: ${REGISTRY}/import:rabbitmq-${IMAGE_TAG}
hostname: import-prod
environment:
RABBITMQ_ERLANG_COOKIE: SQASLDFJQESLKDFJT
RABBITMQ_DEFAULT_USER: ${RABBITMQ_DEFAULT_USER}
RABBITMQ_DEFAULT_PASS: ${RABBITMQ_DEFAULT_PASS}
RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS: -rabbit disk_free_limit 2147483648
ports:
- "5672:5672"
- "15672:15672"
volumes:
- import-rabbitmq:/var/lib/rabbitmq
p.s.s
Config supervisor
Создал три похожие команды, итого на одну очередь 15 обработчиков.
[program:amqp1]
command=/usr/local/bin/php ImportAMQP/bin/app amqp:products:consume
process_name=%(program_name)s_%(process_num)02d
stdout_logfile=/app/logs/supervisor.log
stderr_logfile=/app/logs/supervisor_err.log
numprocs=5
autostart=true
autorestart=true
user=root
stopasgroup=true
killasgroup=true
startsecs=5
startretries=10