Airflow Concepts: Difference between revisions

From NovaOrdis Knowledge Base
Jump to navigation Jump to search
 
(136 intermediate revisions by the same user not shown)
Line 7: Line 7:
=DAG=
=DAG=
{{External|https://airflow.apache.org/docs/apache-airflow/stable/concepts/dags.html}}
{{External|https://airflow.apache.org/docs/apache-airflow/stable/concepts/dags.html}}
{{External|https://airflow.apache.org/docs/apache-airflow/2.0.0/concepts.html#dags}}
{{Internal|Graph_Concepts#Directed_Acyclic_Graph_.28DAG.29|Graph Concepts | Directed Acyclic Graph}}
{{Internal|Graph_Concepts#Directed_Acyclic_Graph_.28DAG.29|Graph Concepts | Directed Acyclic Graph}}


The edges can be [https://airflow.apache.org/docs/apache-airflow/stable/concepts/dags.html#edge-labels labeled] in the UI.
The edges can be [https://airflow.apache.org/docs/apache-airflow/stable/concepts/dags.html#edge-labels labeled] in the UI.
==SubDAG==
==DAG Name==
{{External|https://airflow.apache.org/docs/apache-airflow/stable/concepts/dags.html#concepts-subdags}}
When the DAG is declared with the [[Airflow_Programming_Model#With_DAG.28.29_Constructor|<code>DAG()</code> constructor]], the [[Airflow_Programming_Model#Constructor_Example|name is the first argument of the constructor]]. When the DAG is declared with the [[Airflow_Programming_Model#With_.40dag_Decorator|<code>@dag</code> decorator]], the name of the DAG is [[Airflow_Programming_Model#Decorator_Example|the name of the function]].
A DAG is made of [[#Task|tasks]] among which there are relations of [[#Dependency|dependency]]. The DAG is not concerned about what happens inside the tasks, it is only concerned about how to run them: order, retries, timeouts. etc.
 
==Declaring a DAG==
==Declaring a DAG==
* Via a context manager.
DAGs are declared in Airflow Python script files, which are just configuration file specifying the DAG's structure as code. The actual tasks defined in it will run in a different context from the context of the script. Different tasks run on different workers at different points in time, which means that the script cannot be used to communicate between tasks. People sometimes think of the DAG definition file as a place where they can do some actual data processing. That is not the case at all. The script’s purpose is to define a DAG object. It needs to evaluate quickly (seconds, not minutes) since the scheduler will execute it periodically to reflect the changes if any.
* With the <code>DAG()</code> constructor.
 
* With the <code>@dag</code> decorator. <font color=darkkhaki>TO PARSE: https://airflow.apache.org/docs/apache-airflow/stable/concepts/dags.html#the-dag-decorator</font>.
<font color=darkkhaki>The DAG definition is executed at task scheduling time, which allows for [[Airflow_Dynamic_Task_Mapping#Overview|dynamic task mapping]].</font>
===With <tt>@dag</tt> Decorator===
{{Internal|Airflow_Programming_Model#With_.40dag_Decorator|Declare a DAG with <code>@dag</code> decorator}}
===With <tt>DAG()</tt> Constructor===
{{Internal|Airflow_Programming_Model#With_DAG.28.29_Constructor|Declare a DAG with a <code>DAG()</code> constructor}}
===Via ContextManager===
<span id='DAG_Decorator_.40dag'></span>The DAG can be declared <font color=darkkhaki>via a context manager</font>.
 
==DAG Configuration Parameters==
Programming model usage example: {{Internal|Airflow_Programming_Model#With_.40dag_Decorator|DAG Configuration Parameters Programming Model}}
====<tt>schedule_interval</tt>====
Optional parameter, if not specified, <font color=darkkhaki>the default is <code>(* * * * * )</code> which means the DAG will be scheduled every minute.</font>
 
To prevent the DAG from being automatically scheduled at all, which is appropriate for manual triggering, use:
<syntaxhighlight lang='py'>
@dag(
    schedule_interval=None,
    [...]
)
</syntaxhighlight>
 
====<tt>start_date</tt>====
Required parameter. Example:
<syntaxhighlight lang='py'>
from datetime import datetime
@dag(
    start_date=datetime(2022, 7, 13, 0),
    [...]
)
</syntaxhighlight>
====<tt>catchup</tt>====
====<tt>max_active_runs</tt>====
====<tt>dagrun_timeout</tt>====
====<tt>default_args</tt>====
 
==DAG Run==
==DAG Run==
{{External|https://airflow.apache.org/docs/apache-airflow/stable/concepts/dags.html#concepts-dag-run}}
{{External|https://airflow.apache.org/docs/apache-airflow/stable/concepts/dags.html#concepts-dag-run}}
A DAG instantiates in a DAG Run at runtime.
{{External|https://airflow.apache.org/docs/apache-airflow/2.0.0/concepts.html#dag-runs}}
{{External|https://airflow.apache.org/docs/apache-airflow/stable/dag-run.html}}
A DAG instantiates in a DAG Run at runtime. A DAG Run has an associated [[#Execution_Context|execution context]].
===Data Interval===
<font color=darkkhaki>TO PROCESS: https://airflow.apache.org/docs/apache-airflow/stable/dag-run.html#data-interval</font>
 
===Logical Date===
<font color=darkkhaki>TO PROCESS: https://airflow.apache.org/docs/apache-airflow/stable/dag-run.html#data-interval</font>
 
===External Triggers===
{{External|https://airflow.apache.org/docs/apache-airflow/stable/dag-run.html#external-triggers}}


==Control Flow==
==Control Flow==
{{External|https://airflow.apache.org/docs/apache-airflow/stable/concepts/dags.html#control-flow}}
{{External|https://airflow.apache.org/docs/apache-airflow/stable/concepts/dags.html#control-flow}}
==<span id='Dynamic_DAG'></span>Dynamic DAG or Dynamic Task Mapping==
==<span id='Dynamic_DAG'></span>Dynamic DAG or Dynamic Task Mapping==
{{External|https://airflow.apache.org/docs/apache-airflow/stable/concepts/dags.html#dynamic-dags}}
The DAGs can be purely declarative, or they can be declared in Python code, by adding tasks dynamically. For more details, see:
The DAGs can be purely declarative, or they can be declared in Python code, by adding tasks dynamically.
{{Internal|Airflow Dynamic Task Mapping|Dynamic Task Mapping}}
==DAG File Processing==
{{External|https://airflow.apache.org/docs/apache-airflow/stable/concepts/dagfile-processing.html}}
==DAG Serialization==
{{External|https://airflow.apache.org/docs/apache-airflow/stable/dag-serialization.html}}
==DAG Scope==
{{External|https://airflow.apache.org/docs/apache-airflow/2.0.0/concepts.html#scope}}
==DAG Configuration==
===Default Arguments===
{{External|https://airflow.apache.org/docs/apache-airflow/2.0.0/concepts.html#default-arguments}}


=Task=
=Task=
{{External|https://airflow.apache.org/docs/apache-airflow/stable/concepts/tasks.html}}
{{External|https://airflow.apache.org/docs/apache-airflow/stable/concepts/tasks.html}}
{{External|https://airflow.apache.org/docs/apache-airflow/2.0.0/concepts.html#tasks}}
A Task is the basic unit of execution in Airflow. Every task must be assigned to a [[#DAG|DAG]] to run. Tasks have dependencies on each other. There could be upstream dependencies (if B depends on A, A → B, then A is an upstream dependency of B). To be [[#scheduled|scheduled]], a task have all its dependencies met.
A Task is the basic unit of execution in Airflow. Every task must be assigned to a [[#DAG|DAG]] to run. Tasks have dependencies on each other. There could be upstream dependencies (if B depends on A, A → B, then A is an upstream dependency of B). To be [[#scheduled|scheduled]], a task have all its dependencies met.
==Task Relationships==
==Task Relationships==
Line 48: Line 103:
===Operator===
===Operator===
{{External|https://airflow.apache.org/docs/apache-airflow/stable/concepts/operators.html}}
{{External|https://airflow.apache.org/docs/apache-airflow/stable/concepts/operators.html}}
An Operator is a predefined task template.  
{{External|https://airflow.apache.org/docs/apache-airflow/stable/operators-and-hooks-ref.html}}
{{External|https://airflow.apache.org/docs/apache-airflow/2.0.0/concepts.html#operators}}
{{External|https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/operators/index.html}}
An Operator is a predefined task template. Popular Airflow operators are:
* [[Airflow EmptyOperator#Overview|EmptyOperator]]
* [[Airflow BashOperator#Overview|BashOperator]]
* [[Airflow PythonOperator#Overview|PythonOperator]]
To differentiate these task types from the [[#.40task_Task_Decorator.2C_TaskFlow-decorated_Task|TaskFlow-style, @task-annotated tasks]], documentation refers to these as "classic style operators".
 
<font color=darkkhaki>Document the operator's <code>execute()</code> method.</font>
 
For usage examples, see: {{Internal|Airflow_Programming_Model#Overview|Airflow Programming Model}}
====Sensor====
====Sensor====
{{External|https://airflow.apache.org/docs/apache-airflow/stable/concepts/sensors.html}}
{{Internal|Airflow Sensor#Overview|Airflow Sensors}}
A Sensor is a subclass of [[#Operator|Operator]] that wait for an external event to happen.
====Deferrable Operators and Triggers====
{{Internal|Airflow Deferrable Operators|Airflow Deferrable Operators}}


===<span id='Bjy4L'></span>TaskFlow-decorated Task===
===<span id='Task_Decorator_.40task'></span><span id='TaskFlow-decorated_Task'></span><span id='TaskFlow'></span><span id='Bjy4L'></span><tt>@task</tt> Task Decorator, TaskFlow-decorated Task===
{{External|https://airflow.apache.org/docs/apache-airflow/stable/concepts/taskflow.html}}
Decorated with <code>@task</code>. A custom Python function packaged up as a Task. For more details see: {{Internal|Airflow_TaskFlow#Overview|TaskFlow}}
Decorated with <code>@task</code>. A custom Python function packaged up as a Task.


==Task Assignment to DAG==
==Task Assignment to DAG==
Line 61: Line 127:
==Task Instance==
==Task Instance==
{{External|https://airflow.apache.org/docs/apache-airflow/stable/concepts/tasks.html#task-instances}}
{{External|https://airflow.apache.org/docs/apache-airflow/stable/concepts/tasks.html#task-instances}}
The same way a [[#DAG|DAG]] is instantiated at runtime into a [[#DAG_Run|DAG Run]], the tasks under a DAG are instantiated into Task Instances.
{{External|https://airflow.apache.org/docs/apache-airflow/2.0.0/concepts.html#task-instances}}
The same way a [[#DAG|DAG]] is instantiated at runtime into a [[#DAG_Run|DAG Run]], the tasks under a DAG are instantiated into '''task instances'''.
 
The task instance is passed as the first argument of the [[Airflow_TaskFlow#Programming_Model|methods annotated with <code>@task</code>]], which provides the task behavior.
===Task States===
===Task States===
====<tt>none</tt>====
====<tt>none</tt>====
Line 80: Line 149:
The task had an error during execution and failed to run.
The task had an error during execution and failed to run.
====<tt>skipped</tt>====
====<tt>skipped</tt>====
The task was skipped due to branching, LatestOnly or similar.
The task was skipped due to branching, LatestOnly or mapping with <code>[[Airflow_Dynamic_Task_Mapping#expand.28.29_Argument|expand()]]</code>.
 
====<tt>upstream_failed</tt>====
====<tt>upstream_failed</tt>====
An upstream task failed and the Trigger Rule says we needed it.
An upstream task failed and the Trigger Rule says we needed it.
Line 93: Line 163:
====<tt>removed</tt>====
====<tt>removed</tt>====
The task has vanished from the DAG since the run started.
The task has vanished from the DAG since the run started.
==Task Lifecycle==
==Task Lifecycle==
{{External|https://airflow.apache.org/docs/apache-airflow/2.0.0/concepts.html#task-lifecycle}}
The normal lifecycle of a task instance is [[#none|none]] → [[#scheduled|scheduled]] → [[#queued|queued]] → [[#running|running]] → [[#success|success]].
==Task Configuration and Data Exchange==
===<span id='Variable'></span>Variables===
{{External|https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html}}
{{External|https://airflow.apache.org/docs/apache-airflow/stable/howto/variable.html}}
{{External|https://airflow.apache.org/docs/apache-airflow/2.0.0/concepts.html#variables}}
Variables are an Airflow runtime configuration concept. Variables are maintained in a general key/value store, which is global and shared by the entire Airflow instance, and which can be queried from the tasks. While the variables can be created via API, they can also be created and updated via UI, gy going to Admin → Variables. Internally, the variables are stored in the variable table. They can be encrypted.
Programming model:
<syntaxhighlight lang='py'>
from airflow.models import Variable
# Normal call style
some_variable = Variable.get("some_variable")
# Auto-deserializes a JSON value
some_other_variable = Variable.get("some_other_variable", deserialize_json=True)
# Returns the value of default_var (None) if the variable is not set
some_variable_2 = Variable.get("some_variable_2", default_var=None)
</syntaxhighlight>
The variables can be used from templates.


The normal lifecycle of a task instance is [[#none|none]] → [[#scheduled|scheduled]] → [[#queued|queued]] → [[#running|running]] → [[#success|success]].
Variables are '''global''' and should be only used for overall configuration that covers the entire installation. To pass data to and from tasks, [[Airflow XComs#Overview|XComs]] should be used instead.
 
===<span id='Param'></span>Params===
{{External|https://airflow.apache.org/docs/apache-airflow/stable/concepts/params.html}}
 
Params are used to provide runtime configuration to tasks. When a DAG is started manually, its Params can be modified before the DAG run starts.
====DAG-level Params====
====Task-level Params====
{{External|https://airflow.apache.org/docs/apache-airflow/stable/concepts/params.html#task-level-params}}


==Passing Data between Tasks==
===XComs===
Tasks pass data among each other using:
Tasks pass data among each other using:
* [[#XComs|XComs]], when the amount of metadata to be exchanged is small.
* XComs, when the amount of metadata to be exchanged is small.
* Uploading and downloading large files from a storage service.
* Uploading and downloading large files from a storage service.
More details: {{Internal|Airflow XComs#Overview|XComs}}
===Execution Context===
{{External|https://airflow.apache.org/docs/apache-airflow/2.0.0/concepts.html#accessing-current-context}}
<font color=darkkhaki>What is the execution context?</font>
An execution context is associated with a [[#DAG_Run|DAG run]]. The current execution context can be retrieved from a task, during the task execution. <font color=darkkhaki>The context is not accessible during <code>pre_execute</code> or <code>post_execute</code>.</font>.
====Execution Context Programming Model====
<syntaxhighlight lang='py'>
@task
def task_a():
  context = get_current_context()
  print(context['dag'])
  print(context['task'])
</syntaxhighlight>
====Execution Context as Data Sharing Medium====
The context DOES NOT seem to be a valid method to exchange data between tasks. Even if a key/value pair can be placed in the context and retrieved locally by a task, the value does not seem to propagate to a subsequent task, even if the subsequent task declares a explicit dependency on the first task.
<syntaxhighlight lang='py'>
@task
def task_a():
  context = get_current_context()
  key = 'test_key'
  context[key] = 'test_value'
  print(f">>> task_a, context[{key}] = {context.get(key)}")
@task
def task_b():
  context = get_current_context()
  key = 'test_key'
  print(f">>> task_b, context[{key}] = {context.get(key)}")
task_a() >> task_b()
</syntaxhighlight>
task_a will display:
<syntaxhighlight lang='text'>
>>> task_a, context[test_key] = test_value
</syntaxhighlight>
task_b will display:
<syntaxhighlight lang='text'>
>>> task_b, context[test_key] = None
</syntaxhighlight>
====Execution Context Keys====
Airflow populates the context with keys like "dag", "task", and the associated values:
{| class="wikitable" style="text-align: left;"
! Key
! Value
! Note
|-
| conf || <code>airflow.configuration.AirflowConfigParser</code> instance ||
|-
| dag || <code>DAG</code> instance ||
|-
| dag_run || DAG Run instance ||
|-
| data_interval_start || 2022-07-16T04:12:58.698129+00:00  ||
|-
| data_interval_end  || 2022-07-16T04:12:58.698129+00:00 ||
|-
| ds || 2022-07-16 ||
|-
| ds_nodash || 20220716 ||
|-
| <span id='execution_date'></span>execution_date || 2022-07-16T04:12:58.698129+00:00 || Deprecated. The execution date (logical date), same as [[#logical_date|dag_run.logical_date]].
|-
| inlets ||  ||
|-
| <span id='logical_date'></span>logical_date || 2022-07-16T04:12:58.698129+00:00 || See [[#Logical_Date|DAG Run Logical Date]].
|-
| macros ||  ||
|-
| next_ds || 2022-07-16 ||
|-
| next_ds_nodash || 20220716 ||
|-
| next_execution_date || 2022-07-16T04:12:58.698129+00:00 ||
|-
| outlets ||  ||
|-
|params  ||  ||
|-
| prev_data_interval_start_success ||2022-07-16T03:38:00.597026+00:00  ||
|-
| prev_data_interval_end_success || 2022-07-16T03:38:00.597026+00:00 ||
|-
|prev_ds  ||2022-07-16  ||
|-
| prev_ds_nodash ||20220716  ||
|-
| prev_execution_date || 2022-07-16T04:12:58.698129+00:00 ||
|-
| prev_execution_date_success ||  ||
|-
| prev_start_date_success || 2022-07-16T03:38:01.381156+00:00 ||
|-
| run_id || manual__2022-07-16T04:12:58.698129+00:00 ||
|-
| task ||  ||
|-
| <span id='task_instance'></span>task_instance || TaskInstance instance ||
|-
| task_instance_key_str || task_experiment_2__task_d__20220716 ||
|-
| test_mode || False ||
|-
| ti  ||  TaskInstance instance || See [[#task_instace|task_instance]].
|-
| tomorrow_ds || 2022-07-17 ||
|-
| tomorrow_ds_nodash || 20220717 ||
|-
| ts || 2022-07-16T04:12:58.698129+00:00 ||
|-
|ts_nodash  ||20220716T041258 ||
|-
| ts_nodash_with_tz || 20220716T041258.698129+0000 ||
|-
| var || {'json': None, 'value': None} ||
|-
| conn || None ||
|-
| yesterday_ds || 2022-07-15 ||
|-
|yesterday_ds_nodash || 20220715 ||
|-
|templates_dict  || None ||
|-
|}
==TaskGroup==
==TaskGroup==
{{External|https://airflow.apache.org/docs/apache-airflow/stable/concepts/dags.html#concepts-taskgroups}}
{{External|https://airflow.apache.org/docs/apache-airflow/stable/concepts/dags.html#concepts-taskgroups}}
Line 113: Line 347:
==Per-Task Executor Configuration==
==Per-Task Executor Configuration==
{{External|https://airflow.apache.org/docs/apache-airflow/stable/concepts/tasks.html#executor-configuration}}
{{External|https://airflow.apache.org/docs/apache-airflow/stable/concepts/tasks.html#executor-configuration}}
 
==Task Logging==
=XComs=
See: {{Internal|Airflow_Logging_and_Monitoring#Logging_for_Tasks|Airflow Logging and Monitoring &#124; Logging for Tasks}}
{{External|https://airflow.apache.org/docs/apache-airflow/stable/concepts/taskflow.html}}
=Branching=
"Cross-communications".
{{External|https://airflow.apache.org/docs/apache-airflow/2.0.0/concepts.html#branching}}


=Workload=
=Workload=
Line 124: Line 358:
=Executor=
=Executor=
{{External|https://airflow.apache.org/docs/apache-airflow/stable/executor/index.html}}
{{External|https://airflow.apache.org/docs/apache-airflow/stable/executor/index.html}}
Executors are the mechanism by which [[#Task_Instance|task instances]] get run. All executors have a common API and they are "pluggable", meaning they can be swapped based on operational needs.
There is no need to run a separate executor process (though you can). For [[#Local_Executors|local executors]], the executor’s logic runs inside the [[#Scheduler|scheduler]] process. If a scheduler is running, then the executor is running.
==Executor Types==
===Local Executors===
Local executors run tasks locally inside the [[#Scheduler|scheduler]] process.
* <span id='Debug_Executor'></span>[[Airflow Debug Executor#Overview|Debug Executor]]
* <span id='Local_Executor'></span>[[Airflow Local Executor#Overview|Local Executor]]
* <span id='Sequential_Executor'></span>[[Airflow Sequential Executor#Overview|Sequential Executor]]. Airflow comes configured with the Sequential Executor by default.
===Remote Executors===
Remote executors run tasks remotely, usually via a [[#Pool|pool]] of [[#Worker|workers]].
* <span id='Celery_Executor'></span>[[Airflow Celery Executor#Overview|Celery Executor]]
* <span id='CeleryKubernetes_Executor'></span>[[Airflow CeleryKubernetes Executor#Overview|CeleryKubernetes Executor]]
* <span id='Dask_Executor'></span>[[Airflow Dask Executor#Overview|Dask Executor]]
* <span id='Kuberentes_Executor'></span>[[Airflow Kuberentes Executor#Overview|Kuberentes Executor]]
* <span id='LocalKubernetes_Executor'></span>[[Airflow LocalKubernetes Executor#Overview|LocalKubernetes Executor]]
=Worker=
=Worker=
Worker slot.
=Metadata Database=
=Metadata Database=
=Connections & Hooks=
=Connections & Hooks=
{{External|https://airflow.apache.org/docs/apache-airflow/stable/concepts/connections.html}}
{{External|https://airflow.apache.org/docs/apache-airflow/stable/concepts/connections.html}}
==Connection==
{{External|https://airflow.apache.org/docs/apache-airflow/stable/howto/connection.html}}
==Hook==
{{External|https://airflow.apache.org/docs/apache-airflow/stable/concepts/connections.html#hooks}}
{{External|https://airflow.apache.org/docs/apache-airflow/stable/python-api-ref.html#pythonapi-hooks}}
=Pool=
=Pool=
{{External|https://airflow.apache.org/docs/apache-airflow/stable/concepts/pools.html}}
{{External|https://airflow.apache.org/docs/apache-airflow/stable/concepts/pools.html}}
=<span id='Macro'></span>Macros=
=<span id='Macro'></span>Macros=
{{External|https://airflow.apache.org/docs/apache-airflow/stable/templates-ref.html#templates-ref}}
{{External|https://airflow.apache.org/docs/apache-airflow/stable/templates-ref.html#templates-ref}}
=Timetables=
{{External|https://airflow.apache.org/docs/apache-airflow/stable/concepts/timetable.html}}
=Priority Weights=
{{External|https://airflow.apache.org/docs/apache-airflow/stable/concepts/priority-weight.html}}
=Cluster Policies=
{{External|https://airflow.apache.org/docs/apache-airflow/stable/concepts/cluster-policies.html}}
=Plugins=
{{External|https://airflow.apache.org/docs/apache-airflow/stable/plugins.html}}
=Security=
{{Internal|Airflow Security#Overview|Airflow Security}}
=Logging and Monitoring=
{{Internal|Airflow Logging and Monitoring#Overview|Logging and Monitoring}}
=Integration=
{{External|https://airflow.apache.org/docs/apache-airflow/stable/integration.html}}
=Python Module Management=
{{External|https://airflow.apache.org/docs/apache-airflow/stable/modules_management.html}}
=Trigger Rule=
{{External|https://airflow.apache.org/docs/apache-airflow/2.0.0/concepts.html#trigger-rules}}
=Jinja Support=
{{External|https://airflow.apache.org/docs/apache-airflow/2.0.0/concepts.html#jinja-templating}}
=Airflow Programming Model=
{{Internal|Airflow_Programming_Model#Overview|Airflow Programming Model}}

Latest revision as of 23:05, 18 July 2022

External

Internal

Workflow

DAG

https://airflow.apache.org/docs/apache-airflow/stable/concepts/dags.html
https://airflow.apache.org/docs/apache-airflow/2.0.0/concepts.html#dags
Graph Concepts | Directed Acyclic Graph

The edges can be labeled in the UI.

DAG Name

When the DAG is declared with the DAG() constructor, the name is the first argument of the constructor. When the DAG is declared with the @dag decorator, the name of the DAG is the name of the function.

Declaring a DAG

DAGs are declared in Airflow Python script files, which are just configuration file specifying the DAG's structure as code. The actual tasks defined in it will run in a different context from the context of the script. Different tasks run on different workers at different points in time, which means that the script cannot be used to communicate between tasks. People sometimes think of the DAG definition file as a place where they can do some actual data processing. That is not the case at all. The script’s purpose is to define a DAG object. It needs to evaluate quickly (seconds, not minutes) since the scheduler will execute it periodically to reflect the changes if any.

The DAG definition is executed at task scheduling time, which allows for dynamic task mapping.

With @dag Decorator

Declare a DAG with @dag decorator

With DAG() Constructor

Declare a DAG with a DAG() constructor

Via ContextManager

The DAG can be declared via a context manager.

DAG Configuration Parameters

Programming model usage example:

DAG Configuration Parameters Programming Model

schedule_interval

Optional parameter, if not specified, the default is (* * * * * ) which means the DAG will be scheduled every minute.

To prevent the DAG from being automatically scheduled at all, which is appropriate for manual triggering, use:

@dag(
    schedule_interval=None,
    [...]
)

start_date

Required parameter. Example:

from datetime import datetime
@dag(
    start_date=datetime(2022, 7, 13, 0),
    [...]
)

catchup

max_active_runs

dagrun_timeout

default_args

DAG Run

https://airflow.apache.org/docs/apache-airflow/stable/concepts/dags.html#concepts-dag-run
https://airflow.apache.org/docs/apache-airflow/2.0.0/concepts.html#dag-runs
https://airflow.apache.org/docs/apache-airflow/stable/dag-run.html

A DAG instantiates in a DAG Run at runtime. A DAG Run has an associated execution context.

Data Interval

TO PROCESS: https://airflow.apache.org/docs/apache-airflow/stable/dag-run.html#data-interval

Logical Date

TO PROCESS: https://airflow.apache.org/docs/apache-airflow/stable/dag-run.html#data-interval

External Triggers

https://airflow.apache.org/docs/apache-airflow/stable/dag-run.html#external-triggers

Control Flow

https://airflow.apache.org/docs/apache-airflow/stable/concepts/dags.html#control-flow

Dynamic DAG or Dynamic Task Mapping

The DAGs can be purely declarative, or they can be declared in Python code, by adding tasks dynamically. For more details, see:

Dynamic Task Mapping

DAG File Processing

https://airflow.apache.org/docs/apache-airflow/stable/concepts/dagfile-processing.html

DAG Serialization

https://airflow.apache.org/docs/apache-airflow/stable/dag-serialization.html

DAG Scope

https://airflow.apache.org/docs/apache-airflow/2.0.0/concepts.html#scope

DAG Configuration

Default Arguments

https://airflow.apache.org/docs/apache-airflow/2.0.0/concepts.html#default-arguments

Task

https://airflow.apache.org/docs/apache-airflow/stable/concepts/tasks.html
https://airflow.apache.org/docs/apache-airflow/2.0.0/concepts.html#tasks

A Task is the basic unit of execution in Airflow. Every task must be assigned to a DAG to run. Tasks have dependencies on each other. There could be upstream dependencies (if B depends on A, A → B, then A is an upstream dependency of B). To be scheduled, a task have all its dependencies met.

Task Relationships

https://airflow.apache.org/docs/apache-airflow/stable/concepts/tasks.html#relationships

The task relationships, are a key part in using Tasks.

There are two types of relationships: dependency and

Upstream and Downstream Dependency

If a task B has a dependency on task A (A → B), it is said that A is upstream of B and B is downstream of A. The dependencies are the directed edges of the directed acyclic graph.

The upstream term has a very strict semantics: an upstream task is the task that is directly preceding the other task. This concept does not describe the tasks that are higher in the task hierarchy (they are not a direct parent of the task). Same constrains apply to a downstream task, which need to be a direct child of the other task.

Previous and Next

There may also be instances of the same task, but for different data intervals - from other runs of the same DAG. These are previous and next.

Task Types

Airflow has three types of tasks: Operator, Sensor, which is a subclass of Operator, and TaskFlow-decorated Task. All these are subclasses of Airflow's BaseOperator. Operators and Sensor are templates, and when one is called in a DAG, it is made into a Task.

Operator

https://airflow.apache.org/docs/apache-airflow/stable/concepts/operators.html
https://airflow.apache.org/docs/apache-airflow/stable/operators-and-hooks-ref.html
https://airflow.apache.org/docs/apache-airflow/2.0.0/concepts.html#operators
https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/operators/index.html

An Operator is a predefined task template. Popular Airflow operators are:

To differentiate these task types from the TaskFlow-style, @task-annotated tasks, documentation refers to these as "classic style operators".

Document the operator's execute() method.

For usage examples, see:

Airflow Programming Model

Sensor

Airflow Sensors

Deferrable Operators and Triggers

Airflow Deferrable Operators

@task Task Decorator, TaskFlow-decorated Task

Decorated with @task. A custom Python function packaged up as a Task. For more details see:

TaskFlow

Task Assignment to DAG

https://airflow.apache.org/docs/apache-airflow/stable/concepts/dags.html#dag-assignment

Task Instance

https://airflow.apache.org/docs/apache-airflow/stable/concepts/tasks.html#task-instances
https://airflow.apache.org/docs/apache-airflow/2.0.0/concepts.html#task-instances

The same way a DAG is instantiated at runtime into a DAG Run, the tasks under a DAG are instantiated into task instances.

The task instance is passed as the first argument of the methods annotated with @task, which provides the task behavior.

Task States

none

The task has not yet been queued for execution because its dependencies are not yet met.

scheduled

The task dependencies have been met, and the scheduled has determined that the task should run.

queued

The task has been assigned to an executor and it is awaiting a worker.

running

The task is running on a worker or on a local/synchronous executor.

success

The task finished running without errors.

shutdown

The task was externally requested to shut down when it was running.

restarting

The task was externally requested to restart when it was running.

failed

The task had an error during execution and failed to run.

skipped

The task was skipped due to branching, LatestOnly or mapping with expand().

upstream_failed

An upstream task failed and the Trigger Rule says we needed it.

up_for_retry

The task failed, but has retry attempts left and will be rescheduled.

up_for_reschedule

The task is a sensor that is in reschedule mode.

sensing

The task is a Smart Sensor.

deferred

The task has been deferred to a trigger.

removed

The task has vanished from the DAG since the run started.

Task Lifecycle

https://airflow.apache.org/docs/apache-airflow/2.0.0/concepts.html#task-lifecycle

The normal lifecycle of a task instance is nonescheduledqueuedrunningsuccess.

Task Configuration and Data Exchange

Variables

https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html
https://airflow.apache.org/docs/apache-airflow/stable/howto/variable.html
https://airflow.apache.org/docs/apache-airflow/2.0.0/concepts.html#variables

Variables are an Airflow runtime configuration concept. Variables are maintained in a general key/value store, which is global and shared by the entire Airflow instance, and which can be queried from the tasks. While the variables can be created via API, they can also be created and updated via UI, gy going to Admin → Variables. Internally, the variables are stored in the variable table. They can be encrypted.

Programming model:

from airflow.models import Variable

# Normal call style
some_variable = Variable.get("some_variable")

# Auto-deserializes a JSON value
some_other_variable = Variable.get("some_other_variable", deserialize_json=True)

# Returns the value of default_var (None) if the variable is not set
some_variable_2 = Variable.get("some_variable_2", default_var=None)

The variables can be used from templates.

Variables are global and should be only used for overall configuration that covers the entire installation. To pass data to and from tasks, XComs should be used instead.

Params

https://airflow.apache.org/docs/apache-airflow/stable/concepts/params.html

Params are used to provide runtime configuration to tasks. When a DAG is started manually, its Params can be modified before the DAG run starts.

DAG-level Params

Task-level Params

https://airflow.apache.org/docs/apache-airflow/stable/concepts/params.html#task-level-params

XComs

Tasks pass data among each other using:

  • XComs, when the amount of metadata to be exchanged is small.
  • Uploading and downloading large files from a storage service.

More details:

XComs

Execution Context

https://airflow.apache.org/docs/apache-airflow/2.0.0/concepts.html#accessing-current-context

What is the execution context?

An execution context is associated with a DAG run. The current execution context can be retrieved from a task, during the task execution. The context is not accessible during pre_execute or post_execute..

Execution Context Programming Model

@task
def task_a():
  context = get_current_context()
  print(context['dag'])
  print(context['task'])

Execution Context as Data Sharing Medium

The context DOES NOT seem to be a valid method to exchange data between tasks. Even if a key/value pair can be placed in the context and retrieved locally by a task, the value does not seem to propagate to a subsequent task, even if the subsequent task declares a explicit dependency on the first task.

@task
def task_a():
  context = get_current_context()
  key = 'test_key'
  context[key] = 'test_value'
  print(f">>> task_a, context[{key}] = {context.get(key)}")

@task
def task_b():
  context = get_current_context()
  key = 'test_key'
  print(f">>> task_b, context[{key}] = {context.get(key)}")

task_a() >> task_b()

task_a will display:

>>> task_a, context[test_key] = test_value

task_b will display:

>>> task_b, context[test_key] = None

Execution Context Keys

Airflow populates the context with keys like "dag", "task", and the associated values:

Key Value Note
conf airflow.configuration.AirflowConfigParser instance
dag DAG instance
dag_run DAG Run instance
data_interval_start 2022-07-16T04:12:58.698129+00:00
data_interval_end 2022-07-16T04:12:58.698129+00:00
ds 2022-07-16
ds_nodash 20220716
execution_date 2022-07-16T04:12:58.698129+00:00 Deprecated. The execution date (logical date), same as dag_run.logical_date.
inlets
logical_date 2022-07-16T04:12:58.698129+00:00 See DAG Run Logical Date.
macros
next_ds 2022-07-16
next_ds_nodash 20220716
next_execution_date 2022-07-16T04:12:58.698129+00:00
outlets
params
prev_data_interval_start_success 2022-07-16T03:38:00.597026+00:00
prev_data_interval_end_success 2022-07-16T03:38:00.597026+00:00
prev_ds 2022-07-16
prev_ds_nodash 20220716
prev_execution_date 2022-07-16T04:12:58.698129+00:00
prev_execution_date_success
prev_start_date_success 2022-07-16T03:38:01.381156+00:00
run_id manual__2022-07-16T04:12:58.698129+00:00
task
task_instance TaskInstance instance
task_instance_key_str task_experiment_2__task_d__20220716
test_mode False
ti TaskInstance instance See task_instance.
tomorrow_ds 2022-07-17
tomorrow_ds_nodash 20220717
ts 2022-07-16T04:12:58.698129+00:00
ts_nodash 20220716T041258
ts_nodash_with_tz 20220716T041258.698129+0000
var {'json': None, 'value': None}
conn None
yesterday_ds 2022-07-15
yesterday_ds_nodash 20220715
templates_dict None

TaskGroup

https://airflow.apache.org/docs/apache-airflow/stable/concepts/dags.html#concepts-taskgroups

This is a pure UI concept.

Task Timeout

https://airflow.apache.org/docs/apache-airflow/stable/concepts/tasks.html#timeouts

Task SLA

https://airflow.apache.org/docs/apache-airflow/stable/concepts/tasks.html#slas

An SLA, or a Service Level Agreement, is an expectation for the maximum time a Task should take.

Zombie/Undead Tasks

https://airflow.apache.org/docs/apache-airflow/stable/concepts/tasks.html#zombie-undead-tasks

Per-Task Executor Configuration

https://airflow.apache.org/docs/apache-airflow/stable/concepts/tasks.html#executor-configuration

Task Logging

See:

Airflow Logging and Monitoring | Logging for Tasks

Branching

https://airflow.apache.org/docs/apache-airflow/2.0.0/concepts.html#branching

Workload

Scheduler

https://airflow.apache.org/docs/apache-airflow/stable/concepts/scheduler.html

Executor

https://airflow.apache.org/docs/apache-airflow/stable/executor/index.html

Executors are the mechanism by which task instances get run. All executors have a common API and they are "pluggable", meaning they can be swapped based on operational needs.

There is no need to run a separate executor process (though you can). For local executors, the executor’s logic runs inside the scheduler process. If a scheduler is running, then the executor is running.

Executor Types

Local Executors

Local executors run tasks locally inside the scheduler process.

Remote Executors

Remote executors run tasks remotely, usually via a pool of workers.

Worker

Worker slot.

Metadata Database

Connections & Hooks

https://airflow.apache.org/docs/apache-airflow/stable/concepts/connections.html

Connection

https://airflow.apache.org/docs/apache-airflow/stable/howto/connection.html

Hook

https://airflow.apache.org/docs/apache-airflow/stable/concepts/connections.html#hooks
https://airflow.apache.org/docs/apache-airflow/stable/python-api-ref.html#pythonapi-hooks

Pool

https://airflow.apache.org/docs/apache-airflow/stable/concepts/pools.html

Macros

https://airflow.apache.org/docs/apache-airflow/stable/templates-ref.html#templates-ref

Timetables

https://airflow.apache.org/docs/apache-airflow/stable/concepts/timetable.html

Priority Weights

https://airflow.apache.org/docs/apache-airflow/stable/concepts/priority-weight.html

Cluster Policies

https://airflow.apache.org/docs/apache-airflow/stable/concepts/cluster-policies.html

Plugins

https://airflow.apache.org/docs/apache-airflow/stable/plugins.html

Security

Airflow Security

Logging and Monitoring

Logging and Monitoring

Integration

https://airflow.apache.org/docs/apache-airflow/stable/integration.html

Python Module Management

https://airflow.apache.org/docs/apache-airflow/stable/modules_management.html

Trigger Rule

https://airflow.apache.org/docs/apache-airflow/2.0.0/concepts.html#trigger-rules

Jinja Support

https://airflow.apache.org/docs/apache-airflow/2.0.0/concepts.html#jinja-templating

Airflow Programming Model

Airflow Programming Model