@Exxyss

Проблема с реализацией функционала, для отслеживания изменения статуса пользователя в базе данных. Как корректно реализовать эту задачу?

Приветствую всех!
Для начала, хотелось бы внедрить вас в курс дела. У нас есть действующий проект, который мы реализуем с определенным количеством людей. Основная цель заключается в создании Telegram бота, написанном на фреймворке Aiogram, а также административную панель, написанную с помощью фреймворка Django. Также, есть API структуры, написанные на FastAPI (Django & FastAPI находятся в одном проекте. Aiogram в другом, отдельно от них).

Реализовал достаточно простой процесс регистрации пользователей на стороне Telegram бота. Разумеется, CRUD операции для пользователей осуществляются посредством FastAPI эндпоинтов. Так-же, добавил Django-Signals для автоматической установки стандартного статуса (CANDIDATE) для только что зарегистрировавшихся пользователей.

Моя текущая задача и конечно же проблема, с которой я не могу разобраться, заключается в том, чтобы уведомить внешнего пользователя об изменении его статуса и выполнить какие-либо операции с помощью Telegram бота.
Как выглядит процесс в целом и какова его последовательность? Я предполагаю следующее:

1. Администратор изменяет статус пользователя в административной панели Django;

2. На изменения реагируют Django-Signals. Сигнал на основе изменений вызывает функцию publish_status_change():

@receiver(pre_save, sender=UserStatus)
def set_specific_status(sender, instance, **kwargs):
    if instance.pk:
        previous_status = UserStatus.objects.get(pk=instance.pk)
        current_status = instance.get_status_display()

        if previous_status != current_status:
            logging.info(f'Статус пользователя изменен с "{previous_status}" на "{current_status}"')
            return set_specific_operation(sender, instance, **kwargs)


@receiver(post_save, sender=UserStatus)
def set_specific_operation(sender, instance, **kwargs):
    if instance.pk:
        status = UserStatus.UserListStatus

        user_id = instance.user.tg_id
        current_status = instance.get_status_display()
        logging.info(f'Data: {user_id}:{current_status}')

        if instance.status == status.ONBOARD:
            logging.info(f'Сравниваемые данные: {instance.status}:{status.ONBOARD}')
            return publish_status_change(user_id, current_status)


3. Собственно функция publish_status_change() представляет собой функцию для публикации данных (или сообщений) в канал/каналы Redis. Проще говоря - Redis Pub/Sub:

def publish_status_change(user_id, status):
    redis = Redis.from_url(os.getenv('REDIS_URL'))

    message_data = {
        'user_id': user_id,
        'status': status
    }

    redis.publish('user_status', json.dumps(message_data))


4. Данные успешно опубликованы в канал 'user_status'. Что теперь? Теперь необходимо "подписаться" на этот канал, чтобы мы могли прослушивать сообщения. Собственно, перейдем в проект телеграмм, в котором мы определили сервис для подключения к каналу:
class PubSubService:
    def __init__(self, redis: Redis):
        self.redis = redis
        self.logger = logging.getLogger()

    async def subscribe_status_channel(self):
        """Метод, осуществляющий подключение к каналу Redis"""
        pubsub = self.redis.pubsub()
        await pubsub.subscribe('user_status')
        return pubsub

    async def listen_status_channel(self):
        """Метод, позволяющий прослушивать поступающие сообщения канала user_status"""
        self.logger.info('Инициализация и подключение к каналу - "user_status" ...')
        pubsub = await self.subscribe_status_channel()

        try:
            self.logger.info('Ожидание сообщений канала "user_status" ...')
            async for message in pubsub.listen():
                if message['type'] == 'message':
                    user_data = message['data'].decode('utf-8')
                    self.logger.info(f'Поступили следующие данные: {user_data}')

                    await self.handle_status(user_data)

        except Exception as e:
            self.logger.info(f'Ошибка при прослушивании канала: {e}')
        finally:
            await pubsub.close()

    async def handle_status(self, user_data, message: Message = None):
        """Метод, в котором происходит обработка полученного значения статуса и вызывается необходимый обработчик"""
        load_data = json.loads(user_data)
        
        if load_data:
            status = load_data['status']
            if status == "На онбординге":
                await show_onboarding_message(message)


5. Далее, нам необходимо инициализировать сервис.
1). Сперва создадим некую зависимость:
async def create_pubsub_service():
    from services.pubsub_service import PubSubService
    storage = await setup_storage()
    redis = await storage.redis
    logging.info(f'Сервис PubSub успешно подключен')
    return PubSubService(redis)

2) Инициализируем в main.py:
async def channel():
    pubsub_service = await create_pubsub_service()
    await pubsub_service.listen_status_channel()


async def main():
    token = settings.BOT_TOKEN
    bot = Bot(token=token)
    await bot.delete_webhook()

    storage = await setup_storage()
    dp = Dispatcher(storage=storage)

    channel_task = asyncio.create_task(channel()) # PubSubService task

    for r in routers:
        dp.include_router(r)

    setup_dialogs(dp)

    try:
        await dp.start_polling(bot)
    finally:
        channel_task.cancel()
        await channel_task


if __name__ == "__main__":
    logging.basicConfig(level=logging.INFO)
    logger = logging.getLogger()
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        logger.error("Exit!")


В ЧЕМ проблема?
Никак не удается вызвать необходимые мне обработчики Aiogram в контексте PubSubService. Так как каждый из обработчиков требует передачи таких объектов как message, callback, bot и т.п., чтобы традиционно взаимодействовать с внешним пользователем / отвечать ему / отправлять что-то. Зачем мне вызывать обработчики? Для автоматического ответа пользователю с помощью бота, основываясь на значении его статуса. Например, если у пользователя статус изменен на "На онбординге", мы должны вызвать обработчик aiogram, в котором определена отправка текста и кнопок пользователю.

Например, в коде PubSubService, который я предоставил выше, имеется вызов обработчика show_onboarding_message(message). Если вам интересно, да и в целом, если это полезно, вот код этого обработчика:
@router.message(Command("onboarding"))
async def show_onboarding_message(message: Message):
    """
    Handle the /onboarding command to show onboarding information.

    Parameters:
    message (Message): The message instance containing the command.

    Behavior:
    Sends a message with onboarding information and a keyboard for further actions.

    Raises:
    Exception: If an issue occurs while sending the message.
    """

    header_text = "*Получить доступ к группе:*\n\n"
    message_text = f"{header_text}"

    await message.answer(
        text=message_text,
        reply_markup=onboarding_keyboard.as_markup(),
        parse_mode=ParseMode.MARKDOWN_V2,
    )

При попытке вызвать этот обработчик, возникнет следующая ошибка:
"INFO:root:Ошибка при прослушивании канала: 'NoneType' object has no attribute 'answer'".

Скажите мне пожалуйста. В ЧЕМ здесь проблема? И как исправить и реализовать нужный мне функционал? Буду очень признателен!
  • Вопрос задан
  • 189 просмотров
Решения вопроса 1
@alekssamos
Программист любитель
Коротко: bot.sendMessage(tg_id, "Статус изменен")
И не вызывать никаких хендлеров.
Ответ написан
Комментировать
Пригласить эксперта
Ответы на вопрос 1
@Everything_is_bad
Потому что ты руками вызываешь show_onboarding_message, хотя это хендер aiogram, не предназначен он для ручного вызова, конечно же в нем message будет None. Что тебе мешает написать "нормальную" функцию, которая умеет отправлять сообщения, ясно что message у нее не будет, но сообщения может и экземпляр aiogram.Bot отправлять
Ответ написан
Комментировать
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Войти через центр авторизации
Похожие вопросы