Skip to main content

Using Conveyor run

Conveyor run was introduced to shorten the development cycle. It was created to simplify the following flow, which is often performed by users:

  • build a project
  • deploy the project to an Airflow environment
  • access Airflow and (re)-run a specific task

This flow is quite slow, certainly the part waiting for a DAG to be updated in the Airflow UI.

The conveyor run command allows you to specify which task and environment you want to use. From there, Conveyor will take care of:

  • building your project
  • parsing the DAG logic such that we know what task needs to be executed
  • launching the task in the requested environment

If you use conveyor run you can skip the manual steps of building and deploying your project. This means that it is not needed to run conveyor build before executing conveyor run, the run command will already create a new build as part of its execution.

note

There is one exception: if you are still using terraform resources and made a change in them. The resources do not get applied automatically when using conveyor run. In that case you will have to do a conveyor deploy to apply the terraform resources.

Using Conveyor run with Airflow dynamic tasks

When you are using dynamic tasks you can use the argument --dynamic-task-args for conveyor run to supply additional arguments.

An example dag could be:

from airflow import DAG
from datetime import datetime
from conveyor.operators import ConveyorContainerOperatorV2

default_args = {
"start_date": datetime.fromisocalendar(2022, 1, 1),
}

with DAG("dynamic", default_args=default_args) as dag:
custom_dynamic = ConveyorContainerOperatorV2.partial(
task_id="dynamic-task",
instance_type="mx.micro",
).expand(arguments=[1, 2])

If you want to run the task dynamic-task, you should supply the arguments field trough --dynamic-task-args:

conveyor run --dag dynamic --task dynamic-task --dynamic-task-args arguments=1

This will run your task with the value for arguments set to 1.

For more information on what you can do with dynamic tasks have a look at the Airflow docs.