Xaip
@Xaip

Простой чат на tornado и websocket?

И снова я, и снова тут.
Данный вопрос я уже создавал и мне на него ответили(Спасибо). https://toster.ru/q/540718
Но данное решение не подходит для чата потому что создает новое соединения для каждого класса, и в итоге получается целая лавина сообщений с экспоненциальным ростом. Я думаю проблему можно решить при помощи отдельного хандлера для redis который будет работать отдельно и итерироваться по списку. Но проблема в том что GIL не позволит мне не даст одновременно обращаться к этому списку. Как мне синхронизировать потоки так что бы при обращении к переменой из вебсокета приостонавливался. Хотя тут тоже вытекают свои недостатки, я же могу потерять сообщения за время приостановки хандлера редиса. Прошу помощи.
# Тут держу список соединений каждое соединение тоже представляет
# список из строки с названием треда и экземпляр класса 
connections = []
# Только единожды инициализирую редис
r = redis.StrictRedis()
p = r.pubsub()
# и подписываю на паттерн сообщений
p.psubscribe("thread:"*)



class EchoWebSocket(tornado.websocket.WebSocketHandler):

    async def open(self):
        print("WebSocket opened")

    def check_origin(self, origin):
        return True

    async def on_message(self, data):
        json_data = json.loads(data)
        if json_data['type'] == "SEND_MESSAGE":
            token = json_data['token']
            message = json_data['message']
            thread = json_data['thread']
            await self.post(message, thread, token)
        if json_data['type'] == "SUBSCRIBE_THREAD":
            thread = "thread:" + str(json_data['id'])
            connections.append([thread, self])


    async def post(self, message, thread, token):
        http_client = httpclient.AsyncHTTPClient()
        url = "http://127.0.0.1:8000/api/v0/thread/add/"
        headers = {'Authorization': 'JWT ' + token, "Content-Type":"application/json"}
        context = {'thread':thread, 'message':message}
        body = json.dumps(context)
        request = await http_client.fetch(request=url, method="POST", headers=headers, body=body)
        http_client.close()

    def on_close(self):
        # Пока не проработал отписку от треда
        self.connection.close()
        print("WebSocket closed")

async def reader():
    while True:
        for evt in p.listen():
            for channel, connection in connections:
                if channel == evt['channel'].decode("UTF-8"):
                    await connection.write_message(evt["data"])



application = tornado.web.Application([
    (r"/", EchoWebSocket),
])
application.autoreload = True
application.listen(8888, '127.0.0.1')
tornado.autoreload.start()
try:
    loop = tornado.ioloop.IOLoop.current()
    loop.start()
except KeyboardInterrupt:
    tornado.ioloop.IOLoop.current().stop()
  • Вопрос задан
  • 1296 просмотров
Пригласить эксперта
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Войти через центр авторизации
Похожие вопросы