Airflow XComs: Difference between revisions
(34 intermediate revisions by the same user not shown) | |||
Line 7: | Line 7: | ||
* [[Airflow_Concepts#XComs|Airflow Concepts]] | * [[Airflow_Concepts#XComs|Airflow Concepts]] | ||
=Overview= | =<span id='Concepts'></span>Overview= | ||
XComs is one of the methods [[Airflow_Concepts#Task|tasks]] use to exchange data. | XComs is one of the methods [[Airflow_Concepts#Task|tasks]] use to exchange data. Tasks communicate using inputs and outputs, and the XComs ("cross-communications") mechanism is an implementation of this pattern. By default, tasks are entirely isolated and may be running on entirely different machines so when they exchange data, the data must be serializable. | ||
Tasks communicate using inputs and outputs, and the XComs ("cross-communications") mechanism is an implementation of this pattern. By default, tasks are entirely isolated and may be running on entirely different machines so when they exchange data, the data must be serializable. | |||
An XCom is identified by a <code>key</code>, which is the XCom's name, as well as the <code>task_id</code> and <code>dag_id</code> it came from. The XCom can have any serializable value, however it must be relatively small. If there is need to exchange large amounts of data, this is usually done uploading and downloading large files from a storage service. | An XCom is identified by a <code>key</code>, which is the XCom's name, as well as the <code>task_id</code> and <code>dag_id</code> it came from. The XCom can have any serializable value, however it must be relatively small. If there is need to exchange large amounts of data, this is usually done uploading and downloading large files from a storage service. | ||
XComs are explicitly “pushed” and “pulled” to/from their storage using the <code>xcom_push()</code> and <code>xcom_pull()</code> methods on [[Airflow_Concepts#Task_Instance|task instances]]. For more details see [[# | XComs are explicitly “pushed” and “pulled” to/from their storage using the <code>xcom_push()</code> and <code>xcom_pull()</code> methods on [[Airflow_Concepts#Task_Instance|task instances]]. For more details see [[#Ingesting_Input_into_a_Task_with_xcom_pull.28.29|Ingesting Input into a Task with <code>xcom_pull()</code>]] and [[#Exposing_Task_Output_via_the_Return_Value_or_with_xcom_push.28.29|Exposing Task Output with <code>xcom_push()</code> or via the Return Value]] below. The XComs are stored in the <code>xcom</code> table and they need to be explicitly deleted after use, otherwise they'll leak in the table. | ||
The XComs are stored in the xcom table and they need to be explicitly deleted after use, otherwise they'll leak in the table. | |||
[[Airflow_Concepts#Variable|Variables]] are an alternative mechanism for tasks to share data. However, variables are global and should be used for overall configuration that covers the entire installation. To pass data to and from tasks, XComs are preferable. | [[Airflow_Concepts#Variable|Variables]] are an alternative mechanism for tasks to share data. However, variables are global and should be used for overall configuration that covers the entire installation. To pass data to and from tasks, XComs are preferable. | ||
Also see | Also see: {{Internal|Airflow_Concepts#Execution_Context|DAG run execution context}} | ||
=Programming Model= | =Programming Model= | ||
Also see: {{Internal|Airflow_Programming_Model#XComs_Programming_Model|Airflow Programming Model}} | Also see: {{Internal|Airflow_Programming_Model#XComs_Programming_Model|Airflow Programming Model}} | ||
Other examples: | |||
{{External|https://github.com/apache/airflow/blob/main/airflow/example_dags/example_xcom.py}} | |||
==Ingesting Input into a Task with <tt>xcom_pull()</tt>== | |||
Data externally generated by a preceding task can be ingested by a task with <code>xcom_pull</code>: | |||
<syntaxhighlight lang='py'> | |||
@task | |||
def task_b(ti=None): | |||
v = ti.xcom_pull(key="return_value", task_ids='task_a') | |||
print(v) | |||
</syntaxhighlight> | |||
<code>xcom_pull</code> pulls XComs that optionally meet certain criteria. If there is no XCom that match the criteria (no such key for an existing task, no such task, etc.), <code>xcom_pull()</code> returns <code>None</code>. | |||
It has the following parameters: | |||
====<tt>key</tt>==== | |||
The XCom's key. If provided, only XComs with matching keys will be returned. The default key is 'return_value', also available as constant <code>XCOM_RETURN_KEY</code>. This key is automatically associated with the XComs returned by the task, as opposed to being pushed with <code>xcom_push()</code>. To remove the key filter, pass <code>None</code>. | |||
====<tt>task_ids</tt>==== | |||
Only XComs from tasks with matching ids will be pulled. Pass <code>None</code> to remove the filter. To specify multiple task IDs, provide a non-string iterable. | |||
====<tt>dag_id</tt>==== | |||
If provided, only pulls XComs from the specified DAG. If <code>None</code> is provided, which is the default, the DAG of the calling task is used. | |||
====<tt>map_indexes</tt>==== | |||
If provided, only pull XComs with matching indexes. If <code>None</code> (default), this is inferred from the task(s) being pulled. When pulling one single task (<code>task_ids</code> is <code>None</code> or a str) without specifying <code>map_indexes</code>, the return value is inferred from whether the specified task is mapped. If not, value from the one single task instance is returned. If the task to pull is mapped, an iterator (not a list) yielding XComs from mapped task instances is returned. In either case, <code>default</code> (<code>None</code> if not specified) is returned if no matching XComs are found. When pulling multiple tasks (i.e. either <code>task_ids</code> or <code>map_index</code> is a non-str iterable), a list of matching XComs is returned. Elements in the list is ordered by item ordering in <code>task_id</code> and <code>map_index</code>. | |||
====<tt>include_prior_dates</tt>==== | |||
If <code>False</code>, only XComs from the current <code>[[Airflow_Concepts#execution_date|execution_date]]</code> are returned. If <code>True</code>, XComs from previous dates are returned as well. | |||
===<tt>task_ids</tt>=== | |||
===To Clarify=== | |||
<font color=darkkhaki> | |||
* There is also a static method of xcom_pull(). When to use that? | |||
</font> | |||
==Exposing Task Output via the Return Value or with <tt>xcom_push()</tt>== | |||
Simply returning a value out of a <code>@task</code> function automatically exposes it as a "return_value" XCom. | |||
<syntaxhighlight lang='py'> | <syntaxhighlight lang='py'> | ||
@task | @task | ||
def task_a(): | def task_a(): | ||
return "something" | |||
</syntaxhighlight> | |||
is equivalent with: | |||
<syntaxhighlight lang='py'> | |||
@task | |||
def task_a(ti=None): | |||
ti.xcom_push('return_value', 'something') | |||
</syntaxhighlight> | |||
==Deleting XComs== | |||
<font color=darkkhaki>TO PROCESS: https://stackoverflow.com/questions/46707132/how-to-delete-xcom-objects-once-the-dag-finishes-its-run-in-airflow</font> | |||
==<tt>XComArg</tt>== | |||
<font color=darkkhaki>When you call a [[Airflow Concepts#TaskFlow|TaskFlow]] function in the DAG file, rather than executing it, you will get an object representing the XCom for the result (an <code>XComArg</code>, that you can use as inputs to downstream tasks and operators.</font> | |||
=Operations= | |||
The values of the created XComs, tabulated by timestamp, DAG ID, Task ID, key and value, are available via Admin → XComs. | |||
=Backends= | =Backends= | ||
{{External|https://airflow.apache.org/docs/apache-airflow/stable/concepts/xcoms.html#custom-xcom-backends}} | {{External|https://airflow.apache.org/docs/apache-airflow/stable/concepts/xcoms.html#custom-xcom-backends}} |
Latest revision as of 03:10, 18 July 2022
External
- https://airflow.apache.org/docs/apache-airflow/stable/concepts/xcoms.html
- https://airflow.apache.org/docs/apache-airflow/stable/concepts/taskflow.html
- https://airflow.apache.org/docs/apache-airflow/2.0.0/concepts.html#xcoms
Internal
Overview
XComs is one of the methods tasks use to exchange data. Tasks communicate using inputs and outputs, and the XComs ("cross-communications") mechanism is an implementation of this pattern. By default, tasks are entirely isolated and may be running on entirely different machines so when they exchange data, the data must be serializable.
An XCom is identified by a key
, which is the XCom's name, as well as the task_id
and dag_id
it came from. The XCom can have any serializable value, however it must be relatively small. If there is need to exchange large amounts of data, this is usually done uploading and downloading large files from a storage service.
XComs are explicitly “pushed” and “pulled” to/from their storage using the xcom_push()
and xcom_pull()
methods on task instances. For more details see Ingesting Input into a Task with xcom_pull()
and Exposing Task Output with xcom_push()
or via the Return Value below. The XComs are stored in the xcom
table and they need to be explicitly deleted after use, otherwise they'll leak in the table.
Variables are an alternative mechanism for tasks to share data. However, variables are global and should be used for overall configuration that covers the entire installation. To pass data to and from tasks, XComs are preferable.
Also see:
Programming Model
Also see:
Other examples:
Ingesting Input into a Task with xcom_pull()
Data externally generated by a preceding task can be ingested by a task with xcom_pull
:
@task
def task_b(ti=None):
v = ti.xcom_pull(key="return_value", task_ids='task_a')
print(v)
xcom_pull
pulls XComs that optionally meet certain criteria. If there is no XCom that match the criteria (no such key for an existing task, no such task, etc.), xcom_pull()
returns None
.
It has the following parameters:
key
The XCom's key. If provided, only XComs with matching keys will be returned. The default key is 'return_value', also available as constant XCOM_RETURN_KEY
. This key is automatically associated with the XComs returned by the task, as opposed to being pushed with xcom_push()
. To remove the key filter, pass None
.
task_ids
Only XComs from tasks with matching ids will be pulled. Pass None
to remove the filter. To specify multiple task IDs, provide a non-string iterable.
dag_id
If provided, only pulls XComs from the specified DAG. If None
is provided, which is the default, the DAG of the calling task is used.
map_indexes
If provided, only pull XComs with matching indexes. If None
(default), this is inferred from the task(s) being pulled. When pulling one single task (task_ids
is None
or a str) without specifying map_indexes
, the return value is inferred from whether the specified task is mapped. If not, value from the one single task instance is returned. If the task to pull is mapped, an iterator (not a list) yielding XComs from mapped task instances is returned. In either case, default
(None
if not specified) is returned if no matching XComs are found. When pulling multiple tasks (i.e. either task_ids
or map_index
is a non-str iterable), a list of matching XComs is returned. Elements in the list is ordered by item ordering in task_id
and map_index
.
include_prior_dates
If False
, only XComs from the current execution_date
are returned. If True
, XComs from previous dates are returned as well.
task_ids
To Clarify
- There is also a static method of xcom_pull(). When to use that?
Exposing Task Output via the Return Value or with xcom_push()
Simply returning a value out of a @task
function automatically exposes it as a "return_value" XCom.
@task
def task_a():
return "something"
is equivalent with:
@task
def task_a(ti=None):
ti.xcom_push('return_value', 'something')
Deleting XComs
XComArg
When you call a TaskFlow function in the DAG file, rather than executing it, you will get an object representing the XCom for the result (an XComArg
, that you can use as inputs to downstream tasks and operators.
Operations
The values of the created XComs, tabulated by timestamp, DAG ID, Task ID, key and value, are available via Admin → XComs.