Подскажите пожалуйста, почему astronomer или airflow преобразует колонку DataFrame с данными типа List[Dict] в ndarray?
У меня есть следующий dag:
@dag(
schedule='@hourly',
catchup=False,
default_args=default_args,
tags=['market', 'items']
)
def market_items():
@task.external_python(python='/usr/local/airflow/venv_main/bin/python')
def get_market_items():
from src.transform import MarketTables
data = MarketTables().items()
print([{key: (type(key), type(value))} for i in data['items'].to_dict(orient='records') for key, value in i.items()])
# >>> [{'id': (<class 'str'>, <class 'int'>)}, {'uuid': (<class 'str'>, <class 'str'>)}, {'card_id': (<class 'str'>, <class 'int'>)}, {'article': (<class 'str'>, <class 'str'>)}, {'category_id': (<class 'str'>, <class 'int'>)}, {'category_name': (<class 'str'>, <class 'str'>)}, {'brand': (<class 'str'>, <class 'str'>)}, {'name': (<class 'str'>, <class 'str'>)}, {'created_at': (<class 'str'>, <class 'str'>)}, {'updated_at': (<class 'str'>, <class 'str'>)}, {'photos': (<class 'str'>, <class 'list'>)}, {'video': (<class 'str'>, <class 'NoneType'>)}, {'description': (<class 'str'>, <class 'str'>)}, {'length': (<class 'str'>, <class 'int'>)}, {'width': (<class 'str'>, <class 'int'>)}, {'height': (<class 'str'>, <class 'int'>)}, {'link': (<class 'str'>, <class 'str'>)}
# look at {'photos': (<class 'str'>, <class 'list'>)}
return data
@task.external_python(python='/usr/local/airflow/venv_main/bin/python')
def update_market_items(data):
from src.databases.DatabaseWorker import DatabaseWorker
from src.databases.company.market_schema import MarketItems
db = DatabaseWorker(host='host.docker.internal', port=5431)
print([{key: (type(key), type(value))} for i in data['items'].to_dict(orient='records') for key, value in i.items()])
# >>> {'id': (<class 'str'>, <class 'int'>)}, {'uuid': (<class 'str'>, <class 'str'>)}, {'card_id': (<class 'str'>, <class 'int'>)}, {'article': (<class 'str'>, <class 'str'>)}, {'category_id': (<class 'str'>, <class 'int'>)}, {'category_name': (<class 'str'>, <class 'str'>)}, {'brand': (<class 'str'>, <class 'str'>)}, {'name': (<class 'str'>, <class 'str'>)}, {'created_at': (<class 'str'>, <class 'str'>)}, {'updated_at': (<class 'str'>, <class 'str'>)}, {'photos': (<class 'str'>, <class 'numpy.ndarray'>)}, {'video': (<class 'str'>, <class 'NoneType'>)}, {'description': (<class 'str'>, <class 'str'>)}, {'length': (<class 'str'>, <class 'int'>)}, {'width': (<class 'str'>, <class 'int'>)}, {'height': (<class 'str'>, <class 'int'>)}, {'link': (<class 'str'>, <class 'str'>)}
# look at {'photos': (<class 'str'>, <class 'numpy.ndarray'>)}
with db.session() as session:
session.upsert(
MarketItems,
data['items'],
on_conflict='do_update',
deletable=True
)
_get_market_items = get_market_items()
_get_market_items >> update_market_items(_get_market_items) >> [
update_market_items_characteristics(_get_market_items),
update_market_items_sizes(_get_market_items),
update_market_items_tags(_get_market_items)
]
market_items()
При выполнении этого дага почему-то при передаче DataFrame между задачами List[Dict] конвертируется в ndarray, хотя на самом деле тип данных в столбце photos — List[Dict]. При этом, когда я выполняю тот же код без использования airflow/astronomer, тип данных не преобразуется в ndarray, а остается List[Dict], который потом нормально вставляется в PostgreSQL с помощью SqlAlchemy.
Почему это происходит и как этого можно избежать?