Как организовать поток данных в продукте в PostgreSQL?

Имеется проблема в проекте, а именно плохая структура кода для работы с БД. В проекте 4 основных этапа: получение данных из внешних источников, обработка средствами python, загрузка данных в БД, выгрузка и работы с данными в аналитических инструментах.
Во всей этой цепочке для меня оказалось большой сложностью организовать поток данных, а именно то, как данных будут подгружаться в БД. Я пробовал множество вариаций создания функций для crud операций, они работали, но нарушали Open-closed принцип.
Задача для меня стоит следующая: написать код для работы с БД который не придется изменять при добавлении новых таблиц или, например, изменении типа данных в столбце. И вопросов в связи с этим очень много. Писать отдельные методы insert, update или создать метод upsert? Как для определенных таблиц при вставке данных удалять данные, которые не пришли по API, а для других нет? Насколько разумно читать excel с помощью file_fdw или лучше делать это питоном? и т.д.

Думаю, что ответы придут во время чтения нужной литературы/статей/видео, про которые и хочу узнать. Так что, вопрос заключается в просьбе написать литературу или указать ссылку которые прольют свет для меня на работу с БД с помощью ORM SqlAlchemy, да и в целом, любой хоть как то касающаяся информации

ps. дополнение к моему комментарию. пример функций, которые меня не устраивали.
spoiler

def get_dataframe_from_db_v2(self, tname: Base, schema: str = None, pk: list or str = "*", wheres_db: list[dict] = None) -> pd.DataFrame:
        """Doc."""
        if pk == "*":
            if type(tname.__table_args__) == tuple:
                tname.__table_args__ = tname.__table_args__[1]
            pk = self.take_columns_name(tname, tname.__table_args__["schema"])

        if wheres_db is None:
            quiry = select(*[c for c in tname.__table__.c if c.name in pk])
        else:
            quiry = select(*[c for c in tname.__table__.c if c.name in pk]).where(and_(text(" AND ".join(["".join(list(*i.items())) for i in wheres_db]))))
        return self.table.convert_database_table(tname, pd.DataFrame(self.Session.execute(quiry).all(), columns=pk))

def take_columns_name(self, tname: Base, schema: str = None) -> list:
        """Doc."""
        from sqlalchemy import inspect
        inspector = inspect(self.engine)
        return [column["name"] for column in inspector.get_columns(tname.__tablename__, schema=schema)]

def compare_tables(self, tname: Base, schema: str = None, pk: list or str = "*", wheres=None, wheres_db=None) -> None: 
        """Doc."""
        logging.info(f"Compare {tname}")
        if pk == "*":
            pk = self.take_columns_name(tname, schema)

        # Dataframe from DataBase
        dataframe_db = self.get_dataframe_from_db_v2(tname, schema, pk, wheres_db).sort_values(pk).reset_index(drop=True)

        # Dataframe from APIs
        dataframe = self.__dataframe__(tname, wheres).sort_values(pk).reset_index(drop=True)

        in_db_not_existed = dataframe_db.merge(dataframe, how='right', on=pk, indicator=True) \
                                        .query("_merge == 'right_only'") \
                                        .drop('_merge', axis=1)  # noqa

        in_db_existed = dataframe_db.merge(dataframe[pk], how='left', indicator=True) \
                                    .query("_merge == 'left_only'") \
                                    .drop('_merge', axis=1)[dataframe_db.columns]  # noqa

        if not in_db_not_existed.empty:
            self.insert_into_table(tname, schema, in_db_not_existed)  # noqa
            self.compare_tables(tname, schema, pk, wheres, wheres_db)
        if not in_db_existed.empty and tname not in (OzonStock, OzonPosting, WbStock, WbPosting, WbIncomes, WbRealization, SelfIncomesItems, SelfIncomes):
            self.delete_from_table_v2(tname, schema, in_db_existed, pk="*")
            self.compare_tables(tname=tname, schema=schema, pk=pk, wheres=wheres, wheres_db=wheres_db)
        if in_db_existed.empty and in_db_not_existed.empty:
            self.update_table(dataframe, self.get_dataframe_from_db_v2(tname, schema, wheres_db=wheres_db, pk="*"), tname, schema, pk)  # noqa

def insert_into_table(self, tname: Base, schema, in_db_not_existed: pd.DataFrame):  # noqa
        """Doc."""
        with self.engine.begin() as connection:
            in_db_not_existed.to_sql(name=tname.__tablename__, schema=schema, con=connection, index=False, if_exists="append", method="multi")  # noqa

        print(f"[INSERTED] {len(in_db_not_existed)} rows was inserted into {tname.__tablename__}") # noqa

def delete_from_table_v2(self, tname: Base, schema: str, in_db_existed: pd.DataFrame, pk="*"):
        """Doc."""
        keys_list = list(map(lambda x: list(map(lambda y: f"{in_db_existed.columns.values.tolist()[list(x).index(y)]}=" + "'" +str(y) + "'",x)), in_db_existed.values))

        with self.Session as session:
            query = (delete(tname.__table__).where(text(" OR ".join(" AND ".join(i) for i in keys_list))))
            session.execute(query)
            session.commit()
        print(f"[DELETED] {len(in_db_existed)} rows was deleted") # noqa

def update_table(self, dataframe: pd.DataFrame, dataframe_db: pd.DataFrame, tname: Base, schema: str = None, pk: list or str = "*"):
        """Doc."""
        dataframe = dataframe.sort_values(pk).reset_index(drop=True)
        dataframe_db = dataframe_db.sort_values(pk).reset_index(drop=True)

        if not dataframe.equals(dataframe_db):
            for row_old, row_new in zip(dataframe_db.itertuples(index=False), dataframe.itertuples(index=False)):
                where = [f"{i[1]}='{row_old[i[0]]}'" for i in enumerate(pk)]
                row_old = list(map(lambda x: None if type(x) == pd._libs.tslibs.nattype.NaTType else x, row_old))
                row_new = list(map(lambda x: None if type(x) == pd._libs.tslibs.nattype.NaTType else x, row_new))
                for old_value, new_value, column in zip(row_old, row_new, enumerate(dataframe_db.columns)):
                    if old_value != new_value:
                        print(str(old_value) + "  >>>  " + str(new_value))
                        with self.Session as session:
                            query = (update(tname.__table__).where(text(" AND ".join(where))).values({column[1]: new_value}, ))
                            print(query)
                            session.execute(query)
                            session.commit()

  • Вопрос задан
  • 299 просмотров
Пригласить эксперта
Ответы на вопрос 1
dimonchik2013
@dimonchik2013
non progredi est regredi
надо делить вопрос на несколько
с
Насколько разумно читать excel с помощью file_fdw или лучше делать это питоном

ответ очевиден, и, если Вы его не знаете, то с БД еще в самом начале: бывают задачи, когда нужно обработать данные с внешними источниками данных - когда-то (да и сейчас) это, к примеру, таблицы в других БД, но - могут быть и файлы, конвертируемые в таблицы - вот для такого file_fdw предназначен

чтобы понять что оно такое - нужно какое-то время с ним поработать - рано ли поздно вы наткнетесь когда неверно добавленный символ в строку отправляет к *беням всю идею парсинга csv файла и заставляет написать кучу кода:
а) вычленение херяшей все строки,
б) перенос ее в место (поле , табл, файл) с ошибками,
в) лог - уведомление о том что такое случилось.

проще говорят - это для "по быстрому аналитику сделать выборку", на постоянной основе существовать не должно, но(!) что ни говори, а кода будет чуть меньше ))

по применению Open-closed принципа, честно, ничего не понял в контексте описываемых задач - конечно, вертится Маршаллинг/Анмаршаллинг и Сериализация/Десериализция, но...
Ответ написан
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Похожие вопросы