def get_data(url, data, data_counter):
r = requests.get(url)
soup = BS(r.text, 'lxml')
...
...
...
scraped_data = {
'title': title,
'name': name,
'description': description,
'image': image,
'rating': rating,
'category': category,
'link': link,
'rss': extra_data['rss'],
'email': extra_data['email'],
'latest_date': latest_date
}
for a in range(len(articles)):
podcast_data['listener_{}'.format(a+1)] = articles[a]
data.append(scraped_data)
data_counter.value += 1
print('DONE №{}: {}'.format(data_counter.value, url))
if __name__ == "__main__":
manager = Manager()
data = manager.list()
data_counter = manager.Value('i', 0)
with Pool(999) as pool:
for url in urls:
pool.apply_async(get_data, (url, data, data_counter))
pool.close()
pool.join()
result = []
for d in data:
result.append(d)
create_csv(result)
print(len(result))
Traceback (most recent call last):
File "scrape_mp.py", line 46, in get_email
match = re.search(r'[\w\.-]+@[\w\.-]+', r.text)
File "/home/kshnkvn/.local/lib/python3.6/site-packages/requests/models.py", line 861, in text
content = str(self.content, encoding, errors='replace')
MemoryError
soup.decompose()
мог-бы помочь избавиться от утечки памяти? Сейчас перезапускать скрипт точно не буду, но в следующий раз попробую, может действительно беда в том, что не сразу из памяти удаляется объект soup
. Хотя если это так в 100% случаев, то у меня сразу после запуска скрипта память закончилась-бы. ValueError: too many file descriptors in select()
при ~350-400 потоках. Этого недостаточно. ValueError: too many file descriptors in select()
при ~350-400 потоках.
Поставь семафор на 100 одновременных соединений, тебе хватит
ValueError
. Но при 300 соединениях все значительно медленнее.О каких потоках в asyncio ты говоришь?
но человек считает что все эти решения оверхед
который считает себя умнее остальных
Мы же тебе помочь хотим, а ты сопротивляешься
на что расходуется ОЗУ
можно-ли как-то это предотвратить
можно-ли как-то это предотвратить именно для этого кода
Можно. Написать нормально
Увы, говнокод на то и говнокод
Причём, количество процессов бессмысленно указывать больше, чем ядер у тебя в компуктере.
в условный html_pages = []
Почему? Если я запущу на данный момент 10 процессов, то они отработают гораздо медленнее, чем если запущу 100 процессов.
asyncio
буду выгребать все 4 сайта и отдавать их через redis
выполняться в multiprocessing
?asyncio
что может замедлить работу. Т.е. мне в любом случае нужно 100% грузить как минимум 2 страницы, если я буду грузить всегда 4 страницы, то это может быть быстрее, чем если создавать дополнительные асинхронные выполнения? будут плодиться лишние выполнения asyncio
response = await r.text()
match = re.search(r'[\w\.-]+@[\w\.-]+', response)
response = await r.text()
match = re.search(r'[\w\.-]+@[\w\.-]+', response)
pub = await aioredis.create_redis('redis://localhost')
оставить в __main__
и сам pub
передавать в качестве аргумента? Ок.multiprocessing
работает вроде ничего не зациклилось. Может поищу что-то среднее между паблик и приват прокси, тогда уберу while
По поводу "впихнуть пару строк" — можно попробовать оставить в первом скрипте, который качает данные. Я почти уверен, что одна регулярка почти не помешает IO операциям
import asyncio
from concurrent.futures import ProcessPoolExecutor
import aiohttp
from loguru import logger as loguru
from lxml.html import fromstring
pool = ProcessPoolExecutor()
parser_sem = asyncio.Semaphore(pool._max_workers)
loguru.info(f"CPU workers: {pool._max_workers}")
host = "https://ru.wikipedia.org"
start_from = f"{host}/wiki/Заглавная_страница"
q_d = asyncio.Queue()
q_p = asyncio.Queue()
sem = asyncio.Semaphore(100)
downloaded_urls = set()
class O:
downloaded = 0
parsed = 0
downloading = 0
down_pending = 0
waiting_for_download_q = 0
o = O()
async def log_printer(queue_d, queue_p):
while True:
loguru.debug(
f"[PRINTER] to Download: {queue_d.qsize()}, to Parse: {queue_p.qsize()}"
f" downloaded: {o.downloaded}, parsed: {o.parsed}"
f" pending: {o.down_pending}, downloading: {o.downloading}"
f" waiting Q: {o.waiting_for_download_q}"
f" tasks: {len(asyncio.Task.all_tasks())}"
)
await asyncio.sleep(0.33)
def lxml_parse(html):
try:
tree = fromstring(html)
urls = tree.xpath("//a/@href")
try:
title = tree.find(".//title").text
except AttributeError:
title = "<UNKNOWN>"
new_urls = []
for url in urls:
if url.startswith("/") and not url.startswith("//"):
new_urls.append(f"{host}{url}")
elif url.startswith("http"):
new_urls.append(url)
return new_urls, title
except Exception as e:
loguru.error(f"Parse error: {e}")
return [], "<ERROR>"
async def parse(html):
loop = asyncio.get_event_loop()
urls, title = await loop.run_in_executor(pool, lxml_parse, html)
o.parsed += 1
return urls, title
async def start_parse_task(content, queue_d):
async with parser_sem:
urls, title = await parse(content)
# loguru.debug(f"[PARSER]: Parse done {title}")
o.waiting_for_download_q += 1
for url in urls:
if url not in downloaded_urls:
await queue_d.put(url)
o.waiting_for_download_q -= 1
# loguru.debug(f"[PARSER]: Add {len(urls)} to download queue")
async def parser(queue_d, queue_p):
while True:
content = await queue_p.get()
asyncio.create_task(start_parse_task(content, queue_d))
async def downloader(queue_d, queue_p, session):
while True:
url = await queue_d.get()
if url in downloaded_urls:
continue
o.down_pending += 1
async with sem:
o.down_pending -= 1
o.downloading += 1
try:
async with session.get(url) as resp:
o.downloading -= 1
downloaded_urls.add(url)
# loguru.debug(f"[DOWNLOADER]: got response for {url}")
try:
text = await resp.text()
await queue_p.put(text)
except UnicodeDecodeError:
pass
o.downloaded += 1
except Exception as e:
loguru.error(f"Download error: {e}")
async def main():
await q_d.put(start_from)
async with aiohttp.ClientSession() as session:
ds = []
for i in range(100):
ds.append(asyncio.create_task(downloader(q_d, q_p, session)))
p = asyncio.create_task(parser(q_d, q_p))
printer = asyncio.create_task(log_printer(q_d, q_p))
await asyncio.gather(*ds, p, printer)
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
Единственный, кто в этой теме пытается помочь - это longclaps