yxtiblin
@yxtiblin

Как правильно организовать ооп telethon в python?

Есть у меня скрипт на телетоне в котором нужно было подключить и работать сразу на 2 аккаунтах в телеграме. Я тогда не стал заморачиваться и просто копипастнул функцию и с помощью asyncio запустил их асинхронно
#from dataclasses import asdict
from copyreg import pickle
from telethon import TelegramClient
import logging, eel
import telethon
from telethon.tl.functions.channels import JoinChannelRequest
from telethon.tl.functions.messages import ImportChatInviteRequest
import asyncio
import pickle as p

logging.basicConfig(format='[%(levelname) 5s/%(asctime)s] %(name)s: %(message)s', level=logging.WARNING)

# client 1
try:
    api_id1 = 
    api_hash1 = ''
    client1 = TelegramClient('client1', api_id1, api_hash1)
except Exception:
    print("Первый клиент не найден")

# client 2
try:
    api_id2 = 
    api_hash2 = ''
    client2 = TelegramClient('client2', api_id2, api_hash2)
except Exception:
    print("Второй клиент не найден")


def save(skipList, data):
    output = open(data, 'wb')
    p.dump(skipList,output,2)
    output.close

def load(skipList, data):
    try:
        input = open(data, 'rb')
        skipList = p.load(input)
        input.close()
    except Exception as err:
        print("skip list пуст")
    return skipList
    
def check(skipList, tgLink, i):
    return any(tgLink[i] in skipLink for skipLink in skipList)


# скрипт для 1 клиента
async def telegram1():
    # бесконечный цикл заходящий в чаты
    await client1.start()
    i=0
    skipList = []

    skipList = load(skipList, 'data1.pkl')
    while True:
        print("CLIENT 1 --------------------------------------------------------")
        
        if i >= len(tgLink):
            print("Ссылки закончились.")
            break

        if check(skipList,tgLink,i) == True:
            print(f"ссылка {tgLink[i]} уже пройдена")
            i += 1
            continue
        else:
            print(tgLink[i])
        
        try:
            try:
                await client1(ImportChatInviteRequest(tgLink[i]))
            except telethon.errors.rpcerrorlist.InviteHashExpiredError:
                await client1(JoinChannelRequest(tgLink[i]))
        except telethon.errors.rpcerrorlist.FloodWaitError as err:
            print("Ошибка из-за защиты от флуда telethon, придется подождать.")
            print(err)
            await asyncio.sleep(1)
        except Exception as err:
            print("Если что исправить")
            print(err)
            skipList.append(tgLink[i])
            i += 1
            save(skipList, 'data1.pkl')
            await asyncio.sleep(1)
        else:
            skipList.append(tgLink[i])
            i += 1
            save(skipList, 'data1.pkl')
    await client1.run_until_disconnected()

# скрипт для 2 клиента
async def telegram2():
    # бесконечный цикл заходящий в чаты
    await client2.start()
    i=0
    skipList = []

    skipList = load(skipList, 'data2.pkl')
    while True:
        print("CLIENT 2 --------------------------------------------------------")
        
        if i >= len(tgLink):
            print("Ссылки закончились.")
            break

        if check(skipList,tgLink,i) == True:
            print(f"ссылка {tgLink[i]} уже пройдена")
            i += 1
            continue
        else:
            print(tgLink[i])
        
        try:
            try:
                await client2(ImportChatInviteRequest(tgLink[i]))
            except telethon.errors.rpcerrorlist.InviteHashExpiredError:
                await client2(JoinChannelRequest(tgLink[i]))
        except telethon.errors.rpcerrorlist.FloodWaitError as err:
            print("Ошибка из-за защиты от флуда telethon, придется подождать.")
            print(err)
            await asyncio.sleep(1)
        except Exception as err:
            print("Если что исправить")
            print(err)
            skipList.append(tgLink[i])
            i += 1
            save(skipList, 'data2.pkl')
            await asyncio.sleep(1)
        else:
            skipList.append(tgLink[i])
            i += 1
            save(skipList, 'data2.pkl')
    await client2.run_until_disconnected()

# выполнять пока не выполниться до конца
async def main(secClientBool):
    if secClientBool == True:
        await asyncio.gather(
            telegram1(),
            telegram2(),
        )
    else:
        await asyncio.gather(
            telegram1(),
        )


# функция вызывающаяся из js main.html, начинает работу программы
@eel.expose
def start(links, secClientBool):
    global tgLink
    tgLink = links.split(',') # создание массива и присвоение ему линков через запятую
    print(tgLink)
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main(secClientBool))
    

# инициализация и запуск GUI
eel.init("web")
eel.start("main.html", size=(490,300))

Но что если мне нужно работать сразу на 5 аккаунтах одновременно? копипастить функцию 5 раз подряд не вариант. Нужно делать с помощью класса. Тут у меня и возникли недопонимания, например как запустить класс сразу 5 раз асинхронно? и вообще можно ли так сделать
  • Вопрос задан
  • 377 просмотров
Решения вопроса 1
phaggi
@phaggi Куратор тега Python
лужу, паяю, ЭВМы починяю
Как я это вижу:
Запускается не класс, «запускается» (вызывается) метод у экземпляров класса. При создании экземпляров нового класса туда надо передать соответствующие значения для создания в них экземпляров класса TelegramClient, либо уже готовые экземпляры класса TelegramClient.
В классе должен быть метод, примерно соответствующий вашей функции.

А в main, вместо запуска отдельных функций по-очереди, в цикле перебирать экземпляры вашего класса (по словарю или списку) и у каждого дергать соответствующий метод.

Я думаю, как-то так. Конечно, по дороге могут возникнуть нюансы, но а как по-другому?
Ответ написан
Пригласить эксперта
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Войти через центр авторизации
Похожие вопросы