@stayHARD

Многопоточная обработка форм на страницах Python3. Как?

Добрый вечер, Тостер.
Уже не первый день не могу сделать обработку форма на Python3 с многопоточностью.(параллельно, мб не правильно употребять термин - многопоточность)
Мой код достаточно простой и выглядит вот так:
Черпает настройки поиска из базы данных(postgresql), а также ссылки которые нужно обработать, затем создает N-ое кол-во потоков и запускает одну и ту же функцию с разными аргументами.
Вот только когда я имею базу данных с 100 000 ссылок, скрипт, работающий в 20 потоков начинает тупить после 2 400 ссылок и выдавать вот такую ошибку:
concurrent.futures.process.BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending.
Task exception was never retrieved
future: <Task finished coro=<main() done, defined at async.py:173> exception=BrokenProcessPool('A process in the process pool was terminated abruptly while the future was running or pending.',)>

173ая строчка представляет собой:
yield from loop.run_in_executor(p, operation, item)

Собственно виновник действия:
import asyncio
import time
from concurrent.futures import ProcessPoolExecutor
from grab import Grab
import random
import psycopg2

# Open connection to the database
connection = psycopg2.connect(database="<....>",
                              user="<....>",
                              password="<....>",
                              host="127.0.0.1",
                              port="5432")

# Create a new cursor for it
c = connection.cursor()

# Select settings from database
c.execute("SELECT * FROM <....> WHERE id=1;")
data = c.fetchall()

# Get time starting script
start_time = time.time()

def operation(link):
    # init grab framework
    g = Grab()
    # try to find some elements on the page
    try:
        # open link
        g.go(link)
        # some link processing
        <....>
    except:
        pass


@asyncio.coroutine
def main(item):
    yield from loop.run_in_executor(p, operation, item)

# Create async loop, declare number of threads
loop = asyncio.get_event_loop()
p = ProcessPoolExecutor(data[0][13])  # =20

# Init tasks list - empty
tasks = []

# Select all urls which need to process
c.execute ("SELECT url FROM <....> ORDER BY id;")

# Forming tasks
for item in c.fetchall():
    tasks.append(main(item[0]))

# Close main connection to the database
connection.close()
# Run async tasks
loop.run_until_complete(asyncio.wait(tasks))
# Close loop
loop.close()
# Get script finish time
print("--- %s seconds ---" % (time.time() - start_time))

Что здесь можно сделать лучше, что-бы он мог идти дальше после 2ух тысяч ссылок?
Операции на странице достаточно простые, парсинг HTML + заполнение некоторызх форм на сайте, если это представляется возможным. Операции выполняются очень быстро, но из-за большого кол-ва ссылок занимают очень много времени, хотелось бы сделать это многопоточно(опять же не силен в терминах). Посоветуйте что-нибудь.
Заранее благодарен.
  • Вопрос задан
  • 1325 просмотров
Пригласить эксперта
Ответы на вопрос 2
Olej
@Olej
инженер, программист, преподаватель
Вы создаёте не параллельные потоки для обработки, а параллельные процессы.
А число максимально допустимых процессов у вас может быть ограничено системой, что-то типа:
$ ulimit -a
core file size          (blocks, -c) 0
data seg size           (kbytes, -d) unlimited
scheduling priority             (-e) 0
file size               (blocks, -f) unlimited
pending signals                 (-i) 32089
max locked memory       (kbytes, -l) 64
max memory size         (kbytes, -m) unlimited
open files                      (-n) 1024
pipe size            (512 bytes, -p) 8
POSIX message queues     (bytes, -q) 819200
real-time priority              (-r) 0
stack size              (kbytes, -s) 8192
cpu time               (seconds, -t) unlimited
max user processes              (-u) 32089
virtual memory          (kbytes, -v) unlimited
file locks                      (-x) unlimited

Но и параллельными потоками не особенно увлекайтесь, потому как Python не умеет использовать истинно потоки, и многопоточность в Python не увеличивает суммарную производительность.
Ответ написан
angru
@angru
Можно попробовать использовать ThreadPoolExecutor вместо ProcessPoolExecutor. Код будет выполняться на одном ядре, но все еще асинхронно.
Ответ написан
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Похожие вопросы