Airflow Concepts: Difference between revisions
(→XComs) |
|||
(131 intermediate revisions by the same user not shown) | |||
Line 7: | Line 7: | ||
=DAG= | =DAG= | ||
{{External|https://airflow.apache.org/docs/apache-airflow/stable/concepts/dags.html}} | {{External|https://airflow.apache.org/docs/apache-airflow/stable/concepts/dags.html}} | ||
{{External|https://airflow.apache.org/docs/apache-airflow/2.0.0/concepts.html#dags}} | |||
{{Internal|Graph_Concepts#Directed_Acyclic_Graph_.28DAG.29|Graph Concepts | Directed Acyclic Graph}} | {{Internal|Graph_Concepts#Directed_Acyclic_Graph_.28DAG.29|Graph Concepts | Directed Acyclic Graph}} | ||
The edges can be [https://airflow.apache.org/docs/apache-airflow/stable/concepts/dags.html#edge-labels labeled] in the UI. | The edges can be [https://airflow.apache.org/docs/apache-airflow/stable/concepts/dags.html#edge-labels labeled] in the UI. | ||
== | ==DAG Name== | ||
When the DAG is declared with the [[Airflow_Programming_Model#With_DAG.28.29_Constructor|<code>DAG()</code> constructor]], the [[Airflow_Programming_Model#Constructor_Example|name is the first argument of the constructor]]. When the DAG is declared with the [[Airflow_Programming_Model#With_.40dag_Decorator|<code>@dag</code> decorator]], the name of the DAG is [[Airflow_Programming_Model#Decorator_Example|the name of the function]]. | |||
==Declaring a DAG== | ==Declaring a DAG== | ||
DAGs are declared in Airflow Python script files, which are just configuration file specifying the DAG's structure as code. The actual tasks defined in it will run in a different context from the context of the script. Different tasks run on different workers at different points in time, which means that the script cannot be used to communicate between tasks. People sometimes think of the DAG definition file as a place where they can do some actual data processing. That is not the case at all. The script’s purpose is to define a DAG object. It needs to evaluate quickly (seconds, not minutes) since the scheduler will execute it periodically to reflect the changes if any. | |||
<font color=darkkhaki>The DAG definition is executed at task scheduling time, which allows for [[Airflow_Dynamic_Task_Mapping#Overview|dynamic task mapping]].</font> | |||
===With <tt>@dag</tt> Decorator=== | |||
{{Internal|Airflow_Programming_Model#With_.40dag_Decorator|Declare a DAG with <code>@dag</code> decorator}} | |||
===With <tt>DAG()</tt> Constructor=== | |||
{{Internal|Airflow_Programming_Model#With_DAG.28.29_Constructor|Declare a DAG with a <code>DAG()</code> constructor}} | |||
===Via ContextManager=== | |||
<span id='DAG_Decorator_.40dag'></span>The DAG can be declared <font color=darkkhaki>via a context manager</font>. | |||
==DAG Configuration Parameters== | |||
Programming model usage example: {{Internal|Airflow_Programming_Model#With_.40dag_Decorator|DAG Configuration Parameters Programming Model}} | |||
====<tt>schedule_interval</tt>==== | |||
Optional parameter, if not specified, <font color=darkkhaki>the default is <code>(* * * * * )</code> which means the DAG will be scheduled every minute.</font> | |||
To prevent the DAG from being automatically scheduled at all, which is appropriate for manual triggering, use: | |||
<syntaxhighlight lang='py'> | |||
@dag( | |||
schedule_interval=None, | |||
[...] | |||
) | |||
</syntaxhighlight> | |||
====<tt>start_date</tt>==== | |||
Required parameter. Example: | |||
<syntaxhighlight lang='py'> | |||
from datetime import datetime | |||
@dag( | |||
start_date=datetime(2022, 7, 13, 0), | |||
[...] | |||
) | |||
</syntaxhighlight> | |||
====<tt>catchup</tt>==== | |||
====<tt>max_active_runs</tt>==== | |||
====<tt>dagrun_timeout</tt>==== | |||
====<tt>default_args</tt>==== | |||
==DAG Run== | ==DAG Run== | ||
{{External|https://airflow.apache.org/docs/apache-airflow/stable/concepts/dags.html#concepts-dag-run}} | {{External|https://airflow.apache.org/docs/apache-airflow/stable/concepts/dags.html#concepts-dag-run}} | ||
A DAG instantiates in a DAG Run at runtime. | {{External|https://airflow.apache.org/docs/apache-airflow/2.0.0/concepts.html#dag-runs}} | ||
{{External|https://airflow.apache.org/docs/apache-airflow/stable/dag-run.html}} | |||
A DAG instantiates in a DAG Run at runtime. A DAG Run has an associated [[#Execution_Context|execution context]]. | |||
===Data Interval=== | |||
<font color=darkkhaki>TO PROCESS: https://airflow.apache.org/docs/apache-airflow/stable/dag-run.html#data-interval</font> | |||
===Logical Date=== | |||
<font color=darkkhaki>TO PROCESS: https://airflow.apache.org/docs/apache-airflow/stable/dag-run.html#data-interval</font> | |||
===External Triggers=== | |||
{{External|https://airflow.apache.org/docs/apache-airflow/stable/dag-run.html#external-triggers}} | |||
==Control Flow== | ==Control Flow== | ||
Line 26: | Line 71: | ||
The DAGs can be purely declarative, or they can be declared in Python code, by adding tasks dynamically. For more details, see: | The DAGs can be purely declarative, or they can be declared in Python code, by adding tasks dynamically. For more details, see: | ||
{{Internal|Airflow Dynamic Task Mapping|Dynamic Task Mapping}} | {{Internal|Airflow Dynamic Task Mapping|Dynamic Task Mapping}} | ||
==DAG File Processing== | |||
{{External|https://airflow.apache.org/docs/apache-airflow/stable/concepts/dagfile-processing.html}} | |||
==DAG Serialization== | |||
{{External|https://airflow.apache.org/docs/apache-airflow/stable/dag-serialization.html}} | |||
==DAG Scope== | |||
{{External|https://airflow.apache.org/docs/apache-airflow/2.0.0/concepts.html#scope}} | |||
==DAG Configuration== | |||
===Default Arguments=== | |||
{{External|https://airflow.apache.org/docs/apache-airflow/2.0.0/concepts.html#default-arguments}} | |||
=Task= | =Task= | ||
{{External|https://airflow.apache.org/docs/apache-airflow/stable/concepts/tasks.html}} | {{External|https://airflow.apache.org/docs/apache-airflow/stable/concepts/tasks.html}} | ||
{{External|https://airflow.apache.org/docs/apache-airflow/2.0.0/concepts.html#tasks}} | |||
A Task is the basic unit of execution in Airflow. Every task must be assigned to a [[#DAG|DAG]] to run. Tasks have dependencies on each other. There could be upstream dependencies (if B depends on A, A → B, then A is an upstream dependency of B). To be [[#scheduled|scheduled]], a task have all its dependencies met. | A Task is the basic unit of execution in Airflow. Every task must be assigned to a [[#DAG|DAG]] to run. Tasks have dependencies on each other. There could be upstream dependencies (if B depends on A, A → B, then A is an upstream dependency of B). To be [[#scheduled|scheduled]], a task have all its dependencies met. | ||
==Task Relationships== | ==Task Relationships== | ||
Line 48: | Line 103: | ||
===Operator=== | ===Operator=== | ||
{{External|https://airflow.apache.org/docs/apache-airflow/stable/concepts/operators.html}} | {{External|https://airflow.apache.org/docs/apache-airflow/stable/concepts/operators.html}} | ||
An Operator is a predefined task template. | {{External|https://airflow.apache.org/docs/apache-airflow/stable/operators-and-hooks-ref.html}} | ||
{{External|https://airflow.apache.org/docs/apache-airflow/2.0.0/concepts.html#operators}} | |||
{{External|https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/operators/index.html}} | |||
An Operator is a predefined task template. Popular Airflow operators are: | |||
* [[Airflow EmptyOperator#Overview|EmptyOperator]] | |||
* [[Airflow BashOperator#Overview|BashOperator]] | |||
* [[Airflow PythonOperator#Overview|PythonOperator]] | |||
To differentiate these task types from the [[#.40task_Task_Decorator.2C_TaskFlow-decorated_Task|TaskFlow-style, @task-annotated tasks]], documentation refers to these as "classic style operators". | |||
<font color=darkkhaki>Document the operator's <code>execute()</code> method.</font> | |||
For usage examples, see: {{Internal|Airflow_Programming_Model#Overview|Airflow Programming Model}} | |||
====Sensor==== | ====Sensor==== | ||
{{ | {{Internal|Airflow Sensor#Overview|Airflow Sensors}} | ||
{{ | ====Deferrable Operators and Triggers==== | ||
{{Internal|Airflow Deferrable Operators|Airflow Deferrable Operators}} | |||
===<span id='Bjy4L'></span>TaskFlow-decorated Task=== | ===<span id='Task_Decorator_.40task'></span><span id='TaskFlow-decorated_Task'></span><span id='TaskFlow'></span><span id='Bjy4L'></span><tt>@task</tt> Task Decorator, TaskFlow-decorated Task=== | ||
Decorated with <code>@task</code>. A custom Python function packaged up as a Task. For more details see: {{Internal|Airflow_TaskFlow#Overview|TaskFlow}} | |||
Decorated with <code>@task</code>. A custom Python function packaged up as a Task. | |||
==Task Assignment to DAG== | ==Task Assignment to DAG== | ||
Line 64: | Line 127: | ||
==Task Instance== | ==Task Instance== | ||
{{External|https://airflow.apache.org/docs/apache-airflow/stable/concepts/tasks.html#task-instances}} | {{External|https://airflow.apache.org/docs/apache-airflow/stable/concepts/tasks.html#task-instances}} | ||
The same way a [[#DAG|DAG]] is instantiated at runtime into a [[#DAG_Run|DAG Run]], the tasks under a DAG are instantiated into | {{External|https://airflow.apache.org/docs/apache-airflow/2.0.0/concepts.html#task-instances}} | ||
The same way a [[#DAG|DAG]] is instantiated at runtime into a [[#DAG_Run|DAG Run]], the tasks under a DAG are instantiated into '''task instances'''. | |||
The task instance is passed as the first argument of the [[Airflow_TaskFlow#Programming_Model|methods annotated with <code>@task</code>]], which provides the task behavior. | |||
===Task States=== | ===Task States=== | ||
====<tt>none</tt>==== | ====<tt>none</tt>==== | ||
Line 83: | Line 149: | ||
The task had an error during execution and failed to run. | The task had an error during execution and failed to run. | ||
====<tt>skipped</tt>==== | ====<tt>skipped</tt>==== | ||
The task was skipped due to branching, LatestOnly or | The task was skipped due to branching, LatestOnly or mapping with <code>[[Airflow_Dynamic_Task_Mapping#expand.28.29_Argument|expand()]]</code>. | ||
====<tt>upstream_failed</tt>==== | ====<tt>upstream_failed</tt>==== | ||
An upstream task failed and the Trigger Rule says we needed it. | An upstream task failed and the Trigger Rule says we needed it. | ||
Line 96: | Line 163: | ||
====<tt>removed</tt>==== | ====<tt>removed</tt>==== | ||
The task has vanished from the DAG since the run started. | The task has vanished from the DAG since the run started. | ||
==Task Lifecycle== | ==Task Lifecycle== | ||
{{External|https://airflow.apache.org/docs/apache-airflow/2.0.0/concepts.html#task-lifecycle}} | |||
The normal lifecycle of a task instance is [[#none|none]] → [[#scheduled|scheduled]] → [[#queued|queued]] → [[#running|running]] → [[#success|success]]. | |||
==Task Configuration and Data Exchange== | |||
===<span id='Variable'></span>Variables=== | |||
{{External|https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html}} | |||
{{External|https://airflow.apache.org/docs/apache-airflow/stable/howto/variable.html}} | |||
{{External|https://airflow.apache.org/docs/apache-airflow/2.0.0/concepts.html#variables}} | |||
Variables are an Airflow runtime configuration concept. Variables are maintained in a general key/value store, which is global and shared by the entire Airflow instance, and which can be queried from the tasks. While the variables can be created via API, they can also be created and updated via UI, gy going to Admin → Variables. Internally, the variables are stored in the variable table. They can be encrypted. | |||
Programming model: | |||
<syntaxhighlight lang='py'> | |||
from airflow.models import Variable | |||
# Normal call style | |||
some_variable = Variable.get("some_variable") | |||
# Auto-deserializes a JSON value | |||
some_other_variable = Variable.get("some_other_variable", deserialize_json=True) | |||
# Returns the value of default_var (None) if the variable is not set | |||
some_variable_2 = Variable.get("some_variable_2", default_var=None) | |||
</syntaxhighlight> | |||
The variables can be used from templates. | |||
Variables are '''global''' and should be only used for overall configuration that covers the entire installation. To pass data to and from tasks, [[Airflow XComs#Overview|XComs]] should be used instead. | |||
===<span id='Param'></span>Params=== | |||
{{External|https://airflow.apache.org/docs/apache-airflow/stable/concepts/params.html}} | |||
Params are used to provide runtime configuration to tasks. When a DAG is started manually, its Params can be modified before the DAG run starts. | |||
====DAG-level Params==== | |||
====Task-level Params==== | |||
{{External|https://airflow.apache.org/docs/apache-airflow/stable/concepts/params.html#task-level-params}} | |||
== | ===XComs=== | ||
Tasks pass data among each other using: | Tasks pass data among each other using: | ||
* | * XComs, when the amount of metadata to be exchanged is small. | ||
* Uploading and downloading large files from a storage service. | * Uploading and downloading large files from a storage service. | ||
More details: {{Internal|Airflow XComs#Overview|XComs}} | |||
===Execution Context=== | |||
{{External|https://airflow.apache.org/docs/apache-airflow/2.0.0/concepts.html#accessing-current-context}} | |||
<font color=darkkhaki>What is the execution context?</font> | |||
An execution context is associated with a [[#DAG_Run|DAG run]]. The current execution context can be retrieved from a task, during the task execution. <font color=darkkhaki>The context is not accessible during <code>pre_execute</code> or <code>post_execute</code>.</font>. | |||
====Execution Context Programming Model==== | |||
<syntaxhighlight lang='py'> | |||
@task | |||
def task_a(): | |||
context = get_current_context() | |||
print(context['dag']) | |||
print(context['task']) | |||
</syntaxhighlight> | |||
====Execution Context as Data Sharing Medium==== | |||
The context DOES NOT seem to be a valid method to exchange data between tasks. Even if a key/value pair can be placed in the context and retrieved locally by a task, the value does not seem to propagate to a subsequent task, even if the subsequent task declares a explicit dependency on the first task. | |||
<syntaxhighlight lang='py'> | |||
@task | |||
def task_a(): | |||
context = get_current_context() | |||
key = 'test_key' | |||
context[key] = 'test_value' | |||
print(f">>> task_a, context[{key}] = {context.get(key)}") | |||
@task | |||
def task_b(): | |||
context = get_current_context() | |||
key = 'test_key' | |||
print(f">>> task_b, context[{key}] = {context.get(key)}") | |||
task_a() >> task_b() | |||
</syntaxhighlight> | |||
task_a will display: | |||
<syntaxhighlight lang='text'> | |||
>>> task_a, context[test_key] = test_value | |||
</syntaxhighlight> | |||
task_b will display: | |||
<syntaxhighlight lang='text'> | |||
>>> task_b, context[test_key] = None | |||
</syntaxhighlight> | |||
====Execution Context Keys==== | |||
Airflow populates the context with keys like "dag", "task", and the associated values: | |||
{| class="wikitable" style="text-align: left;" | |||
! Key | |||
! Value | |||
! Note | |||
|- | |||
| conf || <code>airflow.configuration.AirflowConfigParser</code> instance || | |||
|- | |||
| dag || <code>DAG</code> instance || | |||
|- | |||
| dag_run || DAG Run instance || | |||
|- | |||
| data_interval_start || 2022-07-16T04:12:58.698129+00:00 || | |||
|- | |||
| data_interval_end || 2022-07-16T04:12:58.698129+00:00 || | |||
|- | |||
| ds || 2022-07-16 || | |||
|- | |||
| ds_nodash || 20220716 || | |||
|- | |||
| <span id='execution_date'></span>execution_date || 2022-07-16T04:12:58.698129+00:00 || Deprecated. The execution date (logical date), same as [[#logical_date|dag_run.logical_date]]. | |||
|- | |||
| inlets || || | |||
|- | |||
| <span id='logical_date'></span>logical_date || 2022-07-16T04:12:58.698129+00:00 || See [[#Logical_Date|DAG Run Logical Date]]. | |||
|- | |||
| macros || || | |||
|- | |||
| next_ds || 2022-07-16 || | |||
|- | |||
| next_ds_nodash || 20220716 || | |||
|- | |||
| next_execution_date || 2022-07-16T04:12:58.698129+00:00 || | |||
|- | |||
| outlets || || | |||
|- | |||
|params || || | |||
|- | |||
| prev_data_interval_start_success ||2022-07-16T03:38:00.597026+00:00 || | |||
|- | |||
| prev_data_interval_end_success || 2022-07-16T03:38:00.597026+00:00 || | |||
|- | |||
|prev_ds ||2022-07-16 || | |||
|- | |||
| prev_ds_nodash ||20220716 || | |||
|- | |||
| prev_execution_date || 2022-07-16T04:12:58.698129+00:00 || | |||
|- | |||
| prev_execution_date_success || || | |||
|- | |||
| prev_start_date_success || 2022-07-16T03:38:01.381156+00:00 || | |||
|- | |||
| run_id || manual__2022-07-16T04:12:58.698129+00:00 || | |||
|- | |||
| task || || | |||
|- | |||
| <span id='task_instance'></span>task_instance || TaskInstance instance || | |||
|- | |||
| task_instance_key_str || task_experiment_2__task_d__20220716 || | |||
|- | |||
| test_mode || False || | |||
|- | |||
| ti || TaskInstance instance || See [[#task_instace|task_instance]]. | |||
|- | |||
| tomorrow_ds || 2022-07-17 || | |||
|- | |||
| tomorrow_ds_nodash || 20220717 || | |||
|- | |||
| ts || 2022-07-16T04:12:58.698129+00:00 || | |||
|- | |||
|ts_nodash ||20220716T041258 || | |||
|- | |||
| ts_nodash_with_tz || 20220716T041258.698129+0000 || | |||
|- | |||
| var || {'json': None, 'value': None} || | |||
|- | |||
| conn || None || | |||
|- | |||
| yesterday_ds || 2022-07-15 || | |||
|- | |||
|yesterday_ds_nodash || 20220715 || | |||
|- | |||
|templates_dict || None || | |||
|- | |||
|} | |||
==TaskGroup== | ==TaskGroup== | ||
{{External|https://airflow.apache.org/docs/apache-airflow/stable/concepts/dags.html#concepts-taskgroups}} | {{External|https://airflow.apache.org/docs/apache-airflow/stable/concepts/dags.html#concepts-taskgroups}} | ||
Line 116: | Line 347: | ||
==Per-Task Executor Configuration== | ==Per-Task Executor Configuration== | ||
{{External|https://airflow.apache.org/docs/apache-airflow/stable/concepts/tasks.html#executor-configuration}} | {{External|https://airflow.apache.org/docs/apache-airflow/stable/concepts/tasks.html#executor-configuration}} | ||
== | ==Task Logging== | ||
{{External|https://airflow.apache.org/docs/apache-airflow/ | See: {{Internal|Airflow_Logging_and_Monitoring#Logging_for_Tasks|Airflow Logging and Monitoring | Logging for Tasks}} | ||
=Branching= | |||
{{External|https://airflow.apache.org/docs/apache-airflow/2.0.0/concepts.html#branching}} | |||
=Workload= | =Workload= | ||
Line 128: | Line 358: | ||
=Executor= | =Executor= | ||
{{External|https://airflow.apache.org/docs/apache-airflow/stable/executor/index.html}} | {{External|https://airflow.apache.org/docs/apache-airflow/stable/executor/index.html}} | ||
Executors are the mechanism by which [[#Task_Instance|task instances]] get run. All executors have a common API and they are "pluggable", meaning they can be swapped based on operational needs. | |||
There is no need to run a separate executor process (though you can). For [[#Local_Executors|local executors]], the executor’s logic runs inside the [[#Scheduler|scheduler]] process. If a scheduler is running, then the executor is running. | |||
==Executor Types== | |||
===Local Executors=== | |||
Local executors run tasks locally inside the [[#Scheduler|scheduler]] process. | |||
* <span id='Debug_Executor'></span>[[Airflow Debug Executor#Overview|Debug Executor]] | |||
* <span id='Local_Executor'></span>[[Airflow Local Executor#Overview|Local Executor]] | |||
* <span id='Sequential_Executor'></span>[[Airflow Sequential Executor#Overview|Sequential Executor]]. Airflow comes configured with the Sequential Executor by default. | |||
===Remote Executors=== | |||
Remote executors run tasks remotely, usually via a [[#Pool|pool]] of [[#Worker|workers]]. | |||
* <span id='Celery_Executor'></span>[[Airflow Celery Executor#Overview|Celery Executor]] | |||
* <span id='CeleryKubernetes_Executor'></span>[[Airflow CeleryKubernetes Executor#Overview|CeleryKubernetes Executor]] | |||
* <span id='Dask_Executor'></span>[[Airflow Dask Executor#Overview|Dask Executor]] | |||
* <span id='Kuberentes_Executor'></span>[[Airflow Kuberentes Executor#Overview|Kuberentes Executor]] | |||
* <span id='LocalKubernetes_Executor'></span>[[Airflow LocalKubernetes Executor#Overview|LocalKubernetes Executor]] | |||
=Worker= | =Worker= | ||
Worker slot. | |||
=Metadata Database= | =Metadata Database= | ||
=Connections & Hooks= | =Connections & Hooks= | ||
{{External|https://airflow.apache.org/docs/apache-airflow/stable/concepts/connections.html}} | {{External|https://airflow.apache.org/docs/apache-airflow/stable/concepts/connections.html}} | ||
==Connection== | |||
{{External|https://airflow.apache.org/docs/apache-airflow/stable/howto/connection.html}} | |||
==Hook== | |||
{{External|https://airflow.apache.org/docs/apache-airflow/stable/concepts/connections.html#hooks}} | |||
{{External|https://airflow.apache.org/docs/apache-airflow/stable/python-api-ref.html#pythonapi-hooks}} | |||
=Pool= | =Pool= | ||
{{External|https://airflow.apache.org/docs/apache-airflow/stable/concepts/pools.html}} | {{External|https://airflow.apache.org/docs/apache-airflow/stable/concepts/pools.html}} | ||
=<span id='Macro'></span>Macros= | =<span id='Macro'></span>Macros= | ||
{{External|https://airflow.apache.org/docs/apache-airflow/stable/templates-ref.html#templates-ref}} | {{External|https://airflow.apache.org/docs/apache-airflow/stable/templates-ref.html#templates-ref}} | ||
=Timetables= | |||
{{External|https://airflow.apache.org/docs/apache-airflow/stable/concepts/timetable.html}} | |||
=Priority Weights= | |||
{{External|https://airflow.apache.org/docs/apache-airflow/stable/concepts/priority-weight.html}} | |||
=Cluster Policies= | |||
{{External|https://airflow.apache.org/docs/apache-airflow/stable/concepts/cluster-policies.html}} | |||
=Plugins= | |||
{{External|https://airflow.apache.org/docs/apache-airflow/stable/plugins.html}} | |||
=Security= | |||
{{Internal|Airflow Security#Overview|Airflow Security}} | |||
=Logging and Monitoring= | |||
{{Internal|Airflow Logging and Monitoring#Overview|Logging and Monitoring}} | |||
=Integration= | |||
{{External|https://airflow.apache.org/docs/apache-airflow/stable/integration.html}} | |||
=Python Module Management= | |||
{{External|https://airflow.apache.org/docs/apache-airflow/stable/modules_management.html}} | |||
=Trigger Rule= | |||
{{External|https://airflow.apache.org/docs/apache-airflow/2.0.0/concepts.html#trigger-rules}} | |||
=Jinja Support= | |||
{{External|https://airflow.apache.org/docs/apache-airflow/2.0.0/concepts.html#jinja-templating}} | |||
=Airflow Programming Model= | |||
{{Internal|Airflow_Programming_Model#Overview|Airflow Programming Model}} |
Latest revision as of 23:05, 18 July 2022
External
Internal
Workflow
DAG
The edges can be labeled in the UI.
DAG Name
When the DAG is declared with the DAG()
constructor, the name is the first argument of the constructor. When the DAG is declared with the @dag
decorator, the name of the DAG is the name of the function.
Declaring a DAG
DAGs are declared in Airflow Python script files, which are just configuration file specifying the DAG's structure as code. The actual tasks defined in it will run in a different context from the context of the script. Different tasks run on different workers at different points in time, which means that the script cannot be used to communicate between tasks. People sometimes think of the DAG definition file as a place where they can do some actual data processing. That is not the case at all. The script’s purpose is to define a DAG object. It needs to evaluate quickly (seconds, not minutes) since the scheduler will execute it periodically to reflect the changes if any.
The DAG definition is executed at task scheduling time, which allows for dynamic task mapping.
With @dag Decorator
With DAG() Constructor
Via ContextManager
The DAG can be declared via a context manager.
DAG Configuration Parameters
Programming model usage example:
schedule_interval
Optional parameter, if not specified, the default is (* * * * * )
which means the DAG will be scheduled every minute.
To prevent the DAG from being automatically scheduled at all, which is appropriate for manual triggering, use:
@dag(
schedule_interval=None,
[...]
)
start_date
Required parameter. Example:
from datetime import datetime
@dag(
start_date=datetime(2022, 7, 13, 0),
[...]
)
catchup
max_active_runs
dagrun_timeout
default_args
DAG Run
A DAG instantiates in a DAG Run at runtime. A DAG Run has an associated execution context.
Data Interval
TO PROCESS: https://airflow.apache.org/docs/apache-airflow/stable/dag-run.html#data-interval
Logical Date
TO PROCESS: https://airflow.apache.org/docs/apache-airflow/stable/dag-run.html#data-interval
External Triggers
Control Flow
Dynamic DAG or Dynamic Task Mapping
The DAGs can be purely declarative, or they can be declared in Python code, by adding tasks dynamically. For more details, see:
DAG File Processing
DAG Serialization
DAG Scope
DAG Configuration
Default Arguments
Task
A Task is the basic unit of execution in Airflow. Every task must be assigned to a DAG to run. Tasks have dependencies on each other. There could be upstream dependencies (if B depends on A, A → B, then A is an upstream dependency of B). To be scheduled, a task have all its dependencies met.
Task Relationships
The task relationships, are a key part in using Tasks.
There are two types of relationships: dependency and
Upstream and Downstream Dependency
If a task B has a dependency on task A (A → B), it is said that A is upstream of B and B is downstream of A. The dependencies are the directed edges of the directed acyclic graph.
The upstream term has a very strict semantics: an upstream task is the task that is directly preceding the other task. This concept does not describe the tasks that are higher in the task hierarchy (they are not a direct parent of the task). Same constrains apply to a downstream task, which need to be a direct child of the other task.
Previous and Next
There may also be instances of the same task, but for different data intervals - from other runs of the same DAG. These are previous and next.
Task Types
Airflow has three types of tasks: Operator, Sensor, which is a subclass of Operator, and TaskFlow-decorated Task. All these are subclasses of Airflow's BaseOperator
. Operators and Sensor are templates, and when one is called in a DAG, it is made into a Task.
Operator
An Operator is a predefined task template. Popular Airflow operators are:
To differentiate these task types from the TaskFlow-style, @task-annotated tasks, documentation refers to these as "classic style operators".
Document the operator's execute()
method.
For usage examples, see:
Sensor
Deferrable Operators and Triggers
@task Task Decorator, TaskFlow-decorated Task
Decorated with @task
. A custom Python function packaged up as a Task. For more details see:
Task Assignment to DAG
Task Instance
The same way a DAG is instantiated at runtime into a DAG Run, the tasks under a DAG are instantiated into task instances.
The task instance is passed as the first argument of the methods annotated with @task
, which provides the task behavior.
Task States
none
The task has not yet been queued for execution because its dependencies are not yet met.
scheduled
The task dependencies have been met, and the scheduled has determined that the task should run.
queued
The task has been assigned to an executor and it is awaiting a worker.
running
The task is running on a worker or on a local/synchronous executor.
success
The task finished running without errors.
shutdown
The task was externally requested to shut down when it was running.
restarting
The task was externally requested to restart when it was running.
failed
The task had an error during execution and failed to run.
skipped
The task was skipped due to branching, LatestOnly or mapping with expand()
.
upstream_failed
An upstream task failed and the Trigger Rule says we needed it.
up_for_retry
The task failed, but has retry attempts left and will be rescheduled.
up_for_reschedule
The task is a sensor that is in reschedule mode.
sensing
The task is a Smart Sensor.
deferred
The task has been deferred to a trigger.
removed
The task has vanished from the DAG since the run started.
Task Lifecycle
The normal lifecycle of a task instance is none → scheduled → queued → running → success.
Task Configuration and Data Exchange
Variables
Variables are an Airflow runtime configuration concept. Variables are maintained in a general key/value store, which is global and shared by the entire Airflow instance, and which can be queried from the tasks. While the variables can be created via API, they can also be created and updated via UI, gy going to Admin → Variables. Internally, the variables are stored in the variable table. They can be encrypted.
Programming model:
from airflow.models import Variable
# Normal call style
some_variable = Variable.get("some_variable")
# Auto-deserializes a JSON value
some_other_variable = Variable.get("some_other_variable", deserialize_json=True)
# Returns the value of default_var (None) if the variable is not set
some_variable_2 = Variable.get("some_variable_2", default_var=None)
The variables can be used from templates.
Variables are global and should be only used for overall configuration that covers the entire installation. To pass data to and from tasks, XComs should be used instead.
Params
Params are used to provide runtime configuration to tasks. When a DAG is started manually, its Params can be modified before the DAG run starts.
DAG-level Params
Task-level Params
XComs
Tasks pass data among each other using:
- XComs, when the amount of metadata to be exchanged is small.
- Uploading and downloading large files from a storage service.
More details:
Execution Context
What is the execution context?
An execution context is associated with a DAG run. The current execution context can be retrieved from a task, during the task execution. The context is not accessible during pre_execute
or post_execute
..
Execution Context Programming Model
@task
def task_a():
context = get_current_context()
print(context['dag'])
print(context['task'])
Execution Context as Data Sharing Medium
The context DOES NOT seem to be a valid method to exchange data between tasks. Even if a key/value pair can be placed in the context and retrieved locally by a task, the value does not seem to propagate to a subsequent task, even if the subsequent task declares a explicit dependency on the first task.
@task
def task_a():
context = get_current_context()
key = 'test_key'
context[key] = 'test_value'
print(f">>> task_a, context[{key}] = {context.get(key)}")
@task
def task_b():
context = get_current_context()
key = 'test_key'
print(f">>> task_b, context[{key}] = {context.get(key)}")
task_a() >> task_b()
task_a will display:
>>> task_a, context[test_key] = test_value
task_b will display:
>>> task_b, context[test_key] = None
Execution Context Keys
Airflow populates the context with keys like "dag", "task", and the associated values:
Key | Value | Note |
---|---|---|
conf | airflow.configuration.AirflowConfigParser instance |
|
dag | DAG instance |
|
dag_run | DAG Run instance | |
data_interval_start | 2022-07-16T04:12:58.698129+00:00 | |
data_interval_end | 2022-07-16T04:12:58.698129+00:00 | |
ds | 2022-07-16 | |
ds_nodash | 20220716 | |
execution_date | 2022-07-16T04:12:58.698129+00:00 | Deprecated. The execution date (logical date), same as dag_run.logical_date. |
inlets | ||
logical_date | 2022-07-16T04:12:58.698129+00:00 | See DAG Run Logical Date. |
macros | ||
next_ds | 2022-07-16 | |
next_ds_nodash | 20220716 | |
next_execution_date | 2022-07-16T04:12:58.698129+00:00 | |
outlets | ||
params | ||
prev_data_interval_start_success | 2022-07-16T03:38:00.597026+00:00 | |
prev_data_interval_end_success | 2022-07-16T03:38:00.597026+00:00 | |
prev_ds | 2022-07-16 | |
prev_ds_nodash | 20220716 | |
prev_execution_date | 2022-07-16T04:12:58.698129+00:00 | |
prev_execution_date_success | ||
prev_start_date_success | 2022-07-16T03:38:01.381156+00:00 | |
run_id | manual__2022-07-16T04:12:58.698129+00:00 | |
task | ||
task_instance | TaskInstance instance | |
task_instance_key_str | task_experiment_2__task_d__20220716 | |
test_mode | False | |
ti | TaskInstance instance | See task_instance. |
tomorrow_ds | 2022-07-17 | |
tomorrow_ds_nodash | 20220717 | |
ts | 2022-07-16T04:12:58.698129+00:00 | |
ts_nodash | 20220716T041258 | |
ts_nodash_with_tz | 20220716T041258.698129+0000 | |
var | {'json': None, 'value': None} | |
conn | None | |
yesterday_ds | 2022-07-15 | |
yesterday_ds_nodash | 20220715 | |
templates_dict | None |
TaskGroup
This is a pure UI concept.
Task Timeout
Task SLA
An SLA, or a Service Level Agreement, is an expectation for the maximum time a Task should take.
Zombie/Undead Tasks
Per-Task Executor Configuration
Task Logging
See:
Branching
Workload
Scheduler
Executor
Executors are the mechanism by which task instances get run. All executors have a common API and they are "pluggable", meaning they can be swapped based on operational needs.
There is no need to run a separate executor process (though you can). For local executors, the executor’s logic runs inside the scheduler process. If a scheduler is running, then the executor is running.
Executor Types
Local Executors
Local executors run tasks locally inside the scheduler process.
- Debug Executor
- Local Executor
- Sequential Executor. Airflow comes configured with the Sequential Executor by default.
Remote Executors
Remote executors run tasks remotely, usually via a pool of workers.
- Celery Executor
- CeleryKubernetes Executor
- Dask Executor
- Kuberentes Executor
- LocalKubernetes Executor
Worker
Worker slot.