import typing
import asyncio
import functools
def throttled(delay: float, measure: typing.Literal['end_to_start', 'start_to_start'] = 'start_to_start'):
def decorator(actual_func: typing.Coroutine) -> typing.Coroutine:
queue = None
async def _single_query(future, args, kwargs):
try:
result = await actual_func(*args, **kwargs) # тут делаем асинхронное обращение к сервису
except Exception as err:
future.set_exception(err) # была ошибка - теперь await future выкинет исключение
else:
future.set_result(result) # полуен результат - await future вернёт его
async def _work_loop():
nonlocal queue
while True:
try:
# ждем, пока не придёт запрос, или пока не закончится таймаут
future, args, kwargs = await asyncio.wait_for(queue.get(), delay)
except asyncio.TimeoutError: # новые запросы долго не приходят, сворачиваем работу, чтобы не тратить ресурсы
queue = None
return
task = _single_query(future, args, kwargs)
if measure == 'start_to_start':
asyncio.create_task(task)
else:
await task
queue.task_done() # каждому успешному get() соответствует task_done()
await asyncio.sleep(delay)
@functools.wraps(actual_func)
async def query(*args, **kwargs):
nonlocal queue # обращение к переменной выше уровнем, но не глобальной
future = asyncio.Future() # Future просигналит, когда наш запрос будет обслужен
if queue is None: # либо это первый запрос, либо запросы долго не приходили, и мы свернули работу
queue = asyncio.Queue()
asyncio.create_task(_work_loop())
await queue.put((future, args, kwargs))
return await future
return query
return decorator
@throttled(delay=1.0/MESSAGE_REPLY_RATE, measure='start_to_start')
async def replier(message_cls: Message_struct):
...
#
async def _single_response(self, params, future):
try:
result = await replier(params) # тут делаем асинхронное обращение к сервису
except Exception as err:
future.set_exception(err) # была ошибка - теперь await future выкинет исключение
else:
future.set_result(result) # полуен результат - await future вернёт его
async def _work_loop(self):
while True:
future, params = await self._queue.get() # ждем, пока не придёт запрос
asyincio.create_task(self._single_response(params, future))
self._queue.task_done() # каждому успешному get() соответствует task_done()
await asyncio.sleep(self._delay)