import os
import os.path
import re
import sys
from glob import glob
from time import sleep
from urllib.request import urlretrieve
import urllib.error
packages_to_install = "pyquery==2.0.0 tqdm==4.66.1"
cmd_for_install = sys.executable + " -m pip install "
for i in range(3):
try:
from pyquery import PyQuery as pq # type: ignore
from tqdm import trange, tqdm
from tqdm.contrib.concurrent import thread_map
break
except ImportError:
if os.system(cmd_for_install + packages_to_install) != 0:
os.system(cmd_for_install + " --user " + packages_to_install)
links_to_files: list = []
class ArchiveNotFound(Exception):
pass
class patterns:
RN = re.compile(r"[\r\n]{2,}")
TIME = re.compile(r" в [\d]{1,2}:[\d]{1,2}:[\d]{1,2}")
DELMSG = re.compile("Сообщение удалено[\n]?")
EDITEDMSG = re.compile(r"\(ред\.\)")
SLUG = re.compile(r"[^\w\s-]")
ONLY_DIGITS = r"[\D]+"
def get_user_aaction(prompt: str) -> str:
action = ""
while len(action) > 0:
action = re.sub(patterns.ONLY_DIGITS, "", input(prompt)).strip()
return action
def isInt(value):
try:
int(value)
except ValueError:
return False
return True
def norm(value: str) -> str:
for pattern in [patterns.RN, patterns.TIME, patterns.EDITEDMSG, patterns.DELMSG]:
value = re.sub(pattern, "\n" if pattern == patterns.RN else "", value)
return value.strip() + "\n\n"
def slugify(value: str) -> str:
value = re.sub(patterns.SLUG, "", value.lower())
return re.sub(r"[-\s]+", "-", value).strip("-_")
def get_dir_with_messages() -> str:
cwd = os.getcwd()
cwd2 = ""
for a in sys.argv:
if os.path.isdir(a):
cwd = a
cwd2 = os.path.join(cwd, "messages")
if os.path.isdir(cwd2):
cwd = cwd2
if not os.path.isfile(os.path.join(cwd, "index-messages.html")):
raise ArchiveNotFound("")
return cwd
def export_to_txt(files_with_messages: list, results_folder: str):
with open(files_with_messages[-1], "r", encoding="CP1251") as fp:
d = pq(fp.read())
chat_title = d("div.ui_crumb:last").text().strip()
txtfile = slugify(chat_title)
txtfile = os.path.join(results_folder, txtfile + ".txt")
with open(txtfile, "w", encoding="UTF-8") as txtfp:
txtfp.write("\ufeff")
for i in trange(len(files_with_messages), leave=None, delay=1.5, unit="pg"):
f = files_with_messages[i]
with open(f, "r", encoding="CP1251") as fp:
d = pq(fp.read())
messages: list = list(d("div.item").items())
messages.reverse()
txtfp.writelines([norm(m.text()) for m in messages])
attachments_links = d("a.attachment__link").items()
for attachment_link in attachments_links:
links_to_files.append(attachment_link.attr("href"))
def process_chat(path_to_chat: str, results_folder: str = "."):
d = None
files_with_messages: list = glob(
os.path.join(path_to_chat, "messages*.html"), recursive=False
)
if not files_with_messages:
return
files_with_messages.sort(
key=lambda x: int(os.path.basename(x).split("s")[-1][:-5]),
reverse=True,
)
export_to_txt(files_with_messages, results_folder)
def main():
try:
p = get_dir_with_messages()
except ArchiveNotFound:
print(
"""
Неверно указан путь к папке с архивом.
Измените рабочую директорию
или передайте нужный путь в аргументе командной строки при вызове этой утилиты.
""".strip()
)
sys.exit(1)
chats = list(
filter(
lambda x: isInt(os.path.basename(x)),
[
c
for c in glob(p + os.path.sep + "*", recursive=False)
if os.path.isdir(p)
],
)
)
print("I'm starting to work. %d chats found." % (len(chats),))
results_folder = os.path.join(p, "mre")
if not os.path.isdir(results_folder):
os.mkdir(results_folder)
results = thread_map(
lambda x: process_chat(x, results_folder),
chats,
leave=None,
miniters=1,
unit="chat",
)
del results
with open("links_to_files.txt", "w", encoding="UTF-8") as fp:
fp.write( "\n".join(links_to_files) )
print("Скачиваю вложения...")
attachments_dir = os.path.join(results_folder, "attachments")
if not os.path.isdir(attachments_dir):
os.mkdir(attachments_dir)
for url in tqdm(links_to_files):
filename = os.path.join(attachments_dir, url.split("/")[-1].split("?")[0])
try:
urlretrieve(url, filename)
except urllib.error.HTTPError:
print(f"Ошибка при скачивании {filename}")
sleep(1)
print(" ")
print("Завершено! Текстовые файлы находятся по пути:\r\n", results_folder)
if __name__ == "__main__":
main()