@ArtemMik
Python

Как парсить телеграмм канал в онлайне?

У меня есть код, который сохраняет историю сообщений каналов телеграмм, но мне надо сделать, чтобы он мог сохранять новые сообщения в режиме реального времени

Код парсинга канала:
import logging  # стандартная библиотека для логирования
import parser_functions  # библиотека этого парсера
from telethon import TelegramClient, events, sync, connection  # pip3 install telethon
from telethon.tl.functions.channels import JoinChannelRequest
from config import api_id, api_hash  # получение айди и хэша нашего приложения из файла config.py
from loguru import logger
import asyncio
from telethon.errors.rpcerrorlist import FloodWaitError


# настройка логгера
logging.basicConfig(
    level=logging.INFO,
    filename='parser_log.log',
    filemode='w',
    format="%(asctime)s %(levelname)s %(message)s"
)


url = ["XXXX"]
flag = True



async def main():
    async with TelegramClient('new', api_id, api_hash) as client:
        for channel in url:
            try:
                logger.info(f"Аккаунт был подключен!")
                await client(JoinChannelRequest(channel))
                err = await parser_functions.parse(client, channel)  # обработка сообщений
                logger.info(f"Сообщения были спарсины!")
            except FloodWaitError as fwe:
                print(f'Waiting for {fwe}')
                await asyncio.sleep(delay=fwe.seconds)
        await client.run_until_disconnected()




if __name__ == "__main__":
    asyncio.run(main())


Функция забора ID канала и обработка сообщений и медиа файлов:

from telethon.tl.types import MessageEntityTextUrl
from glob import glob
from dateutil.relativedelta import relativedelta  # pip3 install python-dateutil
import datetime
import os
import asyncio


async def get_channel_id(client, link):  # получение ID канала
    m = await client.get_messages(link, limit=1)
    channel_id = m[0].peer_id.channel_id
    return str(channel_id)


def clearify_text(msg):  # очищение текста от символов гиперссылки
    text = msg.message
    text_splitted = text.split()
    text_listed = [word for word in text_splitted if word != ' ']
    return " ".join(text_listed)


async def get_message_content(client, msg, url, channel_name, directory_name):  # получение содержимого сообщения
    msg_date = str(msg.date)  # дата отправки сообщения
    msg_url = url + '/' + str(msg.id)  # каст ссылки на сообщение
    file = open(f"{channel_name}/{directory_name}/{directory_name}_meta.txt", 'a+')  # запись метаданных сообщения
    file.write(msg_url)
    file.write('\n' + msg_date)
    file.close()
    if msg.message:  # если сообщение содержит текст, запись этого текста в текстовый файл в папке сообщения
        text = clearify_text(msg=msg)
        file = open(f"{channel_name}/{directory_name}/{directory_name}.txt", "w")
        file.write(text)
        file.close()
    if msg.media:  # если сообщение содержит медиа (фото, видео, документы, файлы), загрузка медиа в папку сообщения
        await client.download_media(message=msg, file=f"{channel_name}/{directory_name}")
    if msg.entities:  # запись гиперссылок из текста сообщения в файл сообщения
        urls = [ent.url for ent in msg.entities if isinstance(ent, MessageEntityTextUrl)]
        file = open(f"{channel_name}/{directory_name}/{directory_name}.txt", mode='a+')
        for u in urls:
            file.write('\n' + u)
        file.close()


async def find_last_parsed_date(path):  # определение даты, с которой начинать парсинг
    paths = glob(f"{path}/*/*meta.txt", recursive=True)  # поиск существующих метаданных по уже собранным сообщениям
    oldest = datetime.datetime.strptime("1970-01-01 00:00:00+00:00", "%Y-%m-%d %H:%M:%S%z")
    temp = oldest
    for p in paths:  # поиск даты отправки последнего сообщения
        with open(p, 'r') as file:
            date = datetime.datetime.strptime(file.readlines()[-1], "%Y-%m-%d %H:%M:%S%z")
            if date > oldest:
                oldest = date
    if temp == oldest:
        oldest = datetime.datetime.now() - relativedelta(months=3)  # если сообщений нет, офсет устанавливается на
                                                                    # три месяца от текущей даты
    return oldest


async def parse(client, url):  # сбор сообщений из канала
    err = []  # переменная возможной ошибки
    channel_id = await get_channel_id(client, url)  # получение ID канала
    os.makedirs(channel_id, exist_ok=True)  # создание папки канала в текущей директории
    oldest = await find_last_parsed_date(channel_id)  # получение даты, с которой начинать парсинг
    async for message in client.iter_messages(url, reverse=True, offset_date=oldest):  # итератор по сообщениям (урл - ссылка
                                                                                 # на канал, реверс - итерация от старых
                                                                                 # к новым, офсет - дата с которой
                                                                                 # начинать парсинг
        try:
            directory_name = str(message.id)  # получение ID сообщения
            os.makedirs(f"{channel_id}/{directory_name}", exist_ok=True)  # создание папки сообщения
            await get_message_content(client, message, url, channel_id, directory_name)  # обработка сообщения

        except Exception as passing:  # обработка ошибок
            err.append(passing)
            continue
    return err  # возврат возможных ошибок


Чтобы записалась история сообщений в канале, надо запустить скрипт заново, а мне нужно чтобы это можно было сделать в онлайне (без перезапуск скрипта), долго искал информацию, но не нашёл ничего годного для моего кода, прошу, пожалуйста, помочь
  • Вопрос задан
  • 308 просмотров
Пригласить эксперта
Ответы на вопрос 1
Lapita12
@Lapita12
Тесты, тесты?
https://github.com/mmat16/telegram_channel_parser

import logging  # стандартная библиотека для логирования
import parser_functions  # библиотека этого парсера
from telethon import TelegramClient, events, sync, connection  # pip3 install telethon
from telethon.tl.functions.channels import JoinChannelRequest
from config import api_id, api_hash  # получение айди и хэша нашего приложения из файла config.py
from loguru import logger
import asyncio
from telethon.errors.rpcerrorlist import FloodWaitError


# настройка логгера
logging.basicConfig(
    level=logging.INFO,
    filename='parser_log.log',
    filemode='w',
    format="%(asctime)s %(levelname)s %(message)s"
)


url = ["XXXX"]
flag = True



async def main():
    async with TelegramClient('new', api_id, api_hash) as client:
        for channel in url:
            try:
                logger.info(f"Аккаунт был подключен!")
                await client(JoinChannelRequest(channel))
                client.on_message(parser_functions.on_message)  # регистрация обработчика сообщений
                await client.run_until_disconnected()
            except FloodWaitError as fwe:
                print(f'Waiting for {fwe}')
                await asyncio.sleep(delay=fwe.seconds)




if __name__ == "__main__":
    asyncio.run(main())




def on_message(event):
    try:
        channel_id = event.message.chat.id
        directory_name = str(event.message.id)
        os.makedirs(f"{channel_id}/{directory_name}", exist_ok=True)
        await parser_functions.get_message_content(client, event.message, url, channel_id, directory_name)
    except Exception as passing:
        logger.error(passing)
Ответ написан
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Похожие вопросы