Задать вопрос

Как реализовать асинхронность + многопоточность одновременно?

Мне надо запарсить огромное кол-во страниц (около 15000) с 35 статьями на каждой на каждую рубрику. Разумеется, если делать это через requests синхронно и в одном потоке, то на это уйдет очень много времени (несколько часов?), а мне информация нужна прямо сейчас. Хочу использовать aiohttp и multithreading. Хочу реализовать примерно так: каждый поток будет параллельно с другими считывать тысячи страниц определенной рубрики, которых около 15 штук. В каждом потоке будут асинхронно делаться запросы к aiohttp. Проблема в том, что executor не дает передать в себя await функцию, а если ее вызывать прямо в submit, то никакой многопоточности не будет, потоки будут выполняться последовательно.
Вопрос: как это реализовать? Текущий код прилагаю ниже:

# Other imports
import bs4
import aiohttp, asyncio
from urllib import parse
import certifi, ssl
from concurrent.futures import ThreadPoolExecutor

# Project imports
from utils import to_dict, fetch


items_list = []
SSL_CERT = ssl.create_default_context(cafile=certifi.where())

async def serialize_topic(topic: str, topics: list[str]) -> None:
    print(f'Starting to serialize {topics.index(topic)+1}. {topic}')

    async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(ssl=SSL_CERT)) as session:
        topic_html = await fetch(func=session.get, url=topic, params={'limit' : 35})

    topic_soup = bs4.BeautifulSoup(topic_html, 'html.parser')

    # Getting the last page number to handle pagination
    pagination = list(topic_soup('ul', {'class' : 'pagination'}).children)
    max_page_url = pagination[-1].a.get('href')
    max_page_params = parse.urlparse(max_page_url).query # Parsing a link to the last page in order to get its number
    dict_from_query = parse.parse_qs(max_page_params)
    max_page = int(dict_from_query['page'][0])

    # Going through all pages
    for i in range(max_page):
        async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(ssl=SSL_CERT)) as session:
            page_html = await fetch(func=session.get, url=topic, params={'page' : i, 'limit' : 100})

        page_soup = bs4.BeautifulSoup(page_html, 'html.parser')
        items = page_soup.find_all(class_='article')

        # Going through items on one page
        for item in items:

            # Getting item caption with its data
            # ... здесь просто получаю описание статьи и название 
            ...
            
        print(f'Serialized page number {i+1}')

    print(f'Serialized {topics.index(topic)+1}. {topic}')


async def main():
    # Getting all topics
    async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(ssl=SSL_CERT)) as session:
        html = await fetch(func=session.get, url='')

    soup = bs4.BeautifulSoup(html, 'html.parser')
    topics = soup.find('div', {'id' : 'content'}).find_all(class_='topic')
    topics = [item.a.get('href') for item in topics]

    with ThreadPoolExecutor(max_workers=15) as executor:
        for topic in topics:
            executor.submit(await serialize_topic(), topic, topics)

    print('\n**************************\n')
    print(f'Total amount of articles: {len(items_list)}')


if name == '__main__':
    asyncio.run(main())
  • Вопрос задан
  • 282 просмотра
Подписаться 4 Простой 11 комментариев
Пригласить эксперта
Ответы на вопрос 1
@Everything_is_bad
не нужен тут ThreadPoolExecutor, читай про create_task. Ну и сейчас рекомендуют TaskGroup, а не gather.
Ответ написан
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Похожие вопросы