Rett-oo
@Rett-oo

Как создавать DAG динамически с динамическим интервалом срабатывания?

Требуется сделать динамическое создание DAG (в зависимости от количества пользователей) и для каждого DAG`а указывать свой интервал срабатывания. Логику я вижу примерно такую:
0. Пользователь через ТГ бота указывает какой интервал рассылки себе поставить, 1 раз в час, 3 раза в час, 2 раза в день и т.д. Далее эти данные обновляются в БД.
1. Создаётся отдельный обслуживающий DAG который срабатывает условно 1 раз в час. Он будет делать запрос в БД получать оттуда пользователей и данные частоты получения рассылки.
2. После запроса, если есть DAGи чье время отличается, у тех параметр schedule_interval обновляется.
3. Обслуживающий даг запускает динамически сгенерированные DAG в зависимости от их интервалов.

Вижу это примерно так. Если есть замечания, или поправки - скажите, я учту. Любой помощи буду рад, спасибо.

ps. основной dag сам по себе простой:
@dag(
    catchup=False,
    default_args=default_args,
    tags=['telegram', 'competitors', 'prices'],
    schedule_interval='0 7,14 * * *'
)
def competitors_prices():

    @task.external_python(python='/usr/local/airflow/venv_main/bin/python')
    def collect_data():
        # execute query in database and save data

    @task.external_python(python='/usr/local/airflow/venv_main/bin/python')
    def send_photo():
        async def send_(ids):
            for id_ in ids:
                 # sending data to user
        asyncio.run(send_(TG_ALLOWED_USERS_ID))

    collect_data() >> send_photo()
  • Вопрос задан
  • 55 просмотров
Пригласить эксперта
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Похожие вопросы
23 нояб. 2024, в 01:31
1000 руб./за проект
23 нояб. 2024, в 00:16
2000 руб./за проект