import logging # стандартная библиотека для логирования
import parser_functions # библиотека этого парсера
from telethon import TelegramClient, events, sync, connection # pip3 install telethon
from telethon.tl.functions.channels import JoinChannelRequest
from config import api_id, api_hash # получение айди и хэша нашего приложения из файла config.py
from loguru import logger
import asyncio
from telethon.errors.rpcerrorlist import FloodWaitError
# настройка логгера
logging.basicConfig(
level=logging.INFO,
filename='parser_log.log',
filemode='w',
format="%(asctime)s %(levelname)s %(message)s"
)
url = ["XXXX"]
flag = True
async def main():
async with TelegramClient('new', api_id, api_hash) as client:
for channel in url:
try:
logger.info(f"Аккаунт был подключен!")
await client(JoinChannelRequest(channel))
client.on_message(parser_functions.on_message) # регистрация обработчика сообщений
await client.run_until_disconnected()
except FloodWaitError as fwe:
print(f'Waiting for {fwe}')
await asyncio.sleep(delay=fwe.seconds)
if __name__ == "__main__":
asyncio.run(main())
def on_message(event):
try:
channel_id = event.message.chat.id
directory_name = str(event.message.id)
os.makedirs(f"{channel_id}/{directory_name}", exist_ok=True)
await parser_functions.get_message_content(client, event.message, url, channel_id, directory_name)
except Exception as passing:
logger.error(passing)
output = subprocess.check_output(["ping", "www.youtube.com"])
print(output.decode())
output = subprocess.run(["ping", "www.youtube.com"], capture_output=True, text=True)
print(output.stdout)
import io
from telethon.tl.custom import File
from typing import List, Tuple, Union
from mypy_extensions import TypedDict
from telethon import TelegramClient, events
from telethon.tl.types import InputMessagesFilterPhotos
# Создаем клиента Telegram
client = TelegramClient('session_name', api_id, api_hash)
# Создаем типизированный словарь для сообщений
class MessageDict(TypedDict):
id: int
message: str
media: Union[None, List[Tuple[str, bytes]]]
# Обработчик событий
@client.on(events.NewMessage(incoming=True, pattern='/start'))
async def handler(event):
# Получаем сообщение
message = event.message
# Получаем медиа-файлы
media = []
if message.media:
for file in message.media:
if isinstance(file, File):
# Конвертируем telethon.tl.custom.file в hints.FileLike
file_bytes = io.BytesIO(await file.download_media(bytes))
media.append((file.mime_type, file_bytes))
# Создаем типизированный словарь для сообщения
message_dict: MessageDict = {
'id': message.id,
'message': message.message,
'media': media if media else None
}
# Отправляем сообщение в другой канал
await client.send_message('other_channel', message_dict)
# Запускаем клиента Telegram
client.start()
client.run_until_disconnected()
messages.getConversations
для получения списка бесед, а затем перебрать каждую беседу и отправить сообщение в ней.conversations = await bot.api.messages.getConversations(filter='all')
for conversation in conversations['items']:
peer_id = conversation['peer']['id']
try:
await bot.api.messages.send(
peer_id=peer_id,
message='Новый пост!',
random_id=0,
attachment=f'wall-{event.group_id}_{event.object.id}'
)
except Exception as ex:
print(f'error: {ex}')
messages.getConversations
, и затем перебираем каждую беседу и отправляем сообщение в ней, используя метод messages.send
. В peer_id
мы передаем идентификатор беседы, а в attachment
мы передаем идентификатор записи на стене, которую мы хотим прикрепить к сообщению.