Rett-oo
@Rett-oo

Почему airflow/astronomer преобразует List в ndarray?

Подскажите пожалуйста, почему astronomer или airflow преобразует колонку DataFrame с данными типа List[Dict] в ndarray?

У меня есть следующий dag:
@dag(
    schedule='@hourly',
    catchup=False,
    default_args=default_args,
    tags=['market', 'items']
)
def market_items():

    @task.external_python(python='/usr/local/airflow/venv_main/bin/python')
    def get_market_items():
        from src.transform import MarketTables
        data = MarketTables().items()
        print([{key: (type(key), type(value))} for i in data['items'].to_dict(orient='records') for key, value in i.items()])
        # >>>  [{'id': (<class 'str'>, <class 'int'>)}, {'uuid': (<class 'str'>, <class 'str'>)}, {'card_id': (<class 'str'>, <class 'int'>)}, {'article': (<class 'str'>, <class 'str'>)}, {'category_id': (<class 'str'>, <class 'int'>)}, {'category_name': (<class 'str'>, <class 'str'>)}, {'brand': (<class 'str'>, <class 'str'>)}, {'name': (<class 'str'>, <class 'str'>)}, {'created_at': (<class 'str'>, <class 'str'>)}, {'updated_at': (<class 'str'>, <class 'str'>)}, {'photos': (<class 'str'>, <class 'list'>)}, {'video': (<class 'str'>, <class 'NoneType'>)}, {'description': (<class 'str'>, <class 'str'>)}, {'length': (<class 'str'>, <class 'int'>)}, {'width': (<class 'str'>, <class 'int'>)}, {'height': (<class 'str'>, <class 'int'>)}, {'link': (<class 'str'>, <class 'str'>)}
        # look at  {'photos': (<class 'str'>, <class 'list'>)}
        return data

    @task.external_python(python='/usr/local/airflow/venv_main/bin/python')
    def update_market_items(data):
        from src.databases.DatabaseWorker import DatabaseWorker
        from src.databases.company.market_schema import MarketItems

        db = DatabaseWorker(host='host.docker.internal', port=5431)
        print([{key: (type(key), type(value))} for i in data['items'].to_dict(orient='records') for key, value in i.items()])

        # >>> {'id': (<class 'str'>, <class 'int'>)}, {'uuid': (<class 'str'>, <class 'str'>)}, {'card_id': (<class 'str'>, <class 'int'>)}, {'article': (<class 'str'>, <class 'str'>)}, {'category_id': (<class 'str'>, <class 'int'>)}, {'category_name': (<class 'str'>, <class 'str'>)}, {'brand': (<class 'str'>, <class 'str'>)}, {'name': (<class 'str'>, <class 'str'>)}, {'created_at': (<class 'str'>, <class 'str'>)}, {'updated_at': (<class 'str'>, <class 'str'>)}, {'photos': (<class 'str'>, <class 'numpy.ndarray'>)}, {'video': (<class 'str'>, <class 'NoneType'>)}, {'description': (<class 'str'>, <class 'str'>)}, {'length': (<class 'str'>, <class 'int'>)}, {'width': (<class 'str'>, <class 'int'>)}, {'height': (<class 'str'>, <class 'int'>)}, {'link': (<class 'str'>, <class 'str'>)}
        # look at {'photos': (<class 'str'>, <class 'numpy.ndarray'>)}
        with db.session() as session:
            session.upsert(
                MarketItems,
                data['items'],
                on_conflict='do_update',
                deletable=True
            )
    _get_market_items = get_market_items()
    _get_market_items >> update_market_items(_get_market_items) >> [
        update_market_items_characteristics(_get_market_items),
        update_market_items_sizes(_get_market_items),
        update_market_items_tags(_get_market_items)
    ]

market_items()


При выполнении этого дага почему-то при передаче DataFrame между задачами List[Dict] конвертируется в ndarray, хотя на самом деле тип данных в столбце photos — List[Dict]. При этом, когда я выполняю тот же код без использования airflow/astronomer, тип данных не преобразуется в ndarray, а остается List[Dict], который потом нормально вставляется в PostgreSQL с помощью SqlAlchemy. Почему это происходит и как этого можно избежать?
  • Вопрос задан
  • 130 просмотров
Пригласить эксперта
Ответы на вопрос 1
ZhenyaMak
@ZhenyaMak
Писать качественный софт? Пфф, это не для меня.
Потому что так написали разработчики
Ответ написан
Комментировать
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Похожие вопросы