@temagadfazer

Как хранить websocket-ы на сервере на aiohttp?

Всем привет. Пишу чатик с комнатами, внутри которых общий чат. На сервере использую aiohttp и вебсокеты храню в экземпляре приложения. Есть несколько кейсов, при которых приложение в конечном счете выдает:
socket.send() raised exception.
На сколько я понимаю, если состояние сокета изменилось (вышел человек из чата, обновил чат), то другой вебсокет не узнает об этом, но что с этим делать и почему так происходит? Какие вообще лучшие практики работы с вебсокетами? Стоит ли каждый раз закрывать и открывать соединение по вебсокету (на сколько я понял, что да, иначе просто никак)? Код работы с вебсокетом ниже упрощен. Заранее спасибо.
class CompanyWebSocket(web.View):
    async def get(self):
        session = await get_session(self.request)
        self_id = session.get('user')
        login = session.get('login')
        user = User(self.request.app.db, {})
        unread = UnreadMessage(self.request.app.db)
        message = Message(self.request.app.db)
        company = Company(self.request.app.db)
        
        ws = web.WebSocketResponse()
        await ws.prepare(self.request)

        my_companys = await company.get_company_by_user(self_id)
        for c in my_companys:
            self.request.app['websockets'][str(c['_id'])].append(ws)
        company_id = self.request.rel_url.query.get('company_id')

        async for msg in ws:
            if msg.type == WSMsgType.TEXT:
                data = json.loads(msg.data)
                if msg.data == 'close':
                    await ws.close()

                else:
                    if data['company_id']:
                        comp = await company.get_company(data['company_id'])

                        result = await message.save_for_company(
                            from_user=self_id,
                            msg=data['msg'],
                            company_id=data['company_id'],
                        )
                        for u in comp['users']:
                            if u == self_id:
                                continue
                            if not await unread.check_unread(data['company_id'], u):
                                r = await unread.save_for_company(
                                    # from_user=self_id,
                                    to_user=u,
                                    to_company=data['company_id'],
                                    msg_id=result,
                                )
                            else:
                                await unread.add_unread(data['company_id'], u)
                    mess = {
                        'from': login,
                        'msg': data['msg'],
                        'type': 'msg',
                        'from_id': self_id,
                        'to_user': data['to_user'],
                        'company_id': data['company_id'],
                        'chat_name': data.get('chat_name')
                    }
                        # отправляем сообщения всем юзерам входящим в эту компанию
                    for _ws in self.request.app['websockets'][company_id]:
                        try:
                            await _ws.send_json(mess)
                        except exception as e:
                            print(e)

            elif msg.type == WSMsgType.ERROR:
                log.debug('ws connection closed with exception %s' % ws.exception())

        for c in my_companys:
            self.request.app['websockets'][str(c['_id'])].remove(ws)
        return ws


app.py
import asyncio
import aiohttp_jinja2
import jinja2
import hashlib
import collections
import os
from aiohttp_session import session_middleware
from aiohttp_session.cookie_storage import EncryptedCookieStorage
from aiohttp import web

from routes import routes
from middlewares import authorize
from motor import motor_asyncio as ma
from settings import *


basedir = os.path.dirname(os.path.realpath(__file__))
photo_dir = os.path.join(basedir, 'static/photo/')

async def on_shutdown(app):
    for ws in app['websockets']:
        await ws.close(code=1001, mesage='Server shutdown')

middle = [
    session_middleware(EncryptedCookieStorage(hashlib.sha256(bytes(SECRET_KEY, 'utf-8')).digest())),
    authorize
]

app = web.Application(middlewares=middle)

aiohttp_jinja2.setup(app, loader=jinja2.FileSystemLoader('templates'))

for route in routes:
    app.router.add_route(*route[:3], name=route[3])
app['static_root_url'] = '/static'
app.router.add_static('/static', 'static', name='static')

app.client = ma.AsyncIOMotorClient(MONGO_HOST)
app.db = app.client[MONGO_DB_NAME]

app.on_cleanup.append(on_shutdown)
app['websockets'] = collections.defaultdict(list)
app['online'] = {}
app['photo_dir'] = photo_dir

web.run_app(app)
  • Вопрос задан
  • 330 просмотров
Пригласить эксперта
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Войти через центр авторизации
Похожие вопросы