def get_dataframe_from_db_v2(self, tname: Base, schema: str = None, pk: list or str = "*", wheres_db: list[dict] = None) -> pd.DataFrame:
"""Doc."""
if pk == "*":
if type(tname.__table_args__) == tuple:
tname.__table_args__ = tname.__table_args__[1]
pk = self.take_columns_name(tname, tname.__table_args__["schema"])
if wheres_db is None:
quiry = select(*[c for c in tname.__table__.c if c.name in pk])
else:
quiry = select(*[c for c in tname.__table__.c if c.name in pk]).where(and_(text(" AND ".join(["".join(list(*i.items())) for i in wheres_db]))))
return self.table.convert_database_table(tname, pd.DataFrame(self.Session.execute(quiry).all(), columns=pk))
def take_columns_name(self, tname: Base, schema: str = None) -> list:
"""Doc."""
from sqlalchemy import inspect
inspector = inspect(self.engine)
return [column["name"] for column in inspector.get_columns(tname.__tablename__, schema=schema)]
def compare_tables(self, tname: Base, schema: str = None, pk: list or str = "*", wheres=None, wheres_db=None) -> None:
"""Doc."""
logging.info(f"Compare {tname}")
if pk == "*":
pk = self.take_columns_name(tname, schema)
# Dataframe from DataBase
dataframe_db = self.get_dataframe_from_db_v2(tname, schema, pk, wheres_db).sort_values(pk).reset_index(drop=True)
# Dataframe from APIs
dataframe = self.__dataframe__(tname, wheres).sort_values(pk).reset_index(drop=True)
in_db_not_existed = dataframe_db.merge(dataframe, how='right', on=pk, indicator=True) \
.query("_merge == 'right_only'") \
.drop('_merge', axis=1) # noqa
in_db_existed = dataframe_db.merge(dataframe[pk], how='left', indicator=True) \
.query("_merge == 'left_only'") \
.drop('_merge', axis=1)[dataframe_db.columns] # noqa
if not in_db_not_existed.empty:
self.insert_into_table(tname, schema, in_db_not_existed) # noqa
self.compare_tables(tname, schema, pk, wheres, wheres_db)
if not in_db_existed.empty and tname not in (OzonStock, OzonPosting, WbStock, WbPosting, WbIncomes, WbRealization, SelfIncomesItems, SelfIncomes):
self.delete_from_table_v2(tname, schema, in_db_existed, pk="*")
self.compare_tables(tname=tname, schema=schema, pk=pk, wheres=wheres, wheres_db=wheres_db)
if in_db_existed.empty and in_db_not_existed.empty:
self.update_table(dataframe, self.get_dataframe_from_db_v2(tname, schema, wheres_db=wheres_db, pk="*"), tname, schema, pk) # noqa
def insert_into_table(self, tname: Base, schema, in_db_not_existed: pd.DataFrame): # noqa
"""Doc."""
with self.engine.begin() as connection:
in_db_not_existed.to_sql(name=tname.__tablename__, schema=schema, con=connection, index=False, if_exists="append", method="multi") # noqa
print(f"[INSERTED] {len(in_db_not_existed)} rows was inserted into {tname.__tablename__}") # noqa
def delete_from_table_v2(self, tname: Base, schema: str, in_db_existed: pd.DataFrame, pk="*"):
"""Doc."""
keys_list = list(map(lambda x: list(map(lambda y: f"{in_db_existed.columns.values.tolist()[list(x).index(y)]}=" + "'" +str(y) + "'",x)), in_db_existed.values))
with self.Session as session:
query = (delete(tname.__table__).where(text(" OR ".join(" AND ".join(i) for i in keys_list))))
session.execute(query)
session.commit()
print(f"[DELETED] {len(in_db_existed)} rows was deleted") # noqa
def update_table(self, dataframe: pd.DataFrame, dataframe_db: pd.DataFrame, tname: Base, schema: str = None, pk: list or str = "*"):
"""Doc."""
dataframe = dataframe.sort_values(pk).reset_index(drop=True)
dataframe_db = dataframe_db.sort_values(pk).reset_index(drop=True)
if not dataframe.equals(dataframe_db):
for row_old, row_new in zip(dataframe_db.itertuples(index=False), dataframe.itertuples(index=False)):
where = [f"{i[1]}='{row_old[i[0]]}'" for i in enumerate(pk)]
row_old = list(map(lambda x: None if type(x) == pd._libs.tslibs.nattype.NaTType else x, row_old))
row_new = list(map(lambda x: None if type(x) == pd._libs.tslibs.nattype.NaTType else x, row_new))
for old_value, new_value, column in zip(row_old, row_new, enumerate(dataframe_db.columns)):
if old_value != new_value:
print(str(old_value) + " >>> " + str(new_value))
with self.Session as session:
query = (update(tname.__table__).where(text(" AND ".join(where))).values({column[1]: new_value}, ))
print(query)
session.execute(query)
session.commit()