@itsoftoff

Почему aiohttp выкидывает timeout?

Суть кода в том, что он парсит Р2Р рынок известных криптобирж. Короче, условно первые 20 запросов идут нормально, а дальше ловит таймаут (убрать не получится, залагает просто). При обработке таймаута просто не срабатывает return, условно весь код до него работает - а после нет. И дальше все запросы ловят timeoutexception и капец, уже даже функцию сброса прокси написал, чего только не делал. Получается так, что срабатывает первый раз и дальше все запросы ловят timeout.

Код
import requests
from aiohttp import ClientSession
from ujson import loads, dumps
from aiofiles import open
from os import getcwd
from random import choice, sample
from aiohttp.client_exceptions import ClientProxyConnectionError, ServerConnectionError, ClientHttpProxyError
from asyncio.exceptions import TimeoutError
from parsel import Selector
from time import time
from asyncio import sleep
from random import randint


class Crawler:
    
    __slots__ = ('client', 'route', 'proxies', 'proxy_count', 'log', 'base_url', 'disable_useragents')
    
    async def __aenter__(self):
        self.client: ClientSession = ClientSession(self.base_url)
        async with open(f'{getcwd()}/tools/proxylist.txt', 'r') as proxylist:
            async with open(f'{getcwd()}/tools/bad_proxy.txt', 'r+') as bad_proxylist:
                bad_proxyies = {}
                for i in await bad_proxylist.readlines():
                    if i.strip():
                        pt = i.split(' | ')
                        if len(pt) != 2: continue
                        p, t = pt
                        t = float(t)
                        if time()-t>120:
                            continue
                        else:
                            bad_proxyies[p] = t
                async with open(f'{getcwd()}/tools/useragents.txt', 'r') as useragents:
                    proxylist = [i.strip() for i in await proxylist.readlines()]
                    self.route = 0
                    if not hasattr(self, 'disable_useragents'):
                        self.disable_useragents = False
                        useragents = [i.strip() for i in await useragents.readlines()]
                        self.proxies = [
                            [proxy, choice(useragents)]
                            for proxy in proxylist if proxy and proxy not in bad_proxyies
                        ]
                        self.proxy_count = len(self.proxies)
                        self.proxies = sample(self.proxies, k=self.proxy_count)
                    else:
                        self.proxies = [i for i in proxylist if i and i not in bad_proxyies]
                        self.proxy_count = len(self.proxies)
                        self.proxies = sample(self.proxies, k=self.proxy_count)
                    await bad_proxylist.truncate(0)
                    txt = []
                    for p, t in bad_proxyies.items():
                        txt.append(f'{p} | {t}\n')
                    await bad_proxylist.seek(0)
                    await bad_proxylist.writelines(txt)
        return self
    
    async def __aexit__(self, *args, **kwargs):
        await self.client.close()
    
    async def request(
        self,
        method: str,
        url: str,
        return_dict: bool,
        return_selector: bool = False,
        **kwargs
    ):
        proxy = self.proxy_route()
        if not proxy:
            await sleep(30)
            exit()
        if not self.disable_useragents:
            if headers := kwargs.get('headers'):
                headers['user-agent'] = proxy[1]
            else:
                kwargs['headers'] = {'user-agent': proxy[1]}
            if proxy:
                proxy = proxy[0]
        try:
            async with self.client.request(method, url, proxy=proxy,  **kwargs) as response:
                # data = kwargs.get('json')
                # if data['page'] > 1:
                #     data = data
                if response.status == 200:
                    if return_selector:
                        data = Selector(text=await response.text())
                    elif return_dict:
                        data = loads(await response.text())
                else:
                    self.log.warning(f'status={response.status}, endpoint={response.url}')
                    if return_selector:
                        data = Selector(text='')
                    elif return_dict:
                        data = {}
                return data
        except ClientProxyConnectionError:
            self.log.error(f'ClientProxyConnectionError! -> {proxy}')
            async with open(f'{getcwd()}/tools/bad_proxy.txt', 'a') as f:
                await f.writelines([f'{proxy} | {time()}\n'])
            self.proxies.pop(self.route-1)
            self.proxy_count -= 1
            self.route -= 1
            self.client  = ClientSession(self.base_url)
            return await self.request(method, url, return_dict, **kwargs)
        except OSError as err:
            self.log.error(f'OSError! -> {err}')
            async with open(f'{getcwd()}/tools/bad_proxy.txt', 'a') as f:
                await f.writelines([f'{proxy} | {time()}\n'])
            self.proxies.pop(self.route-1)
            self.proxy_count -= 1
            self.route -= 1
            self.client = ClientSession(self.base_url)
            return await self.request(method, url, return_dict, **kwargs)
        except ServerConnectionError:
            self.log.error(f'ServerConnectionError! -> {proxy}')
            async with open(f'{getcwd()}/tools/bad_proxy.txt', 'a') as f:
                await f.writelines([f'{proxy} | {time()}\n'])
            self.proxies.pop(self.route-1)
            self.proxy_count -= 1
            self.route -= 1
            self.client  = ClientSession(self.base_url)
            return await self.request(method, url, return_dict, **kwargs)
        except ClientHttpProxyError:
            self.log.error(f'ClientHttpProxyError! -> {proxy}')
            async with open(f'{getcwd()}/tools/bad_proxy.txt', 'a') as f:
                await f.writelines([f'{proxy} | {time()}\n'])
            self.proxies.pop(self.route-1)
            self.proxy_count -= 1
            self.route -= 1
            self.client = ClientSession(self.base_url)
            return await self.request(method, url, return_dict, **kwargs)
        except RuntimeError:
            self.log.error(f'RuntimeError! -> {proxy}')
            async with open(f'{getcwd()}/tools/bad_proxy.txt', 'a') as f:
                await f.writelines([f'{proxy} | {time()}\n'])
            self.proxies.pop(self.route-1)
            self.proxy_count -= 1
            self.route -= 1
            self.client = ClientSession(self.base_url)
            return await self.request(method, url, return_dict, **kwargs)
        except TimeoutError:
            self.log.error(f'Timeout error! -> {proxy}')
            await self.reset_proxies()
            data = await self.request(method, url, return_dict, **kwargs)
            if data is None:  # Если метод вернул None
                return None
            return data
        except Exception as ex:
            self.log.error(f"Exception - {ex}")

    async def get(
        self, 
        url: str,
        return_dict: bool = True,
        return_selector: bool = False,
        **kwargs
    ):
        return await self.request('GET', url, return_dict, return_selector, **kwargs)
       

    def proxy_route(self) -> tuple:
        if not self.proxy_count:
            return
        if self.route == self.proxy_count:
            self.route = 0
        proxy = self.proxies[self.route]
        self.route += 1
        return proxy
    
    async def reset_proxies(self):
        async with open(f'{getcwd()}/tools/proxylist.txt', 'r') as proxylist:
            async with open(f'{getcwd()}/tools/bad_proxy.txt', 'r+') as bad_proxylist:
                bad_proxyies = {}
                for i in await bad_proxylist.readlines():
                    if i.strip():
                        pt = i.split(' | ')
                        if len(pt) != 2: continue
                        p, t = pt
                        t = float(t)
                        if time()-t>120:
                            continue
                        else:
                            bad_proxyies[p] = t
                async with open(f'{getcwd()}/tools/useragents.txt', 'r') as useragents:
                    proxylist = [i.strip() for i in await proxylist.readlines()]
                    self.route = 0
                    if not hasattr(self, 'disable_useragents'):
                        self.disable_useragents = False
                        useragents = [i.strip() for i in await useragents.readlines()]
                        self.proxies = [
                            [proxy, choice(useragents)]
                            for proxy in proxylist if proxy and proxy not in bad_proxyies
                        ]
                        self.proxy_count = len(self.proxies)
                        self.proxies = sample(self.proxies, k=self.proxy_count)
                    else:
                        self.proxies = [i for i in proxylist if i and i not in bad_proxyies]
                        self.proxy_count = len(self.proxies)
                        self.proxies = sample(self.proxies, k=self.proxy_count)
                    await bad_proxylist.truncate(0)
                    txt = []
                    for p, t in bad_proxyies.items():
                        txt.append(f'{p} | {t}\n')
                    await bad_proxylist.seek(0)
                    await bad_proxylist.writelines(txt)
  • Вопрос задан
  • 145 просмотров
Решения вопроса 1
@itsoftoff Автор вопроса
Короче, нашел решение, мне помогло частично, но вам думаю поможет

https://github.com/aio-libs/aiohttp/issues/3203
Ответ написан
Комментировать
Пригласить эксперта
Ответы на вопрос 1
@Everything_is_bad
Не нужна тут рекурсия, достаточно бесконечного while (а лучше конечный, пусть его внешний софт перезапускает). Еще и sleep у разных except, разный сделать. И код полностью отрефакторить, такое читать больно.
Ответ написан
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Войти через центр авторизации
Похожие вопросы