Приветствую всех!
Для начала, хотелось бы внедрить вас в курс дела. У нас есть действующий проект, который мы реализуем с определенным количеством людей. Основная цель заключается в создании Telegram бота, написанном на фреймворке Aiogram, а также административную панель, написанную с помощью фреймворка Django. Также, есть API структуры, написанные на FastAPI (Django & FastAPI находятся в одном проекте. Aiogram в другом, отдельно от них).
Реализовал достаточно простой процесс регистрации пользователей на стороне Telegram бота. Разумеется, CRUD операции для пользователей осуществляются посредством FastAPI эндпоинтов. Так-же, добавил Django-Signals для автоматической установки стандартного статуса (CANDIDATE) для только что зарегистрировавшихся пользователей.
Моя текущая задача и конечно же проблема, с которой я не могу разобраться, заключается в том, чтобы уведомить внешнего пользователя об изменении его статуса и выполнить какие-либо операции с помощью Telegram бота.
Как выглядит процесс в целом и какова его последовательность? Я предполагаю следующее:
1. Администратор изменяет статус пользователя в административной панели Django;
2. На изменения реагируют Django-Signals. Сигнал на основе изменений вызывает функцию publish_status_change():
@receiver(pre_save, sender=UserStatus)
def set_specific_status(sender, instance, **kwargs):
if instance.pk:
previous_status = UserStatus.objects.get(pk=instance.pk)
current_status = instance.get_status_display()
if previous_status != current_status:
logging.info(f'Статус пользователя изменен с "{previous_status}" на "{current_status}"')
return set_specific_operation(sender, instance, **kwargs)
@receiver(post_save, sender=UserStatus)
def set_specific_operation(sender, instance, **kwargs):
if instance.pk:
status = UserStatus.UserListStatus
user_id = instance.user.tg_id
current_status = instance.get_status_display()
logging.info(f'Data: {user_id}:{current_status}')
if instance.status == status.ONBOARD:
logging.info(f'Сравниваемые данные: {instance.status}:{status.ONBOARD}')
return publish_status_change(user_id, current_status)
3. Собственно функция publish_status_change() представляет собой функцию для публикации данных (или сообщений) в канал/каналы Redis. Проще говоря - Redis Pub/Sub:
def publish_status_change(user_id, status):
redis = Redis.from_url(os.getenv('REDIS_URL'))
message_data = {
'user_id': user_id,
'status': status
}
redis.publish('user_status', json.dumps(message_data))
4. Данные успешно опубликованы в канал 'user_status'. Что теперь? Теперь необходимо "подписаться" на этот канал, чтобы мы могли прослушивать сообщения. Собственно, перейдем в проект телеграмм, в котором мы определили сервис для подключения к каналу:
class PubSubService:
def __init__(self, redis: Redis):
self.redis = redis
self.logger = logging.getLogger()
async def subscribe_status_channel(self):
"""Метод, осуществляющий подключение к каналу Redis"""
pubsub = self.redis.pubsub()
await pubsub.subscribe('user_status')
return pubsub
async def listen_status_channel(self):
"""Метод, позволяющий прослушивать поступающие сообщения канала user_status"""
self.logger.info('Инициализация и подключение к каналу - "user_status" ...')
pubsub = await self.subscribe_status_channel()
try:
self.logger.info('Ожидание сообщений канала "user_status" ...')
async for message in pubsub.listen():
if message['type'] == 'message':
user_data = message['data'].decode('utf-8')
self.logger.info(f'Поступили следующие данные: {user_data}')
await self.handle_status(user_data)
except Exception as e:
self.logger.info(f'Ошибка при прослушивании канала: {e}')
finally:
await pubsub.close()
async def handle_status(self, user_data, message: Message = None):
"""Метод, в котором происходит обработка полученного значения статуса и вызывается необходимый обработчик"""
load_data = json.loads(user_data)
if load_data:
status = load_data['status']
if status == "На онбординге":
await show_onboarding_message(message)
5. Далее, нам необходимо инициализировать сервис.
1). Сперва создадим некую зависимость:
async def create_pubsub_service():
from services.pubsub_service import PubSubService
storage = await setup_storage()
redis = await storage.redis
logging.info(f'Сервис PubSub успешно подключен')
return PubSubService(redis)
2) Инициализируем в main.py:
async def channel():
pubsub_service = await create_pubsub_service()
await pubsub_service.listen_status_channel()
async def main():
token = settings.BOT_TOKEN
bot = Bot(token=token)
await bot.delete_webhook()
storage = await setup_storage()
dp = Dispatcher(storage=storage)
channel_task = asyncio.create_task(channel()) # PubSubService task
for r in routers:
dp.include_router(r)
setup_dialogs(dp)
try:
await dp.start_polling(bot)
finally:
channel_task.cancel()
await channel_task
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger()
try:
asyncio.run(main())
except KeyboardInterrupt:
logger.error("Exit!")
В ЧЕМ проблема?
Никак не удается вызвать необходимые мне обработчики Aiogram в контексте PubSubService. Так как каждый из обработчиков требует передачи таких объектов как message, callback, bot и т.п., чтобы традиционно взаимодействовать с внешним пользователем / отвечать ему / отправлять что-то. Зачем мне вызывать обработчики? Для автоматического ответа пользователю с помощью бота, основываясь на значении его статуса. Например, если у пользователя статус изменен на "На онбординге", мы должны вызвать обработчик aiogram, в котором определена отправка текста и кнопок пользователю.
Например, в коде PubSubService, который я предоставил выше, имеется вызов обработчика show_onboarding_message(message). Если вам интересно, да и в целом, если это полезно, вот код этого обработчика:
@router.message(Command("onboarding"))
async def show_onboarding_message(message: Message):
"""
Handle the /onboarding command to show onboarding information.
Parameters:
message (Message): The message instance containing the command.
Behavior:
Sends a message with onboarding information and a keyboard for further actions.
Raises:
Exception: If an issue occurs while sending the message.
"""
header_text = "*Получить доступ к группе:*\n\n"
message_text = f"{header_text}"
await message.answer(
text=message_text,
reply_markup=onboarding_keyboard.as_markup(),
parse_mode=ParseMode.MARKDOWN_V2,
)
При попытке вызвать этот обработчик, возникнет следующая ошибка:
"INFO:root:Ошибка при прослушивании канала: 'NoneType' object has no attribute 'answer'".
Скажите мне пожалуйста. В ЧЕМ здесь проблема? И как исправить и реализовать нужный мне функционал? Буду очень признателен!