Как правильно использовать pyzmq ioloop c multithreading?

Для начала я поясню, чего я хочу добиться с помощью кода, который я написал.

Логика приложения должна быть развязана от системы передачи сообщений.
Над системой передачи сообщений должна быть абстрактная обертка, позволяющая избавиться от дублирующегося кода.
В одном процессе должны спокойно уживаться две разных системы передачи информации, например rabbitmq и zeromq.

И вот что пока получилось

class BaseZmqNode():
    __metaclass__ = ABCMeta

    def __init__(self, host, port,  bind, hwm):
        self.node = self.create_node()
        self.node.host = host
        self.port = port
        self.context = zmq.Context().instance()
        self.socket = self.create_socket()
        if bind:
            self.socket.bind(self.build_address(host, port))
        else:
            self.socket.connect(self.build_address(host, port))
        self.set_hwm(hwm)

    def set_hwm(self, hwm):
        self.socket.set_hwm(hwm)

    def send_multipart(self, message):
        self.socket.send_multipart(message)

    def send_json(self, json):
        self.socket.send_json(json)
    @abstractmethod
    def create_node(self):
        return BaseMessagingNode()

    def close(self):
        self.socket.close()

    @staticmethod
    def build_address(host, port):
        strings = [host, ':', str(port)]
        return ''.join(strings)

    @abstractmethod
    def create_socket(self):
        pass


Это базовый класс для всех узлов типа pub, sub etc.

class BaseZmqReceiver(BaseZmqNode):

    __metaclass__ = ABCMeta

    def __init__(self, host, port, hwm, bind, on_receive_callback):
        super(BaseZmqReceiver, self).__init__(host=host, port=port, bind=bind, hwm=hwm)
        self.node.on_message_callback = on_receive_callback
        self.stream = ZMQStream(self.socket)
        self.stream.on_recv(self.on_message_received)
        self.runner = ZmqLoopRunner(self.on_close)
        self.runner.start()

    def on_message_received(self, message):
        return self.node.on_message_callback(message)

    def create_node(self):
        return ReceivingNode(None, None)

    def on_close(self):

        self.stream.close()
        self.socket.close()

    def close(self):
        # super(BaseZmqReceiver, self).close()

        self.runner.stop()

Это базовый класс для всех узлов, которые используют zmq ioloop.
class ZmqLoopRunner(Thread):

    def __init__(self, callback):
        super(ZmqLoopRunner, self).__init__()
        self.loop = IOLoop.current()
        self.callback = callback

    def run(self):
        self.loop.start()
        self.callback()

    def stop(self):
        self.loop.stop()


Loop помещен в отдельный поток, чтобы он не блокировал приложение.

class ZmqTest(AbstractMessagingTest):

    def setUp(self):
        super(ZmqTest, self).setUp()
        self.multipart_messages = self.create_multipart_messages(10)

    def tearDown(self):
        super(ZmqTest, self).tearDown()

    def test_request_reply(self):
        try:
            requester = ZmqReq(host='tcp://localhost', port=6000)
            self.request = 'Hello'
            self.reply = 'World!'
            replier = ZmqRep(host='tcp://*', port=6000, request_processor=self.on_request_received)
            self.assertEqual(self.reply, requester.execute(request=self.request))
        except Exception as ex:
            print(ex)
        finally:
            replier.close()
            requester.close()

    def test_address_creation(self):
        full_address = "tcp://localhost:5559"
        self.assertEqual(full_address, ZmqSubscriber.build_address("tcp://localhost", 5559))
        self.assertEqual('tcp://*:6000', ZmqPublisher.build_address("tcp://*", 6000))

    def test_publisher_subscriber(self):

        print('Testing pub sub')
        publisher = ZmqPublisher('tcp://*', 6000)
        subscriber = ZmqSubscriber('tcp://localhost', 6000, self.handle_message)
        self.send_messages(publisher, wait=False)
        sleep(0.5)
        self.assertSequenceEqual(self.test_messages, self.received_messages)
        publisher.close()
        subscriber.close()

    def handle_message(self, message):
        self.base_handle_message(message[0])

    def test_send_json(self):

        print('Testing send json')
        publisher = ZmqPublisher('tcp://*', 6000)
        subscriber = ZmqSubscriber('tcp://localhost', 6000, self.handle_json_message)
        md = {'1' : 1}
        publisher.send_json(md)
        publisher.close()
        subscriber.close()

    def create_multipart_messages(self, size):
        messages = []
        for i in range(size):
            messages.append(['Multipart test message', str(i)])
        return messages

    def send_multipart_messages(self, sender):
        for message in self.multipart_messages:
            sender.send_multipart(message)


    def test_multipart_messages(self):

        print('Testing multipart')
        publisher = ZmqPublisher('tcp://*', 6000)
        subscriber = ZmqSubscriber('tcp://localhost', 6000, self.base_handle_message)
        self.send_multipart_messages(publisher)
        sleep(0.5)
        self.assertSequenceEqual(self.multipart_messages, self.received_messages)
        publisher.close()
        subscriber.close()

    def test_push_pull_multipart(self):

        print('Testing pushpull multipart')
        ventilator = ZmqPush('tcp://*', 6000)
        worker = ZmqPull('tcp://localhost', 6000, self.base_handle_message)
        self.send_multipart_messages(ventilator)
        sleep(0.5)
        self.assertSequenceEqual(self.multipart_messages, self.received_messages)
        ventilator.close()
        worker.close()


    def handle_json_message(self, json):
        print(str(json))

    def test_push_pull(self):

        print('Testing push pull')
        ventilator = ZmqPush('tcp://*', 6000)
        worker = ZmqPull('tcp://localhost', 6000, self.handle_message)
        self.send_messages(ventilator, wait=False)
        sleep(0.5)
        self.assertSequenceEqual(self.test_messages, self.received_messages)
        ventilator.close()
        worker.close()

    def on_request_received(self, message):
        if message[0] == self.request:
            return self.reply
        else:
            return 'ERROR'


А вот тесты. В зависимости от того куда я помещу код закрытия stream и socket они могут проходить, но при этом иногда выкидывать исключение. Например такое.

File "/opt/leos/code/messaging_system/zeromq/ZmqLoopRunner.py", line 12, in run
    self.loop.start()
  File "/Library/Python/2.7/site-packages/zmq/eventloop/ioloop.py", line 160, in start
    super(ZMQIOLoop, self).start()
  File "/Library/Python/2.7/site-packages/tornado/ioloop.py", line 646, in start

    event_pairs = self._impl.poll(poll_timeout)
  File "/Library/Python/2.7/site-packages/zmq/eventloop/ioloop.py", line 132, in poll
    z_events = self._poller.poll(1000*timeout)

  File "/Library/Python/2.7/site-packages/zmq/sugar/poll.py", line 110, in poll
    return zmq_poll(self.sockets, timeout=timeout)
  File "_poll.pyx", line 125, in zmq.backend.cython._poll.zmq_poll (zmq/backend/cython/_poll.c:1705)
IndexError: list index out of range


Или какой-то тест может не пройти. Я в растерянности, так как не понимаю, что я делаю не так.
  • Вопрос задан
  • 2940 просмотров
Пригласить эксперта
Ответы на вопрос 2
Disassociative
@Disassociative
Посмотрел по диагонали, возможно на закрытие в твой on_request_received падает пустой список, который ZmqRep пытается на последок обработать. Посмотри это и сделай проверку на пустой список, если это так.
Если дело не в этом, тебе стоит опубликовать код с проблемой куда-нибудь на github, например. Что бы народ не мучился с копипастой.
Ответ написан
Комментировать
@snowpiercer
Я думаю, надо получше разобраться с документацией по ZMQStream.
Просто я в коде вижу, что ты ожидаешь, будто поток (stream) присылает тебе какие-то сообщения, а может оказаться так, что он присылает тебе кусок данных сразу, как только тот появляется в сокете, т.е. шлёт тебе буквально поток байтов, из которого ты сам должен вычленять осмысленные структуры - сообщения.
Ответ написан
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Войти через центр авторизации
Похожие вопросы