Задать вопрос
@sovok2

Почему не работает apscheduler?

делаю телеграм бота на aiogram 3. После регистрации пользователя мне необходимо реализовать отложенную отправку сообщений пользователю через 3 дня. Но почему то scheduler не отрабатывает, ошибок никаких нет, задача добавляется, но в нужное время по триггеру ничего не происходит

Инициализация шедулера:
# Внешние библиотеки
from apscheduler.jobstores.redis import RedisJobStore
from apscheduler.schedulers.asyncio import AsyncIOScheduler

# Инициализация редис, куда будет сохраняться рассылки
jobstores = {"default": RedisJobStore(host="localhost", port=6379, db=3)}


Модуль шедулера:
# Стандартные библиотеки
import asyncio
from datetime import datetime
import json

# Внешние библиотеки
from aiogram.fsm.storage.base import StorageKey
from aiogram import F, Router
from aiogram.fsm.context import FSMContext
from aiogram.types import FSInputFile

router = Router() 
message_queue = asyncio.Queue()

# Функция-работник для отправки сообщений из очереди
async def message_sender_worker(redis_middleware):
    while True:
        user_id = await message_queue.get()
        try:
            # Создаем ключ и контекст состояния для пользователя
            key = StorageKey(bot_id=bot.id, chat_id=user_id, user_id=user_id)
            state = FSMContext(storage=dp.storage, key=key)

            # Получаем язык пользователя из Redis
            user_lang = await redis_middleware.get(f"user_lang:{user_id}")
            user_lang = user_lang if user_lang else "ru"  # По умолчанию русский
            
            # Отправляем сообщение, передавая актуальный язык пользователя
            await send_scheduled_message(user_id, state=state, user_lang=user_lang)
        except Exception as e:
            logger.error(f"Ошибка при отправке сообщения пользователю {user_id}: {e}")
        finally:
            message_queue.task_done()
        
        # Задержка для соблюдения лимитов Telegram
        await asyncio.sleep(0.05)


# Функция для отправки запланированного сообщения пользователю
async def send_scheduled_message(user_id, state: FSMContext, user_lang):
    print("Функция отправки через 3 дня запущена")

    # Получаем данные пользователя из базы данных
    role = await db.get_role_user(user_id)
    role = role[0]

    country = await db.get_country_user(user_id)
    country = country[0]

    await state.update_data(country_delay = country)
    await state.update_data(role_delay = role)

    if not role and country:
            logger.warning(f"Не найдены данные для пользователя {user_id}")
            return

    # Отправляем первоначальное сообщение
    await bot.send_message(chat_id=user_id, text=post_msg.after_3_days[user_lang])
    logger.info(f"Отправлено сообщение через 3 дня - {user_id}")
    await asyncio.sleep(2)

    # Обрабатываем страну, если нужно
    country_processed = await wrap_half_words_in_percent(country)
    if role in ["Роль1", "Роль2", "Роль3", "Роль4",]:
        await asyncio.sleep(1)
        await bot.send_message(
            chat_id=user_id,
            text=post_msg.get_photo_form["region"][user_lang],
            reply_markup = await reply_regions(country_processed)
        )
        await asyncio.sleep(3)
        # Устанавливаем состояние пользователя
        # state = FSMContext(storage=dp.storage, user=user_id, chat=user_id)
        await state.set_state(GetPhotoRegion.polling_station)
    else:
        await asyncio.sleep(1)
        # doc_path = "send_docs/Права избирателя.docx"
        doc_path = service_msg.agitator_patch_doc[user_lang]
        doc_file = FSInputFile(path=doc_path)
        await bot.send_document(chat_id=user_id, document=doc_file)
        await asyncio.sleep(1)
        await bot.send_message(chat_id=user_id, text=post_msg.agitator_form["send_link"][user_lang])
        
        # Устанавливаем состояние пользователя
        # state = FSMContext(storage=dp.storage, user=user_id, chat=user_id)
        await state.set_state(AgitatorForm.waiting_for_links)
        await asyncio.sleep(2)


# Функция для планирования добавления пользователя в очередь сообщений
def schedule_message(user_id, send_time):
    scheduler.add_job(
        add_user_to_queue,
        'date',
        run_date=send_time,
        args=[user_id]
    )

# Функция для добавления пользователя в очередь сообщений
async def add_user_to_queue(user_id):
    await message_queue.put(user_id)


Кусок кода с регистрацией. Тут я добавляю задачу в планировщик:
@router.callback_query(
    UserStates.FINISH_AUTH, F.data.in_(["confirm_yes", "confirm_no"])
)
async def finish_auth(callback_query: CallbackQuery, state: FSMContext, user_lang):
    if callback_query.data == "confirm_yes":
        # Добавляем данные в базу данных
        data = await state.get_data()
        tg_id = str(callback_query.from_user.id)
        await add_user_db(
            data["name"], data["phone"], tg_id,
            data["email"], data["country"], data["locality"], data["role"]
        )

        # Планируем отправку сообщения через 3 дня
        send_time = datetime.now() + timedelta(days=3)
        #send_time = datetime.now() + timedelta(seconds=10)
        schedule_message(tg_id, send_time)
        logger.info("Запланировано отправление сообщения через 3 дня")

        # Отправляем подтверждение пользователю
        await callback_query.message.answer(
            reg_msg.finish_msg[user_lang], reply_markup=await keyboard.start_quiz(user_lang)
        )
        logger.info(f"Пользователь с TG_ID - {tg_id} зарегистрировался")
        await state.clear()
    else:
        await callback_query.message.answer(reg_msg.name[user_lang])
        await state.set_state(UserStates.WAITING_FOR_NAME)
    await callback_query.answer()
  • Вопрос задан
  • 83 просмотра
Подписаться 1 Простой 2 комментария
Решения вопроса 1
@sovok2 Автор вопроса
У меня используются очереди в питоне:
message_queue = asyncio.Queue()
Чтобы они работали как отдельная таска, необходимо было запустить воркер в главном файле
Я продебажил и посмотрел, что scheduler срабатывает, он кладет в очередь нужный user_id, но дальше очередь не работала, потому что я просто не запускал message_sender_worker

В главном файле нужно было прописать и передать туда мидлвари:
asyncio.create_task(message_sender_worker(redis_middleware))


Вопрос решен, всем спасибо
Ответ написан
Комментировать
Пригласить эксперта
Ответы на вопрос 1
Vindicar
@Vindicar
RTFM!
Так как ты не показал инициализацию scheduler, то спрошу: scheduler.start() не забыл вовремя вызвать?
Также есть совет увеличить детализацию логов для скедулера.
Ответ написан
Комментировать
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Похожие вопросы