import asyncio
from concurrent.futures import ProcessPoolExecutor
import aiohttp
from loguru import logger as loguru
from lxml.html import fromstring
pool = ProcessPoolExecutor()
parser_sem = asyncio.Semaphore(pool._max_workers)
loguru.info(f"CPU workers: {pool._max_workers}")
host = "https://ru.wikipedia.org"
start_from = f"{host}/wiki/Заглавная_страница"
q_d = asyncio.Queue()
q_p = asyncio.Queue()
sem = asyncio.Semaphore(100)
downloaded_urls = set()
class O:
downloaded = 0
parsed = 0
downloading = 0
down_pending = 0
waiting_for_download_q = 0
o = O()
async def log_printer(queue_d, queue_p):
while True:
loguru.debug(
f"[PRINTER] to Download: {queue_d.qsize()}, to Parse: {queue_p.qsize()}"
f" downloaded: {o.downloaded}, parsed: {o.parsed}"
f" pending: {o.down_pending}, downloading: {o.downloading}"
f" waiting Q: {o.waiting_for_download_q}"
f" tasks: {len(asyncio.Task.all_tasks())}"
)
await asyncio.sleep(0.33)
def lxml_parse(html):
try:
tree = fromstring(html)
urls = tree.xpath("//a/@href")
try:
title = tree.find(".//title").text
except AttributeError:
title = "<UNKNOWN>"
new_urls = []
for url in urls:
if url.startswith("/") and not url.startswith("//"):
new_urls.append(f"{host}{url}")
elif url.startswith("http"):
new_urls.append(url)
return new_urls, title
except Exception as e:
loguru.error(f"Parse error: {e}")
return [], "<ERROR>"
async def parse(html):
loop = asyncio.get_event_loop()
urls, title = await loop.run_in_executor(pool, lxml_parse, html)
o.parsed += 1
return urls, title
async def start_parse_task(content, queue_d):
async with parser_sem:
urls, title = await parse(content)
# loguru.debug(f"[PARSER]: Parse done {title}")
o.waiting_for_download_q += 1
for url in urls:
if url not in downloaded_urls:
await queue_d.put(url)
o.waiting_for_download_q -= 1
# loguru.debug(f"[PARSER]: Add {len(urls)} to download queue")
async def parser(queue_d, queue_p):
while True:
content = await queue_p.get()
asyncio.create_task(start_parse_task(content, queue_d))
async def downloader(queue_d, queue_p, session):
while True:
url = await queue_d.get()
if url in downloaded_urls:
continue
o.down_pending += 1
async with sem:
o.down_pending -= 1
o.downloading += 1
try:
async with session.get(url) as resp:
o.downloading -= 1
downloaded_urls.add(url)
# loguru.debug(f"[DOWNLOADER]: got response for {url}")
try:
text = await resp.text()
await queue_p.put(text)
except UnicodeDecodeError:
pass
o.downloaded += 1
except Exception as e:
loguru.error(f"Download error: {e}")
async def main():
await q_d.put(start_from)
async with aiohttp.ClientSession() as session:
ds = []
for i in range(100):
ds.append(asyncio.create_task(downloader(q_d, q_p, session)))
p = asyncio.create_task(parser(q_d, q_p))
printer = asyncio.create_task(log_printer(q_d, q_p))
await asyncio.gather(*ds, p, printer)
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
будут плодиться лишние выполнения asyncio
в условный html_pages = []
Почему? Если я запущу на данный момент 10 процессов, то они отработают гораздо медленнее, чем если запущу 100 процессов.