Skip to main content

Dynamic task mapping in Airflow

Recently Airflow introduced dynamic task mapping, which aims to create tasks dynamically based on the output of a previous task. In this case the scheduler will generate the tasks at runtime, instead of you having to define a for loop manually. More information on how to use this in Airflow can be found in the Airflow documentation.

Airflow dynamic task mapping in Conveyor

Similar to other Airflow operators, the Conveyor operator also supports dynamic task mapping. An example when using the ConveyorContainerOperatorV2 can be as follows:

from airflow import DAG
from datetime import datetime
from conveyor.operators import ConveyorContainerOperatorV2

default_args = {
"start_date": datetime.fromisocalendar(2022, 1, 1),
}

with DAG("dynamic", default_args=default_args) as dag:
custom_dynamic = ConveyorContainerOperatorV2.partial(
task_id="dynamic-task",
instance_type="mx.micro",
).expand(arguments=[1, 2])

In this example the partial function defines the static properties of the operator. This can be seen as a partially defined function, which can be executed once all arguments are supplied. As a last step the dynamic argument is passed after which the operator is executed.

The previous code is equivalent to:


from airflow import DAG
from datetime import datetime
from conveyor.operators import ConveyorContainerOperatorV2

default_args = {
"start_date": datetime.fromisocalendar(2022, 1, 1),
}

with DAG("dynamic", default_args=default_args) as dag:
task1 = ConveyorContainerOperatorV2(
task_id="dynamic-task-1",
instance_type="mx.micro",
arguments=1
)
task2 = ConveyorContainerOperatorV2(
task_id="dynamic-task-2",
instance_type="mx.micro",
arguments=2
)