Airflow Dynamic Task Mapping

From NovaOrdis Knowledge Base
Jump to navigation Jump to search

External

Internal

Overview

Many workflows are static, in that all the component task instances and their relationships are statically declared in the workflow definition code:

# noinspection PyPep8Naming
@task
def A():
    ...

@task
def B():
    ...

@task
def C():
    ...

@dag(...)
def some_dag():
    A() >> B() >> C()

However, since the workflow definition code is Python, it allows for a certain degree of flexibility when declaring tasks in a loop:

@dag(...)
def some_dag():
  a = A()
  c = C()
  for i in [1, 2, 3]:
      a >> B() >> c

The resulting DAG looks similar to:

Parallel 1.png

This is not really dynamic, in the sense that the structure of the DAG changes at runtime, but equivalent with declaring the same task, individually, multiple times:

@dag(...)
def some_dag():
  a = A()
  c = C()
  a >> B() >> c
  a >> B() >> c
  a >> B() >> c

However, Airflow 2.3 and newer allow declaring truly dynamic DAGs, where the structure of the DAG is determined at runtime by data dynamically generated by previous tasks, which could not be known in advance when the DAG structure is programmed. A workflow can create a number of tasks at runtime, based upon current workflow instance state, rather than the DAG author having to know in advance how many tasks would be needed. The tasks must be created in the DAG definition itself. Tasks cannot create dynamic tasks.

To dynamically create tasks, @task-annotated functions expose the function expand(), which can be used as described below.

Creating Multiple Tasks at Schedule Time with expand()

expand() creates multiple instances of the task definition it was invoked on:

# noinspection PyPep8Naming
@task
def A():
    ...

@task
def B(i):
  print(f">>> B({i})"
  return i + 1

@task
def C(iterable):
  print(f">>> C({iterable})")
  result = sum(iterable)
  print(f">>> result: {result}")

@dag(...)
def some_dag():
  a = A()
  lazy_results = B.expand(i=[1, 2, 3])
  C(lazy_results) # this implicitly declares a dependency on all dynamically mapped B tasks

Right before an expand()-mapped task is executed, the scheduler will create multiple copies of the task, one for each input value. Creating the tasks will schedule them for execution in parallel, even there are no dependencies of any other tasks. However, the dynamic task mapping allows for collection of the results and processing by a dependent tasks, if the dependent task is declared as accepting an iterable as a constructor argument.

Collecting Outputs of Mapped Tasks

# noinspection PyPep8Naming
@task
def C(iterable):
  print(f">>> C({iterable})")
  result = sum(iterable)
  print(f">>> result: {result}")

@dag(...)
def some_dag():
  lazy_results = B.expand(i=[1, 2, 3])
  C(lazy_results) # this implicitly declares a dependency on all dynamically mapped B tasks

Questions

  • Can dependencies be declared on dynamic tasks?