@jtores

Некорректная работа кода на питоне, загрузчик видео. Что делать?

Имеется код:
import requests
import json
import time
import yt_dlp
import re
import os
from urllib.parse import urlparse, parse_qs

def is_youtube_url(url):
    """
    Проверяет, является ли URL ссылкой на YouTube видео.
    Поддерживает различные форматы ссылок YouTube.
    """
    patterns = [
        r'^(https?://)?(www\.)?(youtube\.com|youtu\.be)/.+$',
        r'^(https?://)?(www\.)?youtube\.com/watch\?v=[\w-]+(&\S*)?$',
        r'^(https?://)?(www\.)?youtu\.be/[\w-]+$',
        r'^(https?://)?(www\.)?youtube\.com/shorts/[\w-]+$'
    ]
    return any(re.match(pattern, url) for pattern in patterns)

def extract_video_id(url):
    """
    Извлекает ID видео из URL YouTube.
    """
    parsed_url = urlparse(url)
    if parsed_url.hostname in ('youtu.be', 'www.youtu.be'):
        return parsed_url.path[1:]
    if parsed_url.hostname in ('youtube.com', 'www.youtube.com'):
        if parsed_url.path == '/watch':
            return parse_qs(parsed_url.query)['v'][0]
        if parsed_url.path.startswith(('/embed/', '/v/', '/shorts/')):
            return parsed_url.path.split('/')[2]
    return None

def get_video_info(url):
    """
    Получает информацию о видео, используя yt-dlp.
    """
    ydl_opts = {'quiet': True, 'no_warnings': True, 'no_color': True}
    with yt_dlp.YoutubeDL(ydl_opts) as ydl:
        try:
            info = ydl.extract_info(url, download=False)
            title = info['title']
            formats = info['formats']
            available_formats = ['Audio']
            for f in formats:
                if f.get('height') in [144, 240, 360, 480, 720, 1080, 1440, 2160]:
                    available_formats.append(f'{f["height"]}p')
            return title, list(dict.fromkeys(available_formats))
        except Exception as e:
            print(f"Error extracting video info: {e}")
            return None, None

def download_video(url, format, output_path):
    """
    Скачивает видео или аудио, используя yt-dlp.
    """
    ydl_opts = {
        'format': 'bestaudio/best' if format == 'Audio' else f'bestvideo[height<={format[:-1]}]+bestaudio/best',
        'outtmpl': output_path,
        'postprocessors': [{
            'key': 'FFmpegExtractAudio',
            'preferredcodec': 'mp3',
            'preferredquality': '192',
        }] if format == 'Audio' else [],
    }
    with yt_dlp.YoutubeDL(ydl_opts) as ydl:
        try:
            ydl.download([url])
            return True
        except Exception as e:
            print(f"Error downloading video: {e}")
            return False

def read_msg(offset):
    data = {"offset": offset}
    resp = requests.get(base_url + '/getUpdates', data=data)
    dataframe = resp.json()

    for i in dataframe["result"]:
        try:
            message_text = i["message"]["text"]
            print(f"Received message: {message_text}")
            
            if is_youtube_url(message_text):
                video_id = extract_video_id(message_text)
                if video_id:
                    youtube_url = f"https://www.youtube.com/watch?v={video_id}"
                    title, available_formats = get_video_info(youtube_url)
                    if title and available_formats:
                        user_file = f"{i['message']['from']['username']}.txt"
                        with open(user_file, 'w') as f:
                            f.write(youtube_url)
                        
                        keyboard = [[{"text": format}] for format in available_formats]
                        send_download(i["message"]["from"]["id"], keyboard)
                    else:
                        send_message(i["message"]["from"]["id"], 'Unable to get video information')
                else:
                    send_message(i["message"]["from"]["id"], 'Invalid YouTube URL')
            elif message_text in formats:
                user_file = f"{i['message']['from']['username']}.txt"
                if os.path.exists(user_file):
                    with open(user_file, 'r') as f:
                        youtube_url = f.read().strip()
                    
                    username = i["message"]["from"]["username"] + '/'
                    os.makedirs(username, exist_ok=True)
                    
                    title, _ = get_video_info(youtube_url)
                    safe_title = re.sub(r'[^\w\-_\. ]', '_', title)
                    output_path = os.path.join(username, safe_title)
                    
                    if message_text == 'Audio':
                        output_file = f"{output_path}.mp3"
                    else:
                        output_file = f"{output_path}.mp4"
                    
                    if download_video(youtube_url, message_text, output_file):
                        size = round(os.path.getsize(output_file) / 1024 / 1024, 2)
                        if size > 50:
                            send_message(i["message"]["from"]["id"], f'Done! {size}MB')
                        else:
                            if message_text == 'Audio':
                                send_audio(i["message"]["from"]["id"], output_file)
                            else:
                                send_video(i["message"]["from"]["id"], output_file)
                    else:
                        send_message(i["message"]["from"]["id"], 'Failed to download video')
                else:
                    send_message(i["message"]["from"]["id"], 'Please send a YouTube URL first')
            else:
                send_message(i["message"]["from"]["id"], 'Please send a valid YouTube URL')
        except Exception as e:
            print(f"Error processing message: {e}")

    if dataframe["result"]:
        return dataframe["result"][-1]["update_id"] + 1
    return offset

def send_download(user, keyboard):
    base_url_download = base_url + '/sendMessage'
    headers = {"Content-Type": "application/json"}
    data = {
        "chat_id": user,
        "text": "Select format",
        "reply_markup": {
            "keyboard": keyboard,
            "resize_keyboard": True,
            "one_time_keyboard": True,
        }
    }
    requests.post(base_url_download, data=json.dumps(data), headers=headers)

def send_message(user, message):
    base_url_message = base_url + '/sendMessage'
    headers = {"Content-Type": "application/json"}
    data = {
        "chat_id": user,
        "text": message,
    }
    requests.post(base_url_message, data=json.dumps(data), headers=headers)

def send_audio(user, audio):
    base_url_audio = base_url + '/sendAudio'
    with open(audio, 'rb') as audio_file:
        files = {"audio": audio_file}
        data = {"chat_id": user}
        requests.post(base_url_audio, data=data, files=files)

def send_video(user, video):
    base_url_video = base_url + '/sendVideo'
    with open(video, 'rb') as video_file:
        files = {"video": video_file}
        data = {"chat_id": user}
        requests.post(base_url_video, data=data, files=files)

base_url = 'https://api.telegram.org/botxxxx'
offset = 0
formats = ['Audio', '144p', '240p', '360p', '480p', '720p', '1080p', '1440p', '2160p']

while True:
    try:
        offset = read_msg(offset)
    except Exception as e:
        print(f"Error in main loop: {e}")
    time.sleep(1)


Отправляю ссылку на видео, и выводит ошибки, то ссылка не правильная, то не может информацию о видео получить, то еще что-то. В чем заключается ошибка?
  • Вопрос задан
  • 155 просмотров
Пригласить эксперта
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Войти через центр авторизации
Похожие вопросы