Доброго времени. Проблема: не могу наладить нормальное функционирование асинхронного и мультипроцессорного кода на Python. Библиотеки: asyncio, multiprocessing, websockets. Программа состоит из клиента и сервера, которые общаются между собой через веб-сокеты. Проблема начинается тогда, когда я посылаю серверу какую нибудь команду, которая обрабатывается через multiprocessing. То есть получается, что внутри асинхронного кода запускаются процессы. Как наладить их работу, потому что вылетают ошибки:
RuntimeWarning: coroutine 'Start.start' was never awaited
AttributeError: 'coroutine' object has no attribute 'get_me'
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
'Stop.stop' was never awaited
Минимальный код Клиента:
uri = "ws://localhost:5587"
def connection_stream(data):
send_request_body = send_request_handler(data)
stream = asyncio.get_event_loop().run_until_complete(connection(send_request_body))
responce = receive_request_handler(stream)
return responce
async def connection(data, uri):
async with websockets.connect(uri) as websocket:
await websocket.send(data)
return await websocket.recv()
Минимальный код Сервера:
async def connection(self, websocket, path):
received_data = await websocket.recv()
responce = receive_request_handler(received_data)
res = send_request_handler(responce)
await websocket.send(res)
server = websockets.serve(connection, 'localhost', 5587)
asyncio.get_event_loop().run_until_complete(server)
asyncio.get_event_loop().run_forever()
Место где появляется ошибка:
def receive_request_handler(data):
if received_data['command'] == 'CHANNEL-SUBSCRIPTION':
return channel_subscription(received_data['data'])
Параллелизм:
def channel_subscription(data):
users = []
processes = []
res = [data, data, data, data]
for i in res:
users.append(pyrogram.Client(i[0], i[1], i[2]))
for u in users:
processes.append(multiprocessing.Process(target=func, args=(u, args,)))
for p in processes:
p.start()
for p in processes:
p.join()