Server Side Events PHP и ZeroMQ?

Привет!
Хочу сделать Server Side Events с помощью ZeroMQ. Так как основная документация на английском, а с ним у меня плохо, приходится трудно. На гуглил такую реализацию.

Stream.php
<?php
$context = new ZMQContext();

$sock = $context->getSocket(ZMQ::SOCKET_SUB);
$sock->setSockOpt(ZMQ::SOCKOPT_SUBSCRIBE, "");
$sock->connect("tcp://127.0.0.1:4446");

set_time_limit(0);
ini_set('memory_limit', '100M');

header("Content-Type: text/event-stream");
header("Cache-Control: no-cache");

while (true) {
    $msg = $sock->recv();
    $event = json_decode($msg, true);
    if (isset($event['type'])) {
        echo "event: {$event['type']}\n";
    }
    $data = json_encode($event['data']);
    echo "data: $data\n\n";
    ob_flush();
    flush();
}


Router.php
<?php
$context = new ZMQContext();

$pull = $context->getSocket(ZMQ::SOCKET_PULL);
$pull->bind("tcp://*:4445");

$pub = $context->getSocket(ZMQ::SOCKET_PUB);
$pub->bind("tcp://*:4446");

while (true) {
    $msg = $pull->recv();
    echo "publishing received message $msg\n";
    $pub->send($msg);
}


и Send.php
<?php
if(isset($_POST['msg']) && !empty($_POST['msg'])){
    $reqData = $_POST['msg'];
}else{
    $reqData = 'Lorem Ipsum We dont want to see this';
}

$context = new ZMQContext();

$sock = $context->getSocket(ZMQ::SOCKET_PUSH);
$sock->connect("tcp://127.0.0.1:4445");

$msg = json_encode(array('type' => 'debug', 'data' => array('foo', 'bar', 'baz')));
$sock->send($msg);

$msg = json_encode(array('data' => array($reqData,'foo', 'bar', 'baz')));
$sock->send($msg);

//throw new Exception('error');
// OR
// Fatal error
// Uncaught exception ZMQSocketException with message Failed to disconnect the ZMQ socket: Invalid argument
//$sock->disconnect(null);


Клиент
$( document ).ready(function() {
		  var stream = new EventSource('/stream.php');

		  stream.addEventListener('debug', function (event) {
			  var data = JSON.parse(event.data);
			  console.log([event.type, data]);
		  });

		  stream.addEventListener('message', function (event) {
			  var data = JSON.parse(event.data);
              $('#response').prepend(data[0]+'<br/>');
			  console.log([event.type, data]);
		  });

		  $("#click").click(function(){
			  setData($('#text').val());
		  });
	  });

function setData(data){
    if(data == '' || data == ' ' || data == undefined){
        var reqData = 'test';
    }else{
        var reqData = data;
    }
	$.ajax({
		url:"/send.php",
		type:"POST",
		data:{msg:reqData},
		cahce:false,
		async:true,
		success:function(result){
			//getData();
		}
	});
}


Но к сожалению не совсем понял как принимаются и отправляются данные.
Вернее по коду то понятно все, но на деле не совсем.

если при отправке не вызвать $sock->disconnect(null); или throw new Exception('error');
то данные не отправляются. Если вызвать Exception то данные приходят. Почему так происходит не могу понять...

И как можно получить данные если подписываться только на определенные данные: $sock->setSockOpt(ZMQ::SOCKOPT_SUBSCRIBE, "user_id_1");
На сколько я понял нужно вторым параметром передавать MODE_SNDMORE
$sock->send('user_id_1',ZMQ::MODE_SNDMORE);
$sock->send($msg);

Но сообщение также не доходит..
  • Вопрос задан
  • 538 просмотров
Решения вопроса 1
aizhar777
@aizhar777 Автор вопроса
Решил сам!
Нужно добавлять $persistent_id то есть
$sock = $context->getSocket(ZMQ::SOCKET_PUSH,  $persistent_id);

Тогда отправлятся нормально!
Ответ написан
Комментировать
Пригласить эксперта
Ответы на вопрос 1
t-alexashka
@t-alexashka
Сразу пишу legacy код
Это Ваша первая попытка понять SSE? Возможно есть модули попроще чем ZMQ?

Вот тут очень похоже на ваш вопрос: stackoverflow.com/questions/7469396/how-to-impleme...
Ответ написан
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Войти через центр авторизации
Похожие вопросы