Дано:
Пишу высокопроизводительный скраппер по сайтам.
Задача скраппера подтягивать, headers, js скрипты и тд.
Использую прокси, перед запуском проверяю их актив и что они отвечают в указанный timeout.
Запускаю в нескольких потоках gather, +- 600, на 1 процессе.
OS ubuntu 22.04
Догадки:
Из-за того что gather, как я понял, это не полноценные потоки, интерпретатор забивает ими общий стек , отсюда timeout может увеличиваться при особенно при больших скоростях.
При этом метрики позволяют работать на таких скоростях, ЦПУ держится в районе 50%.
Пробывал разбивать на процессы, так же через gather с помощью concurrent.futures, улучшений не выявленно.
Самое интересное что при print(e) ничего не принтиться)
Только через print(type(e)) 'class timeout' получаю. Но это так я понимаю из-за особенности asyncio.
Так же смотрел по колл-ву сокетов и ограничению одновременных соединений на системе, увеличивал колл-во коннектов до предела. Эффекта 0.
Timeout не зависит от сайта, это всегда рандом примерно 10% улетает в молоко даже с рекурсивным методом.
Не надеюсь получить конечно, точечный ответ, я просто даже не знаю как это дебажить) Подскажите.
Спасибо!
import aiohttp
from aiohttp_socks import ProxyConnector
import ssl
import asyncio
from fake_useragent import UserAgent
from asyncio.exceptions import TimeoutError
from aiohttp import ClientTimeout
import random
from typing import Any
ssl_context = ssl.create_default_context()
ssl_context.minimum_version = ssl.TLSVersion.TLSv1_2
ua = UserAgent()
async def parse(*args: Any, **kwargs: Any) -> dict:
pass
async def fetch_url(url, proxy) -> dict:
try:
headers = {
'User-Agent': ua.random
}
connector = ProxyConnector.from_url(proxy, ssl_context=ssl_context)
timeout = ClientTimeout(total=30)
async with aiohttp.ClientSession(connector=connector, timeout=timeout) as session:
async with session.get(url, headers=headers) as response:
response_text = await response.text()
return await parse(response_text)
except TimeoutError:
raise
except Exception:
pass
async def scrap_worker(queue, proxies_list) -> dict:
single_scrap_dict = {}
while not queue.empty():
url = await queue.get()
selected_proxy = random.choice(proxies_list)
try:
scrap_result = await fetch_url(url, selected_proxy)
except TimeoutError:
try:
scrap_result = await fetch_url(url, selected_proxy)
except BaseException:
pass
single_scrap_dict.update(scrap_result)
return single_scrap_dict
async def main(urls, proxies_list, num_workers):
queue = asyncio.Queue()
for url in urls:
queue.put_nowait(url)
tasks = [scrap_worker(queue, proxies_list) for _ in range(num_workers)]
await asyncio.gather(*tasks)
await queue.join()
urls = [
"https://example.com",
"https://example.org",
]
proxies_list = []
result = asyncio.run(main(urls, proxies_list, num_workers=500))