Всем привет. Пишу чатик с комнатами, внутри которых общий чат. На сервере использую aiohttp и вебсокеты храню в экземпляре приложения. Есть несколько кейсов, при которых приложение в конечном счете выдает:
socket.send() raised exception.
На сколько я понимаю, если состояние сокета изменилось (вышел человек из чата, обновил чат), то другой вебсокет не узнает об этом, но что с этим делать и почему так происходит? Какие вообще лучшие практики работы с вебсокетами? Стоит ли каждый раз закрывать и открывать соединение по вебсокету (на сколько я понял, что да, иначе просто никак)? Код работы с вебсокетом ниже упрощен. Заранее спасибо.
class CompanyWebSocket(web.View):
async def get(self):
session = await get_session(self.request)
self_id = session.get('user')
login = session.get('login')
user = User(self.request.app.db, {})
unread = UnreadMessage(self.request.app.db)
message = Message(self.request.app.db)
company = Company(self.request.app.db)
ws = web.WebSocketResponse()
await ws.prepare(self.request)
my_companys = await company.get_company_by_user(self_id)
for c in my_companys:
self.request.app['websockets'][str(c['_id'])].append(ws)
company_id = self.request.rel_url.query.get('company_id')
async for msg in ws:
if msg.type == WSMsgType.TEXT:
data = json.loads(msg.data)
if msg.data == 'close':
await ws.close()
else:
if data['company_id']:
comp = await company.get_company(data['company_id'])
result = await message.save_for_company(
from_user=self_id,
msg=data['msg'],
company_id=data['company_id'],
)
for u in comp['users']:
if u == self_id:
continue
if not await unread.check_unread(data['company_id'], u):
r = await unread.save_for_company(
# from_user=self_id,
to_user=u,
to_company=data['company_id'],
msg_id=result,
)
else:
await unread.add_unread(data['company_id'], u)
mess = {
'from': login,
'msg': data['msg'],
'type': 'msg',
'from_id': self_id,
'to_user': data['to_user'],
'company_id': data['company_id'],
'chat_name': data.get('chat_name')
}
# отправляем сообщения всем юзерам входящим в эту компанию
for _ws in self.request.app['websockets'][company_id]:
try:
await _ws.send_json(mess)
except exception as e:
print(e)
elif msg.type == WSMsgType.ERROR:
log.debug('ws connection closed with exception %s' % ws.exception())
for c in my_companys:
self.request.app['websockets'][str(c['_id'])].remove(ws)
return ws
app.py
import asyncio
import aiohttp_jinja2
import jinja2
import hashlib
import collections
import os
from aiohttp_session import session_middleware
from aiohttp_session.cookie_storage import EncryptedCookieStorage
from aiohttp import web
from routes import routes
from middlewares import authorize
from motor import motor_asyncio as ma
from settings import *
basedir = os.path.dirname(os.path.realpath(__file__))
photo_dir = os.path.join(basedir, 'static/photo/')
async def on_shutdown(app):
for ws in app['websockets']:
await ws.close(code=1001, mesage='Server shutdown')
middle = [
session_middleware(EncryptedCookieStorage(hashlib.sha256(bytes(SECRET_KEY, 'utf-8')).digest())),
authorize
]
app = web.Application(middlewares=middle)
aiohttp_jinja2.setup(app, loader=jinja2.FileSystemLoader('templates'))
for route in routes:
app.router.add_route(*route[:3], name=route[3])
app['static_root_url'] = '/static'
app.router.add_static('/static', 'static', name='static')
app.client = ma.AsyncIOMotorClient(MONGO_HOST)
app.db = app.client[MONGO_DB_NAME]
app.on_cleanup.append(on_shutdown)
app['websockets'] = collections.defaultdict(list)
app['online'] = {}
app['photo_dir'] = photo_dir
web.run_app(app)