@Andrey_Dolg

Как оптимизовать скорость скрапера?

Есть скрапер работает через aiohttp. Обработка данных и отправка идёт на соседнем потоке
Примерная схема
5ee78cf7509dc032254870.jpeg

Сидит он со следующими вводными 230 прокси серверов и 2 vCPU по 3.3 Ghz.
Нагрузка на прокси сейчас регулируется по параметру средней скорости ответа. Быстрее ответ больше запросов через этот прокси.
Предел скорости парсера это обработка со скоростью 400 запросов в секунду(это всё head в основном).
Я пробовал увеличивать число открытых дескрипторов в несколько раз но эффекта это не дало. Так же пробовал запуск через Pypy но положительный эффект если и был то был минимальным. После всего сделанного видится единственный способ разгрузка нагрузки по разным инстансам.

Основной фрагмент кода
def loop(ids_chunks):
        for i in ids_chunks:
        
            # Start Parsing
            tic = time.perf_counter()
            results, proxy_analize = self.loop.run_until_complete(self.requests_new_items(i, self.proxy_list, self.weight_dict))
            toc = time.perf_counter()
        
            print(f"request take {toc - tic:0.4f} seconds,"
                  f" full  amount of requests={ids_amount * len(available)}"
                  f", ids_chunks amount of requests{len(i) * len(available)}")
        
            print("analize")
        
            self.analize(results, self.extend_id, self.proxy_list) # take less then second
            del results

        self.proxy_balancer(proxy_analize, self.weight_dict)

    async def requests_new_items(self, ids, param_proxy_list, weight_dict):     
    
        connector = aiohttp.TCPConnector(limit=self.tcp_speed,
                                         force_close=True)  # self.tcp_speed)  # One time connection limit
        client_timeout = aiohttp.ClientTimeout(connect=9, total=12) # зависит от размера входящего чанка
    
        len_of_requests = int(str(len(ids) * len(available))) # amount of requests
    
        proxy_optimize = random.choices([i for i in weight_dict.keys() if i in param_proxy_list],
                                        [v for k, v in weight_dict.items() if k in param_proxy_list],
                                        k=len_of_proxy) # weights choices of proxy for each request
        async with aiohttp.ClientSession(connector=connector, timeout=client_timeout) as session:

            responce_list = await asyncio.gather(*[self.fetch_one(session, proxy_optimize[current_index], id_of_url,
                                                                  current_index // len(ids)) for
                                                   current_index, id_of_url in enumerate(ids * len(available))])
        del proxy_optimize
    
        new_item_or_except = [i for i in responce_list if i[0]]
    
        proxy_analize = [(i[1], i[2]) for i in responce_list if not i[0]] # list for calculate average responce time
    
        return new_item_or_except, proxy_analize


    async def fetch_one(self, session, proxy, id_of_url, available_id):
        try:
            await_time = time.perf_counter() # for calculate average responce time
            result = await session.head(f"{areas_of_lock[available_id]}/{id_of_url}",
                                        proxy=f"http://{proxy}",
                                        headers=headers_const)
            if not result.status == 404:
                return (id_of_url, result.status, area_id)
            return (None, proxy, time.perf_counter() - await_time)
        except Exception as err:
            print(err)
            return (id_of_url, proxy)
    
    
    def proxy_balancer(self, proxy_responce, weight_dict):
        weight_dict_average = dict() # dict with full responce time for each proxy
        for i in proxy_responce:
            if not weight_dict_average.get(i[0]):
                weight_dict_average[i[0]] = i[1]
                weight_dict_average[i[0] + '_count'] = 1
            else:
                weight_dict_average[i[0]] += i[1]
                weight_dict_average[i[0] + '_count'] += 1
    
    
        weight_dict_average_values = dict() #  1/(avarage value)
        for i in weight_dict.keys():
            if weight_dict_average.get(i):
                weight_dict_average_values[i] = 1 / (weight_dict_average[i] / weight_dict_average[i + '_count'])
    
        full_avarage_time = sum(weight_dict_average_values.values())
        
        # calculate weights for cohice
        for i in weight_dict.keys():
            if weight_dict_average_values.get(i):
                weight_dict[i] = weight_dict_average_values[i] / full_avarage_time
  • Вопрос задан
  • 177 просмотров
Пригласить эксперта
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Похожие вопросы