Задать вопрос
JastaFly
@JastaFly

PHP и RabbitMQ ошибка 504 Gateway Time-out в консьюмере?

Есть вот такой вот класс консьюмер, ожидающий сообщение от реббита:
<?php

namespace App\Infrastructure;

use App\Application\Interfaces\BankServiceInterface;
use App\Application\Interfaces\ConsumerInterface;
use App\Application\Interfaces\MailAgentInterface;
use PhpAmqpLib\Channel\AMQPChannel;
use PhpAmqpLib\Connection\AMQPStreamConnection;

class Consumer implements ConsumerInterface
{
    private AMQPStreamConnection $connection;
    private AMQPChannel $channel;
    private BankServiceInterface $service;
    private MailAgentInterface $mailAgent;

    public function __construct(AMQPStreamConnection $connection, BankServiceInterface $service, MailAgentInterface $mailAgent)
    {
        $this->connection = $connection;
        $this->channel = $connection->channel();
        $this->service = $service;
        $this->mailAgent = $mailAgent;
    }

    public function runFromQueue(string $queueName): void
    {
    	$this->channel->queue_declare(
            $queueName,
            false,
            true,
            false,
            false
        );

        $this->channel->basic_qos(null, 1, null);
        $this->channel->basic_consume(
            $queueName,
            '',
            false,
            false,
            false,
            false,
            $this->onConsume()
        );

        while(count($this->channel->callbacks)) {
            $this->channel->wait();
	    }

	    $this->closeConnection();
    }

    public function closeConnection(): void
    {
        $this->channel->close();
        $this->connection->close();
    }

    private function onConsume()
    {
        return function ($request) {
            echo '<pre>';
            var_dump($request->getBody());
            echo '</pre>';
        };
    }
}

И при инициализации данного класса его работа каждый раз завершается ошибкой 504 Gateway Time-out, но если из метода onConsume(), удалить вызов метода getBody():
private function onConsume()
    {
        return function ($request) {
            echo '<pre>';
            var_dump($request);
            echo '</pre>';
        };

То всё норм работает, вижу вывод объекта так нужное боди.
Аналогично получаю ошибку 504 Gateway Time-out, если вместо геттра обращаться к свойству на прямую:
private function onConsume()
    {
        return function ($request) {
            echo '<pre>';
            var_dump($request->body);
            echo '</pre>';
        };

Как будто бы цикл в методе runFromQueue(), зацикливается и крутиться пока сервер его не отрубит по таймауту. Тогда я попробовал заменить цикл на метод consume() в методе runFromQueue():
public function runFromQueue(string $queueName): void
    {
    	$this->channel->queue_declare(
            $queueName,
            false,
            true,
            false,
            false
        );

        $this->channel->basic_qos(null, 1, null);
        $this->channel->basic_consume(
            $queueName,
            '',
            false,
            false,
            false,
            false,
            $this->onConsume()
        );

        try {
            $this->channel->consume();
        } catch (\Throwable $exception) {
            echo $exception->getMessage();
        }

	    $this->closeConnection();
    }

Но эффект был аналогичен, всё работает с выводом всего объекта $request, но стоит вызвать метод getBody() или обратиться к свойству body, получаю ошибку 504 Gateway Time-out.
Тогда я придумал костыль, который бы останавливал цикл после чтения тела сообщения. Для этого добавил классу новое свойство:
private $isMessageRead = true;
И завязал работу цикла в методе runFromQueue() на него:
public function runFromQueue(string $queueName): void
    {
    	$this->channel->queue_declare(
            $queueName,
            false,
            true,
            false,
            false
        );

        $this->channel->basic_qos(null, 1, null);
        $this->channel->basic_consume(
            $queueName,
            '',
            false,
            false,
            false,
            false,
            $this->onConsume()
        );

        while($this->isMessageRead) {
            $this->channel->wait();
	    }

	    $this->closeConnection();
    }

И соответственно менял значение этого свойства в методе onConsume():
private function onConsume()
    {
        return function ($request) {
            echo '<pre>';
            var_dump($request->getBody());
            echo '</pre>';

            $this->isMessageRead = false;
        };
    }

И всё прекрасно работает, тело сообщения выводится. Пожалуйста подскажите пожалуйста в чём проблема и можно ли как-то заставить всё работь без моего костыля?!?
  • Вопрос задан
  • 99 просмотров
Подписаться 2 Средний 2 комментария
Пригласить эксперта
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Похожие вопросы