И снова я, и снова тут.
Данный вопрос я уже создавал и мне на него ответили(Спасибо).
https://toster.ru/q/540718
Но данное решение не подходит для чата потому что создает новое соединения для каждого класса, и в итоге получается целая лавина сообщений с экспоненциальным ростом. Я думаю проблему можно решить при помощи отдельного хандлера для redis который будет работать отдельно и итерироваться по списку. Но проблема в том что GIL не позволит мне не даст одновременно обращаться к этому списку. Как мне синхронизировать потоки так что бы при обращении к переменой из вебсокета приостонавливался. Хотя тут тоже вытекают свои недостатки, я же могу потерять сообщения за время приостановки хандлера редиса. Прошу помощи.
# Тут держу список соединений каждое соединение тоже представляет
# список из строки с названием треда и экземпляр класса
connections = []
# Только единожды инициализирую редис
r = redis.StrictRedis()
p = r.pubsub()
# и подписываю на паттерн сообщений
p.psubscribe("thread:"*)
class EchoWebSocket(tornado.websocket.WebSocketHandler):
async def open(self):
print("WebSocket opened")
def check_origin(self, origin):
return True
async def on_message(self, data):
json_data = json.loads(data)
if json_data['type'] == "SEND_MESSAGE":
token = json_data['token']
message = json_data['message']
thread = json_data['thread']
await self.post(message, thread, token)
if json_data['type'] == "SUBSCRIBE_THREAD":
thread = "thread:" + str(json_data['id'])
connections.append([thread, self])
async def post(self, message, thread, token):
http_client = httpclient.AsyncHTTPClient()
url = "http://127.0.0.1:8000/api/v0/thread/add/"
headers = {'Authorization': 'JWT ' + token, "Content-Type":"application/json"}
context = {'thread':thread, 'message':message}
body = json.dumps(context)
request = await http_client.fetch(request=url, method="POST", headers=headers, body=body)
http_client.close()
def on_close(self):
# Пока не проработал отписку от треда
self.connection.close()
print("WebSocket closed")
async def reader():
while True:
for evt in p.listen():
for channel, connection in connections:
if channel == evt['channel'].decode("UTF-8"):
await connection.write_message(evt["data"])
application = tornado.web.Application([
(r"/", EchoWebSocket),
])
application.autoreload = True
application.listen(8888, '127.0.0.1')
tornado.autoreload.start()
try:
loop = tornado.ioloop.IOLoop.current()
loop.start()
except KeyboardInterrupt:
tornado.ioloop.IOLoop.current().stop()