@estry

Как запустить асинхронную функцию по расписанию python?

У меня в коде есть асинхронная функция. Это одна функция которая запускается при старте скрипта. Мне нужно ее запускать с интервалом.
Для этого я использую apscheduler. Пример точки входа:
if __name__ == '__main__':
    # asyncio.run(main())
    scheduler = AsyncIOScheduler()
    scheduler.add_job(main, 'interval', seconds=3)
    scheduler.start()
    try:
        asyncio.get_event_loop().run_forever()
    except (KeyboardInterrupt, SystemExit):
        pass

Однако я вижу ошибку:
DeprecationWarning: There is no current event loop
asyncio.get_event_loop().run_forever()

Подскажите как ее исправить?
  • Вопрос задан
  • 879 просмотров
Решения вопроса 1
if __name__ == '__main__':
    loop = asyncio.new_event_loop()
    scheduler = AsyncIOScheduler(event_loop=loop)
    scheduler.add_job(main, 'interval', seconds=3)
    scheduler.start()
    try:
        loop.run_forever()
    except (KeyboardInterrupt, SystemExit):
        pass

Описание проблемы. (Как я это понял)
Пример из github репозитория библиотеки ошибочен по двум причинам
вызывается функция start(), которая сама выполняет get_event_loop()
После этого в текущем потоке создается цикл событий.
Далее в примере эти строки
try:
    asyncio.get_event_loop().run_forever()
except (KeyboardInterrupt, SystemExit):
    pass
в результате мы получаем второй запущенный цикл событий из-за чего мы получаем DeprecationWarning: There is no current event loop
Ответ написан
Пригласить эксперта
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Войти через центр авторизации
Похожие вопросы