@zetsuro

Как работать с psycopg2 и multiprocessing?

Структура приложения:

main.py
import multiprocessing
from postgresql import POSTGRESQL


class Core:

    def __init__(self):
        self.postgres = POSTGRESQL()

    def complete_level_3(self, sub_task_id):
        
        some_information = self.postgres.some_insert_func(sub_task_id)
        
        some_information_two = self.postgres.some_update_func(some_information)

    def run_level_two(self, task_id):

        sub_task_id_list = self.postgres.get_subtasks_id(task_id)

        for sub_task_id in sub_task_id_list:

            process = multiprocessing.Process(target=self.complete_level_3, args=(sub_task_id,))
            process.start()

    def run_level_one(self, task_id_list):

        for task_id in task_id_list:

            process = multiprocessing.Process(target=self.run_level_two, args=(task_id,))
            process.start()


Очень обрезанная, но суть думаю понятна.

Запускается основной процесс, который генерирует n-oe кол-во подпроцессов, которые в свою очередь генерируют еще один уровень подпроцессов.

И все эти процессы взаимодействуют с базой данных Postgresql с помощью драйвера psycopg2.

Подключение к базе данных реализовано через pool connection.
autocommit = True.
Каждый запрос вынесен в отдельную функцию. Для каждой функции выдается свой объект подключения и курсора. После выполнения функции курсор закрывается и объект подключения возвращается в пулл.
Сам класс который взаимодействует с бд реализован по методу Singleton.

Суть проблемы:
При небольших нагрузках вся эта схема работает корректно и без ошибок.
Но чем больше параллельных процессов я создаю, тем больше ошибок возникает в часто используемых функциях.
Например: "создание сессии", "обновление истории" и т.д.

Ошибки, не совсем стандартные. Исходя из логов, можно предположить, что во время execute запросы которые летят в одно время, бывают, перемешиваются во время полета, и меняются плавающими аргументами. - И это гребаный бред, и такого в принципе быть не может, с точки зрения моей логики. Но факты и логи говорят об обратном. Когда я смотрю в файл логов я вижу что мне в инсерт прилетает аргумент который там в принципе никак оказаться не мог, потому что вначале функции стоит куча проверок, и в целом этот аргумент был в другой части кода. (слишком до или после этой функции)

Добавление локов и autocommit = True немножко поправило ситуацию, но не до конца.

Например:
Летит два запроса:
sql1 = "SELECT text, score FROM text_table WHERE id = %s" % (row_id,)
sql2 = "SELECT id FROM language WHERE type = %s" % (type_str,)


И, допустим, вместо row_id может прилететь type_str, хотя они даже по типам различаются.
И вот такого рода ошибки периодически всплывают.

Я в недоумении. Помогите пожалуйста.
  • Вопрос задан
  • 1030 просмотров
Пригласить эксперта
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Похожие вопросы