Airflowで動的ワークフローを作成する適切な方法 質問する

Airflowで動的ワークフローを作成する適切な方法 質問する

問題

Airflow で、タスク A が完了するまでタスク B.* の数が不明になるようなワークフローを作成する方法はありますか? サブ DAG を確認しましたが、DAG の作成時に決定する必要がある静的なタスク セットでのみ機能するようです。

DAGA トリガーは機能しますか? 機能する場合は、例を挙げていただけますか。

タスク A が完了するまで、タスク C を計算するために必要なタスク B の数を知ることができないという問題があります。各タスク B.* の計算には数時間かかり、結合することはできません。

              |---> Task B.1 --|
              |---> Task B.2 --|
 Task A ------|---> Task B.3 --|-----> Task C
              |       ....     |
              |---> Task B.N --|

アイデア #1

このソリューションは、ブロッキング ExternalTask​​Sensor を作成する必要があり、すべてのタスク B.* が完了するまでに 2 ~ 24 時間かかるため、気に入りません。したがって、これは実行可能なソリューションではないと思います。もっと簡単な方法があるはずです。それとも、Airflow はこれ向けに設計されていないのでしょうか。

Dag 1
Task A -> TriggerDagRunOperator(Dag 2) -> ExternalTaskSensor(Dag 2, Task Dummy B) -> Task C

Dag 2 (Dynamically created DAG though python_callable in TriggerDagrunOperator)
               |-- Task B.1 --|
               |-- Task B.2 --|
Task Dummy A --|-- Task B.3 --|-----> Task Dummy B
               |     ....     |
               |-- Task B.N --|

編集1:

今のところこの質問に対する良い答えはまだない解決策を探している何人かの人から連絡がありました。

ベストアンサー1

サブダグなしで同様のリクエストを実行した方法を次に示します。

まず、必要な値を返すメソッドを作成します

def values_function():
     return values

次に、ジョブを動的に生成するメソッドを作成します。

def group(number, **kwargs):
        #load the values if needed in the command you plan to execute
        dyn_value = "{{ task_instance.xcom_pull(task_ids='push_func') }}"
        return BashOperator(
                task_id='JOB_NAME_{}'.format(number),
                bash_command='script.sh {} {}'.format(dyn_value, number),
                dag=dag)

そして、それらを組み合わせます:

push_func = PythonOperator(
        task_id='push_func',
        provide_context=True,
        python_callable=values_function,
        dag=dag)

complete = DummyOperator(
        task_id='All_jobs_completed',
        dag=dag)

for i in values_function():
        push_func >> group(i) >> complete

おすすめ記事