В боте `aiogram` использую `celery` для создания отложенных задач.
Вот код хэндлера, где я вызываю таск:
@admin_router.message(CreateEvent.time)
async def cmd_create_event_4(message: Message, state: FSMContext):
await state.update_data(time=message.text)
data = await state.get_data()
await state.clear()
date = data['date']
time = data['time']
year, month, day = (map(int, date.split('/')))
hour, minute = (map(int, time.split(':')))
datetime = f'{date} {time}'
now = dt.datetime.now()
event_datetime = dt.datetime(year, month, day, hour, minute)
diff = event_datetime - now
seconds = diff.total_seconds()
if seconds < 0:
await message.answer(Admin.event_datetime_passed)
else:
events_table = EventTable()
start_notificating.apply_async(eta=await process_data(year, month, day, hour, minute, 1),
expires=seconds,
kwargs={'bot' : Bot,
'event_name' : data['name'],
'datetime' : f'{date} в {time}',
'left' : Admin.one_day})
start_notificating.apply_async(eta=await process_data(year, month, day, hour, minute, 3),
expires=seconds,
kwargs={'bot' : Bot,
'event_name' : data['name'],
'datetime' : f'{date} в {time}',
'left' : Admin.three_days})
await events_table.add_event(data['name'], datetime, int(seconds))
await message.answer(Admin.event_created_successfully)
Вот код таска `celery':
async def notificate(text: str, bot: Bot):
receivers = Receivers()
await receivers.fill()
users_table = Users()
uids = [user[0] for user in await receivers.not_received()]
for uid in uids:
try:
await bot.send_message(uid, text)
except TelegramRetryAfter as ex: # the sort of scenario when we are making to many requests per second
asyncio.sleep(ex.retry_after)
return await bot.send_message(uid, text)
except TelegramForbiddenError: # if a user blocked the bot immediately after we copied `users` db
await users_table.set_status(uid, 'blocked')
else:
user = await users_table.get_user(uid)
if user[1] == 'blocked':
await users_table.set_status(uid, 'member')
await receivers.set_success(uid) # changing receiving status to success (1)
@celery.task
def start_notificating(bot: Bot, event_name: str, datetime: str, left: str):
text = asyncio.run(Admin.notification_text(event_name, datetime, left))
asyncio.run(notificate(text, bot))
Тем не менее, получаю такую ошибку:
raise TypeError(f'Object of type {o.__class__.__name__} '
kombu.exceptions.EncodeError: Object of type type is not JSON serializable
В чем проблема? Никогда раньше `celery` не пользовался.