Obukhoff
@Obukhoff
Программист С++

Как оптимизировать реализацию long poll, для доставки сообщений клиентам из redis pub/sub на асинхронном python?

Есть пара тысяч устройств, на которые получают от сервера мониторинга уведомления о событиях и командах.
На устройствах чистый c++ и websocketpp, а та же большая проблема с сильно неустойчивой связью по gprs. При необходимости отправить сообщение с сервера получаем ошибку соединения и ждем пока клиент наконец-то прочухается и переподключится.

Решено переделать соединение на long polling. Поскольку основной движок сервера django хотелось бы остаться в рамках python.

Смотрю на aiohttp + aioredis. Сейчас для каждого устройства в redis отдельный канал (но можно и переделать на 1 на всех).
В итоге после ковыряния в мануалах асинхронных библиотек родился такой уродец. Я в сильных подозрениях, что он нармально работать не будет.

  1. на каждого клиента отдельное соединение к редис - уже зашквар.
  2. по истечении таймаута соединения клиент просто рвет сам соединение, вместо того, что бы сервер отвечал ему http 204.
  3. Поскольку соединение рвется, то коннект к редису остается подписанным на канал. Во время переподключения в канал может прилететь сообщение, которое клиент заново подключившись уже не получит. (сейчас на websocket, при подключении клиента сервер лезет в большую базу за неотправленными сообщениями, а уже затем садится на pubsub редиса).


Подскажите, как правильно реализовать подобный паттерн поведения сервера на асинхронном питоне?

import asyncio
import aioredis
from aiohttp import web

redis_url = 'redis://127.0.0.1:6379'


async def handle_poll(request):
    name = request.match_info.get('name', 'unknown')
    sub = await aioredis.create_redis(redis_url, encoding='utf-8')
    res = await sub.subscribe('channel:' + name)
    ch1 = res[0]
    msg = None

    if await ch1.wait_message():
        msg = await ch1.get()
    else:
        await sub.unsubscribe('channel:' + name)
        return web.Response(status=204)

    await sub.unsubscribe('channel:' + name)
    return web.Response(text=msg.decode('utf-8'))


def main():
    app = web.Application()
    app.router.add_get('/poll/{name}', handle_poll)
    web.run_app(app)


if __name__ == '__main__':
    try:
        main()
    except ValueError as e:
        print(e)


Понятно, что пример чисто условный т.к. нехватает авторизации, логирования и много чего ещё ;)
  • Вопрос задан
  • 690 просмотров
Пригласить эксперта
Ответы на вопрос 1
Obukhoff
@Obukhoff Автор вопроса
Программист С++
Ну поскольку ответа похоже не дождусь, отвечу сам себе.
Вот тут неплохой пример - https://gist.github.com/rcarmo/3f0772f2cbe0612b699...
Ответ написан
Комментировать
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Похожие вопросы