Airflow Concepts: Difference between revisions
(39 intermediate revisions by the same user not shown) | |||
Line 16: | Line 16: | ||
==Declaring a DAG== | ==Declaring a DAG== | ||
DAGs are declared in Airflow Python script files, which are just configuration file specifying the DAG's structure as code. The actual tasks defined in it will run in a different context from the context of the script. Different tasks run on different workers at different points in time, which means that the script cannot be used to communicate between tasks. People sometimes think of the DAG definition file as a place where they can do some actual data processing. That is not the case at all. The script’s purpose is to define a DAG object. It needs to evaluate quickly (seconds, not minutes) since the scheduler will execute it periodically to reflect the changes if any. | DAGs are declared in Airflow Python script files, which are just configuration file specifying the DAG's structure as code. The actual tasks defined in it will run in a different context from the context of the script. Different tasks run on different workers at different points in time, which means that the script cannot be used to communicate between tasks. People sometimes think of the DAG definition file as a place where they can do some actual data processing. That is not the case at all. The script’s purpose is to define a DAG object. It needs to evaluate quickly (seconds, not minutes) since the scheduler will execute it periodically to reflect the changes if any. | ||
<font color=darkkhaki>The DAG definition is executed at task scheduling time, which allows for [[Airflow_Dynamic_Task_Mapping#Overview|dynamic task mapping]].</font> | |||
===With <tt>@dag</tt> Decorator=== | ===With <tt>@dag</tt> Decorator=== | ||
{{Internal|Airflow_Programming_Model#With_.40dag_Decorator|Declare a DAG with <code>@dag</code> decorator}} | {{Internal|Airflow_Programming_Model#With_.40dag_Decorator|Declare a DAG with <code>@dag</code> decorator}} | ||
Line 26: | Line 28: | ||
Programming model usage example: {{Internal|Airflow_Programming_Model#With_.40dag_Decorator|DAG Configuration Parameters Programming Model}} | Programming model usage example: {{Internal|Airflow_Programming_Model#With_.40dag_Decorator|DAG Configuration Parameters Programming Model}} | ||
====<tt>schedule_interval</tt>==== | ====<tt>schedule_interval</tt>==== | ||
Optional parameter, if not specified, <font color=darkkhaki>the default is <code>(* * * * * )</code> which means the DAG will be scheduled every minute.</font> | |||
To prevent the DAG from being automatically scheduled at all, which is appropriate for manual triggering, use: | |||
<syntaxhighlight lang='py'> | |||
@dag( | |||
schedule_interval=None, | |||
[...] | |||
) | |||
</syntaxhighlight> | |||
====<tt>start_date</tt>==== | ====<tt>start_date</tt>==== | ||
Required parameter. | Required parameter. Example: | ||
<syntaxhighlight lang='py'> | |||
from datetime import datetime | |||
@dag( | |||
start_date=datetime(2022, 7, 13, 0), | |||
[...] | |||
) | |||
</syntaxhighlight> | |||
====<tt>catchup</tt>==== | ====<tt>catchup</tt>==== | ||
====<tt>max_active_runs</tt>==== | ====<tt>max_active_runs</tt>==== | ||
Line 39: | Line 57: | ||
{{External|https://airflow.apache.org/docs/apache-airflow/stable/dag-run.html}} | {{External|https://airflow.apache.org/docs/apache-airflow/stable/dag-run.html}} | ||
A DAG instantiates in a DAG Run at runtime. A DAG Run has an associated [[#Execution_Context|execution context]]. | A DAG instantiates in a DAG Run at runtime. A DAG Run has an associated [[#Execution_Context|execution context]]. | ||
===Data Interval=== | |||
<font color=darkkhaki>TO PROCESS: https://airflow.apache.org/docs/apache-airflow/stable/dag-run.html#data-interval</font> | |||
===Logical Date=== | |||
<font color=darkkhaki>TO PROCESS: https://airflow.apache.org/docs/apache-airflow/stable/dag-run.html#data-interval</font> | |||
===External Triggers=== | ===External Triggers=== | ||
{{External|https://airflow.apache.org/docs/apache-airflow/stable/dag-run.html#external-triggers}} | {{External|https://airflow.apache.org/docs/apache-airflow/stable/dag-run.html#external-triggers}} | ||
Line 79: | Line 103: | ||
===Operator=== | ===Operator=== | ||
{{External|https://airflow.apache.org/docs/apache-airflow/stable/concepts/operators.html}} | {{External|https://airflow.apache.org/docs/apache-airflow/stable/concepts/operators.html}} | ||
{{External|https://airflow.apache.org/docs/apache-airflow/stable/operators-and-hooks-ref.html}} | |||
{{External|https://airflow.apache.org/docs/apache-airflow/2.0.0/concepts.html#operators}} | {{External|https://airflow.apache.org/docs/apache-airflow/2.0.0/concepts.html#operators}} | ||
An Operator is a predefined task template. | {{External|https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/operators/index.html}} | ||
An Operator is a predefined task template. Popular Airflow operators are: | |||
* [[Airflow EmptyOperator#Overview|EmptyOperator]] | |||
* [[Airflow BashOperator#Overview|BashOperator]] | |||
* [[Airflow PythonOperator#Overview|PythonOperator]] | |||
To differentiate these task types from the [[#.40task_Task_Decorator.2C_TaskFlow-decorated_Task|TaskFlow-style, @task-annotated tasks]], documentation refers to these as "classic style operators". | |||
<font color=darkkhaki>Document the operator's <code>execute()</code> method.</font> | |||
For usage examples, see: {{Internal|Airflow_Programming_Model#Overview|Airflow Programming Model}} | |||
====Sensor==== | ====Sensor==== | ||
{{ | {{Internal|Airflow Sensor#Overview|Airflow Sensors}} | ||
====Deferrable Operators and Triggers==== | |||
{{ | {{Internal|Airflow Deferrable Operators|Airflow Deferrable Operators}} | ||
===<span id='Task_Decorator_.40task'></span><span id='TaskFlow-decorated_Task'></span><span id='TaskFlow'></span><span id='Bjy4L'></span><tt>@task</tt> Task Decorator, TaskFlow-decorated Task=== | ===<span id='Task_Decorator_.40task'></span><span id='TaskFlow-decorated_Task'></span><span id='TaskFlow'></span><span id='Bjy4L'></span><tt>@task</tt> Task Decorator, TaskFlow-decorated Task=== | ||
Line 118: | Line 149: | ||
The task had an error during execution and failed to run. | The task had an error during execution and failed to run. | ||
====<tt>skipped</tt>==== | ====<tt>skipped</tt>==== | ||
The task was skipped due to branching, LatestOnly or | The task was skipped due to branching, LatestOnly or mapping with <code>[[Airflow_Dynamic_Task_Mapping#expand.28.29_Argument|expand()]]</code>. | ||
====<tt>upstream_failed</tt>==== | ====<tt>upstream_failed</tt>==== | ||
An upstream task failed and the Trigger Rule says we needed it. | An upstream task failed and the Trigger Rule says we needed it. | ||
Line 180: | Line 212: | ||
An execution context is associated with a [[#DAG_Run|DAG run]]. The current execution context can be retrieved from a task, during the task execution. <font color=darkkhaki>The context is not accessible during <code>pre_execute</code> or <code>post_execute</code>.</font>. | An execution context is associated with a [[#DAG_Run|DAG run]]. The current execution context can be retrieved from a task, during the task execution. <font color=darkkhaki>The context is not accessible during <code>pre_execute</code> or <code>post_execute</code>.</font>. | ||
Programming | ====Execution Context Programming Model==== | ||
<syntaxhighlight lang='py'> | |||
@task | |||
def task_a(): | |||
context = get_current_context() | |||
print(context['dag']) | |||
print(context['task']) | |||
</syntaxhighlight> | |||
====Execution Context as Data Sharing Medium==== | |||
The context DOES NOT seem to be a valid method to exchange data between tasks. Even if a key/value pair can be placed in the context and retrieved locally by a task, the value does not seem to propagate to a subsequent task, even if the subsequent task declares a explicit dependency on the first task. | |||
<syntaxhighlight lang='py'> | <syntaxhighlight lang='py'> | ||
@task | |||
def task_a(): | |||
context = get_current_context() | |||
key = 'test_key' | |||
context[key] = 'test_value' | |||
print(f">>> task_a, context[{key}] = {context.get(key)}") | |||
@task | @task | ||
def | def task_b(): | ||
context = get_current_context() | |||
key = 'test_key' | |||
print(f">>> task_b, context[{key}] = {context.get(key)}") | |||
task_a() >> task_b() | |||
</syntaxhighlight> | |||
task_a will display: | |||
<syntaxhighlight lang='text'> | |||
>>> task_a, context[test_key] = test_value | |||
</syntaxhighlight> | |||
task_b will display: | |||
<syntaxhighlight lang='text'> | |||
>>> task_b, context[test_key] = None | |||
</syntaxhighlight> | </syntaxhighlight> | ||
====Execution Context Keys==== | |||
Airflow populates the context with keys like "dag", "task", and the associated values: | Airflow populates the context with keys like "dag", "task", and the associated values: | ||
{| class="wikitable" style="text-align: left;" | {| class="wikitable" style="text-align: left;" | ||
Line 212: | Line 269: | ||
| ds_nodash || 20220716 || | | ds_nodash || 20220716 || | ||
|- | |- | ||
| execution_date || 2022-07-16T04:12:58.698129+00:00 || | | <span id='execution_date'></span>execution_date || 2022-07-16T04:12:58.698129+00:00 || Deprecated. The execution date (logical date), same as [[#logical_date|dag_run.logical_date]]. | ||
|- | |- | ||
| inlets || || | | inlets || || | ||
|- | |- | ||
| logical_date || 2022-07-16T04:12:58.698129+00:00 || | | <span id='logical_date'></span>logical_date || 2022-07-16T04:12:58.698129+00:00 || See [[#Logical_Date|DAG Run Logical Date]]. | ||
|- | |- | ||
| macros || || | | macros || || | ||
Line 277: | Line 334: | ||
|- | |- | ||
|} | |} | ||
==TaskGroup== | ==TaskGroup== | ||
Line 292: | Line 347: | ||
==Per-Task Executor Configuration== | ==Per-Task Executor Configuration== | ||
{{External|https://airflow.apache.org/docs/apache-airflow/stable/concepts/tasks.html#executor-configuration}} | {{External|https://airflow.apache.org/docs/apache-airflow/stable/concepts/tasks.html#executor-configuration}} | ||
==Task Logging== | ==Task Logging== | ||
See: {{Internal|Airflow_Logging_and_Monitoring#Logging_for_Tasks|Airflow Logging and Monitoring | Logging for Tasks}} | See: {{Internal|Airflow_Logging_and_Monitoring#Logging_for_Tasks|Airflow Logging and Monitoring | Logging for Tasks}} | ||
Line 325: | Line 378: | ||
=Worker= | =Worker= | ||
Worker slot. | |||
=Metadata Database= | =Metadata Database= | ||
=Connections & Hooks= | =Connections & Hooks= |
Latest revision as of 23:05, 18 July 2022
External
Internal
Workflow
DAG
The edges can be labeled in the UI.
DAG Name
When the DAG is declared with the DAG()
constructor, the name is the first argument of the constructor. When the DAG is declared with the @dag
decorator, the name of the DAG is the name of the function.
Declaring a DAG
DAGs are declared in Airflow Python script files, which are just configuration file specifying the DAG's structure as code. The actual tasks defined in it will run in a different context from the context of the script. Different tasks run on different workers at different points in time, which means that the script cannot be used to communicate between tasks. People sometimes think of the DAG definition file as a place where they can do some actual data processing. That is not the case at all. The script’s purpose is to define a DAG object. It needs to evaluate quickly (seconds, not minutes) since the scheduler will execute it periodically to reflect the changes if any.
The DAG definition is executed at task scheduling time, which allows for dynamic task mapping.
With @dag Decorator
With DAG() Constructor
Via ContextManager
The DAG can be declared via a context manager.
DAG Configuration Parameters
Programming model usage example:
schedule_interval
Optional parameter, if not specified, the default is (* * * * * )
which means the DAG will be scheduled every minute.
To prevent the DAG from being automatically scheduled at all, which is appropriate for manual triggering, use:
@dag(
schedule_interval=None,
[...]
)
start_date
Required parameter. Example:
from datetime import datetime
@dag(
start_date=datetime(2022, 7, 13, 0),
[...]
)
catchup
max_active_runs
dagrun_timeout
default_args
DAG Run
A DAG instantiates in a DAG Run at runtime. A DAG Run has an associated execution context.
Data Interval
TO PROCESS: https://airflow.apache.org/docs/apache-airflow/stable/dag-run.html#data-interval
Logical Date
TO PROCESS: https://airflow.apache.org/docs/apache-airflow/stable/dag-run.html#data-interval
External Triggers
Control Flow
Dynamic DAG or Dynamic Task Mapping
The DAGs can be purely declarative, or they can be declared in Python code, by adding tasks dynamically. For more details, see:
DAG File Processing
DAG Serialization
DAG Scope
DAG Configuration
Default Arguments
Task
A Task is the basic unit of execution in Airflow. Every task must be assigned to a DAG to run. Tasks have dependencies on each other. There could be upstream dependencies (if B depends on A, A → B, then A is an upstream dependency of B). To be scheduled, a task have all its dependencies met.
Task Relationships
The task relationships, are a key part in using Tasks.
There are two types of relationships: dependency and
Upstream and Downstream Dependency
If a task B has a dependency on task A (A → B), it is said that A is upstream of B and B is downstream of A. The dependencies are the directed edges of the directed acyclic graph.
The upstream term has a very strict semantics: an upstream task is the task that is directly preceding the other task. This concept does not describe the tasks that are higher in the task hierarchy (they are not a direct parent of the task). Same constrains apply to a downstream task, which need to be a direct child of the other task.
Previous and Next
There may also be instances of the same task, but for different data intervals - from other runs of the same DAG. These are previous and next.
Task Types
Airflow has three types of tasks: Operator, Sensor, which is a subclass of Operator, and TaskFlow-decorated Task. All these are subclasses of Airflow's BaseOperator
. Operators and Sensor are templates, and when one is called in a DAG, it is made into a Task.
Operator
An Operator is a predefined task template. Popular Airflow operators are:
To differentiate these task types from the TaskFlow-style, @task-annotated tasks, documentation refers to these as "classic style operators".
Document the operator's execute()
method.
For usage examples, see:
Sensor
Deferrable Operators and Triggers
@task Task Decorator, TaskFlow-decorated Task
Decorated with @task
. A custom Python function packaged up as a Task. For more details see:
Task Assignment to DAG
Task Instance
The same way a DAG is instantiated at runtime into a DAG Run, the tasks under a DAG are instantiated into task instances.
The task instance is passed as the first argument of the methods annotated with @task
, which provides the task behavior.
Task States
none
The task has not yet been queued for execution because its dependencies are not yet met.
scheduled
The task dependencies have been met, and the scheduled has determined that the task should run.
queued
The task has been assigned to an executor and it is awaiting a worker.
running
The task is running on a worker or on a local/synchronous executor.
success
The task finished running without errors.
shutdown
The task was externally requested to shut down when it was running.
restarting
The task was externally requested to restart when it was running.
failed
The task had an error during execution and failed to run.
skipped
The task was skipped due to branching, LatestOnly or mapping with expand()
.
upstream_failed
An upstream task failed and the Trigger Rule says we needed it.
up_for_retry
The task failed, but has retry attempts left and will be rescheduled.
up_for_reschedule
The task is a sensor that is in reschedule mode.
sensing
The task is a Smart Sensor.
deferred
The task has been deferred to a trigger.
removed
The task has vanished from the DAG since the run started.
Task Lifecycle
The normal lifecycle of a task instance is none → scheduled → queued → running → success.
Task Configuration and Data Exchange
Variables
Variables are an Airflow runtime configuration concept. Variables are maintained in a general key/value store, which is global and shared by the entire Airflow instance, and which can be queried from the tasks. While the variables can be created via API, they can also be created and updated via UI, gy going to Admin → Variables. Internally, the variables are stored in the variable table. They can be encrypted.
Programming model:
from airflow.models import Variable
# Normal call style
some_variable = Variable.get("some_variable")
# Auto-deserializes a JSON value
some_other_variable = Variable.get("some_other_variable", deserialize_json=True)
# Returns the value of default_var (None) if the variable is not set
some_variable_2 = Variable.get("some_variable_2", default_var=None)
The variables can be used from templates.
Variables are global and should be only used for overall configuration that covers the entire installation. To pass data to and from tasks, XComs should be used instead.
Params
Params are used to provide runtime configuration to tasks. When a DAG is started manually, its Params can be modified before the DAG run starts.
DAG-level Params
Task-level Params
XComs
Tasks pass data among each other using:
- XComs, when the amount of metadata to be exchanged is small.
- Uploading and downloading large files from a storage service.
More details:
Execution Context
What is the execution context?
An execution context is associated with a DAG run. The current execution context can be retrieved from a task, during the task execution. The context is not accessible during pre_execute
or post_execute
..
Execution Context Programming Model
@task
def task_a():
context = get_current_context()
print(context['dag'])
print(context['task'])
Execution Context as Data Sharing Medium
The context DOES NOT seem to be a valid method to exchange data between tasks. Even if a key/value pair can be placed in the context and retrieved locally by a task, the value does not seem to propagate to a subsequent task, even if the subsequent task declares a explicit dependency on the first task.
@task
def task_a():
context = get_current_context()
key = 'test_key'
context[key] = 'test_value'
print(f">>> task_a, context[{key}] = {context.get(key)}")
@task
def task_b():
context = get_current_context()
key = 'test_key'
print(f">>> task_b, context[{key}] = {context.get(key)}")
task_a() >> task_b()
task_a will display:
>>> task_a, context[test_key] = test_value
task_b will display:
>>> task_b, context[test_key] = None
Execution Context Keys
Airflow populates the context with keys like "dag", "task", and the associated values:
Key | Value | Note |
---|---|---|
conf | airflow.configuration.AirflowConfigParser instance |
|
dag | DAG instance |
|
dag_run | DAG Run instance | |
data_interval_start | 2022-07-16T04:12:58.698129+00:00 | |
data_interval_end | 2022-07-16T04:12:58.698129+00:00 | |
ds | 2022-07-16 | |
ds_nodash | 20220716 | |
execution_date | 2022-07-16T04:12:58.698129+00:00 | Deprecated. The execution date (logical date), same as dag_run.logical_date. |
inlets | ||
logical_date | 2022-07-16T04:12:58.698129+00:00 | See DAG Run Logical Date. |
macros | ||
next_ds | 2022-07-16 | |
next_ds_nodash | 20220716 | |
next_execution_date | 2022-07-16T04:12:58.698129+00:00 | |
outlets | ||
params | ||
prev_data_interval_start_success | 2022-07-16T03:38:00.597026+00:00 | |
prev_data_interval_end_success | 2022-07-16T03:38:00.597026+00:00 | |
prev_ds | 2022-07-16 | |
prev_ds_nodash | 20220716 | |
prev_execution_date | 2022-07-16T04:12:58.698129+00:00 | |
prev_execution_date_success | ||
prev_start_date_success | 2022-07-16T03:38:01.381156+00:00 | |
run_id | manual__2022-07-16T04:12:58.698129+00:00 | |
task | ||
task_instance | TaskInstance instance | |
task_instance_key_str | task_experiment_2__task_d__20220716 | |
test_mode | False | |
ti | TaskInstance instance | See task_instance. |
tomorrow_ds | 2022-07-17 | |
tomorrow_ds_nodash | 20220717 | |
ts | 2022-07-16T04:12:58.698129+00:00 | |
ts_nodash | 20220716T041258 | |
ts_nodash_with_tz | 20220716T041258.698129+0000 | |
var | {'json': None, 'value': None} | |
conn | None | |
yesterday_ds | 2022-07-15 | |
yesterday_ds_nodash | 20220715 | |
templates_dict | None |
TaskGroup
This is a pure UI concept.
Task Timeout
Task SLA
An SLA, or a Service Level Agreement, is an expectation for the maximum time a Task should take.
Zombie/Undead Tasks
Per-Task Executor Configuration
Task Logging
See:
Branching
Workload
Scheduler
Executor
Executors are the mechanism by which task instances get run. All executors have a common API and they are "pluggable", meaning they can be swapped based on operational needs.
There is no need to run a separate executor process (though you can). For local executors, the executor’s logic runs inside the scheduler process. If a scheduler is running, then the executor is running.
Executor Types
Local Executors
Local executors run tasks locally inside the scheduler process.
- Debug Executor
- Local Executor
- Sequential Executor. Airflow comes configured with the Sequential Executor by default.
Remote Executors
Remote executors run tasks remotely, usually via a pool of workers.
- Celery Executor
- CeleryKubernetes Executor
- Dask Executor
- Kuberentes Executor
- LocalKubernetes Executor
Worker
Worker slot.