Задать вопрос

Python многопоточность Requests, что блокирует обработку?

Всем привет.
На питоне программирую недавно, в какой-то степени ещё разбираюсь.
Объясните, почему так происходит

Задача: Есть файл c урлами. Хочу обработать их (получить содержимое, или просто проверить статус/доступность).
Но все это ещё реализовал через потоки.
Так вот если домены сайтов существуют, то обработка происходит быстро, если встречается в файле домен, который недоступен/не существует, то обработка становится медленнее в несколько раз, такое ощущение, что потоки блокируются и как только ошибочный домен обрабатывается, опять, быстро, но если много ошибочных доменов, то время выполнения сильно увеличивается.

from threading import Thread
import threading
import subprocess
import requests
import time
import os

theardCount = 25

domain_file = "domains.txt"
domain_temp = "temp/"

def CheckRequest(host, step=0):

    fr_success = domain_temp+"/req-good-"+str(step)+".txt"
    fr_errors  = domain_temp+"/req-error-"+str(step)+".txt" 

    url = "http://"+host
    
    try:
        s = requests.Session()
        r = s.get(url)
        f = open(fr_success, "a+")
        f.write(host+'\n')
        f.close() 
    except Exception:
        f = open(fr_errors, "a+")
        f.write(host+'\n')
        f.close()   

    return 0

class GetDomainThread(Thread):
    def __init__(self, step):
        self.step = step
        self.body = None
        super(GetDomainThread, self).__init__()
    def run(self):
        step = self.step+1
        fdom  = open(domain_file, "r") 
        i=1
        f=step
        for line in fdom.readlines():
            if i==f:
                strLine = line.split("\n")
                domain = strLine[0]
                CheckRequest(domain, step)
                   
                f=f+theardCount
            i=i+1


# MAIN 

def main():

    start = time.time()

    threads = [GetDomainThread(i) for i in range(theardCount)]
	
    for thread in threads:
        thread.start()
	
    for thread in threads:
        thread.join()
    
           
    end = time.time()

    print end-start

if __name__ == '__main__':
    main()
  • Вопрос задан
  • 8634 просмотра
Подписаться 5 Оценить Комментировать
Решения вопроса 1
bzzzzzz
@bzzzzzz
Блокирует обработку в вашем случае не GIL, а то, как вы распределяете задачи между потоками: вы перед началом работы распределяете все урлы поровну между потоками и, поэтому, складывается такая ситуация, когда часть потоков простаивает и ждет когда один из них проверит все оставшиеся ему сайты. В своей программе вы должны использовать очереди для распределения задач и, конечно же, сократить количество I/O операций.

Код будет приблизительно таким (смотрите мои комментарии для того, чтобы понять что и почему):

# coding=utf-8
import requests
import time
import os
from threading import Thread, current_thread
from Queue import Queue


theard_count = 25


domain_file = "domains.txt"
domain_temp = "temp"


def check_url(host):
    url = 'http://' + host

    try:
        requests.get(url, timeout=5)
    except Exception:
        return False
    else:
        return True


def run(queue, result_queue):
    # Цикл продолжается пока очередь задач не станет пустой
    while not queue.empty():
        # получаем первую задачу из очереди
        host = queue.get_nowait()
        print '{} checking in thread {}'.format(host, current_thread())
        # проверяем URL
        status = check_url(host)
        # сохраняем результат для дальнейшей обработки
        result_queue.put_nowait((status, host))
        # сообщаем о выполнении полученной задачи
        queue.task_done()
        print '{} finished in thread {}. Result={}'.format(host, current_thread(), status)

    print '{} closing'.format(current_thread())


# MAIN
def main():
    start_time = time.time()

    # Для получения задач и выдачи результата используем очереди
    queue = Queue()
    result_queue = Queue()

    fr_success = os.path.join(domain_temp, "req-good.txt")
    fr_errors  = os.path.join(domain_temp, "req-error.txt")

    # Сначала загружаем все URL из файла в очередь задач
    with open(domain_file) as f:
        for line in f:
            queue.put(line.strip())

    # Затем запускаем необходимое количество потоков
    for i in range(theard_count):
        thread = Thread(target=run, args=(queue, result_queue))
        thread.daemon = True
        thread.start()

    # И ждем, когда задачи будут выполнены    
    queue.join()

    # После чего пишем результаты в файлы
    with open(fr_success, 'w') as fs, open(fr_errors, 'w') as fe:
        while not result_queue.empty():
            status, host = result_queue.get_nowait()

            if status:
                f = fs
            else:
                f = fe

            f.write(host)
            f.write('\n')

    print time.time() - start_time

if __name__ == '__main__':
    main()


500 сайтов, из которых 150 не работают, он парсит за 35 секунд.
Ответ написан
Пригласить эксперта
Ответы на вопрос 2
svfat
@svfat
☺Нужен VPS? Два месяца бесплатно. Смотри профиль☺
Так и есть - GIL блокирует поток.

А вообще, у вас код не очень эффективно написан, слишком много ненужных I\O операций, каждый раз файлы открывать - много времени теряется, как домен берете из файла для обработки, это вообще жесть. Попробуйте переписать так, что бы в тредах вообще файловых операций не было, работайте только с памятью.
Ответ написан
@SlivTime
Если у вас версия питона >= 3.3, можете попробовать aiohttp и забыть про мороку с тредами в питоне.

import asyncio
import aiohttp

@asyncio.coroutine
def fetch_status(session, url):
    status = None
    try:
        response = yield from session.get(url)
        response.close()
        status = response.status
    except Exception as e:
        status = e.__str__()
    return status


def run():
    session = aiohttp.ClientSession()
    with open('domains.txt', mode='r') as f:
        for url in f:
            url = url.strip()
            status = yield from fetch_status(session, url)
            print(url, ": ", status, sep='')
    session.close()

if __name__ == '__main__':
    asyncio.get_event_loop().run_until_complete(run())
Ответ написан
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Похожие вопросы