@marselabdullin

Как совместно использовать TaskGroup и PythonBranchOperator?

Есть trigger_task, который запускает DAG в определенное время, если подошел тайм то он возвращает start_tasks - группа тасков, которая выполняется последовательно, иначе таск stop_tasks который останавливает выполнение всего дага. Проблема в том что PythonBranchOperator неправильно ветвит таски если использовать TaskGroup а не один таск.

Выстраивается так:
trigger_task >> (все таски)
Должно быть так:
trigger_task >> stop_tasks или start_tasks(в зависимости от вывода trigger_task, в starts_tasks все таски последовательно из группы)

Ниже код

trigger_task = BranchPythonOperator(
    task_id='task_trigger',
    python_callable=task_trigger,
    dag=dag
    )

    stop_tasks = PythonOperator(
    task_id='stop_tasks',
    python_callable=stop_func,
    dag=dag
    )

with TaskGroup('start_tasks', dag=dag) as start_tasks:
        get_1c_saldo_contractor = PythonOperator(
            task_id='get_1c_saldo_contractor',
            python_callable=get_1c_saldo_contractor,
             dag=dag
             )

        sql_sensor_dm_partner_balance = SqlSensor(
            task_id='sensor_task_dm_partner_balance',
            poke_interval=60,
            conn_id='airflow_db',
            sql=sql_query.format('DM_PartnerBalance', dt),
            on_failure_callback=custom_failure_function,
            on_success_callback=custom_success_function,
            dag=dag)

        saldo_comparison_task = PythonOperator(
        task_id='saldo_comparison',
        python_callable=saldo_comparison,
        dag=dag
        )

trigger_task >> [stop_tasks, start_tasks]

def task_trigger():
    if datetime.now().strftime('%Y-%m-%d %H:%M') == '2022-08-18 17:03':
        return 'start_tasks'
    else:
        return 'stop_tasks'


def stop_func():
    return 0
  • Вопрос задан
  • 83 просмотра
Пригласить эксперта
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Похожие вопросы