Using XComs
Description
In some cases you might want to use data, that is calculated in one task, in another subsequent task.
For example, one task calls an external API using the ConveyorContainerOperatorV2
and based on the response
you want to dynamically create a number of tasks to process the result.
In this use-case your Airflow DAG code needs to have access to the result from your previous task,
which why Airflow introduced its XComs concept.
In most use-cases you do not want to create an explicit dependency between tasks as it makes it impossible to run the task in isolation.
How to do it
There are 2 parts in using XComs:
- writing an XCom value during the execution of the
ConveyorContainerOperatorV2
task - using the XCom value from the previous task in subsequent tasks
Writing a XCOM value with the ConveyorContainerOperatorV2
In order to be able to write an XCom value with the ConveyorContainerOperatorV2
,
you need to enable it by setting the xcom_push
argument on the operator as follows:
from conveyor.operators import ConveyorContainerOperatorV2
xcom_push = ConveyorContainerOperatorV2(
task_id="task-with-xcom-push",
arguments=["--date", "{{ ds }}"],
xcom_push=True,
)
Next, you need to write a json value/result to the following path: /opt/airflow/xcom/return_value.json
.
To ensure that Conveyor can retrieve the XCom result,
it is crucial that you write to this specific path and no other location.
This path is mounted in the pod running your project code. Conveyor will make sure that the content of this file ends up in the Airflow database.
You can use the following Python example to set the XCom value to "hello world"
,
or any other desired value:
import json
def main():
with open("/opt/airflow/xcom/return_value.json", "w") as file:
json.dump("hello world", file)
You can check whether the XCom value is successfully stored in Airflow through the Airflow UI.
Click on the Admin/XCOM
in the Airflow top navigation bar. If all went well, you will see your XCom value in the list as follows:
XComs should only be used for small data, as the maximum size allowed in Airflow is 48KB. Additionally, the value you write to the file should be valid JSON; otherwise an exception will be thrown.
Using the XCom result from a task in your DAG
When you have successfully written an XCom value, you can use it in subsequent tasks.
An Airflow task instance can use the following method to pull the XCom value from the previously defined task with task-with-xcom-push
ID:
ti.xcom_pull(key="return_value", task_ids="task-with-xcom-push")
You can pass the XCom value as an argument a task using Airflow Jinja templates. Here are 2 examples on how the XCom values can be used:
from airflow.operators.bash import BashOperator
from conveyor.operators import ConveyorContainerOperatorV2
bash_example = BashOperator(
task_id="echo-xcom-value",
bash_command='echo "{{ ti.xcom_pull(key="return_value", task_ids="task-with-xcom-push") }}"',
)
container_example = ConveyorContainerOperatorV2(
task_id="use-xcom-container-operator",
cmds=["python3", "-m", "python_greeter.xcom_pull"],
arguments=["--xcom-value", "{{ task_instance.xcom_pull(task_ids='xcom-push', key='task-with-xcom-push') }}"],
)
Using json in XCom
Let's say you are writing the following xcom value:
{
"executors": 12,
"executorSize": "mx.large"
}
And you want to use the executors
value in a following job.
Then you can fetch the executors value with xcom_pull
like the following:
ti.xcom_pull(key="return_value", task_ids="task-with-xcom-push").executors
For example:
from airflow.operators.bash import BashOperator
bash_example = BashOperator(
task_id="echo-xcom-value",
bash_command='echo "{{ ti.xcom_pull(key="return_value", task_ids="task-with-xcom-push").executors }}"',
)
Using XCom with dynamic task mapping
You can also use the result of xcom to dynamically map tasks, a use case for this might be to list files on s3, and after that launch a new job for every task found.
Suppose your first task returns the following xcom value:
[
"file1",
"file2",
"file3"
]
Let's say we have a task xcom_push
that pushes this list, and xcom_pull
is a task that users this list, your DAG
would look something like this:
from conveyor.operators import ConveyorContainerOperatorV2
xcom_push = ConveyorContainerOperatorV2(
task_id="xcom-push-list",
xcom_push=True,
)
xcom_pull = ConveyorContainerOperatorV2.partial(
task_id="xcom_pull",
cmds=["python3", "-m", "python_greeter.xcom_pull", "--xcom-value"],
).expand(
arguments=xcom_push.output,
)