При использовании хранилища Redis в apscheduler использую ContextSchedulerDecorator для некоторых объектов.
Apscheduler нужен для запуска функций в телеграм боте (aiogram).
Класс bot регистрирую в ContextSchedulerDecorator в функции start вот таким образом:
scheduler = ContextSchedulerDecorator(AsyncIOScheduler(timezone="Europe/Moscow", jobstores=jobstores))
scheduler.ctx.add_instance(instance=bot, declared_class=Bot)
Apscheduler проброшен с помощью middleware. Соответственно получаю apscheduler в аргументы функции
Приложу полный код для большего понимания:
from aiogram import Bot, Dispatcher
import asyncio
import logging
import contextlib
from core.settings import settings
from aiogram.filters import CommandStart
from core.middlewares.apschedulermiddleware import SchedulerMiddleware
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from aiogram.fsm.storage.redis import RedisStorage
from apscheduler.jobstores.redis import RedisJobStore
from apscheduler_di import ContextSchedulerDecorator
async def start_message(message: Message, bot: Bot, state: FSMContext, apscheduler: AsyncIOScheduler):
datetime_next = datetime.now() + timedelta(seconds=30)
apscheduler.add_job(func=func_2, trigger='date', run_date=datetime_next,
kwargs={'call': message, 'bot': bot, 'state': state, 'apscheduler': apscheduler})
async def start():
bot = Bot(token=settings.bots.bot_token, parse_mode='HTML')
storage = RedisStorage.from_url('redis://localhost:6379/0')
dp = Dispatcher(storage=storage)
jobstores = {
'default': RedisJobStore(jobs_key='dispatched_trips_jobs',
run_times_key='dispatched_trips_running',
host='localhost',
db=5,
port=6379)
}
scheduler = ContextSchedulerDecorator(AsyncIOScheduler(timezone="Europe/Moscow", jobstores=jobstores))
scheduler.ctx.add_instance(instance=bot, declared_class=Bot)
scheduler.start()
dp.update.middleware.register(SchedulerMiddleware(scheduler))
dp.message.register(start_message, CommandStart())
try:
await dp.start_polling(bot, allowed_updates=dp.resolve_used_update_types())
except Exception as ex:
logging.error(f"{ex}", exc_info=True)
finally:
await bot.session.close()
if __name__ == "__main__":
with contextlib.suppress(KeyboardInterrupt, SystemExit):
asyncio.run(start())
С этим проблем нет, все работает корректно, пока при добавлении таски в шедулер только объект класса бот. Как только в шедулер надо передать message или fsmcontext сразу получаю ошибку:
2023-01-14 12:26:01,521 - [ERROR] - aiogram.event - (dispatcher.py)._process_update(299) - Cause exception while process update id=482614320 by bot id=1695706849
TypeError: cannot pickle 'weakref' object
Traceback (most recent call last):
File "C:\Users\Administrator\AppData\Local\Programs\Python\Python310\lib\site-packages\aiogram\dispatcher\dispatcher.py", line 293, in _process_update
response = await self.feed_update(bot, update, **kwargs)
File "C:\Users\Administrator\AppData\Local\Programs\Python\Python310\lib\site-packages\aiogram\dispatcher\dispatcher.py", line 142, in feed_update
response = await self.update.wrap_outer_middleware(
File "C:\Users\Administrator\AppData\Local\Programs\Python\Python310\lib\site-packages\aiogram\dispatcher\middlewares\error.py", line 25, in __call__
return await handler(event, data)
File "C:\Users\Administrator\AppData\Local\Programs\Python\Python310\lib\site-packages\aiogram\dispatcher\middlewares\user_context.py", line 23, in __call__
return await handler(event, data)
File "C:\Users\Administrator\AppData\Local\Programs\Python\Python310\lib\site-packages\aiogram\fsm\middleware.py", line 34, in __call__
return await handler(event, data)
File "C:\Users\Administrator\AppData\Local\Programs\Python\Python310\lib\site-packages\aiogram\dispatcher\event\telegram.py", line 111, in trigger
return await wrapped_inner(event, kwargs)
File "D:\PythonProject\project_test\core\middlewares\dbmiddleware.py", line 21, in __call__
return await handler(event, data)
File "D:\PythonProject\project_test\core\middlewares\apschedulermiddleware.py", line 19, in __call__
return await handler(event, data)
File "C:\Users\Administrator\AppData\Local\Programs\Python\Python310\lib\site-packages\aiogram\dispatcher\event\handler.py", line 42, in call
return await wrapped()
File "C:\Users\Administrator\AppData\Local\Programs\Python\Python310\lib\site-packages\aiogram\dispatcher\dispatcher.py", line 260, in _listen_update
return await self.propagate_event(update_type=update_type, event=event, **kwargs)
File "C:\Users\Administrator\AppData\Local\Programs\Python\Python310\lib\site-packages\aiogram\dispatcher\router.py", line 115, in propagate_event
return await observer.wrap_outer_middleware(_wrapped, event=event, data=kwargs)
File "C:\Users\Administrator\AppData\Local\Programs\Python\Python310\lib\site-packages\aiogram\dispatcher\router.py", line 111, in _wrapped
return await self._propagate_event(
File "C:\Users\Administrator\AppData\Local\Programs\Python\Python310\lib\site-packages\aiogram\dispatcher\router.py", line 124, in _propagate_event
response = await observer.trigger(event, **kwargs)
File "C:\Users\Administrator\AppData\Local\Programs\Python\Python310\lib\site-packages\aiogram\dispatcher\event\telegram.py", line 111, in trigger
return await wrapped_inner(event, kwargs)
File "C:\Users\Administrator\AppData\Local\Programs\Python\Python310\lib\site-packages\aiogram\dispatcher\event\handler.py", line 42, in call
return await wrapped()
File "D:\PythonProject\project_test\core\handlers\subs_channel.py", line 79, in get_start
await func_1(message=message, bot=bot, state=state, apscheduler=apscheduler)
File "D:\PythonProject\project_test\core\handlers\func_rel.py", line 25, in func_1
apscheduler.add_job(func=func_2, trigger='date', run_date=datetime_next,
File "C:\Users\Administrator\AppData\Local\Programs\Python\Python310\lib\site-packages\apscheduler_di\decorator.py", line 163, in add_job
self._scheduler._real_add_job(job, jobstore, replace_existing)
File "C:\Users\Administrator\AppData\Local\Programs\Python\Python310\lib\site-packages\apscheduler\schedulers\base.py", line 871, in _real_add_job
store.add_job(job)
File "C:\Users\Administrator\AppData\Local\Programs\Python\Python310\lib\site-packages\apscheduler\jobstores\redis.py", line 81, in add_job
pipe.hset(self.jobs_key, job.id, pickle.dumps(job.__getstate__(),
TypeError: cannot pickle 'weakref' object
Как можно зарегистрировать объект message и FSMContext в ContextSchedulerDecorator?
Чтобы создать объект FSMContext нужно указать StorageKey, который включает chat_id, user_id, которые становятся известными только когда пользователь начал писать боту.