import os
from models.ws_handler import WsHandler
from consumer import Consumer
import tornado.ioloop
if __name__ == '__main__':
consumer = Consumer()
consumer.start_start_consuming_thread()
application = tornado.web.Application(handlers=[
(r'/ws', WsHandler)
], default_host="0.0.0.0")
port = int(os.environ["PORT"])
application.listen(port=port)
tornado.ioloop.IOLoop.instance().start()
consumer.py
class Consumer:
def __init__(self):
self.connector = Connector()
def start_start_consuming_thread(self):
threading.Thread(target=self.start_consuming).start()
def start_consuming(self):
queue = 'qwerty'
self.connector.connect()
self.connector.setup_queue(queue)
self.connector.channel.basic_consume(queue=queue, auto_ack=True, on_message_callback=self.callback)
self.connector.channel.start_consuming()
def stop(self):
self.connector.disconnect()
def callback(self, ch, method, properties, body):
body_arr = json.loads(body)
client_id = body_arr.get("client_id")
logging.info(f" [x] Received {str(client_id)}")
for client in CLIENTS:
if client.client_id == client_id:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
future = asyncio.ensure_future(client.write_message(body))
loop.run_until_complete(future)
break
В логах ничего необычного, просто перестает слушать и все.
INFO:root:User connects.
ws_server | INFO:pika.adapters.utils.connection_workflow:Pika version 1.2.0 connecting to ('172.30.0.2', 5672)
ws_server | INFO:pika.adapters.utils.io_services_utils:Socket connected: <socket.socket fd=12, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('172.30.0.6', 36408), raddr=('172.30.0.2', 5672)>
ws_server | INFO:pika.adapters.utils.connection_workflow:Streaming transport linked up: (<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fa435a66160>, _StreamingProtocolShim: <SelectConnection PROTOCOL transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fa435a66160> params=<ConnectionParameters host=rabbitmq port=5672 virtual_host=/ ssl=False>>).
ws_server | INFO:pika.adapters.utils.connection_workflow:AMQPConnector - reporting success: <SelectConnection OPEN transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fa435a66160> params=<ConnectionParameters host=rabbitmq port=5672 virtual_host=/ ssl=False>>
ws_server | INFO:pika.adapters.utils.connection_workflow:AMQPConnectionWorkflow - reporting success: <SelectConnection OPEN transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fa435a66160> params=<ConnectionParameters host=rabbitmq port=5672 virtual_host=/ ssl=False>>
ws_server | INFO:pika.adapters.blocking_connection:Connection workflow succeeded: <SelectConnection OPEN transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7fa435a66160> params=<ConnectionParameters host=rabbitmq port=5672 virtual_host=/ ssl=False>>
ws_server | INFO:pika.adapters.blocking_connection:Created channel=1
ws_server | INFO:tornado.access:101 GET /ws (172.30.0.7) 0.81ms