@wintreist

Почему не работает async apscheduler?

Пытаюсь создать планировщика:
sheduler = AsyncIOScheduler(timezone='Europe/Moscow')

async def thread_maintaining_communication():
    print('тут')

async def main():
    sheduler.add_job(thread_maintaining_communication,"interval", seconds=20)
    sheduler.start()
    #await bot.infinity_polling(skip_pending=True)
    while True:
        print('Сплю 10 сек')
        await asyncio.sleep(10)

asyncio.run(main())

Но по какой-то неведомой мне причине он не работает. Вот что в консоли:
INFO:apscheduler.scheduler:Adding job tentatively -- it will be properly scheduled when the scheduler starts
INFO:apscheduler.scheduler:Added job "thread_maintaining_communication" to job store "default"
Сплю 10 секINFO:apscheduler.scheduler:Scheduler started

Сплю 10 сек
Сплю 10 сек
Сплю 10 сек
  • Вопрос задан
  • 708 просмотров
Решения вопроса 1
@wintreist Автор вопроса
Мне дали ответ, в чем была проблема, и вот код который правильно работает
sheduler.add_job(thread_maintaining_communication,"interval", seconds=20)
bot.add_custom_filter(asyncio_filters.StateFilter(bot))
sheduler.start()
loop = asyncio.get_event_loop() 
loop.run_until_complete(bot.polling(skip_pending=True))
Ответ написан
Комментировать
Пригласить эксперта
Ответы на вопрос 2
leahch
@leahch
3D специалист. Dолго, Dорого, Dерьмово.
Подозреваю, что нужно asyncio.get_event_loop().run_forever()
Ответ написан
@Jack444
почему бы не снести шедулер и просто не сделать так?

from asyncio import create_task, run, sleep

async def thread_maintaining_communication(seconds: int):
    while True: await sleep(seconds) ; print('тут')

async def main():
    create_task(thread_maintaining_communication(20))
    while True:
        print('Сплю 10 сек')
        await sleep(10)

run(main())
Ответ написан
Комментировать
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Войти через центр авторизации
Похожие вопросы