делаю телеграм бота на aiogram 3. После регистрации пользователя мне необходимо реализовать отложенную отправку сообщений пользователю через 3 дня. Но почему то scheduler не отрабатывает, ошибок никаких нет, задача добавляется, но в нужное время по триггеру ничего не происходит
Инициализация шедулера:
# Внешние библиотеки
from apscheduler.jobstores.redis import RedisJobStore
from apscheduler.schedulers.asyncio import AsyncIOScheduler
# Инициализация редис, куда будет сохраняться рассылки
jobstores = {"default": RedisJobStore(host="localhost", port=6379, db=3)}
Модуль шедулера:
# Стандартные библиотеки
import asyncio
from datetime import datetime
import json
# Внешние библиотеки
from aiogram.fsm.storage.base import StorageKey
from aiogram import F, Router
from aiogram.fsm.context import FSMContext
from aiogram.types import FSInputFile
router = Router()
message_queue = asyncio.Queue()
# Функция-работник для отправки сообщений из очереди
async def message_sender_worker(redis_middleware):
while True:
user_id = await message_queue.get()
try:
# Создаем ключ и контекст состояния для пользователя
key = StorageKey(bot_id=bot.id, chat_id=user_id, user_id=user_id)
state = FSMContext(storage=dp.storage, key=key)
# Получаем язык пользователя из Redis
user_lang = await redis_middleware.get(f"user_lang:{user_id}")
user_lang = user_lang if user_lang else "ru" # По умолчанию русский
# Отправляем сообщение, передавая актуальный язык пользователя
await send_scheduled_message(user_id, state=state, user_lang=user_lang)
except Exception as e:
logger.error(f"Ошибка при отправке сообщения пользователю {user_id}: {e}")
finally:
message_queue.task_done()
# Задержка для соблюдения лимитов Telegram
await asyncio.sleep(0.05)
# Функция для отправки запланированного сообщения пользователю
async def send_scheduled_message(user_id, state: FSMContext, user_lang):
print("Функция отправки через 3 дня запущена")
# Получаем данные пользователя из базы данных
role = await db.get_role_user(user_id)
role = role[0]
country = await db.get_country_user(user_id)
country = country[0]
await state.update_data(country_delay = country)
await state.update_data(role_delay = role)
if not role and country:
logger.warning(f"Не найдены данные для пользователя {user_id}")
return
# Отправляем первоначальное сообщение
await bot.send_message(chat_id=user_id, text=post_msg.after_3_days[user_lang])
logger.info(f"Отправлено сообщение через 3 дня - {user_id}")
await asyncio.sleep(2)
# Обрабатываем страну, если нужно
country_processed = await wrap_half_words_in_percent(country)
if role in ["Роль1", "Роль2", "Роль3", "Роль4",]:
await asyncio.sleep(1)
await bot.send_message(
chat_id=user_id,
text=post_msg.get_photo_form["region"][user_lang],
reply_markup = await reply_regions(country_processed)
)
await asyncio.sleep(3)
# Устанавливаем состояние пользователя
# state = FSMContext(storage=dp.storage, user=user_id, chat=user_id)
await state.set_state(GetPhotoRegion.polling_station)
else:
await asyncio.sleep(1)
# doc_path = "send_docs/Права избирателя.docx"
doc_path = service_msg.agitator_patch_doc[user_lang]
doc_file = FSInputFile(path=doc_path)
await bot.send_document(chat_id=user_id, document=doc_file)
await asyncio.sleep(1)
await bot.send_message(chat_id=user_id, text=post_msg.agitator_form["send_link"][user_lang])
# Устанавливаем состояние пользователя
# state = FSMContext(storage=dp.storage, user=user_id, chat=user_id)
await state.set_state(AgitatorForm.waiting_for_links)
await asyncio.sleep(2)
# Функция для планирования добавления пользователя в очередь сообщений
def schedule_message(user_id, send_time):
scheduler.add_job(
add_user_to_queue,
'date',
run_date=send_time,
args=[user_id]
)
# Функция для добавления пользователя в очередь сообщений
async def add_user_to_queue(user_id):
await message_queue.put(user_id)
Кусок кода с регистрацией. Тут я добавляю задачу в планировщик:
@router.callback_query(
UserStates.FINISH_AUTH, F.data.in_(["confirm_yes", "confirm_no"])
)
async def finish_auth(callback_query: CallbackQuery, state: FSMContext, user_lang):
if callback_query.data == "confirm_yes":
# Добавляем данные в базу данных
data = await state.get_data()
tg_id = str(callback_query.from_user.id)
await add_user_db(
data["name"], data["phone"], tg_id,
data["email"], data["country"], data["locality"], data["role"]
)
# Планируем отправку сообщения через 3 дня
send_time = datetime.now() + timedelta(days=3)
#send_time = datetime.now() + timedelta(seconds=10)
schedule_message(tg_id, send_time)
logger.info("Запланировано отправление сообщения через 3 дня")
# Отправляем подтверждение пользователю
await callback_query.message.answer(
reg_msg.finish_msg[user_lang], reply_markup=await keyboard.start_quiz(user_lang)
)
logger.info(f"Пользователь с TG_ID - {tg_id} зарегистрировался")
await state.clear()
else:
await callback_query.message.answer(reg_msg.name[user_lang])
await state.set_state(UserStates.WAITING_FOR_NAME)
await callback_query.answer()