Задать вопрос
@AnKus

Как запараллелить выполнение ф-ций в асинхронном парсере?

Здравствуйте. Были точно уже подобные вопросы, но они остались без ответов.
Есть код парсера. Вопрос, как выполнить параллельно функции парсинга и записи в файл csv. Так как в этом коде судя по всему отрабатывает парсинг, а потом запись. Предположим, что запускается скрипт на целый день, распарсил огромный обьем информации, пропала электроэнергия, все пропало. Работало бы параллельно 2 ф-ции, то какая-то часть информации сохранилась бы. Вот собственно код:
import csv
import re
import aiohttp
import asyncio
from bs4 import BeautifulSoup
from datas import *
import requests
from tqdm import tqdm
import time

post_url = 'https://5140.org/companies/request'
companys = []
start_time = time.time()


async def get_pages_data():
    with open('all_links_1.txt', 'r') as f:
        lines = [line.strip() for line in f.readlines()]

        session = aiohttp.ClientSession()
        for line in tqdm(lines):
            try:
                async with session.get(line, headers=headers, cookies=cookies) as response:
                    result = await response.text()
            except AttributeError:
                continue

            soup = BeautifulSoup(result, 'lxml')
            try:
                name = soup.find('div', {'class': 'pull-md-left'}).find_next('h1').text
            except AttributeError:
                name = ''
            try:
                edrpou = soup.find('div', {'class': 'row'}).find_all_next('div', {'class': 'col'})[1].text.strip()
            except (AttributeError, IndexError):
                edrpou = ''
            try:
                idn = str(soup.find('span', {'itemprop': 'item'}).get('itemid')).replace('5140', '')
            except (AttributeError, IndexError):
                print('')
            x = ''.join(re.findall('[0-9]+', re.sub(r'^(.{7}). *$', '\g<1>', idn)))
            phone = requests.post(post_url, cookies=cookies, headers=headers, data=
            {'data_type': 'phone', 'item_id': {x}}).text.replace('+38', '')

            if phone.isdigit() is True:
                company = {
                    'name': name,
                    'phone': phone,
                    'edrpou': edrpou,
                }

                companys.append(company)

        await session.close()
        return companys


async def csv_writer():
    with open('companys.csv', 'w', newline='') as file:
        writer = csv.writer(file, delimiter=',')
        writer.writerow(['Name', 'Phone', 'Edrpou'])
        for company in tqdm(companys):
            writer.writerow([company['name'], company['phone'], company['edrpou']])


async def gather():
    await get_pages_data()
    await csv_writer()


def main():
    asyncio.run(gather())

    finish_time = time.time() - start_time
    print(f'Завершено ! Время выполнения: {finish_time}')


if __name__ == "__main__":
    main()
  • Вопрос задан
  • 153 просмотра
Подписаться 1 Простой 1 комментарий
Пригласить эксперта
Ответы на вопрос 1
@deliro
1. Создать очередь (asyncio.Queue)
2. Парсер пишет в очередь, писарь csv читает очередь и пишет в неё.
3. Парсер и писарь должны запускаться одновременно, то есть, типа такого

q = asyncio.Queue()  # с maxsize можно поиграться, в данном случае отставание писаря от парсера может быть не более чем на один элемент
await asyncio.gather(get_pages_data(q), csv_writer(q))

4. Парсеров, кажется, надо сделать больше одного. То есть, появляется второй asyncio.Queue (назовём его work_queue), куда падают lines из файла, эту очередь слушают N воркеров (скажем, 5 штук), получают элемент, работают с ним, затем пишут в result_queue, который слушает писарь csv и записывает результат в файлик

Псевдокод будет выглядеть так:

async def file_reader(work_q, n_parsers):
    with open('all_links_1.txt', 'r') as f:
        lines = [line.strip() for line in f.readlines()]
    for line in lines:
        await work_q.put(line)
    for _ in range(n_parsers):  # говорим парсерам, что работы больше нет
        await work_q.put(None)

async def parser(work_q, results_q):
    while True:
        line = await work_q.get()
        if line is None:
            return
        result = ... магия с походом в http ...
        await results_q.put(result)

async def writer(results_q):
    with open('companys.csv', 'w', newline='') as file:  # возможно, открывать файл имеет смысл при каждом получении элемента и закрывать после записи, так файл всегда будет "целым", но процесс записи будет дольше
        writer = csv.writer(file, delimiter=',')
        while True:
            result = await results_q.get()
            if result is None:
                return
            writer.writerow([result['name'], result['phone'], result['edrpou']])


async def main():
    work_queue = asyncio.Queue()
    results_queue = asyncio.Queue(10)  # парсер не должен ждать, пока писарь запишет в файл (хард может быть занят), поэтому небольшой буфер
    n_parsers = 5
    tasks = []
    parsers = []
    reader_task = asyncio.create_task(file_reader(work_queue, n_parsers))
    tasks.append(tasks)
    for _ in range(n_parsers):
        parser_task = asyncio.create_task(parser(work_queue, results_queue))
        tasks.append(parser_task)
        parsers.append(parser_task)
    tasks.append(asyncio.create_task(writer(results_queue)))
    await asyncio.gather(*parsers)  # ждём все парсеры
    await results_queue.put(None)  # говорим писарю, что больше ничего не будет
    await asyncio.gather(*tasks)  # дожидаемся все остальные таски (вернее, будет только одна — writer)


Однако нужно понимать, что порядок результатов при таком подходе не будет гарантированным или хоть сколько-то стабильным. Поэтому, если порядок важен, стоит писать в файл какую-то промежуточную структуру (пусть тот же csv, но с доп столбцом link) и под конец всех работ вычитывать её, сортировать и складывать уже в нужном порядке
Ответ написан
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Похожие вопросы