У меня есть бот, в которого я скидываю или пересылаю пост с фотками и текстом, суть бота, что он каждую фотку закидывает в telegraph и скидывает пользователю ссылку на фото
Код функции:
user_data = {}
media_groups = {}
# Переменная для хранения фото из пересланных сообщений
user_data = {}
# Функция загрузки на Telegraph
# Функция загрузки изображения на Telegraph
async def upload_to_telegraph(photo_path):
try:
# Загружаем файл и выводим ответ для отладки
response = upload_file(photo_path)
print(f"Ответ от Telegraph: {response}")
# Проверяем, что это список и он содержит хотя бы одну строку
if isinstance(response, list) and len(response) > 0:
return f"https://telegra.ph{response[0]}"
else:
print(f"Unexpected response format: {response}")
return None
except Exception as e:
print(f"Error uploading to Telegraph: {e}")
return None
# Функция получения ссылки на изображение
async def get_image_link(photo_file_id):
photo_path = f"photo_{photo_file_id}.jpg"
# Скачиваем файл
await bot.download_file_by_id(file_id=photo_file_id, destination=photo_path)
# Загружаем на Telegraph и удаляем файл
image_link = await upload_to_telegraph(photo_path)
os.remove(photo_path)
return image_link
# Функция формирования текста с ссылками
def get_message_text(links, text):
text = text.replace('\n', ' ') if text else ""
while text.count(" ") > 0:
text = text.replace(" ", " ")
for link in links:
text += f"\n{link}"
text = f"{text}"
return text
# Обработка сообщений с фото
@dp.message_handler(content_types=types.ContentType.PHOTO)
async def handle_photo(message: types.Message):
cur.execute('SELECT expiry_date FROM users WHERE user_id = ?', (message.from_user.id,))
rows = cur.fetchall()
if not rows or rows[0][0] is None:
await bot.send_message(message.from_user.id, 'У вас нету подписки, купите её нажав на кнопку ❌',
reply_markup=buy2)
else:
expiry_date_str = rows[0][0]
expiry_date = datetime.datetime.strptime(expiry_date_str, '%Y-%m-%d').date()
current_date = datetime.datetime.now().date()
if current_date > expiry_date:
await bot.send_message(message.from_user.id, 'Ваша подписка истекла ❌')
else:
await message.reply("Подождите, пока бот загрузит фото!")
await process_photo_message(message)
# Проверка завершения группы медиа
# Проверка завершения группы медиа
import logging
# Инициализация словаря для хранения медиа-групп
media_groups = {}
# Проверка завершения группы медиа
async def check_complete_media_group(media_group_id, caption, message: types.Message):
key_for_media_group = media_group_id
# Логируем проверку группы
logging.info(f"Проверка группы: {key_for_media_group}")
# Инициализация ключа, если его нет в media_groups
if key_for_media_group not in media_groups:
logging.warning(f"Группа {key_for_media_group} не была инициализирована, создаем новую.")
media_groups[key_for_media_group] = []
# Получаем ссылку на изображение и добавляем её с подписью
image_link = await get_image_link(message.photo[-1].file_id)
if image_link is None:
await message.answer('Ошибка при загрузке изображения на Telegraph. Попробуйте еще раз позже.')
if key_for_media_group in media_groups:
del media_groups[key_for_media_group]
return
# Добавляем ссылку и подпись в группу
media_groups[key_for_media_group].append((image_link, caption))
logging.info(f"Добавлена ссылка {image_link} с подписью: {caption}")
await asyncio.sleep(20) # Ожидание завершения всех изображений в группе
if key_for_media_group in media_groups:
links = [data[0] for data in media_groups[key_for_media_group]]
message_texts = [data[1] for data in media_groups[key_for_media_group]]
original_text = None
for message_text in message_texts:
original_text = message_text or original_text
del media_groups[key_for_media_group]
text = get_message_text(links, original_text)
await message.reply(text, parse_mode="HTML")
# Обработка медиа сообщений
async def process_photo_message(message: types.Message):
text = message.text or message.caption
cur.execute('SELECT expiry_date FROM users WHERE user_id = ?', (message.from_user.id,))
rows = cur.fetchall()
for row in rows:
expiry_date_str = row[0]
if expiry_date_str is None:
await bot.send_message(message.from_user.id, 'У вас нету подписки, купите её нажав на кнопку ❌',
reply_markup=buy2)
return
# Проверка на наличие media_group_id
if message.media_group_id:
media_group_id = message.media_group_id
logging.info(f"Обработка группы media_group_id: {media_group_id}")
# Инициализация группы для media_group_id, если ее еще нет
if media_group_id not in media_groups:
logging.info(f"Инициализация группы для media_group_id: {media_group_id}")
media_groups[media_group_id] = []
asyncio.create_task(check_complete_media_group(media_group_id, text, message))
else:
# Если нет группы, загружаем одно изображение
image_link = await get_image_link(message.photo[-1].file_id)
text = get_message_text([image_link], text)
await message.reply(text, parse_mode="HTML")
Ошибка:
Error uploading to Telegraph: 'str' object has no attribute 'get'