@motya_py
Слап

Как обрабатывать свои события?

Всем привет!
Мне нужно обрабатывать свои события. Например если наступило 22:57 вызвать какую-то функцию, но делать условием нельзя, т.к. это мне нужно реализовать в aiogram
Код

async def main():
    bot = Bot(token=TOKEN, )
    dp = Dispatcher()
    db = DataBase()
    
    dp.update.middleware(TechnicalBreak())
    
    dp.callback_query.register(reg.start_reg, F.data == "register")
    
    dp.message.register(basic.tech, Command(commands='tech'))
    dp.message.register(basic.start, CommandStart())
    dp.message.register(basic.start, F.text == 'Ещё раз')
    dp.message.register(basic.start, AS.get_eshe)
    dp.message.register(basic.faq, F.text=='F.A.Q.')
    
    dp.message.register(basic.week_kb_select, F.text.count('\n') == 1)
   
    dp.message.register(basic.get_login, AS.get_login)
    dp.message.register(basic.get_password, AS.get_password)

    dp.message.register(basic.get_user, Command(commands='all'))        
    
    dp.message.register(basic.text, F.text)
    
    
    try:
        await bot.delete_webhook(drop_pending_updates=True)
        await dp.start_polling(bot)
    except:
        await bot.session.close()
        


if __name__ == '__main__':
    shutil.rmtree('tempfiles/')
    os.mkdir(path='tempfiles/')
    asyncio.run(main())


Как можно заметить, мне нужно очищать папку tempfiles. Реализовал это при запуске бота, но такой подход не очень эффективен, т.к. очистку папки нужно делать приблизительно в 4-5 утра. Можно было бы реализовать этот перезапуск бота на стороне хостинга, но как по мне тоже не очень отключать бота, если юзер проходит регистрацию, т.к. ему придется заново проходить все этапы.
Пробовал ставить условие и оно проверяется 1 раз при вызове main(), ну это логично. Писал возможные циклы которые вызывают постоянно main(), но это еще одна нагрузка на бота.
  • Вопрос задан
  • 57 просмотров
Пригласить эксперта
Ответы на вопрос 2
Vindicar
@Vindicar
RTFM!
aioschedule. Ну или тупо вечный цикл с asyncio.sleep(2*60*60) внутри, запущенный в отдельной корутине через create_task().
Ответ написан
Комментировать
Я бы в первую очередь прибегнул к такому подходу, однако не могу утверждать, что call_later работает так как предполагается с такими временными рамками. Обычно использовал с временными рамками до 2 минут
Пример кода
import asyncio
import concurrent.futures
from datetime import datetime, timedelta
import os
import shutil


def delete_func():
    # Удаление временных файлов и создание каталога заново
    shutil.rmtree('tempfiles/')
    print('Folder deleted')
    os.mkdir(path='tempfiles/')
    print('Folder created')


def repeat_delete():
    _loop = asyncio.get_running_loop()
    with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
        # Выполнение функции delete_func в отдельном потоке
        _loop.run_in_executor(executor, delete_func)
    # Вычисляем время до 4 утра следующего дня
    now = datetime.now()
    tomorrow_4am = datetime(now.year, now.month, now.day) + timedelta(days=1, hours=4)
    delta = tomorrow_4am - now

    # Выполняем повторный запуск repeat_delete через вычисленное время
    _loop.call_later(delta.total_seconds(), repeat_delete)


async def main():
    ...


if __name__ == '__main__':
    # Создание нового цикла событий asyncio
    loop = asyncio.new_event_loop()
    # Вызов repeat_delete через 1 секунду
    loop.call_later(1, repeat_delete)
    # Запуск основной функции
    loop.run_until_complete(main())
Ответ написан
Комментировать
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Войти через центр авторизации
Похожие вопросы