Есть скрипт на сервере работает 24/7 но есть редкая ситуация когда он буквально зависает потребление памяти в норме и процесс в списке процессов не отмечается как зомби. В логах если они не врут он останавливается где-то в цепочке опроса урлов aiohttp(в try) в асинхроной функции. Не ошибок не сбоев нет что печально эта ошибка даже таймауты самой либы обходит без сбоев. Думал решить это пока немного кривым способом отмониторить нагрузку на CPU на сервере и если она более 5-10 минут меньше 10% то перезапускать скрипт. Так как сейчас event loop параллиться по процессам то возможно можно как-то таймаут исполнения на processing pool map повесить но так как полного доверия логам нет то хотел всё же на основной поток мониторинг использования процесса прикрутить. Собственно вопрос как такое обычно делается и что стоит учесть?
# ############################# All time running loop inside 1 thread child
# init 2 process pool
pool = multiprocessing.Pool(processes=2)
while True:
# create list of proxy with size == chunk size
len_of_proxy = self.n * len(areas_of_lock)
proxy_optimize = random.choices([i for i in self.weight_dict.keys() if i in self.proxy_list],
[v for k, v in self.weight_dict.items() if k in self.proxy_list],
k=len_of_proxy)
# Do chunks
# 1 chunk to 1 process
ids_chunks = [(ids[i:i + self.n], proxy_optimize, self.tcp_speed) for i in range(0, len(ids), self.n)]
results = []
proxy_analize = []
del proxy_optimize
# Send chanks to our multiprocessing pool
unpuck_results = pool.map(get_results, ids_chunks)
[(results.extend(i[0]), proxy_analize.extend(i[1])) for i in unpuck_results]
self.analize(results, self.extend_id, self.proxy_list)
self.proxy_balancer(proxy_analize, self.weight_dict)
# ######################################
# ######################################## multiprocessing and aiohttp part request sequence of urls
def get_results(ids):
"""
Process fuction create new loop and send requests in separate process
:param ids:
:return:
"""
proxy_list = ids[1]
#weight_dict = ids[2]
tcp_speed = ids[2]
loop = asyncio.new_event_loop()
results, proxy_analize = loop.run_until_complete(
requests_new_items(ids[0], proxy_list,tcp_speed))
return results, proxy_analize
async def requests_new_items(ids,param_proxy_list, tcp_speed):
"""
Generate tasks
:param ids:
:param param_proxy_list:
:param tcp_speed:
:return:
"""
connector = aiohttp.TCPConnector(limit=tcp_speed, force_close=True, ssl=False) # One time connection limit
client_timeout = aiohttp.ClientTimeout(connect=13, total=14)
urls_sub_sets_optimizer = []
# generate subset of urls usual repeat subset indexes like [0,0,0,0,0,0,0,1,1,1,1,1,1,1,2,2,2,2,2]
[urls_sub_sets_optimizer.extend([i]*len(ids)) for i in range(len(urls_sub_sets)) ]
async with aiohttp.ClientSession(connector=connector, timeout=client_timeout) as session:
responce_list = await asyncio.gather(*[fetch_one(session, param_proxy_list[current_index], id_of_url,
urls_sub_sets_optimizer[current_index]) for
current_index, id_of_url in enumerate(ids * len(urls_sub_sets))])
new_item_or_except = [i for i in responce_list if i[0]]
proxy_analize = [(i[1],i[2]) for i in responce_list if not i[0]]
return new_item_or_except, proxy_analize #
async def fetch_one(session, proxy, id_of_url,sub_url):
"""
Request to server
:param session:
:param proxy:
:param id_of_url:
:param sub_url:
:return:
"""
try:
await_time = time.perf_counter()
result = await session.head(''.join((url_part_1,url_part_2,url_part_3,url_part_4)),
proxy= ''.join(('http://',proxy)),
headers=headers_const)
if not result.status==status_number:
return (id_of_url,result.status,sub_url)
return (None,proxy, time.perf_counter()-await_time)
except Exception as err:
print(err)
return (id_of_url,proxy)
# ############################################