Задать вопрос

PHP и AMQP, как?

Необходимо организовать постоянный прием сообщений из RabbitMQ очереди в php.


Если использовать php extension (из pear) www.php.net/manual/en/book.amqp.php вместо клиента на php github.com/videlalvaro/php-amqplib, то скорость приема сообщений из очереди минимум в 2 раза быстрее.



В примерах на офф. сайте RabbitMQ в Python вызывается, за этим методом запрятан цикл приема данных, который в нужное время вызывает callback:
channel.start_consuming()


В php extension-е такого почему-то не предусмотрено. Есть AMQPQueue::consume() — возвращает заданное кол-во сообщений и AMQPQueue::get() — возвращает 1 сообщение и кол-во в очереди.

Но непонятно, как лучше в этом случае организовать цикл приема.


Первое, что приходит в голову, сделать как-то так:
while(true) {
    $message = $q->get(0); // Запрашиваем очередное сообщенеи из очереди
    if($message['count'] === -1) {
        // Если значение count равно -1, ждем дальше (сообщений в очереди еще нет)
        usleep(50000);
    } else {
        // Вызываем функцию обработчик сообщения
        if(rcv_msg($message)) {
            // Сообщаем, что сообщение принято, если обработчик вернул TRUE
            $q->ack($message['delivery_tag']);
        } else {
            break;
        }
    }
}
Если снижать задержку (usleep(...)) или совсем исключить, то нагрузка на sys cpu (активность php и сервиса rabbitmq) становится конечно стремная.

Если повышать — то конечно будет падать скорость.

Можно программно регуллировать длительность зарержки.


В php-amqplib (тот, что написан на php) в цикле приема задействован объект канала (такого в extension-е вобще нет).
// Loop as long as the channel has callbacks registered
while(count($ch->callbacks)) {
    $ch->wait(); // Ожидание сообщения происходит в недрах этого метода, цикл не выполняется пока их нет.
}
Такой код совсем не напрягает cpu.


Итак вопрос такой — как бы сделать чтобы и скорость не потерять и ресурсы не нагружать в моменты простоя?
  • Вопрос задан
  • 8020 просмотров
Подписаться 11 Оценить Комментировать
Пригласить эксперта
Ответы на вопрос 2
kibizoidus
@kibizoidus
Вы немного не разобрались в документации.

AMQPQueue::consume() - блокирующая функция, т.е. она будет держать поток выполнения до тех пор, пока не вернется сообщение.

Blocking function that will retrieve the next message from the queue as it becomes available and will pass it off to the callback.

AMQPQueue::get() - неблокирующая, т.е. если задания нет - сразу же вернет FALSE.

Retrieve the next available message from the queue. If no messages are present in the queue, this function will return FALSE immediately.

Т.е. все, что вам нужно сделать, выглядит примерно следующим образом:

while(true) {
    $q->consume("rcv_msg");
}


That's it. Заблокировав выполнение и дожидаясь RabbitMQ ничего не будет выполняться, с другой стороны сообщение моментально влетит в функцию выполнения, как только кролик пнет его в нужном направлении.
Ответ написан
Комментировать
Aco
@Aco
Заклинатель кода
Была схожая задача, решилась через redis и его операцию blpop + таймауты на коннекты
Ответ написан
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Похожие вопросы