Есть trigger_task, который запускает DAG в определенное время, если подошел тайм то он возвращает start_tasks - группа тасков, которая выполняется последовательно, иначе таск stop_tasks который останавливает выполнение всего дага. Проблема в том что PythonBranchOperator неправильно ветвит таски если использовать TaskGroup а не один таск.
Выстраивается так:
trigger_task >> (все таски)
Должно быть так:
trigger_task >> stop_tasks или start_tasks(в зависимости от вывода trigger_task, в starts_tasks все таски последовательно из группы)
Ниже код
trigger_task = BranchPythonOperator(
task_id='task_trigger',
python_callable=task_trigger,
dag=dag
)
stop_tasks = PythonOperator(
task_id='stop_tasks',
python_callable=stop_func,
dag=dag
)
with TaskGroup('start_tasks', dag=dag) as start_tasks:
get_1c_saldo_contractor = PythonOperator(
task_id='get_1c_saldo_contractor',
python_callable=get_1c_saldo_contractor,
dag=dag
)
sql_sensor_dm_partner_balance = SqlSensor(
task_id='sensor_task_dm_partner_balance',
poke_interval=60,
conn_id='airflow_db',
sql=sql_query.format('DM_PartnerBalance', dt),
on_failure_callback=custom_failure_function,
on_success_callback=custom_success_function,
dag=dag)
saldo_comparison_task = PythonOperator(
task_id='saldo_comparison',
python_callable=saldo_comparison,
dag=dag
)
trigger_task >> [stop_tasks, start_tasks]
def task_trigger():
if datetime.now().strftime('%Y-%m-%d %H:%M') == '2022-08-18 17:03':
return 'start_tasks'
else:
return 'stop_tasks'
def stop_func():
return 0