Требуется сделать динамическое создание DAG (в зависимости от количества пользователей) и для каждого DAG`а указывать свой интервал срабатывания. Логику я вижу примерно такую:
0. Пользователь через ТГ бота указывает какой интервал рассылки себе поставить, 1 раз в час, 3 раза в час, 2 раза в день и т.д. Далее эти данные обновляются в БД.
1. Создаётся отдельный обслуживающий DAG который срабатывает условно 1 раз в час. Он будет делать запрос в БД получать оттуда пользователей и данные частоты получения рассылки.
2. После запроса, если есть DAGи чье время отличается, у тех параметр schedule_interval обновляется.
3. Обслуживающий даг запускает динамически сгенерированные DAG в зависимости от их интервалов.
Вижу это примерно так. Если есть замечания, или поправки - скажите, я учту. Любой помощи буду рад, спасибо.
ps. основной dag сам по себе простой:
@dag(
catchup=False,
default_args=default_args,
tags=['telegram', 'competitors', 'prices'],
schedule_interval='0 7,14 * * *'
)
def competitors_prices():
@task.external_python(python='/usr/local/airflow/venv_main/bin/python')
def collect_data():
# execute query in database and save data
@task.external_python(python='/usr/local/airflow/venv_main/bin/python')
def send_photo():
async def send_(ids):
for id_ in ids:
# sending data to user
asyncio.run(send_(TG_ALLOWED_USERS_ID))
collect_data() >> send_photo()