@Alex_888

Как ограничить количество подключений прокси в Pyrogram?

в Telethon есть такое:
connection_retries=int(),
request_retries=int(),
retry_delay=int(),
auto_reconnect=True,
timeout=int(),

и если прокси невалид, то подключение ограничивается выставленным мной количеством попыток.

в Pyrogram документации не нашел подобного. Он до талого пытается подключиться! Как его ограничить можно?
  • Вопрос задан
  • 112 просмотров
Решения вопроса 1
Mike_Ro
@Mike_Ro Куратор тега Python
Python, JS, WordPress, SEO, Bots, Adversting
в Pyrogram документации не нашел подобного

Все верно, Pyrogram не настолько гибкий по прокси, в сравнение с Telethon. Используйте внешний менеджер подключений/проксей и не будите зависеть от конкретной библиотеки, например:
import asyncio
from pyrogram import Client
from pyrogram.errors import PyrogramError

class ProxyManager:
    def __init__(self, app, max_retries=5, retry_delay=1, proxy=None):
        self.app = app
        self.max_retries = max_retries
        self.retry_delay = retry_delay
        self.proxy = proxy
        self.client = None

    async def connect(self):
        for attempt in range(1, self.max_retries + 1):
            try:
                print(f"Connection attempt #{attempt}")
                self.client = Client(**self.app, proxy=self.proxy)
                await self.client.start()
                print("The connection was established successfully.")

            except PyrogramError as e:
                print(f"Connection error: {e}")
                if attempt == self.max_retries:
                    print("The maximum number of connection attempts has been reached, stop!")
                    break

                await asyncio.sleep(self.retry_delay)

    async def disconnect(self):
        if self.client:
            await self.client.stop()
            print("Connection is closed.")

app = {
    'api_id': 'YOUR_API_ID',
    'api_hash': 'YOUR_API_HASH',
    'session_name': 'your_session_name'
}

proxy = {
    'scheme': 'http',  # or 'socks5'
    'hostname': 'your.proxy.hostname',
    'port': 1080,
    'username': 'user',
    'password': 'password'
}

# test run
async def main():
    manager = ProxyManager(app, max_retries=3, retry_delay=2, proxy=proxy)
    await manager.connect()
    await manager.disconnect()

asyncio.run(main())
Ответ написан
Пригласить эксперта
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Похожие вопросы