@Mikeshadow
Очередной телеграм ботодел)

Как правильно реализовать очередь?

Я отправляю запросы по api, у которого ограничение на количество запросов в минуту. Я хочу вместо прямого вызова api добавлять действие в очередь, блокируя выполнение функции пока действие в очереди не выполнится, и соответственно разгружать эту очередь через определенный интервал.
Я могу попробовать реализовать это через условный collections.deque и while True цикл, но не знаю как возвращать результат в функцию. Есть ли более удобное решение, или как правильно реализовать это?
  • Вопрос задан
  • 61 просмотр
Решения вопроса 1
Vindicar
@Vindicar
RTFM!
Я вижу в тегах aiohttp, так что предполагаю, что код у тебя асинхронный.
Мне тут как-то доводилось отвечать на подобный вопрос, может, и тебе пригодится...
Идея простая - ты держишь долгоиграющую задачу, которая мониторит очередь запросов, и выбирает запросы из очереди один за другим. При этом каждый элемент очереди содержит future, в которое будет помещен результат работы корутины, и которое получает код, обратившийся к ресурсу.
Я попробовал оформить это в виде декоратора, который автоматически троттлит обращения к функции. Возможно, код неоптимален, и его придётся допилить.
Код

import typing
import asyncio
import functools


def throttled(delay: float, measure: typing.Literal['end_to_start', 'start_to_start'] = 'start_to_start'):
    def decorator(actual_func: typing.Coroutine) -> typing.Coroutine:
        queue = None
        task = None
        
        async def _single_query(future, args, kwargs):
            try:
                result = await actual_func(*args, **kwargs)  # тут делаем асинхронное обращение к сервису
            except BaseException as err:
                future.set_exception(err)  # была ошибка - теперь await future выкинет исключение
            else:
                future.set_result(result)  # полуен результат - await future вернёт его
        
        async def _work_loop():
            nonlocal queue
            nonlocal task
            while True:
                try:
                    # ждем, пока не придёт запрос, или пока не закончится таймаут
                    future, args, kwargs = await asyncio.wait_for(queue.get(), delay)
                except asyncio.TimeoutError:  # новые запросы долго не приходят, сворачиваем работу, чтобы не тратить ресурсы
                    queue = None
                    task = None
                    return
                single_task = _single_query(future, args, kwargs)
                if measure == 'start_to_start':
                    asyncio.create_task(single_task)
                else:
                    await single_task
                queue.task_done()  # каждому успешному get() соответствует task_done()
                await asyncio.sleep(delay)
        
        @functools.wraps(actual_func)
        async def query(*args, **kwargs):
            nonlocal queue  # обращение к переменной выше уровнем, но не глобальной
            nonlocal task
            future = asyncio.Future()  # Future просигналит, когда наш запрос будет обслужен
            if task is None:  # либо это первый запрос, либо запросы долго не приходили, и мы свернули работу
                queue = asyncio.Queue()
                task = asyncio.create_task(_work_loop())
            await queue.put((future, args, kwargs))
            return await future
        
        return query
    
    return decorator



Пример использования:
# delay - минимальный интервал между запросами в секундах
# measure - как мерять интервалы между запросами: начало-начало или конец-начало
@throttled(delay=5.0, measure='start_to_start')
async def my_coroutine(*args, **kwargs) -> ReturnValue:
    ...

При этом если задекорировать несколько функций, каждая из них будет иметь свою очередь задач.
Минус - задача мониторинга будет висеть некоторое время после последнего запроса. А именно, пока таймаут между запросами не истечет.
Ответ написан
Пригласить эксперта
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Войти через центр авторизации
Похожие вопросы