import asyncio
import re
import aiohttp
def load_ip_list(filename):
with open(filename, 'r') as fp:
ip_list = fp.read().rstrip().split('\n')
return ip_list
def save_file(filename, responses):
with open(filename, 'w') as fp:
for response in responses:
if response:
url, status = response
save_line = "{:28} {}\n".format(url, status)
fp.write(save_line)
def get_ipaddresses_range(ip_list):
ip_addresses_range = []
for ip_addr in ip_list:
pattern = r'([\d\.]+\.)(\d+)-[\d\.]+\.(\d+)$'
network, host_min, host_max = re.search(pattern, ip_addr).groups()
ip_range = [
network + str(host)
for host in range(int(host_min), int(host_max) + 1)
]
ip_addresses_range.extend(ip_range)
return ip_addresses_range
def create_urls_list(ip_addr_range):
url_scheme = 'http'
port = '8000'
url_pattern = "{}://{}:{}"
urls = [
url_pattern.format(url_scheme, ip, port)
for ip in ip_addr_range
]
return urls
async def get_response_status(url, session):
try:
async with session.head(url, timeout=2) as response:
return url, response.status
except asyncio.TimeoutError:
pass
async def check_ip(urls_list, session):
tasks = [
asyncio.ensure_future(get_response_status(url, session))
for url in urls_list
]
responses = await asyncio.gather(*tasks)
return responses
async def run(loop, urls_list):
async with aiohttp.ClientSession(loop=loop) as session:
results = await check_ip(urls_list, session)
return results
if __name__ == '__main__':
ip_list = load_ip_list('ip.txt')
ip_range = get_ipaddresses_range(ip_list)
urls = create_urls_list(ip_range)
loop = asyncio.get_event_loop()
results = loop.run_until_complete(run(loop, urls))
save_file('white_list.txt', results)
loop.close()