Я вижу в тегах aiohttp, так что предполагаю, что код у тебя асинхронный.
Мне тут как-то
доводилось отвечать на подобный вопрос, может, и тебе пригодится...
Идея простая - ты держишь долгоиграющую задачу, которая мониторит очередь запросов, и выбирает запросы из очереди один за другим. При этом каждый элемент очереди содержит future, в которое будет помещен результат работы корутины, и которое получает код, обратившийся к ресурсу.
Я попробовал оформить это в виде декоратора, который автоматически троттлит обращения к функции. Возможно, код неоптимален, и его придётся допилить.
Код
import typing
import asyncio
import functools
def throttled(delay: float, measure: typing.Literal['end_to_start', 'start_to_start'] = 'start_to_start'):
def decorator(actual_func: typing.Coroutine) -> typing.Coroutine:
queue = None
task = None
async def _single_query(future, args, kwargs):
try:
result = await actual_func(*args, **kwargs) # тут делаем асинхронное обращение к сервису
except BaseException as err:
future.set_exception(err) # была ошибка - теперь await future выкинет исключение
else:
future.set_result(result) # полуен результат - await future вернёт его
async def _work_loop():
nonlocal queue
nonlocal task
while True:
try:
# ждем, пока не придёт запрос, или пока не закончится таймаут
future, args, kwargs = await asyncio.wait_for(queue.get(), delay)
except asyncio.TimeoutError: # новые запросы долго не приходят, сворачиваем работу, чтобы не тратить ресурсы
queue = None
task = None
return
single_task = _single_query(future, args, kwargs)
if measure == 'start_to_start':
asyncio.create_task(single_task)
else:
await single_task
queue.task_done() # каждому успешному get() соответствует task_done()
await asyncio.sleep(delay)
@functools.wraps(actual_func)
async def query(*args, **kwargs):
nonlocal queue # обращение к переменной выше уровнем, но не глобальной
nonlocal task
future = asyncio.Future() # Future просигналит, когда наш запрос будет обслужен
if task is None: # либо это первый запрос, либо запросы долго не приходили, и мы свернули работу
queue = asyncio.Queue()
task = asyncio.create_task(_work_loop())
await queue.put((future, args, kwargs))
return await future
return query
return decorator
Пример использования:
# delay - минимальный интервал между запросами в секундах
# measure - как мерять интервалы между запросами: начало-начало или конец-начало
@throttled(delay=5.0, measure='start_to_start')
async def my_coroutine(*args, **kwargs) -> ReturnValue:
...
При этом если задекорировать несколько функций, каждая из них будет иметь свою очередь задач.
Минус - задача мониторинга будет висеть некоторое время после последнего запроса. А именно, пока таймаут между запросами не истечет.