@TZiNVX6yf

Python aiohttp timeout, норма?

Дано:
Пишу высокопроизводительный скраппер по сайтам.
Задача скраппера подтягивать, headers, js скрипты и тд.
Использую прокси, перед запуском проверяю их актив и что они отвечают в указанный timeout.
Запускаю в нескольких потоках gather, +- 600, на 1 процессе.
OS ubuntu 22.04

Догадки:
Из-за того что gather, как я понял, это не полноценные потоки, интерпретатор забивает ими общий стек , отсюда timeout может увеличиваться при особенно при больших скоростях.
При этом метрики позволяют работать на таких скоростях, ЦПУ держится в районе 50%.
Пробывал разбивать на процессы, так же через gather с помощью concurrent.futures, улучшений не выявленно.
Самое интересное что при print(e) ничего не принтиться)
Только через print(type(e)) 'class timeout' получаю. Но это так я понимаю из-за особенности asyncio.
Так же смотрел по колл-ву сокетов и ограничению одновременных соединений на системе, увеличивал колл-во коннектов до предела. Эффекта 0.
Timeout не зависит от сайта, это всегда рандом примерно 10% улетает в молоко даже с рекурсивным методом.

Не надеюсь получить конечно, точечный ответ, я просто даже не знаю как это дебажить) Подскажите.
Спасибо!

import aiohttp
from aiohttp_socks import ProxyConnector
import ssl
import asyncio
from fake_useragent import UserAgent
from asyncio.exceptions import TimeoutError
from aiohttp import ClientTimeout
import random
from typing import Any

ssl_context = ssl.create_default_context()
ssl_context.minimum_version = ssl.TLSVersion.TLSv1_2
ua = UserAgent()

async def parse(*args: Any, **kwargs: Any) -> dict:
    pass

async def fetch_url(url, proxy) -> dict:
    try:
        headers = {
                'User-Agent': ua.random
            }
        connector = ProxyConnector.from_url(proxy, ssl_context=ssl_context)
        timeout = ClientTimeout(total=30)
        async with aiohttp.ClientSession(connector=connector, timeout=timeout) as session:
            async with session.get(url, headers=headers) as response:
                response_text = await response.text()
                return await parse(response_text)
    except TimeoutError:
        raise
    except Exception:
        pass

async def scrap_worker(queue, proxies_list) -> dict:
    single_scrap_dict = {}
    while not queue.empty():
        url = await queue.get()
        selected_proxy = random.choice(proxies_list)
        try:
            scrap_result = await fetch_url(url, selected_proxy)
        except TimeoutError:
            try:
               scrap_result = await fetch_url(url, selected_proxy)
            except BaseException:
                pass

        single_scrap_dict.update(scrap_result)
        
    return single_scrap_dict

async def main(urls, proxies_list, num_workers):
    queue = asyncio.Queue()
    for url in urls:
        queue.put_nowait(url)
    tasks = [scrap_worker(queue, proxies_list) for _ in range(num_workers)]
    await asyncio.gather(*tasks)
    await queue.join()

urls = [
    "https://example.com",
    "https://example.org",
]

proxies_list = []

result = asyncio.run(main(urls, proxies_list, num_workers=500))
  • Вопрос задан
  • 93 просмотра
Решения вопроса 1
fenrir1121
@fenrir1121
Начни с документации
gather как я понял, это не полноценные потоки
gather вообще не создаёт потоки, он оборачивает корутины в задачи и дожидается их выполнения. Async это кооперативная многозадачность - запросы к сайтам происходят не дожидаясь ответа, но выполняется это все в одном потоке.

Вероятнее всего в функции parse, код которой не и приложен, есть вызовы или cpu bound операции, которые блокируют цикл событий.
Ответ написан
Пригласить эксперта
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Войти через центр авторизации
Похожие вопросы
SpectrumData Екатеринбург
от 150 000 до 200 000 ₽
Гринатом Москва
от 150 000 ₽
Greenway Global Новосибирск
от 150 000 ₽
16 июн. 2024, в 20:43
90909 руб./за проект
16 июн. 2024, в 19:56
30000 руб./за проект
16 июн. 2024, в 18:39
150000 руб./за проект