Rett-oo
@Rett-oo

Почему airflow/astronomer преобразует List в ndarray?

Подскажите пожалуйста, почему astronomer или airflow преобразует колонку DataFrame с данными типа List[Dict] в ndarray?

У меня есть следующий dag:
@dag(
    schedule='@hourly',
    catchup=False,
    default_args=default_args,
    tags=['market', 'items']
)
def market_items():

    @task.external_python(python='/usr/local/airflow/venv_main/bin/python')
    def get_market_items():
        from src.transform import MarketTables
        data = MarketTables().items()
        print([{key: (type(key), type(value))} for i in data['items'].to_dict(orient='records') for key, value in i.items()])
        # >>>  [{'id': (<class 'str'>, <class 'int'>)}, {'uuid': (<class 'str'>, <class 'str'>)}, {'card_id': (<class 'str'>, <class 'int'>)}, {'article': (<class 'str'>, <class 'str'>)}, {'category_id': (<class 'str'>, <class 'int'>)}, {'category_name': (<class 'str'>, <class 'str'>)}, {'brand': (<class 'str'>, <class 'str'>)}, {'name': (<class 'str'>, <class 'str'>)}, {'created_at': (<class 'str'>, <class 'str'>)}, {'updated_at': (<class 'str'>, <class 'str'>)}, {'photos': (<class 'str'>, <class 'list'>)}, {'video': (<class 'str'>, <class 'NoneType'>)}, {'description': (<class 'str'>, <class 'str'>)}, {'length': (<class 'str'>, <class 'int'>)}, {'width': (<class 'str'>, <class 'int'>)}, {'height': (<class 'str'>, <class 'int'>)}, {'link': (<class 'str'>, <class 'str'>)}
        # look at  {'photos': (<class 'str'>, <class 'list'>)}
        return data

    @task.external_python(python='/usr/local/airflow/venv_main/bin/python')
    def update_market_items(data):
        from src.databases.DatabaseWorker import DatabaseWorker
        from src.databases.company.market_schema import MarketItems

        db = DatabaseWorker(host='host.docker.internal', port=5431)
        print([{key: (type(key), type(value))} for i in data['items'].to_dict(orient='records') for key, value in i.items()])

        # >>> {'id': (<class 'str'>, <class 'int'>)}, {'uuid': (<class 'str'>, <class 'str'>)}, {'card_id': (<class 'str'>, <class 'int'>)}, {'article': (<class 'str'>, <class 'str'>)}, {'category_id': (<class 'str'>, <class 'int'>)}, {'category_name': (<class 'str'>, <class 'str'>)}, {'brand': (<class 'str'>, <class 'str'>)}, {'name': (<class 'str'>, <class 'str'>)}, {'created_at': (<class 'str'>, <class 'str'>)}, {'updated_at': (<class 'str'>, <class 'str'>)}, {'photos': (<class 'str'>, <class 'numpy.ndarray'>)}, {'video': (<class 'str'>, <class 'NoneType'>)}, {'description': (<class 'str'>, <class 'str'>)}, {'length': (<class 'str'>, <class 'int'>)}, {'width': (<class 'str'>, <class 'int'>)}, {'height': (<class 'str'>, <class 'int'>)}, {'link': (<class 'str'>, <class 'str'>)}
        # look at {'photos': (<class 'str'>, <class 'numpy.ndarray'>)}
        with db.session() as session:
            session.upsert(
                MarketItems,
                data['items'],
                on_conflict='do_update',
                deletable=True
            )
    _get_market_items = get_market_items()
    _get_market_items >> update_market_items(_get_market_items) >> [
        update_market_items_characteristics(_get_market_items),
        update_market_items_sizes(_get_market_items),
        update_market_items_tags(_get_market_items)
    ]

market_items()


При выполнении этого дага почему-то при передаче DataFrame между задачами List[Dict] конвертируется в ndarray, хотя на самом деле тип данных в столбце photos — List[Dict]. При этом, когда я выполняю тот же код без использования airflow/astronomer, тип данных не преобразуется в ndarray, а остается List[Dict], который потом нормально вставляется в PostgreSQL с помощью SqlAlchemy. Почему это происходит и как этого можно избежать?
  • Вопрос задан
  • 127 просмотров
Пригласить эксперта
Ответы на вопрос 1
ZhenyaMak
@ZhenyaMak
Писать качественный софт? Пфф, это не для меня.
Потому что так написали разработчики
Ответ написан
Комментировать
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Войти через центр авторизации
Похожие вопросы