Задать вопрос
@Ninzalo

Как запустить выполнение асинхронной функции с определенной частотой выполнения?

Здравствуйте,
у меня есть код, который получает сообщение из мессенжера.
Допустим, что функция replier() отвечает на полученное сообщение. API, к которой я обращаюсь для ответа на сообщение имеет ограничение по частоте вызовов в секунду.
import asyncio
from datetime import datetime
import pickle

from lib.bot.converters import dataclass_from_dict
from lib.bot.struct import Message_struct
from config import LOCAL_IP, LOCAL_PORT, MESSAGE_REPLY_RATE


async def handle_echo(reader, writer) -> None:
    tasks = []
    while True:
        data = await reader.read()
        if not data:
            break
        message = pickle.loads(data)
        message_cls = dataclass_from_dict(
            struct=Message_struct, dictionary=message
        )
        task = asyncio.create_task(replier(message_cls=message_cls))
        tasks.append(task)

    await asyncio.gather(*tasks)
    addr = writer.get_extra_info("peername")
    print(f"Received {message_cls!r} from {addr!r}")
    writer.close()


async def replier(message_cls: Message_struct):
    # process the received data here
    print(message_cls.text, datetime.now())


async def main() -> None:
    server = await asyncio.start_server(handle_echo, LOCAL_IP, LOCAL_PORT)

    addrs = ", ".join(str(sock.getsockname()) for sock in server.sockets)
    print(f"Serving on {addrs}")

    async with server:
        await server.serve_forever()


asyncio.run(main())


Все, что я нашел в интернете - добавить asyncio.sleep(1.0 / MESSAGE_REPLY_RATE) в конце функции replier(). Но это не работает. Если мне поступает несколько сообщений из мессенжера, то функция replier() вызывается одновременно
Как добавить частоту ответа на сообщения?
  • Вопрос задан
  • 121 просмотр
Подписаться 1 Простой Комментировать
Решения вопроса 1
Vindicar
@Vindicar
RTFM!
Храни в asyncio.Queue очередь запросов к API. Отдельная задача пусть выбирает запросы из очереди, отправляет, получает ответ и оповещает о результате. Например, так.
import asyncio
import typing

class ThrottledResource:
    def __init__(self, delay: float):
        self._delay = delay
        self._queue = asyncio.Queue()
        self._task = None
    
    def start(self):
        self._task = asyncio.create_task(self._work_loop)
    
    def stop(self):
        self._task.cancel()
        self._task = None

    # этот метод вызывается клиентским кодом, получает параметры и возвращает отклик спустя время.
    async def query(self, params):
        future = asyncio.Future()  # Future просигналит, когда наш запрос будет обслужен
        await self._queue.put((future, params))
        result = await future  # корутина спит, пока запрос не обслужат
        return result

    async def _work_loop(self):
        while True:
            future, params = await. self._queue.get()  # ждем, пока не придёт запрос
            try:
                result = await call_api(params)  # тут делаем асинхронное обращение к сервису
            except Exception as err:
                future.set_exception(err)  # была ошибка - теперь await future выкинет исключение
            else:
                future.set_result(result)  # полуен результат - await future вернёт его
            self._queue.task_done()  # каждому успешному get() соответствует task_done()
            asyncio.sleep(self._delay)  # можно учесть, сколько времени делался запрос. Но стоит ли?

Код примерный, но идею передаёт. Использоваться будет как-то так
api = ThrottledResource(delay=1.0)
api.start()
...
result = await api.query(params)  # await подождёт, пока не дойдёт очередь до нашего запроса


Нужно добавить обработку ошибок, корректное завершение работы при наличии задач в очереди, и так далее.
Технически вместо класса можно было реализовать это всё в виде декоратора над replier(), но это уже на вкус и цвет.
Ответ написан
Пригласить эксперта
Ответы на вопрос 1
@rPman
await asyncio.sleep(1.0 / MESSAGE_REPLY_RATE)
нужно добавлять в тело цикла, после tasks.append(task), добавив await

тебе ведь нужно не создавать новые task чаще чем указанная частота, вот и создавай их с интервалом.
Ответ написан
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Похожие вопросы