Приветствую.
Делаю парсер страниц на python. Нужно постоянно обходить большое ко-во страниц. Например, 1М в день.
Для этого раньше использовал многопоточность TheadPool. Но когда возросло кол-во требуемых одновременных подключений до 80-100 скрипт начинал падать по памяти.
Почитал, что пишут надо переходить на asyncio.
Накидал простенький скрипт, где беру прокси из файлика, беру url и обхожу их. Для ограничения одновременности испольщую semaphore.
Но столкнулся с тем, что если у меня для прокси доступно, например 50 потоков, то при выборке 200 урлов, первые 50 отработаются правильно, а остальные уйдут в ошибку. Такое ощущение, что семафор не работает или я как то не так использую это.
Хотелось бы понять как нужно делать. Или пример кода подобного.
Мой код для примера:
#!/usr/bin/python3.6
# modified fetch function with semaphore
import random
import asyncio
from aiohttp import ClientSession
from random import shuffle
from bs4 import BeautifulSoup
count = 0
with open('proxy2.txt') as f:
proxy = f.readlines()
proxy = [x.strip() for x in proxy]
shuffle(proxy)
def geturls(num):
urls = []
f = open('sitemap.xml', 'r')
soup = BeautifulSoup(f, "lxml")
arr = soup.findAll('loc')
i = 0
for url in arr:
if i < num:
urls.append(url.contents[0])
i = i + 1
else:
break
return urls
async def fetch(sem,url, session):
global proxy
global count
try:
async with session.get(url,proxy="http://"+random.choice(proxy)) as response:
count = count + 1
body = await response.read()
print(str(count) + " " + url)
return body
except Exception as e:
print(e)
async def bound_fetch(sem, url, session):
# Getter function with semaphore.
async with sem:
return await fetch(sem, url, session)
async def run(r):
urls=geturls(r)
tasks = []
#одноврменно можно до 50 коннектов
sem = asyncio.Semaphore(50)
async with ClientSession() as session:
for url in urls:
task = asyncio.ensure_future(bound_fetch(sem, url, session))
tasks.append(task)
responses = asyncio.gather(*tasks)
await responses
#200 - выбираем 200 урлов из sitemap для теста
number = 200
loop = asyncio.get_event_loop()
future = asyncio.ensure_future(run(number))
loop.run_until_complete(future)