Airflow XComs

From NovaOrdis Knowledge Base
Jump to navigation Jump to search

External

Internal

Overview

XComs is one of the methods tasks use to exchange data. Tasks communicate using inputs and outputs, and the XComs ("cross-communications") mechanism is an implementation of this pattern. By default, tasks are entirely isolated and may be running on entirely different machines so when they exchange data, the data must be serializable.

An XCom is identified by a key, which is the XCom's name, as well as the task_id and dag_id it came from. The XCom can have any serializable value, however it must be relatively small. If there is need to exchange large amounts of data, this is usually done uploading and downloading large files from a storage service.

XComs are explicitly “pushed” and “pulled” to/from their storage using the xcom_push() and xcom_pull() methods on task instances. For more details see Ingesting Input into a Task with xcom_pull() and Exposing Task Output with xcom_push() or via the Return Value below. The XComs are stored in the xcom table and they need to be explicitly deleted after use, otherwise they'll leak in the table.

Variables are an alternative mechanism for tasks to share data. However, variables are global and should be used for overall configuration that covers the entire installation. To pass data to and from tasks, XComs are preferable.

Also see:

DAG run execution context

Programming Model

Also see:

Airflow Programming Model

Other examples:

https://github.com/apache/airflow/blob/main/airflow/example_dags/example_xcom.py

Ingesting Input into a Task with xcom_pull()

Data externally generated by a preceding task can be ingested by a task with xcom_pull:

@task
def task_b(ti=None):
    v = ti.xcom_pull(key="return_value", task_ids='task_a')
    print(v)

xcom_pull pulls XComs that optionally meet certain criteria.

       """Pull XComs that optionally meet certain criteria.
       :param key: A key for the XCom. If provided, only XComs with matching
           keys will be returned. The default key is ``'return_value'``, also
           available as constant ``XCOM_RETURN_KEY``. This key is automatically
           given to XComs returned by tasks (as opposed to being pushed
           manually). To remove the filter, pass *None*.
       :param task_ids: Only XComs from tasks with matching ids will be
           pulled. Pass *None* to remove the filter.
       :param dag_id: If provided, only pulls XComs from this DAG. If *None*
           (default), the DAG of the calling task is used.
       :param map_indexes: If provided, only pull XComs with matching indexes.
           If *None* (default), this is inferred from the task(s) being pulled
           (see below for details).
       :param include_prior_dates: If False, only XComs from the current
           execution_date are returned. If *True*, XComs from previous dates
           are returned as well.
       When pulling one single task (``task_id`` is *None* or a str) without
       specifying ``map_indexes``, the return value is inferred from whether
       the specified task is mapped. If not, value from the one single task
       instance is returned. If the task to pull is mapped, an iterator (not a
       list) yielding XComs from mapped task instances is returned. In either
       case, ``default`` (*None* if not specified) is returned if no matching
       XComs are found.
       When pulling multiple tasks (i.e. either ``task_id`` or ``map_index`` is
       a non-str iterable), a list of matching XComs is returned. Elements in
       the list is ordered by item ordering in ``task_id`` and ``map_index``.
       """


If there is no such XCom key for an existing task, or no such task, xcom_pull() returns None.

task_ids

To Clarify

  • There is also a static method of xcom_pull(). When to use that?

Exposing Task Output via the Return Value or with xcom_push()

Simply returning a value out of a @task function automatically exposes it as a "return_value" XCom.

@task
def task_a():
    return "something"

is equivalent with:

@task
def task_a(ti=None):
    ti.xcom_push('return_value', 'something')

Deleting XComs

TO PROCESS: https://stackoverflow.com/questions/46707132/how-to-delete-xcom-objects-once-the-dag-finishes-its-run-in-airflow

XComArg

When you call a TaskFlow function in the DAG file, rather than executing it, you will get an object representing the XCom for the result (an XComArg, that you can use as inputs to downstream tasks and operators.

Operations

The values of the created XComs, tabulated by timestamp, DAG ID, Task ID, key and value, are available via Admin → XComs.

Backends

https://airflow.apache.org/docs/apache-airflow/stable/concepts/xcoms.html#custom-xcom-backends