Airflow Programming Model: Difference between revisions

From NovaOrdis Knowledge Base
Jump to navigation Jump to search
 
(14 intermediate revisions by the same user not shown)
Line 1: Line 1:
=External=
* Examples: https://github.com/apache/airflow/tree/main/airflow/example_dags
=Internal=
=Internal=
* [[Airflow#Subjects|Airflow]]
* [[Airflow#Subjects|Airflow]]
* [[Airflow Concepts]]
=Overview=
=Overview=
Airflow DAGs are programmed in Python.
=Airflow DAG Examples=
=Airflow DAG Examples=
{{External|https://github.com/apache/airflow/tree/main/airflow/example_dags}}
{{External|https://github.com/apache/airflow/tree/main/airflow/example_dags}}
Line 12: Line 18:
=Declaring a DAG=
=Declaring a DAG=
==With <tt>@dag</tt> Decorator==
==With <tt>@dag</tt> Decorator==
{{External|https://airflow.apache.org/docs/apache-airflow/stable/concepts/dags.html#the-dag-decorator}}
The <code>@dag</code> decorator turns a Python function into a '''DAG generator function'''. <code>@dag</code> is only available in Airflow 2 and newer. The decorator can be configured with the a [[Airflow_Concepts#DAG_Configuration_Parameters|number of pre-defined parameters]]. The function includes task declaration and task relationship declaration, as follows:
The <code>@dag</code> decorator turns a Python function into a '''DAG generator function'''. <code>@dag</code> is only available in Airflow 2 and newer. The decorator can be configured with the a [[Airflow_Concepts#DAG_Configuration_Parameters|number of pre-defined parameters]]. The function includes task declaration and task relationship declaration, as follows:
<syntaxhighlight lang='py'>
<span id='Decorator_Example'></span><syntaxhighlight lang='py'>
from airflow.decorators import dag, task
from datetime import datetime
 
 
@dag(
    schedule_interval=None,
    start_date=datetime(2022, 7, 13, 0),
    catchup=False
)
def some_dag():
    @task
    def task_a():
        print(">>> A")
 
    @task
    def task_b():
        print(">>> B")


</syntaxhighlight>
    @task
    def task_c():
        print(">>> C")


    task_a() >> task_b() >> task_c()


dag = some_dag()
</syntaxhighlight>


Also see: {{Internal|Airflow_Concepts#Declaring_a_DAG|Airflow Concepts &#124; Declaring a DAG}}
Also see: {{Internal|Airflow_Concepts#Declaring_a_DAG|Airflow Concepts &#124; Declaring a DAG}}
<font color=darkkhaki>TO PARSE:
* https://airflow.apache.org/docs/apache-airflow/stable/concepts/dags.html#the-dag-decorator
* https://airflow.apache.org/docs/apache-airflow/2.0.0/concepts.html#dag-decorator</font>.


==With <tt>DAG()</tt> Constructor==
==With <tt>DAG()</tt> Constructor==
Also see: {{Internal|Airflow_Concepts#Declaring_a_DAG|Airflow Concepts &#124; Declaring a DAG}}
Also see: {{Internal|Airflow_Concepts#Declaring_a_DAG|Airflow Concepts &#124; Declaring a DAG}}
<syntaxhighlight lang='py'>
<span id='Constructor_Example'></span><syntaxhighlight lang='py'>
import time
import time
from datetime import datetime, timedelta
from datetime import datetime, timedelta
Line 71: Line 97:
=Declaring a Task=
=Declaring a Task=
==Using the <tt>PythonOperator</tt>==
==Using the <tt>PythonOperator</tt>==
==Using the <tt>@task</tt> Decorator==
<span id='Constructor_Example'></span><syntaxhighlight lang='py'>
from airflow import DAG
from airflow.operators.python import PythonOperator
 
dag = DAG('some_dag', [...])
 
def some_function():
    print("something")
 
with dag:
    some_function = PythonOperator(
        task_id='some_function',
        python_callable=some_function,
        dag=dag
    )
 
    [...]
</syntaxhighlight>
 
==<span id='Decorator_Example'></span>Using the <tt>@task</tt> Decorator==
{{Internal|Airflow_TaskFlow#Programming_Model|Airflow TaskFlow &#124; Programming Model}}
==Combining <tt>@task</tt> and Operators==
<syntaxhighlight lang='py'>
from airflow.models.dag import DAG
from airflow.decorators import task
from airflow.operators.empty import EmptyOperator
from datetime import datetime
 
@task
def task_a():
    print(">>> task_a")
 
with DAG(
        dag_id='some_dag',
        start_date=datetime(2022, 7, 13, 0)
) as dag:
    start = EmptyOperator(task_id='start', dag=dag)
    end = EmptyOperator(task_id='end', dag=dag)
 
    @task
    def task_b():
        print(">>> task_b")
 
    start >> task_a() >> task_b() >> end
</syntaxhighlight>
 
=XComs Programming Model=
{{Internal|Airflow_XComs#Programming_Model|Airflow XComs &#124; Programming Model}}


=Module Management=
=Module Management=
<font color=darkkhaki>TO PROCESS: https://airflow.apache.org/docs/apache-airflow/2.3.2/modules_management.html?highlight=import</font>
<font color=darkkhaki>TO PROCESS: https://airflow.apache.org/docs/apache-airflow/2.3.2/modules_management.html?highlight=import</font>

Latest revision as of 01:46, 18 July 2022

External

Internal

Overview

Airflow DAGs are programmed in Python.

Airflow DAG Examples

https://github.com/apache/airflow/tree/main/airflow/example_dags

requirements.txt

apache-airflow == 2.3.3

Declaring a DAG

With @dag Decorator

https://airflow.apache.org/docs/apache-airflow/stable/concepts/dags.html#the-dag-decorator

The @dag decorator turns a Python function into a DAG generator function. @dag is only available in Airflow 2 and newer. The decorator can be configured with the a number of pre-defined parameters. The function includes task declaration and task relationship declaration, as follows:

from airflow.decorators import dag, task
from datetime import datetime


@dag(
    schedule_interval=None,
    start_date=datetime(2022, 7, 13, 0),
    catchup=False
)
def some_dag():
    @task
    def task_a():
        print(">>> A")

    @task
    def task_b():
        print(">>> B")

    @task
    def task_c():
        print(">>> C")

    task_a() >> task_b() >> task_c()


dag = some_dag()

Also see:

Airflow Concepts | Declaring a DAG

With DAG() Constructor

Also see:

Airflow Concepts | Declaring a DAG

import time
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.empty import EmptyOperator
from airflow.operators.python import PythonOperator

default_args = {
    'owner': 'somebody',
    "depends_on_past": False,
    "email": ["somebody@example.com"],
    "email_on_failure": True,
    "email_on_retry": False
}

dag = DAG('some_dag',
          max_active_runs=1,
          catchup=True,
          start_date=datetime(2022, 7, 13, 0),
          schedule_interval="* * * * *",
          dagrun_timeout=timedelta(minutes=2),
          default_args=default_args
          )


def some_function():
    print("something")


with dag:
    start = EmptyOperator(task_id='start', dag=dag)
    end = EmptyOperator(task_id='end', dag=dag)
    some_function = PythonOperator(
        task_id='some_function',
        python_callable=some_function,
        dag=dag
    )

    start >> some_function >> end

Declaring a Task

Using the PythonOperator

from airflow import DAG
from airflow.operators.python import PythonOperator

dag = DAG('some_dag', [...])

def some_function():
    print("something")

with dag:
    some_function = PythonOperator(
        task_id='some_function',
        python_callable=some_function,
        dag=dag
    )

    [...]

Using the @task Decorator

Airflow TaskFlow | Programming Model

Combining @task and Operators

from airflow.models.dag import DAG
from airflow.decorators import task
from airflow.operators.empty import EmptyOperator
from datetime import datetime

@task
def task_a():
    print(">>> task_a")

with DAG(
        dag_id='some_dag',
        start_date=datetime(2022, 7, 13, 0)
) as dag:
    start = EmptyOperator(task_id='start', dag=dag)
    end = EmptyOperator(task_id='end', dag=dag)

    @task
    def task_b():
        print(">>> task_b")

    start >> task_a() >> task_b() >> end

XComs Programming Model

Airflow XComs | Programming Model

Module Management

TO PROCESS: https://airflow.apache.org/docs/apache-airflow/2.3.2/modules_management.html?highlight=import