Привет. Наигравшись с RabbitMQ и Swoole по отдельности, решил их объединить. Суть задачи:
1) Php приложение отдает сообщение RabbitMQ воркеру
2) RabbitMQ воркер принимает сообщение и транслирует его всем существующим сокет соединениям
Для начала создал Swoole сокет сервер который слушает соединения (swoole_sever.php). Запускаю его в одном окне терминала.
$server = new \swoole_websocket_server("0.0.0.0", 2345, SWOOLE_BASE);
$server->on('open', function(\Swoole\Websocket\Server $server, $req)
{
echo "connection open: {$req->fd}\n";
});
$server->on('message', function($server, \Swoole\Websocket\Frame $frame)
{
echo "received message: {$frame->data}\n";
$server->push($frame->fd, json_encode(["hello", "world"]));
});
$server->on('close', function($server, $fd)
{
echo "connection close: {$fd}\n";
});
$server->start();
Далее, в другом окне терминала запускается RabbitMQ воркер, который ждет сообщения (worker.php)
$connection = new AMQPStreamConnection('0.0.0.0', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->queue_declare('task_queue', false, true, false, false);
echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";
$callback = function($msg){
echo " [x] Received ", $msg->body, "\n";
sleep(substr_count($msg->body, '.'));
echo " [x] Done", "\n";
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
// Здесь я пытаюсь подключиться к сокету и отправить сообщение
$cli = new \swoole_http_client('0.0.0.0', 2345);
$cli->on('message', function ($_cli, $frame) {
var_dump($frame);
});
$cli->upgrade('/', function($cli)
{
$cli->push('This is the message to send to Swoole server');
$cli->close();
});
};
$channel->basic_qos(null, 1, null);
$channel->basic_consume('task_queue', '', false, false, false, false, $callback);
while(count($channel->callbacks)) {
$channel->wait();
}
$channel->close();
$connection->close();
Создаю новую задачу в которой будет отправляться сообщение RabbitMQ воркеру (task.php)
$connection = new AMQPStreamConnection('0.0.0.0', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->queue_declare('task_queue', false, true, false, false);
$data = implode(' ', array_slice($argv, 1));
if(empty($data)) $data = "Hello World!";
$msg = new AMQPMessage($data,
array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)
);
$channel->basic_publish($msg, '', 'task_queue');
echo " [x] Sent ", $data, "\n";
$channel->close();
$connection->close();
И наконец запускаю задачу
php new_task.php
В итоге задача успешно доходит до воркера, тот успешно отрабатывает функцию обратного вызова
$channel->basic_consume('task_queue', '', false, false, false, false, $callback);
А вот в Swoole сокет сервер ничего не приходит. Даже в терминале не видно сообщения "connection open". Однако, если после этого остановить воркер в терминале, то в окне где работает сокет сервер срабатывает функция
$server->on('close', function($server, $fd)
Как такое вообще возможно? Уже 2ой день пытаюсь разобраться :(