Exchange Data between tasks in Airflow(XCOM)

Source: https://afiahealth.com/challenges-sharing-data-separate-ehrs/

Introduction

Airflow is a scheduling tool as like Control-M,Oozie and Automic. In Automic we have SET parameter to update the variable in database and we can use that variable in next upcoming tasks within workflow. To achieve the same scenario in Airflow like 1.) First task to list the file names in given path 2.) Second task to download the listed files.

To achieve this scenario, we have two ways to do it in Airflow

Different Approaches Option 1 /Option 2:

1.) Have external tool like databases to push data from first task and pull those data from database in second task. Here we have to setup connection to the external tool and also External tool should be available in-order to share the data between task.

2.)Without any External tool we can share the data between two task in Airflow that’s called XCOM. XCOM (Cross Communication) allows to exchange SMALL amount of data between task in Airflow. XCOM stores these intermediate data in Meta data of Airflow and we will discuss this option below.

XCOM in Airflow:

After adding XCOM_PUSH_PULL.py code in dag folder path it will appear as shown below in UI.

Figure 1: Shows the DAG view in UI

In processing tasks item_price_a,Item_price_b and Item_price_c will use XCOM_PUSH and set value for those variable using below code.

Figure 2: Shows the code to push data to database

Task4 in above job will pull the data from database using XCOM_PULL and it will print it using below code.

Figure 3: Shows the code to pull data from database

How XCOM_PUSH works in Airflow:

Input required by XCOM_PUSH: It requires Key and value which has to be pushed to airflow meta database as like below.

Figure 4: Shows the input required by xcom push

Using above code if we trigger the DAG, record will be inserted into Airflow meta using key as “item_price” and value as “50”. It will be visible in Airflow UI under ADMIN -> XCOMS tab as shown below.

Figure 5: Shows the inserted value in UI

HOW XCOM_PULL works in Airflow:

After pushing data to the Airflow meta we have to PULL the same data using a task instance method XCOM_PULL . Same like XCOM_PUSH XCOM_PULL requires two input parameters :

  1. ) task_id → Only XComs from tasks matching ids will be pulled
  2. ) Key → Only XComs with matching key will be returned

We have to pass this variable as like below

Figure 6: Shows the input required by xcom pull

We can specify multiple task ids, therefore we can pull XComs from multiple tasks at once and we have to give a key to pull the right XComs.

We pull the XComs with the key item_price that was created from the task item_price_a. After triggering the DAG we can see the values in log as below

Figure 7: Shows the log with inserted data

Limitations of XCom:

Airflow is not a data processing framework, so avoid sending huge DataFrames between tasks.

If we try to transfer more data between task we might end-up with memory issue because Airflow is not a processing framework its an orchestrator. XCom limit size in Airflow is based on the database we use for Airflow metadata.

For SQLite → 2GB,Postgres →1GB and MySQL → 64KB

References :

https://airflow.apache.org/docs/apache-airflow/stable/concepts/xcoms.html

Conclusion:

Today you have learned how to use XCOM available in Airflow to share data between task in Airflow. Now you have everything needed to effectively communicate between tasks in your DAGs. Just remember that Airflow isn’t a data processing framework, but a data orchestrator instead. Don’t use XComs to exchange huge datasets and you’re good to go.

Exchange Data between tasks in Airflow(XCOM) was originally published in Walmart Global Tech Blog on Medium, where people are continuing the conversation by highlighting and responding to this story.

Article Link: Exchange Data between tasks in Airflow(XCOM) | by Dilliraja Baskaran | Walmart Global Tech Blog | Oct, 2022 | Medium