@Andrey_Dolg

Как мониторить работу скрипта?

Есть скрипт на сервере работает 24/7 но есть редкая ситуация когда он буквально зависает потребление памяти в норме и процесс в списке процессов не отмечается как зомби. В логах если они не врут он останавливается где-то в цепочке опроса урлов aiohttp(в try) в асинхроной функции. Не ошибок не сбоев нет что печально эта ошибка даже таймауты самой либы обходит без сбоев. Думал решить это пока немного кривым способом отмониторить нагрузку на CPU на сервере и если она более 5-10 минут меньше 10% то перезапускать скрипт. Так как сейчас event loop параллиться по процессам то возможно можно как-то таймаут исполнения на processing pool map повесить но так как полного доверия логам нет то хотел всё же на основной поток мониторинг использования процесса прикрутить. Собственно вопрос как такое обычно делается и что стоит учесть?
# #############################   All time running  loop inside 1 thread child
# init 2 process pool
pool = multiprocessing.Pool(processes=2)
while True:


    # create list of proxy with size == chunk size
    len_of_proxy = self.n * len(areas_of_lock)
    proxy_optimize = random.choices([i for i in self.weight_dict.keys() if i in self.proxy_list],
                                    [v for k, v in self.weight_dict.items() if k in self.proxy_list],
                                    k=len_of_proxy)

    # Do chunks
    # 1 chunk to 1 process
    ids_chunks = [(ids[i:i + self.n], proxy_optimize, self.tcp_speed) for i in range(0, len(ids), self.n)]

    results = []
    proxy_analize = []

    del proxy_optimize
    
    # Send chanks to our multiprocessing pool
    unpuck_results = pool.map(get_results, ids_chunks)

    [(results.extend(i[0]), proxy_analize.extend(i[1])) for i in unpuck_results]


    self.analize(results, self.extend_id, self.proxy_list)
    self.proxy_balancer(proxy_analize, self.weight_dict)

# ######################################


# ######################################## multiprocessing and aiohttp part request sequence of urls

def get_results(ids):
    """
    Process fuction create new loop and send requests in separate process
    :param ids: 
    :return: 
    """
    proxy_list = ids[1]
    #weight_dict = ids[2]
    tcp_speed = ids[2]
    loop = asyncio.new_event_loop()
    results, proxy_analize =  loop.run_until_complete(
        requests_new_items(ids[0], proxy_list,tcp_speed))
    return results, proxy_analize


async def requests_new_items(ids,param_proxy_list, tcp_speed):
    """
    Generate tasks
    :param ids:
    :param param_proxy_list:
    :param tcp_speed:
    :return:
    """
    connector = aiohttp.TCPConnector(limit=tcp_speed, force_close=True, ssl=False) # One time connection limit
    client_timeout = aiohttp.ClientTimeout(connect=13, total=14)
    urls_sub_sets_optimizer = []

    # generate subset of urls usual repeat subset indexes like [0,0,0,0,0,0,0,1,1,1,1,1,1,1,2,2,2,2,2]
    [urls_sub_sets_optimizer.extend([i]*len(ids)) for i in range(len(urls_sub_sets)) ]
    async with aiohttp.ClientSession(connector=connector, timeout=client_timeout) as session:
        responce_list = await asyncio.gather(*[fetch_one(session, param_proxy_list[current_index], id_of_url,
                                                     urls_sub_sets_optimizer[current_index]) for
                                               current_index, id_of_url in enumerate(ids * len(urls_sub_sets))])

    new_item_or_except = [i for i in responce_list if i[0]]
    proxy_analize = [(i[1],i[2]) for i in responce_list if not i[0]]
    return  new_item_or_except, proxy_analize   #

async def fetch_one(session, proxy, id_of_url,sub_url):
    """
     Request to server
    :param session:
    :param proxy:
    :param id_of_url:
    :param sub_url:
    :return:
    """
    try:

        await_time = time.perf_counter()
        result = await session.head(''.join((url_part_1,url_part_2,url_part_3,url_part_4)),
                                    proxy= ''.join(('http://',proxy)),
                                    headers=headers_const)

        if not result.status==status_number:
            return (id_of_url,result.status,sub_url)

        return (None,proxy, time.perf_counter()-await_time)
    except Exception as err:
        print(err)
        return (id_of_url,proxy)
# ############################################
  • Вопрос задан
  • 243 просмотра
Решения вопроса 1
@bkosun
Используйте инструмент, который позволяет контролировать процессы, например Supervisor

autorestart=true — перезапуск воркера, если тот по какой-то причине упал;
stopsignal=KILL — сигнал остановки (убийства) процесса. Если не определяется, то используется команда по умолчанию — TERM;


https://ruhighload.com/%D0%97%D0%B0%D0%BF%D1%83%D1...
Ответ написан
Комментировать
Пригласить эксперта
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Похожие вопросы