Skip to main content

Using XComs in Conveyor

Description

In some cases you might want to use data, that is calculated in one task, in another subsequent task. For example, one task calls an external API using the ConveyorContainerOperatorV2 and based on the response you want to dynamically create a number of tasks to process the result. In this use-case your Airflow DAG code needs to have access to the result from your previous task, which why Airflow introduced its XComs concept.

note

In most use-cases you do not want to create an explicit dependency between tasks as it makes it impossible to run the task in isolation.

How to do it

There are 2 parts in using XComs:

  • writing an XCom value during the execution of the ConveyorContainerOperatorV2 task
  • using the XCom value from the previous task in subsequent tasks

Writing a XCOM value with the ConveyorContainerOperatorV2

In order to be able to write an XCom value with the ConveyorContainerOperatorV2, you need to enable it by setting the xcom_push argument on the operator as follows:

from conveyor.operators import ConveyorContainerOperatorV2

xcom_push = ConveyorContainerOperatorV2(
dag=dag,
task_id="task-with-xcom-push",
arguments=["--date", "{{ ds }}"],
xcom_push=True,
)

Next, you need to write a json value/result to the following path: /opt/airflow/xcom/return_value.json. To ensure that Conveyor can retrieve the XCom result, it is crucial that you write to this specific path and no other location.

note

This path is mounted in the pod running your project code. Conveyor will make sure that the content of this file ends up in the Airflow database.

You can use the following Python example to set the XCom value to "hello world", or any other desired value:

def main():
with open("/opt/airflow/xcom/return_value.json", "w") as file:
json.dump("hello world", file)

You can check whether the XCom value is successfully stored in Airflow through the Airflow UI. Click on the Admin/XCOM in the Airflow top navigation bar. If all went well, you will see your Xcom value in the list as follows:

caution

XComs should only be used for small data, as the maximum size allowed in Airflow is 48KB. Additionally, the value you write to the file should be valid JSON; otherwise an exception will be thrown.

Using the XCom result from a task in your DAG

When you have successfully written an XCom value, you can use it in subsequent tasks. An Airflow task instance can use the following method to pull the XCom value from the previously defined task with task-with-xcom-push ID:

ti.xcom_pull(key="return_value", task_ids="task-with-xcom-push")

You can pass the XCom value as an argument a task using Airflow Jinja templates. Here are 2 examples on how the XCom values can be used:

from airflow.operators.bash import BashOperator
from conveyor.operators import ConveyorContainerOperatorV2

bash_example = BashOperator(
dag=dag,
task_id="echo-xcom-value",
bash_command='echo "{{ ti.xcom_pull(key="return_value", task_ids="task-with-xcom-push") }}"',
)

container_example = ConveyorContainerOperatorV2(
dag=dag,
task_id="use-xcom-container-operator",
cmds=["python3", "-m", "python_greeter.xcom_pull"],
arguments=["--xcom-value", "{{ task_instance.xcom_pull(task_ids='xcom-push', key='task-with-xcom-push') }}"],
)

Using json in XCom

Let's say you are writing the following xcom value:

{
"executors": 12,
"executorSize": "mx.large"
}

And you want to use the executors value in a folloing job. Then you can fetch the executors value with xcom_pull like the following:

ti.xcom_pull(key="return_value", task_ids="task-with-xcom-push").executors

For example:

bash_example = BashOperator(
dag=dag,
task_id="echo-xcom-value",
bash_command='echo "{{ ti.xcom_pull(key="return_value", task_ids="task-with-xcom-push").executors }}"',
)

Using XCom with dynamic task mapping

You can also use the result of xcom to dynamically map tasks, a use case for this might be to list files on s3, and after that launch a new job for every task found.

Suppose your first task returns the following xcom value:

[
"file1",
"file2",
"file3"
]

Let's say we have a task xcom_push that pushes this list, and xcom_pull is a task that users this list, your DAG would look something like this:

from conveyor.operators import ConveyorContainerOperatorV2

xcom_push = ConveyorContainerOperatorV2(
dag=dag,
task_id="xcom-push-list",
xcom_push=True,
)

xcom_pull = ConveyorContainerOperatorV2.partial(
dag=dag,
task_id="xcom_pull",
cmds=["python3", "-m", "python_greeter.xcom_pull", "--xcom-value"],
).expand(arguments=xcom_push.output)