Доброго дня!
Есть машина на debian 12, 24gb cpu и 24 ram.
Задача - написать парсер на python собирающий простую инфу с большого перечня сайтов. Использую asyncio и playwright.
Логика - единовременно открывается большое количество сайтов, прогружается нужная инфа, записали, сайт закрыли.
Количество единовременно открываемых сайтов контролируется семафором.
Проблема - спустя некоторое время работы (2-4 часа) скрипт падает и все сайты закрывает с ошибкой Target page, context or browser has been closed.
Что делал:
- Логировал по-разному, ресурсы сервера полностью в норме, в пике(момент единовременного запуска всех сайтов) загрузка памяти и цпу не доходит до 80%, в среднем меньше 40%
- Проверял базу доменов - абсолютно валидна
- Пробовал разные браузеры playwright с разными аргументами, отключая/включая разные штуки влияющие на загрузку/открытие сайта
- Изолировал от основного кода кусок, в котором генерится ошибка, сделал более простой пример. Тут интересный момент. Если я полностью продублирую скрипт и установлю мощность(количество единовременно обрабатываемых сайтов), например, 50, то запустив оба скрипта одновременно я получу суммарную мощность в 100 и полное отсутствие ошибок. Если я делаю все в рамках одного скрипта - ловлю Target page, context or browser has been closed.
- Из теста выше сделал вывод и придумал костыль: создал в одном скрипте две асинхронные задачи выполняющие, по сути, функционал отдельного скрипта - как итог получаю ту-же ошибку. Ниже прикладываю код. Спасибо!
import asyncio
import time
from playwright.async_api import async_playwright, Browser
import logging
import gc
import psutil
import re
#inp_power = int(input('PPPower: '))
#inp_domains = int(input('Domains: '))
#inp_log_path = input('Path to debug_test.log: ')
pause = input('COPY - Press enter')
inp_power = 60 #мощность, при которой при единовременном запуске скрипт не падает
inp_domains = 130000 #допустим, мне нужно отработать 390к доменов
inp_log_path = '/root/python/counters/'
log_name = 'debug_test.log'
file_log = logging.FileHandler(f"{inp_log_path}{log_name}")
console_out = logging.StreamHandler()
logging.basicConfig(handlers=(file_log, console_out), level=logging.DEBUG)
logging.info(f'START CONFIG: power: {inp_power}, domains: {inp_domains}, log path: {inp_log_path}{log_name}')
#MAIN_LIMIT = 2
CONCURRENT_LIMIT = inp_power
async def ram_info_log():
memory_info = psutil.virtual_memory()
total_memory = memory_info.total
used_memory = memory_info.used
logging.info(f'RAM: used/total gb: {used_memory / (1024 ** 3):.2f}/{total_memory / (1024 ** 3):.2f}')
#----------------------------------------------------ONE--------------------------------------------------#
async def process_domain_one(browser: Browser, domain: str, id, proc_id):
#logging.info(browser)
#logging.info(f'id: {id}, timeout 30s')
await asyncio.sleep(15)
try:
start_time = time.time()
page = await browser.new_page()
#logging.info(f'Browser id: {id}, page: {page}')
await page.goto(f"http://{domain}")
#logging.info(f'Browser id: {id}, page: {page}')
title = await page.title()
end_time = time.time()
execution_time = end_time - start_time
a = 'Википедия\xa0— свободная энциклопедия'
#logging.info(f"Proc_id: {proc_id}, Browser id: {id}, Domain: {domain}, Title: {title}, time: {execution_time:.2f}")
if title == a:
logging.info(f"Proc_id: {proc_id}, Browser id: {id}, Domain: {domain}, time: {execution_time:.2f}, status: OK")
else:
logging.info(f"Proc_id: {proc_id}, Browser id: {id}, Domain: {domain}, time: {execution_time:.2f}, status: ERROR {title}")
await page.close()
#await ram_info_log()
gc.collect()
except Exception as e:
#await ram_info_log()
logging.error(f"Proc_id: {proc_id}, Browser id: {id}, Error processing {domain}: {e}")
async def bounded_process_one(sem, browser: Browser, domain: str, id, proc_id):
async with sem:
await process_domain_one(browser, domain, id, proc_id)
async def main_one(domains, proc_id):
sem = asyncio.Semaphore(CONCURRENT_LIMIT)
async with async_playwright() as p:
browser = await p.chromium.launch(args=["--disable-gpu", "--enable-unsafe-swiftshader"])
await asyncio.gather(*(bounded_process_one(sem, browser, domain, i, proc_id) for i, domain in enumerate(domains, start=1)))
await browser.close()
async def start_one():
proc_id = '1'
start_time = time.time()
domains = ["ru.wikipedia.org"] *inp_domains
await main_one(domains, proc_id)
end_time = time.time()
execution_time = end_time - start_time
logging.info(f"Proc_id: {proc_id}. Время выполнения сектора: {execution_time:.2f} секунд")
#---------------------------------------------------------------------------------------------------------#
#----------------------------------------------------TWO--------------------------------------------------#
async def process_domain_two(browser: Browser, domain: str, id, proc_id):
#logging.info(browser)
#logging.info(f'id: {id}, timeout 30s')
await asyncio.sleep(15)
try:
start_time = time.time()
page = await browser.new_page()
#logging.info(f'Browser id: {id}, page: {page}')
await page.goto(f"http://{domain}")
#logging.info(f'Browser id: {id}, page: {page}')
title = await page.title()
end_time = time.time()
execution_time = end_time - start_time
a = 'Википедия\xa0— свободная энциклопедия'
#logging.info(f"Proc_id: {proc_id}, Browser id: {id}, Domain: {domain}, Title: {title}, time: {execution_time:.2f}")
if title == a:
logging.info(f"Proc_id: {proc_id}, Browser id: {id}, Domain: {domain}, time: {execution_time:.2f}, status: OK")
else:
logging.info(f"Proc_id: {proc_id}, Browser id: {id}, Domain: {domain}, time: {execution_time:.2f}, status: ERROR {title}")
await page.close()
#await ram_info_log()
gc.collect()
except Exception as e:
#await ram_info_log()
logging.error(f"Proc_id: {proc_id}, Browser id: {id}, Error processing {domain}: {e}")
async def bounded_process_two(sem, browser: Browser, domain: str, id, proc_id):
async with sem:
await process_domain_two(browser, domain, id, proc_id)
async def main_two(domains, proc_id):
sem = asyncio.Semaphore(CONCURRENT_LIMIT)
async with async_playwright() as p:
browser = await p.chromium.launch(args=["--disable-gpu", "--enable-unsafe-swiftshader"])
await asyncio.gather(*(bounded_process_two(sem, browser, domain, i, proc_id) for i, domain in enumerate(domains, start=1)))
await browser.close()
async def start_two():
proc_id = '2'
start_time = time.time()
domains = ["ru.wikipedia.org"] *inp_domains
await main_two(domains, proc_id)
end_time = time.time()
execution_time = end_time - start_time
logging.info(f"Proc_id: {proc_id}. Время выполнения сектора: {execution_time:.2f} секунд")
#---------------------------------------------------------------------------------------------------------#
#----------------------------------------------------THREE--------------------------------------------------#
async def process_domain_three(browser: Browser, domain: str, id, proc_id):
#logging.info(browser)
#logging.info(f'id: {id}, timeout 30s')
await asyncio.sleep(15)
try:
start_time = time.time()
page = await browser.new_page()
#logging.info(f'Browser id: {id}, page: {page}')
await page.goto(f"http://{domain}")
#logging.info(f'Browser id: {id}, page: {page}')
title = await page.title()
end_time = time.time()
execution_time = end_time - start_time
a = 'Википедия\xa0— свободная энциклопедия'
#logging.info(f"Proc_id: {proc_id}, Browser id: {id}, Domain: {domain}, Title: {title}, time: {execution_time:.2f}")
if title == a:
logging.info(f"Proc_id: {proc_id}, Browser id: {id}, Domain: {domain}, time: {execution_time:.2f}, status: OK")
else:
logging.info(f"Proc_id: {proc_id}, Browser id: {id}, Domain: {domain}, time: {execution_time:.2f}, status: ERROR {title}")
await page.close()
#await ram_info_log()
gc.collect()
except Exception as e:
#await ram_info_log()
logging.error(f"Proc_id: {proc_id}, Browser id: {id}, Error processing {domain}: {e}")
async def bounded_process_three(sem, browser: Browser, domain: str, id, proc_id):
async with sem:
await process_domain_three(browser, domain, id, proc_id)
async def main_three(domains, proc_id):
sem = asyncio.Semaphore(CONCURRENT_LIMIT)
async with async_playwright() as p:
browser = await p.chromium.launch(args=["--disable-gpu", "--enable-unsafe-swiftshader"])
await asyncio.gather(*(bounded_process_three(sem, browser, domain, i, proc_id) for i, domain in enumerate(domains, start=1)))
await browser.close()
async def start_three():
proc_id = '3'
start_time = time.time()
domains = ["ru.wikipedia.org"] *inp_domains
await main_three(domains, proc_id)
end_time = time.time()
execution_time = end_time - start_time
logging.info(f"Proc_id: {proc_id}. Время выполнения сектора: {execution_time:.2f} секунд")
#---------------------------------------------------------------------------------------------------------#
async def main_main():
start_time = time.time()
await asyncio.gather(start_one(), start_two(), start_three())
#await asyncio.gather(start_one())
end_time = time.time()
execution_time = end_time - start_time
logging.info(f"Общее время выполнения скрипта: {execution_time:.2f} секунд")
if __name__ == "__main__":
asyncio.run(main_main())