Имеется код:
import requests
import json
import time
import yt_dlp
import re
import os
from urllib.parse import urlparse, parse_qs
def is_youtube_url(url):
"""
Проверяет, является ли URL ссылкой на YouTube видео.
Поддерживает различные форматы ссылок YouTube.
"""
patterns = [
r'^(https?://)?(www\.)?(youtube\.com|youtu\.be)/.+$',
r'^(https?://)?(www\.)?youtube\.com/watch\?v=[\w-]+(&\S*)?$',
r'^(https?://)?(www\.)?youtu\.be/[\w-]+$',
r'^(https?://)?(www\.)?youtube\.com/shorts/[\w-]+$'
]
return any(re.match(pattern, url) for pattern in patterns)
def extract_video_id(url):
"""
Извлекает ID видео из URL YouTube.
"""
parsed_url = urlparse(url)
if parsed_url.hostname in ('youtu.be', 'www.youtu.be'):
return parsed_url.path[1:]
if parsed_url.hostname in ('youtube.com', 'www.youtube.com'):
if parsed_url.path == '/watch':
return parse_qs(parsed_url.query)['v'][0]
if parsed_url.path.startswith(('/embed/', '/v/', '/shorts/')):
return parsed_url.path.split('/')[2]
return None
def get_video_info(url):
"""
Получает информацию о видео, используя yt-dlp.
"""
ydl_opts = {'quiet': True, 'no_warnings': True, 'no_color': True}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
try:
info = ydl.extract_info(url, download=False)
title = info['title']
formats = info['formats']
available_formats = ['Audio']
for f in formats:
if f.get('height') in [144, 240, 360, 480, 720, 1080, 1440, 2160]:
available_formats.append(f'{f["height"]}p')
return title, list(dict.fromkeys(available_formats))
except Exception as e:
print(f"Error extracting video info: {e}")
return None, None
def download_video(url, format, output_path):
"""
Скачивает видео или аудио, используя yt-dlp.
"""
ydl_opts = {
'format': 'bestaudio/best' if format == 'Audio' else f'bestvideo[height<={format[:-1]}]+bestaudio/best',
'outtmpl': output_path,
'postprocessors': [{
'key': 'FFmpegExtractAudio',
'preferredcodec': 'mp3',
'preferredquality': '192',
}] if format == 'Audio' else [],
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
try:
ydl.download([url])
return True
except Exception as e:
print(f"Error downloading video: {e}")
return False
def read_msg(offset):
data = {"offset": offset}
resp = requests.get(base_url + '/getUpdates', data=data)
dataframe = resp.json()
for i in dataframe["result"]:
try:
message_text = i["message"]["text"]
print(f"Received message: {message_text}")
if is_youtube_url(message_text):
video_id = extract_video_id(message_text)
if video_id:
youtube_url = f"https://www.youtube.com/watch?v={video_id}"
title, available_formats = get_video_info(youtube_url)
if title and available_formats:
user_file = f"{i['message']['from']['username']}.txt"
with open(user_file, 'w') as f:
f.write(youtube_url)
keyboard = [[{"text": format}] for format in available_formats]
send_download(i["message"]["from"]["id"], keyboard)
else:
send_message(i["message"]["from"]["id"], 'Unable to get video information')
else:
send_message(i["message"]["from"]["id"], 'Invalid YouTube URL')
elif message_text in formats:
user_file = f"{i['message']['from']['username']}.txt"
if os.path.exists(user_file):
with open(user_file, 'r') as f:
youtube_url = f.read().strip()
username = i["message"]["from"]["username"] + '/'
os.makedirs(username, exist_ok=True)
title, _ = get_video_info(youtube_url)
safe_title = re.sub(r'[^\w\-_\. ]', '_', title)
output_path = os.path.join(username, safe_title)
if message_text == 'Audio':
output_file = f"{output_path}.mp3"
else:
output_file = f"{output_path}.mp4"
if download_video(youtube_url, message_text, output_file):
size = round(os.path.getsize(output_file) / 1024 / 1024, 2)
if size > 50:
send_message(i["message"]["from"]["id"], f'Done! {size}MB')
else:
if message_text == 'Audio':
send_audio(i["message"]["from"]["id"], output_file)
else:
send_video(i["message"]["from"]["id"], output_file)
else:
send_message(i["message"]["from"]["id"], 'Failed to download video')
else:
send_message(i["message"]["from"]["id"], 'Please send a YouTube URL first')
else:
send_message(i["message"]["from"]["id"], 'Please send a valid YouTube URL')
except Exception as e:
print(f"Error processing message: {e}")
if dataframe["result"]:
return dataframe["result"][-1]["update_id"] + 1
return offset
def send_download(user, keyboard):
base_url_download = base_url + '/sendMessage'
headers = {"Content-Type": "application/json"}
data = {
"chat_id": user,
"text": "Select format",
"reply_markup": {
"keyboard": keyboard,
"resize_keyboard": True,
"one_time_keyboard": True,
}
}
requests.post(base_url_download, data=json.dumps(data), headers=headers)
def send_message(user, message):
base_url_message = base_url + '/sendMessage'
headers = {"Content-Type": "application/json"}
data = {
"chat_id": user,
"text": message,
}
requests.post(base_url_message, data=json.dumps(data), headers=headers)
def send_audio(user, audio):
base_url_audio = base_url + '/sendAudio'
with open(audio, 'rb') as audio_file:
files = {"audio": audio_file}
data = {"chat_id": user}
requests.post(base_url_audio, data=data, files=files)
def send_video(user, video):
base_url_video = base_url + '/sendVideo'
with open(video, 'rb') as video_file:
files = {"video": video_file}
data = {"chat_id": user}
requests.post(base_url_video, data=data, files=files)
base_url = 'https://api.telegram.org/botxxxx'
offset = 0
formats = ['Audio', '144p', '240p', '360p', '480p', '720p', '1080p', '1440p', '2160p']
while True:
try:
offset = read_msg(offset)
except Exception as e:
print(f"Error in main loop: {e}")
time.sleep(1)
Отправляю ссылку на видео, и выводит ошибки, то ссылка не правильная, то не может информацию о видео получить, то еще что-то. В чем заключается ошибка?