У тебя при абсолютно любой ошибке отправки пользователь молча помечается как неактивный, даже если это какой-то несвязанный твой косяк.
async def serve_one():
conn = await aiomysql.connect() # нужно использовать асинхронные библиотеки!
...
await asyncio.sleep(3) # не time.sleep()! asyncio.sleep() позволяет выполняться "соседним" задачам
ip = await aiohttp.get('http://localhost/') # см. выше про библиотеки
...
# ну и так далее
async def serve_all():
for i in iter(int, 1):
await serve_one()
serve_task = None
async def monitor(): # а вот эта корутина будет висеть и тихонько мониторить признак остановки.
global serve_task # вместо global'ов можно отрефакторить на класс, суть не меняется.
while not stop_requested: # спим, пока нас не попросят остановиться.
await asyncio.sleep(0.5)
serve_task.cancel() # сигнализируем, что нужно прервать главную задачу.
# это спровоцирует исключение asyncio.CancelledError в текущем await-вызове внутри задачи.
async def main():
global serve_task
serve_task = asyncio.create_task(serve_all()) # запускаем serve_all() "параллельно" себе
monitor = asyncio.create_task(monitor()) # запускаем monitor() "параллельно" себе
try:
await serve_task # ждём завершения serve_all()
except asyncio.CancelledError:
pass # задача была прервана, но мы этого и ожидали
if __name__ == '__main__':
asyncio.run(main())
import typing
import asyncio
import functools
def throttled(delay: float, measure: typing.Literal['end_to_start', 'start_to_start'] = 'start_to_start'):
def decorator(actual_func: typing.Coroutine) -> typing.Coroutine:
queue = None
async def _single_query(future, args, kwargs):
try:
result = await actual_func(*args, **kwargs) # тут делаем асинхронное обращение к сервису
except Exception as err:
future.set_exception(err) # была ошибка - теперь await future выкинет исключение
else:
future.set_result(result) # полуен результат - await future вернёт его
async def _work_loop():
nonlocal queue
while True:
try:
# ждем, пока не придёт запрос, или пока не закончится таймаут
future, args, kwargs = await asyncio.wait_for(queue.get(), delay)
except asyncio.TimeoutError: # новые запросы долго не приходят, сворачиваем работу, чтобы не тратить ресурсы
queue = None
return
task = _single_query(future, args, kwargs)
if measure == 'start_to_start':
asyncio.create_task(task)
else:
await task
queue.task_done() # каждому успешному get() соответствует task_done()
await asyncio.sleep(delay)
@functools.wraps(actual_func)
async def query(*args, **kwargs):
nonlocal queue # обращение к переменной выше уровнем, но не глобальной
future = asyncio.Future() # Future просигналит, когда наш запрос будет обслужен
if queue is None: # либо это первый запрос, либо запросы долго не приходили, и мы свернули работу
queue = asyncio.Queue()
asyncio.create_task(_work_loop())
await queue.put((future, args, kwargs))
return await future
return query
return decorator
@throttled(delay=1.0/MESSAGE_REPLY_RATE, measure='start_to_start')
async def replier(message_cls: Message_struct):
...
Функции send()/recv() фактически оперируют с этим буфером, а дальнейшая работа с железом уже выполняется ОС и драйверами.
Как следствие, для приёма и фактической отправки данных активное участие программы не требуется - сказал "прими до 4 КБ данных" или "отправь вот эти 8 с половиной байт", и дальше занимайся своими делами. На этом асинхронное ПО и построено.
Хотя для упрощения логики, конечно, можно дождаться завершения операции (синхронный режим).