Как наладить работу асинхронного и параллельного кода на Python?

Доброго времени. Проблема: не могу наладить нормальное функционирование асинхронного и мультипроцессорного кода на Python. Библиотеки: asyncio, multiprocessing, websockets. Программа состоит из клиента и сервера, которые общаются между собой через веб-сокеты. Проблема начинается тогда, когда я посылаю серверу какую нибудь команду, которая обрабатывается через multiprocessing. То есть получается, что внутри асинхронного кода запускаются процессы. Как наладить их работу, потому что вылетают ошибки:

RuntimeWarning: coroutine 'Start.start' was never awaited
AttributeError: 'coroutine' object has no attribute 'get_me'
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
'Stop.stop' was never awaited

Минимальный код Клиента:
uri = "ws://localhost:5587"


def connection_stream(data):
        send_request_body = send_request_handler(data)
        stream = asyncio.get_event_loop().run_until_complete(connection(send_request_body))
        responce = receive_request_handler(stream)
        return responce

async def connection(data, uri):
        async with websockets.connect(uri) as websocket:
        await websocket.send(data)
        return await websocket.recv()


Минимальный код Сервера:
async def connection(self, websocket, path):
        received_data = await websocket.recv()
        responce = receive_request_handler(received_data)   
        res = send_request_handler(responce)
        await websocket.send(res)

server = websockets.serve(connection, 'localhost', 5587)
asyncio.get_event_loop().run_until_complete(server)
asyncio.get_event_loop().run_forever()


Место где появляется ошибка:
def receive_request_handler(data):
        if received_data['command'] == 'CHANNEL-SUBSCRIPTION':
            return channel_subscription(received_data['data'])


Параллелизм:
def channel_subscription(data):
        users = []
        processes = []

        res = [data, data, data, data]

	for i in res:
		users.append(pyrogram.Client(i[0], i[1], i[2]))

		for u in users:
			processes.append(multiprocessing.Process(target=func, args=(u, args,)))

		for p in processes:
			p.start()

		for p in processes:
			p.join()
  • Вопрос задан
  • 281 просмотр
Пригласить эксперта
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Войти через центр авторизации
Похожие вопросы