Airflow Dynamic Task Mapping
External
- https://airflow.apache.org/docs/apache-airflow/stable/concepts/dynamic-task-mapping.html
- https://airflow.apache.org/docs/apache-airflow/stable/howto/dynamic-dag-generation.html
- https://airflow.apache.org/docs/apache-airflow/stable/concepts/dags.html#dynamic-dags
- https://www.astronomer.io/guides/dynamic-tasks/
Internal
Overview
Many workflows are static, in that all the component task instances and their relationships are statically declared in the workflow definition code:
# noinspection PyPep8Naming
@task
def A():
...
@task
def B():
...
@task
def C():
...
@dag(...)
def some_dag():
A() >> B() >> C()
However, since the workflow definition code is Python, it allows for a certain degree of flexibility when declaring tasks in a loop:
@dag(...)
def some_dag():
a = A()
c = C()
for i in [1, 2, 3]:
a >> B() >> c
The resulting DAG looks similar to:
This is not really dynamic, in the sense that the structure of the DAG changes at runtime, but equivalent with declaring the same task, individually, multiple times:
@dag(...)
def some_dag():
a = A()
c = C()
a >> B() >> c
a >> B() >> c
a >> B() >> c
However, Airflow 2.3 and newer allow declaring truly dynamic DAGs, where the structure of the DAG is determined at runtime by data dynamically generated by previous tasks, which could not be known in advance when the DAG structure is programmed. For that, the @task
-annotated functions expose the function expand()
, which can be used as described below.
Creating Multiple Tasks at Schedule Time with expand()
Questions
- Can dependencies be declared on dynamic tasks?