Airflow Programming Model: Difference between revisions
Jump to navigation
Jump to search
(27 intermediate revisions by the same user not shown) | |||
Line 1: | Line 1: | ||
=External= | |||
* Examples: https://github.com/apache/airflow/tree/main/airflow/example_dags | |||
=Internal= | =Internal= | ||
* [[Airflow#Subjects|Airflow]] | * [[Airflow#Subjects|Airflow]] | ||
* [[Airflow Concepts]] | |||
=Overview= | =Overview= | ||
Airflow DAGs are programmed in Python. | |||
=Airflow DAG Examples= | |||
{{External|https://github.com/apache/airflow/tree/main/airflow/example_dags}} | |||
=<tt>requirements.txt</tt>= | =<tt>requirements.txt</tt>= | ||
<syntaxhighlight lang='text'> | <syntaxhighlight lang='text'> | ||
Line 8: | Line 17: | ||
=Declaring a DAG= | =Declaring a DAG= | ||
==With <tt>@dag</tt> Decorator== | |||
{{External|https://airflow.apache.org/docs/apache-airflow/stable/concepts/dags.html#the-dag-decorator}} | |||
The <code>@dag</code> decorator turns a Python function into a '''DAG generator function'''. <code>@dag</code> is only available in Airflow 2 and newer. The decorator can be configured with the a [[Airflow_Concepts#DAG_Configuration_Parameters|number of pre-defined parameters]]. The function includes task declaration and task relationship declaration, as follows: | |||
<span id='Decorator_Example'></span><syntaxhighlight lang='py'> | |||
from airflow.decorators import dag, task | |||
from datetime import datetime | |||
@dag( | |||
schedule_interval=None, | |||
start_date=datetime(2022, 7, 13, 0), | |||
catchup=False | |||
) | |||
def some_dag(): | |||
@task | |||
def task_a(): | |||
print(">>> A") | |||
@task | |||
def task_b(): | |||
print(">>> B") | |||
@task | |||
def task_c(): | |||
print(">>> C") | |||
task_a() >> task_b() >> task_c() | |||
dag = some_dag() | |||
</syntaxhighlight> | |||
Also see: {{Internal|Airflow_Concepts#Declaring_a_DAG|Airflow Concepts | Declaring a DAG}} | |||
==With <tt>DAG()</tt> Constructor== | ==With <tt>DAG()</tt> Constructor== | ||
Also see: {{Internal|Airflow_Concepts#Declaring_a_DAG|Airflow Concepts | Declaring a DAG}} | Also see: {{Internal|Airflow_Concepts#Declaring_a_DAG|Airflow Concepts | Declaring a DAG}} | ||
== | <span id='Constructor_Example'></span><syntaxhighlight lang='py'> | ||
import time | |||
from datetime import datetime, timedelta | |||
from airflow import DAG | |||
from airflow.operators.empty import EmptyOperator | |||
from airflow.operators.python import PythonOperator | |||
default_args = { | |||
'owner': 'somebody', | |||
"depends_on_past": False, | |||
"email": ["somebody@example.com"], | |||
"email_on_failure": True, | |||
"email_on_retry": False | |||
} | |||
dag = DAG('some_dag', | |||
max_active_runs=1, | |||
catchup=True, | |||
start_date=datetime(2022, 7, 13, 0), | |||
schedule_interval="* * * * *", | |||
dagrun_timeout=timedelta(minutes=2), | |||
default_args=default_args | |||
) | |||
def some_function(): | |||
print("something") | |||
with dag: | |||
start = EmptyOperator(task_id='start', dag=dag) | |||
end = EmptyOperator(task_id='end', dag=dag) | |||
some_function = PythonOperator( | |||
task_id='some_function', | |||
python_callable=some_function, | |||
dag=dag | |||
) | |||
start >> some_function >> end | |||
</syntaxhighlight> | |||
=Declaring a Task= | |||
==Using the <tt>PythonOperator</tt>== | |||
<span id='Constructor_Example'></span><syntaxhighlight lang='py'> | |||
from airflow import DAG | |||
from airflow.operators.python import PythonOperator | |||
dag = DAG('some_dag', [...]) | |||
def some_function(): | |||
print("something") | |||
with dag: | |||
some_function = PythonOperator( | |||
task_id='some_function', | |||
python_callable=some_function, | |||
dag=dag | |||
) | |||
[...] | |||
</syntaxhighlight> | |||
==<span id='Decorator_Example'></span>Using the <tt>@task</tt> Decorator== | |||
{{Internal|Airflow_TaskFlow#Programming_Model|Airflow TaskFlow | Programming Model}} | |||
==Combining <tt>@task</tt> and Operators== | |||
<syntaxhighlight lang='py'> | |||
from airflow.models.dag import DAG | |||
from airflow.decorators import task | |||
from airflow.operators.empty import EmptyOperator | |||
from datetime import datetime | |||
@task | |||
def task_a(): | |||
print(">>> task_a") | |||
with DAG( | |||
dag_id='some_dag', | |||
start_date=datetime(2022, 7, 13, 0) | |||
) as dag: | |||
start = EmptyOperator(task_id='start', dag=dag) | |||
end = EmptyOperator(task_id='end', dag=dag) | |||
@task | |||
def task_b(): | |||
print(">>> task_b") | |||
start >> task_a() >> task_b() >> end | |||
</syntaxhighlight> | |||
=XComs Programming Model= | |||
{{Internal|Airflow_XComs#Programming_Model|Airflow XComs | Programming Model}} | |||
=Module Management= | =Module Management= | ||
<font color=darkkhaki>TO PROCESS: https://airflow.apache.org/docs/apache-airflow/2.3.2/modules_management.html?highlight=import</font> | <font color=darkkhaki>TO PROCESS: https://airflow.apache.org/docs/apache-airflow/2.3.2/modules_management.html?highlight=import</font> |
Latest revision as of 01:46, 18 July 2022
External
Internal
Overview
Airflow DAGs are programmed in Python.
Airflow DAG Examples
requirements.txt
apache-airflow == 2.3.3
Declaring a DAG
With @dag Decorator
The @dag
decorator turns a Python function into a DAG generator function. @dag
is only available in Airflow 2 and newer. The decorator can be configured with the a number of pre-defined parameters. The function includes task declaration and task relationship declaration, as follows:
from airflow.decorators import dag, task
from datetime import datetime
@dag(
schedule_interval=None,
start_date=datetime(2022, 7, 13, 0),
catchup=False
)
def some_dag():
@task
def task_a():
print(">>> A")
@task
def task_b():
print(">>> B")
@task
def task_c():
print(">>> C")
task_a() >> task_b() >> task_c()
dag = some_dag()
Also see:
With DAG() Constructor
Also see:
import time
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.empty import EmptyOperator
from airflow.operators.python import PythonOperator
default_args = {
'owner': 'somebody',
"depends_on_past": False,
"email": ["somebody@example.com"],
"email_on_failure": True,
"email_on_retry": False
}
dag = DAG('some_dag',
max_active_runs=1,
catchup=True,
start_date=datetime(2022, 7, 13, 0),
schedule_interval="* * * * *",
dagrun_timeout=timedelta(minutes=2),
default_args=default_args
)
def some_function():
print("something")
with dag:
start = EmptyOperator(task_id='start', dag=dag)
end = EmptyOperator(task_id='end', dag=dag)
some_function = PythonOperator(
task_id='some_function',
python_callable=some_function,
dag=dag
)
start >> some_function >> end
Declaring a Task
Using the PythonOperator
from airflow import DAG
from airflow.operators.python import PythonOperator
dag = DAG('some_dag', [...])
def some_function():
print("something")
with dag:
some_function = PythonOperator(
task_id='some_function',
python_callable=some_function,
dag=dag
)
[...]
Using the @task Decorator
Combining @task and Operators
from airflow.models.dag import DAG
from airflow.decorators import task
from airflow.operators.empty import EmptyOperator
from datetime import datetime
@task
def task_a():
print(">>> task_a")
with DAG(
dag_id='some_dag',
start_date=datetime(2022, 7, 13, 0)
) as dag:
start = EmptyOperator(task_id='start', dag=dag)
end = EmptyOperator(task_id='end', dag=dag)
@task
def task_b():
print(">>> task_b")
start >> task_a() >> task_b() >> end
XComs Programming Model
Module Management
TO PROCESS: https://airflow.apache.org/docs/apache-airflow/2.3.2/modules_management.html?highlight=import