Skip to main content

Using Conveyor run

Conveyor run was introduced to shorten the development cycle. It was created to simplify the following flow, which is often performed by users:

  • build a project
  • deploy the project to an Airflow environment
  • access Airflow and (re)-run a specific task

This flow is quite slow, certainly the part waiting for a DAG to be updated in the Airflow UI.

The conveyor run command allows you to specify which task and environment you want to use. From there, Conveyor will take care of:

  • building your project
  • parsing the DAG logic such that we know what task needs to be executed
  • launching the task in the requested environment

If you use conveyor run you can skip the manual steps of building and deploying your project. This means that it is not needed to run conveyor build before executing conveyor run, the run command will already create a new build as part of its execution.

note

There is one exception: if you are still using terraform resources and made a change in them. The resources do not get applied automatically when using conveyor run. In that case you will have to do a conveyor deploy to apply the terraform resources.

Using Conveyor run with Airflow dynamic tasks

When you are using dynamic tasks you can use the argument --dynamic-task-args for conveyor run to supply additional arguments.

An example DAG could be:

from airflow import DAG
from datetime import datetime
from conveyor.operators import ConveyorContainerOperatorV2

default_args = {
"start_date": datetime.fromisocalendar(2022, 1, 1),
}

with DAG("dynamic", default_args=default_args) as dag:
custom_dynamic = ConveyorContainerOperatorV2.partial(
task_id="dynamic-task",
instance_type="mx.micro",
).expand(arguments=[1, 2])

If you want to run the task dynamic-task, you should supply the arguments field trough --dynamic-task-args:

conveyor run --dag dynamic --task dynamic-task --dynamic-task-args arguments=1

This will run your task with the value for arguments set to 1.

For more information on what you can do with dynamic tasks have a look at the Airflow docs.

Using Conveyor run with Airflow Params

Airflow has the concept of Params to provide runtime configuration to tasks. Conveyor also allows you to overwrite these when using conveyor run.

To overwrite a Param value, you should supply the --params argument to the conveyor run invocation.

Suppose you have the following DAG:

from airflow import DAG
from airflow.models.param import Param
from conveyor.operators import ConveyorContainerOperatorV2

with DAG(
"parameterized",
params={ "my_param": Param("default") },
) as dag:
ConveyorContainerOperatorV2(
task_id="param-task",
arguments=["--configurable-arg", "{{ params.my_param }}"],
)

Here you are able to overwrite the value of my_param to the value of your choice as such:

conveyor run --dag parameterized --task param-task --params my_param="my value"
note

It is important that you access your Param value through the template syntax ({{ params.<your_key> }}). Airflow provides an example where they access the params of the DAG directly (dag.params['my_int_param']), but this notation is not supported via the mechanism used by conveyor run. Your task will still run, but the externally supplied value will not be used, and the execution will fall back on the default.