Для начала я поясню, чего я хочу добиться с помощью кода, который я написал.
Логика приложения должна быть развязана от системы передачи сообщений.
Над системой передачи сообщений должна быть абстрактная обертка, позволяющая избавиться от дублирующегося кода.
В одном процессе должны спокойно уживаться две разных системы передачи информации, например rabbitmq и zeromq.
И вот что пока получилось
class BaseZmqNode():
__metaclass__ = ABCMeta
def __init__(self, host, port, bind, hwm):
self.node = self.create_node()
self.node.host = host
self.port = port
self.context = zmq.Context().instance()
self.socket = self.create_socket()
if bind:
self.socket.bind(self.build_address(host, port))
else:
self.socket.connect(self.build_address(host, port))
self.set_hwm(hwm)
def set_hwm(self, hwm):
self.socket.set_hwm(hwm)
def send_multipart(self, message):
self.socket.send_multipart(message)
def send_json(self, json):
self.socket.send_json(json)
@abstractmethod
def create_node(self):
return BaseMessagingNode()
def close(self):
self.socket.close()
@staticmethod
def build_address(host, port):
strings = [host, ':', str(port)]
return ''.join(strings)
@abstractmethod
def create_socket(self):
pass
Это базовый класс для всех узлов типа pub, sub etc.
class BaseZmqReceiver(BaseZmqNode):
__metaclass__ = ABCMeta
def __init__(self, host, port, hwm, bind, on_receive_callback):
super(BaseZmqReceiver, self).__init__(host=host, port=port, bind=bind, hwm=hwm)
self.node.on_message_callback = on_receive_callback
self.stream = ZMQStream(self.socket)
self.stream.on_recv(self.on_message_received)
self.runner = ZmqLoopRunner(self.on_close)
self.runner.start()
def on_message_received(self, message):
return self.node.on_message_callback(message)
def create_node(self):
return ReceivingNode(None, None)
def on_close(self):
self.stream.close()
self.socket.close()
def close(self):
# super(BaseZmqReceiver, self).close()
self.runner.stop()
Это базовый класс для всех узлов, которые используют zmq ioloop.
class ZmqLoopRunner(Thread):
def __init__(self, callback):
super(ZmqLoopRunner, self).__init__()
self.loop = IOLoop.current()
self.callback = callback
def run(self):
self.loop.start()
self.callback()
def stop(self):
self.loop.stop()
Loop помещен в отдельный поток, чтобы он не блокировал приложение.
class ZmqTest(AbstractMessagingTest):
def setUp(self):
super(ZmqTest, self).setUp()
self.multipart_messages = self.create_multipart_messages(10)
def tearDown(self):
super(ZmqTest, self).tearDown()
def test_request_reply(self):
try:
requester = ZmqReq(host='tcp://localhost', port=6000)
self.request = 'Hello'
self.reply = 'World!'
replier = ZmqRep(host='tcp://*', port=6000, request_processor=self.on_request_received)
self.assertEqual(self.reply, requester.execute(request=self.request))
except Exception as ex:
print(ex)
finally:
replier.close()
requester.close()
def test_address_creation(self):
full_address = "tcp://localhost:5559"
self.assertEqual(full_address, ZmqSubscriber.build_address("tcp://localhost", 5559))
self.assertEqual('tcp://*:6000', ZmqPublisher.build_address("tcp://*", 6000))
def test_publisher_subscriber(self):
print('Testing pub sub')
publisher = ZmqPublisher('tcp://*', 6000)
subscriber = ZmqSubscriber('tcp://localhost', 6000, self.handle_message)
self.send_messages(publisher, wait=False)
sleep(0.5)
self.assertSequenceEqual(self.test_messages, self.received_messages)
publisher.close()
subscriber.close()
def handle_message(self, message):
self.base_handle_message(message[0])
def test_send_json(self):
print('Testing send json')
publisher = ZmqPublisher('tcp://*', 6000)
subscriber = ZmqSubscriber('tcp://localhost', 6000, self.handle_json_message)
md = {'1' : 1}
publisher.send_json(md)
publisher.close()
subscriber.close()
def create_multipart_messages(self, size):
messages = []
for i in range(size):
messages.append(['Multipart test message', str(i)])
return messages
def send_multipart_messages(self, sender):
for message in self.multipart_messages:
sender.send_multipart(message)
def test_multipart_messages(self):
print('Testing multipart')
publisher = ZmqPublisher('tcp://*', 6000)
subscriber = ZmqSubscriber('tcp://localhost', 6000, self.base_handle_message)
self.send_multipart_messages(publisher)
sleep(0.5)
self.assertSequenceEqual(self.multipart_messages, self.received_messages)
publisher.close()
subscriber.close()
def test_push_pull_multipart(self):
print('Testing pushpull multipart')
ventilator = ZmqPush('tcp://*', 6000)
worker = ZmqPull('tcp://localhost', 6000, self.base_handle_message)
self.send_multipart_messages(ventilator)
sleep(0.5)
self.assertSequenceEqual(self.multipart_messages, self.received_messages)
ventilator.close()
worker.close()
def handle_json_message(self, json):
print(str(json))
def test_push_pull(self):
print('Testing push pull')
ventilator = ZmqPush('tcp://*', 6000)
worker = ZmqPull('tcp://localhost', 6000, self.handle_message)
self.send_messages(ventilator, wait=False)
sleep(0.5)
self.assertSequenceEqual(self.test_messages, self.received_messages)
ventilator.close()
worker.close()
def on_request_received(self, message):
if message[0] == self.request:
return self.reply
else:
return 'ERROR'
А вот тесты. В зависимости от того куда я помещу код закрытия stream и socket они могут проходить, но при этом иногда выкидывать исключение. Например такое.
File "/opt/leos/code/messaging_system/zeromq/ZmqLoopRunner.py", line 12, in run
self.loop.start()
File "/Library/Python/2.7/site-packages/zmq/eventloop/ioloop.py", line 160, in start
super(ZMQIOLoop, self).start()
File "/Library/Python/2.7/site-packages/tornado/ioloop.py", line 646, in start
event_pairs = self._impl.poll(poll_timeout)
File "/Library/Python/2.7/site-packages/zmq/eventloop/ioloop.py", line 132, in poll
z_events = self._poller.poll(1000*timeout)
File "/Library/Python/2.7/site-packages/zmq/sugar/poll.py", line 110, in poll
return zmq_poll(self.sockets, timeout=timeout)
File "_poll.pyx", line 125, in zmq.backend.cython._poll.zmq_poll (zmq/backend/cython/_poll.c:1705)
IndexError: list index out of range
Или какой-то тест может не пройти. Я в растерянности, так как не понимаю, что я делаю не так.