@TZiNVX6yf

Python aiohttp timeout, норма?

Дано:
Пишу высокопроизводительный скраппер по сайтам.
Задача скраппера подтягивать, headers, js скрипты и тд.
Использую прокси, перед запуском проверяю их актив и что они отвечают в указанный timeout.
Запускаю в нескольких потоках gather, +- 600, на 1 процессе.
OS ubuntu 22.04

Догадки:
Из-за того что gather, как я понял, это не полноценные потоки, интерпретатор забивает ими общий стек , отсюда timeout может увеличиваться при особенно при больших скоростях.
При этом метрики позволяют работать на таких скоростях, ЦПУ держится в районе 50%.
Пробывал разбивать на процессы, так же через gather с помощью concurrent.futures, улучшений не выявленно.
Самое интересное что при print(e) ничего не принтиться)
Только через print(type(e)) 'class timeout' получаю. Но это так я понимаю из-за особенности asyncio.
Так же смотрел по колл-ву сокетов и ограничению одновременных соединений на системе, увеличивал колл-во коннектов до предела. Эффекта 0.
Timeout не зависит от сайта, это всегда рандом примерно 10% улетает в молоко даже с рекурсивным методом.

Не надеюсь получить конечно, точечный ответ, я просто даже не знаю как это дебажить) Подскажите.
Спасибо!

import aiohttp
from aiohttp_socks import ProxyConnector
import ssl
import asyncio
from fake_useragent import UserAgent
from asyncio.exceptions import TimeoutError
from aiohttp import ClientTimeout
import random
from typing import Any

ssl_context = ssl.create_default_context()
ssl_context.minimum_version = ssl.TLSVersion.TLSv1_2
ua = UserAgent()

async def parse(*args: Any, **kwargs: Any) -> dict:
    pass

async def fetch_url(url, proxy) -> dict:
    try:
        headers = {
                'User-Agent': ua.random
            }
        connector = ProxyConnector.from_url(proxy, ssl_context=ssl_context)
        timeout = ClientTimeout(total=30)
        async with aiohttp.ClientSession(connector=connector, timeout=timeout) as session:
            async with session.get(url, headers=headers) as response:
                response_text = await response.text()
                return await parse(response_text)
    except TimeoutError:
        raise
    except Exception:
        pass

async def scrap_worker(queue, proxies_list) -> dict:
    single_scrap_dict = {}
    while not queue.empty():
        url = await queue.get()
        selected_proxy = random.choice(proxies_list)
        try:
            scrap_result = await fetch_url(url, selected_proxy)
        except TimeoutError:
            try:
               scrap_result = await fetch_url(url, selected_proxy)
            except BaseException:
                pass

        single_scrap_dict.update(scrap_result)
        
    return single_scrap_dict

async def main(urls, proxies_list, num_workers):
    queue = asyncio.Queue()
    for url in urls:
        queue.put_nowait(url)
    tasks = [scrap_worker(queue, proxies_list) for _ in range(num_workers)]
    await asyncio.gather(*tasks)
    await queue.join()

urls = [
    "https://example.com",
    "https://example.org",
]

proxies_list = []

result = asyncio.run(main(urls, proxies_list, num_workers=500))
  • Вопрос задан
  • 96 просмотров
Решения вопроса 1
fenrir1121
@fenrir1121
Начни с документации
gather как я понял, это не полноценные потоки
gather вообще не создаёт потоки, он оборачивает корутины в задачи и дожидается их выполнения. Async это кооперативная многозадачность - запросы к сайтам происходят не дожидаясь ответа, но выполняется это все в одном потоке.

Вероятнее всего в функции parse, код которой не и приложен, есть вызовы или cpu bound операции, которые блокируют цикл событий.
Ответ написан
Пригласить эксперта
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Войти через центр авторизации
Похожие вопросы