Airflow Dynamic Task Mapping: Difference between revisions

From NovaOrdis Knowledge Base
Jump to navigation Jump to search
Line 121: Line 121:
Note that a "reduce" task is NOT required. The mapped tasks will still be executed even if they have no downstream tasks.
Note that a "reduce" task is NOT required. The mapped tasks will still be executed even if they have no downstream tasks.
=Task Output-generated Mapping=
=Task Output-generated Mapping=
The examples presented so far, even if they use <code>expand()</code>, are not semantically different from [[#Tasks_in_a_Loop|using a <code>for</code> loop in the DAG code]], because the number of iterations and the values for each iteration is known when the DAG is declared. Real dynamic mapping happens when <code>expand()</code> is invoked on a value generated at run time by a dependency task.
The examples presented so far, even if they use <code>expand()</code>, are not semantically different from [[#Tasks_in_a_Loop|using a <code>for</code> loop in the DAG code]], because the number of iterations and the values for each iteration is known when the DAG is declared. Real dynamic mapping happens when <code>expand()</code> is invoked on a value generated at run time by a dependency task:
 
<syntaxhighlight lang='py'>
# generate a sequence at runtime
@task
def A():
    return [randrange(i) for i in [5, 10, 15]]
 
# act on each element of the sequence
@task
def B(i):
    print(f">>> B({i})")
    return i + 1
 
 
# reduce the results
@task
def C(iterable):
    print(f">>> C({iterable})")
    result = sum(iterable)
    print(f">>> result: {result}")
 
@dag(...)
def some_dag():
    a = A()
    # this implicitly declares a dependency of all B tasks on A:
    lazy_results = B.expand(i=a)
    C(lazy_results)  # this implicitly declares a dependency on all B tasks
</syntaxhighlight>


=Constant Parameters=
=Constant Parameters=

Revision as of 23:12, 18 July 2022

External

Internal

Overview

Many workflows are static, in that all the component task instances and their relationships are statically declared in the workflow definition code:

# noinspection PyPep8Naming
@task
def A():
    ...

@task
def B():
    ...

@task
def C():
    ...

@dag(...)
def some_dag():
    A() >> B() >> C()

However, since the workflow definition code is Python, it allows for a certain degree of flexibility when declaring tasks in a loop:

@dag(...)
def some_dag():
  a = A()
  c = C()
  for i in [1, 2, 3]:
      a >> B() >> c

The resulting DAG looks similar to:

Parallel 1.png

This is not really dynamic, in the sense that the structure of the DAG changes at runtime, but equivalent with declaring the same task, individually, multiple times:

@dag(...)
def some_dag():
  a = A()
  c = C()
  a >> B() >> c
  a >> B() >> c
  a >> B() >> c

However, Airflow 2.3 and newer allow declaring truly dynamic DAGs, where the structure of the DAG is determined at runtime by data dynamically generated by previous tasks, which could not be known in advance when the DAG structure is programmed. A workflow can create a number of tasks at runtime, based upon current workflow instance state, rather than the DAG author having to know in advance how many tasks would be needed. The tasks must be created in the DAG definition itself. Tasks cannot create dynamic tasks.

To dynamically create tasks, @task-annotated functions expose the function expand(), which can be used as described below.

Creating Multiple Tasks at Schedule Time with expand()

expand() creates multiple instances of the task definition it was invoked on:

# noinspection PyPep8Naming
@task
def A():
    ...

@task
def B(i):
  print(f">>> B({i})"
  return i + 1

@dag(...)
def some_dag():
  a = A()
  lazy_results = B.expand(i=[1, 2, 3])

Right before an expand()-mapped task is executed, the scheduler will create multiple copies of the task, one for each input value. Creating the tasks will schedule them for execution in parallel, even there are no dependencies of any other tasks:

AirflowParallel2.png

Only keyword arguments are allowed to be passed to expand().

The result of the call is a lazy initialized sequence:

_LazyXComAccess(dag_id='some_dag', run_id='manual__2022-07-18T22:20:22.638279+00:00', task_id='B')

You can use normal sequence syntax on this object (e.g. lazy_results[0]), or iterate through it normally with a for loop. list(lazy_results) will give you a “real” list, but please be aware of the potential performance implications if the list is large.

However, the dynamic task mapping mechanism further allows for collection of the results and processing by a dependent tasks, if the dependent task is declared as accepting an iterable as a constructor argument.

expand() Argument

The expand() argument can be only a list, a dictionary or one of the values stored as an XCom result of a task. If an upstream task returns an unmappable type, the mapped task will fail at run-time with an UnmappableXComTypePushed exception.

If the input is empty (zero length), no new tasks will be created and the mapped task will be marked as skipped.

Limits of Mapped Tasks

TO PROCESS: https://airflow.apache.org/docs/apache-airflow/stable/concepts/dynamic-task-mapping.html#placing-limits-on-mapped-tasks

Collecting Outputs of Mapped Tasks

As mentioned above, the result of expand() invocation is a lazily sequence. If a task is declared as accepting an iterable as argument, and then invoked with the result of expand() execution, the task will automatically become a dependent of all previously mapped tasks, and will be executed when its dependencies complete, allowing it to aggregate the partial results. This is an implementation of the map/reduce pattern.

# noinspection PyPep8Naming
@task
def C(iterable):
  print(f">>> C({iterable})")
  result = sum(iterable)
  print(f">>> result: {result}")

@dag(...)
def some_dag():
  A()
  lazy_results = B.expand(i=[1, 2, 3])
  C(lazy_results) # this implicitly declares a dependency on all dynamically mapped B tasks

The graphical representation of the DAG is similar to:

AirflowParallel3.png

Note that a "reduce" task is NOT required. The mapped tasks will still be executed even if they have no downstream tasks.

Task Output-generated Mapping

The examples presented so far, even if they use expand(), are not semantically different from using a for loop in the DAG code, because the number of iterations and the values for each iteration is known when the DAG is declared. Real dynamic mapping happens when expand() is invoked on a value generated at run time by a dependency task:

# generate a sequence at runtime
@task
def A():
    return [randrange(i) for i in [5, 10, 15]]

# act on each element of the sequence
@task
def B(i):
    print(f">>> B({i})")
    return i + 1


# reduce the results
@task
def C(iterable):
    print(f">>> C({iterable})")
    result = sum(iterable)
    print(f">>> result: {result}")

@dag(...)
def some_dag():
    a = A()
    # this implicitly declares a dependency of all B tasks on A:
    lazy_results = B.expand(i=a)
    C(lazy_results)  # this implicitly declares a dependency on all B tasks

Constant Parameters

TO PROCESS: https://airflow.apache.org/docs/apache-airflow/stable/concepts/dynamic-task-mapping.html#constant-parameters

partial()

Repeated Mapping

The result of one mapped task can also be used as input to the next mapped task.

TO PROCESS: https://airflow.apache.org/docs/apache-airflow/stable/concepts/dynamic-task-mapping.html#repeated-mapping

Mapping Over Multiple Parameters

TO PROCESS: https://airflow.apache.org/docs/apache-airflow/stable/concepts/dynamic-task-mapping.html#mapping-over-multiple-parameters

Mapping with non-TaskFlow Operators

TO PROCESS: https://airflow.apache.org/docs/apache-airflow/stable/concepts/dynamic-task-mapping.html#mapping-with-non-taskflow-operators

How Do Templated Fields and Mapped Arguments Interact?

TO PROCESS: https://airflow.apache.org/docs/apache-airflow/stable/concepts/dynamic-task-mapping.html#how-do-templated-fields-and-mapped-arguments-interact

Questions

  • Can dependencies be declared on dynamic tasks?