Задать вопрос
@Zecka

При отправки медиагруппы (несколько фото + текст) бот предложки отправляет каждое фото(видео) отдельно. А мне нужно, чтобы вместе) как исправить то?

вот код:
import logging
import time
import uuid
import asyncio
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup, InputMediaPhoto
from telegram.ext import Application, MessageHandler, filters, CallbackQueryHandler, CommandHandler, ContextTypes
from cryptography.fernet import Fernet

# Загрузка зашифрованного токена и ключа из файла
with open('config.txt', 'r') as file:
key = file.readline().strip()
encrypted_token = file.readline().strip()

# Инициализация Fernet и расшифровка токена
cipher_suite = Fernet(key.encode())
BOT_TOKEN = cipher_suite.decrypt(encrypted_token.encode()).decode()

# Идентификаторы
ADMIN_ID = ADMIN_ID
CHANNEL_ID = 'CHANNEL_ID'

# Логирование
logging.basicConfig(
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
level=logging.INFO
)
logger = logging.getLogger(__name__)

# Инициализация бота
application = Application.builder().token(BOT_TOKEN).build()

# Словарь для хранения сообщений и соответствий идентификаторов
user_messages = {}
user_message_times = {}
banned_users = set()
short_to_long_ids = {}

def encode_id(id):
short_id = str(uuid.uuid4())[:8] # Генерация короткого уникального идентификатора
short_to_long_ids[short_id] = id
return short_id

def decode_id(short_id):
return short_to_long_ids.get(short_id, None)

# Обработчик команды /start
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE):
await update.message.reply_text("Хай! Кидайте свои мемы, новости, обзоры и всяческие приколмбасы, без ПorNo, иначе BAN.")

# Обработчик сообщений от пользователей
async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE):
try:
logger.info("Received a message")

if update.message:
user_id = update.message.from_user.id

# Проверка на забаненного пользователя
if user_id in banned_users:
await update.message.reply_text("Вы заблокированы и не можете отправлять сообщения.")
return

# Ограничение частоты сообщений (защита от DDoS)
current_time = time.time()
if user_id in user_message_times:
if current_time - user_message_times[user_id] < 0.1: # 0.1 секунд между сообщениями
await update.message.reply_text("Вы отправляете сообщения слишком часто. Подождите немного.")
return
user_message_times[user_id] = current_time

username = update.message.from_user.username or update.message.from_user.full_name or "NoUsername"
message = update.message

logger.info(f"Message from user {user_id} ({username})")

# Генерация уникального идентификатора для каждого сообщения
message_id = str(uuid.uuid4())

# Сохранение сообщения
if user_id not in user_messages:
user_messages[user_id] = {}
user_messages[user_id][message_id] = message

# Уведомление пользователя о получении сообщения
await update.message.reply_text("Ваше сообщение отправлено Кумяшу.")

# Пересылка сообщения админу
caption = message.caption or message.text or ""
forward_message = f"{caption}\n\nОт пользователя: {username}"

encoded_user_id = encode_id(user_id)
encoded_message_id = encode_id(message_id)

if message.photo:
logger.info(f"Photo message from user {user_id} ({username})")
photos = message.photo
media = [InputMediaPhoto(photo.file_id, caption=(forward_message if i == 0 else None)) for i, photo in enumerate(photos)]
await context.bot.send_media_group(chat_id=ADMIN_ID, media=media, reply_markup=get_admin_keyboard(encoded_user_id, encoded_message_id))
elif message.video:
logger.info(f"Video message from user {user_id} ({username})")
await context.bot.send_video(chat_id=ADMIN_ID, video=message.video.file_id, caption=forward_message, reply_markup=get_admin_keyboard(encoded_user_id, encoded_message_id))
elif message.document:
logger.info(f"Document message from user {user_id} ({username})")
await context.bot.send_document(chat_id=ADMIN_ID, document=message.document.file_id, caption=forward_message, reply_markup=get_admin_keyboard(encoded_user_id, encoded_message_id))
else:
await context.bot.send_message(chat_id=ADMIN_ID, text=forward_message, reply_markup=get_admin_keyboard(encoded_user_id, encoded_message_id))
else:
logger.info("Update does not contain a message")
except Exception as e:
logger.error(f"An error occurred: {e}")
if update.message:
await update.message.reply_text("Произошла ошибка. Пожалуйста, попробуйте позже.")

# Кнопки для администратора
def get_admin_keyboard(user_id, message_id):
keyboard = [
[InlineKeyboardButton("Скип", callback_data=f"skip_{user_id}_{message_id}"),
InlineKeyboardButton("Бан", callback_data=f"ban_{user_id}"),
InlineKeyboardButton("Запостить", callback_data=f"post_{user_id}_{message_id}")]
]
return InlineKeyboardMarkup(keyboard)

# Обработчик нажатий кнопок
async def button_callback(update: Update, context: ContextTypes.DEFAULT_TYPE):
try:
logger.info("Received a callback query")

query = update.callback_query
await query.answer()
action, encoded_user_id, encoded_message_id = query.data.split('_')
user_id = decode_id(encoded_user_id)
message_id = decode_id(encoded_message_id)

if action == "skip":
logger.info(f"Skipping message {message_id} from user {user_id}")
# Удалить сообщение из памяти
if user_id in user_messages and message_id in user_messages[user_id]:
del user_messages[user_id][message_id]
await query.message.delete()
elif action == "ban":
logger.info(f"Banning user {user_id}")
# Заблокировать пользователя
banned_users.add(user_id)
await query.message.delete()
elif action == "post":
logger.info(f"Posting message {message_id} from user {user_id}")
# Запостить сообщение в канал
message = user_messages.get(user_id, {}).get(message_id)
if message:
username = message.from_user.username or message.from_user.full_name or "NoUsername"
forward_message = (message.caption or message.text or "") + f"\n\nОт пользователя: {username}"
if message.photo:
logger.info(f"Sending photo to channel with caption: {forward_message}")
photos = message.photo
media = [InputMediaPhoto(photo.file_id, caption=(forward_message if i == 0 else None)) for i, photo in enumerate(photos)]
await context.bot.send_media_group(chat_id=CHANNEL_ID, media=media)
elif message.video:
logger.info(f"Sending video to channel with caption: {forward_message}")
await context.bot.send_video(chat_id=CHANNEL_ID, video=message.video.file_id, caption=forward_message)
elif message.document:
logger.info(f"Sending document to channel with caption: {forward_message}")
await context.bot.send_document(chat_id=CHANNEL_ID, document=message.document.file_id, caption=forward_message)
else:
logger.info(f"Sending text message to channel: {forward_message}")
await context.bot.send_message(chat_id=CHANNEL_ID, text=forward_message)
if user_id in user_messages and message_id in user_messages[user_id]:
del user_messages[user_id][message_id]
await query.message.delete()
except Exception as e:
logger.error(f"An error occurred in callback query: {e}")
await query.message.reply_text("Произошла ошибка. Пожалуйста, попробуйте позже.")

# Настройка обработчиков
application.add_handler(CommandHandler("start", start))
application.add_handler(MessageHandler(filters.ALL, handle_message))
application.add_handler(CallbackQueryHandler(button_callback))

# Запуск бота
if __name__ == '__main__':
logger.info("Starting bot")
asyncio.run(application.run_polling())
  • Вопрос задан
  • 89 просмотров
Подписаться 1 Средний Комментировать
Пригласить эксперта
Ответы на вопрос 2
@Prizrak256
А где медиагруппа, собственно?
У тебя только send_photo есть. Для отправки нескольких фотографий можно использовать bot.send_media_group и передавать в аргумент media массив фотографий, например [InputMediaPhoto(image_url, caption) for image_url in photo_list]
Ответ написан
Это известная тема. Чтобы отправить медиагруппу, нужно контролировать её начало и конец. Но бот так же ловит каждое сообщение отдельным обновлением, у них разные id. Из общего - mediagrouop_id. Поэтому и отправляются они не группой, а каждое после получения обновления.

Чтобы собрать группу, нужна либо машина состояний, либо функция для парсинга последних сообщений и проверки у них того же значения mediagroup_id. Но в Telegram Bot API по-моему нет возможности смотреть историю, только работать с обновлениями. Так что задача усложняется работой на лету.

Как ещё одна альтернатива — использовать какой-нибудь Pyrogram на MTProto, в нём есть метод copy_mediagroup. Но конкретно эта библиотека давно не обновляется, будут проблемы с типизацией. Аналог — Telethon, но я ей не пользуюсь, не знаю есть ли там нужные инструменты.
Ответ написан
Комментировать
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Похожие вопросы