Airflow Programming Model: Difference between revisions
Jump to navigation
Jump to search
Line 10: | Line 10: | ||
==With <tt>DAG()</tt> Constructor== | ==With <tt>DAG()</tt> Constructor== | ||
Also see: {{Internal|Airflow_Concepts#Declaring_a_DAG|Airflow Concepts | Declaring a DAG}} | Also see: {{Internal|Airflow_Concepts#Declaring_a_DAG|Airflow Concepts | Declaring a DAG}} | ||
<syntaxhighlight lang='py'> | |||
import time | |||
from datetime import datetime, timedelta | |||
from airflow import DAG | |||
from airflow.operators.empty import EmptyOperator | |||
from airflow.operators.python import PythonOperator | |||
default_args = { | |||
'owner': 'somebody', | |||
"depends_on_past": False, | |||
"email": ["somebody@example.com"], | |||
"email_on_failure": True, | |||
"email_on_retry": False | |||
} | |||
dag = DAG('some_dag', | |||
max_active_runs=1, | |||
catchup=True, | |||
start_date=datetime(2022, 7, 13, 0), | |||
schedule_interval="* * * * *", | |||
dagrun_timeout=timedelta(minutes=2), | |||
default_args=default_args | |||
) | |||
def some_function(): | |||
print("something") | |||
with dag: | |||
start = EmptyOperator(task_id='start', dag=dag) | |||
end = EmptyOperator(task_id='end', dag=dag) | |||
some_function = PythonOperator( | |||
task_id='sleep', | |||
python_callable=some_function, | |||
dag=dag | |||
) | |||
start >> some_function >> end | |||
</syntaxhighlight> | |||
==With <tt>@dag</tt> Decorator== | ==With <tt>@dag</tt> Decorator== | ||
Also see: {{Internal|Airflow_Concepts#Declaring_a_DAG|Airflow Concepts | Declaring a DAG}} | Also see: {{Internal|Airflow_Concepts#Declaring_a_DAG|Airflow Concepts | Declaring a DAG}} |
Revision as of 01:43, 14 July 2022
Internal
Overview
requirements.txt
apache-airflow == 2.3.3
Declaring a DAG
With DAG() Constructor
Also see:
import time
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.empty import EmptyOperator
from airflow.operators.python import PythonOperator
default_args = {
'owner': 'somebody',
"depends_on_past": False,
"email": ["somebody@example.com"],
"email_on_failure": True,
"email_on_retry": False
}
dag = DAG('some_dag',
max_active_runs=1,
catchup=True,
start_date=datetime(2022, 7, 13, 0),
schedule_interval="* * * * *",
dagrun_timeout=timedelta(minutes=2),
default_args=default_args
)
def some_function():
print("something")
with dag:
start = EmptyOperator(task_id='start', dag=dag)
end = EmptyOperator(task_id='end', dag=dag)
some_function = PythonOperator(
task_id='sleep',
python_callable=some_function,
dag=dag
)
start >> some_function >> end
With @dag Decorator
Also see:
TO PARSE:
- https://airflow.apache.org/docs/apache-airflow/stable/concepts/dags.html#the-dag-decorator
- https://airflow.apache.org/docs/apache-airflow/2.0.0/concepts.html#dag-decorator.
Module Management
TO PROCESS: https://airflow.apache.org/docs/apache-airflow/2.3.2/modules_management.html?highlight=import