Buchachalo
@Buchachalo

Как правильно организовать подключение к множеству серверов по websocket?

Ребята, помогите разобраться или в мысль толкните. Вопрос скорее теоретический.
Требуется подлечится к множеству серверов и принять данные. Форматы у всех разные, я написал "драйвера" (скорее асинхронные функции где определил как нужно подключатся и какой формат данных) к серверам что бы унифицировать подключение.

data = {'srv1': ['dvs1','dvs2','dvs3','dvs4','dvs5','dvs6',],
'srv2': ['dvs1','dvs2','dvs3','dvs4','dvs5','dvs6',],
'srv3': ['dvs1','dvs2','dvs3','dvs4','dvs5','dvs6',]}

drv = {'srv1': foo_srv1, 
'srv2': foo_srv2, 
'srv3': foo_srv3}


async def main():
    futures = [asyncio.create_task(drv.get(server)(dvs)) for server, dvs in data.items() if server in drv]
    await asyncio.gather(*futures, return_exceptions=True)

if __name__ == '__main__':
    asyncio.run(main())


И пример двух чуть разных по реализации функций:
async def foo_srv1(dvs: list):
    url = f'*******************' + dvs
    async with websockets.connect(url) as ws:
        async for message in ws:
            data = json.loads(message)
            print('foo_srv1')

Все достаточно стандартно и вроде как хорошо работает.
Но вот реализация во второй функции отличается:
async def foo_srv2(dvs: list):
    url = '********************'
    async with websockets.connect(url) as ws:
        args = []
        for i in range(0, len(dvs), 10):
            slice_dvs = dvs[i:i + 10]
            args.append(','.join(slice_dvs))
        futures = [ws.send(f'{{"op": "subscribe", "args": [{str(arg)}]}}') for arg in args]
        await asyncio.gather(*futures, return_exceptions=True)
        async for message in ws:
            data = json.loads(message)
            print('foo_srv2')

Так как один из серверов принимает в запросе ограниченно количество девайсов, приходится список разбивать по 10 девайсов и генерировать не один ws.send, а сразу допустим 20.
Ради эксперимента вывел в консоль когда отрабатывает тот или иной сокет. И закономерно получил что вторая функция куда как чаще принимает и отрабатывает поступившие данные.
Условно в loop попал всего один таск с foo_srv1 и 20 тасков с foo_srv2.

Но условное количество поступающей информации что с первого сервера что со второго сервера примерно равны. Как я понимаю это та самая конкуренция!
Вопрос в следующем, как правильно организовать подключение к нескольким серверам? При разной реализации подписки на сокет?
В первом случаи я подписался на прием данных в одном "запросе" сразу на 100 устройств.
Во втором случаи мне пришлось разбить 100 устройств по 10 устройств в запросе.
Может быть я напрасно использую таски во второй функции и как то запустить все "последовательно"?
Или разбить все по процессорам? Но тогда я сильно ограничен процессором, хотя утилизация что сетевого канала что процессора крайне маленькая.
Ну или я занялся оверинженеренгом и все куда как проще? :)

Понимаю что некий сумбур в вопросе, задайте уточняющие и я попробую разложить.
  • Вопрос задан
  • 150 просмотров
Решения вопроса 1
Buchachalo
@Buchachalo Автор вопроса
"Ну вот и всё! Неплохая получилась история. Интересная, весёлая, порой, немного грустная, а главное-поучительная! Она научила быть смелыми и не бояться вызовов, которые готовит нам жизнь. Помогала нам добиваться поставленных целей, несмотря ни на что. Но самое главное что у это истории счастливый конец!"


Как обычно в воспаленном мозгу инженера рождаются странные мысли об оверинженеренге!!! Нужно придумать какого менеджера сокетов, или ФАБРИКУ сокетов! Или поднимать для каждого драйвера свой докер-образ. И все это было сделано, и все это работало, но оставалось чувство что делается все не правильно. Все как обычно проще) Нужно было 2 бокала белого рислинга что бы наступило прозрение...

async def foo_srv2(dvs: list):
    url = '********************'
    
    async def create_ws(url: str, dvs: list):
        async with websockets.connect(url) as ws:
            await ws.send(f'{{"op": "subscribe", "args": {dvs}}}')
            async for message in ws:
                message = json.loads(message)
                print(message)

    futures = list()
    for i in range(0, len(dvs), 10):
        slice_dvs = dvs[i:i + 10]
        futures.append(asyncio.create_task(create_ws(slice_dvs)))
    await asyncio.gather(*futures, return_exceptions=True)

Все просто и все прекрасно полетело. Не гарантирую что это единственно правильный подход, но все отлично поднялось и бегает. Отмасштабировал до прослушивания 44 серверов, прилетают пакеты каждые 10 миллисекунд в среднем, количество пакетов не считал, но там не мало. Все отлично работает и не жужжит)
Всем спасибо за внимание!
Ответ написан
Комментировать
Пригласить эксперта
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Войти через центр авторизации
Похожие вопросы