Skip to main content

Developing Conveyor packages

This how-to guide describes how to create and use a Conveyor package. Additionally, we also propose a development flow to easily build your own packages.

Creating a package

In order to create a new package, run the following CLI command:

conveyor package create --name <your-package>

The structure of a package is largely analogous to that of a project. The main difference is that packages will look for Airflow files in the /pkgs folder by default1, instead of the /dags folder which is used by projects.

By providing a /pkgs folder in your package, shared functionality can be made available to projects for usage in Airflow DAGs. If you provide a Dockerfile in your package, this will be used to create a container image as part of your build. By attaching a container image to your package, you can easily create and distribute custom operators that provide common functionality (a typical example is calling an alerting endpoint).

Example

A minimal example of a package that exposes a common container for use by other projects could look as follows. Note the usage of the packages.image() command which will resolve the correct image belonging to the package you have published.

Package: pkgs/example.py
from conveyor import packages
from conveyor.operators import ConveyorContainerOperatorV2

class ExampleOperator(ConveyorContainerOperatorV2):
"""Example of a reusable operator"""

def __init__(self, task_id: str, **kwargs):
super().__init__(
task_id=task_id,
image=packages.image(),
**kwargs,
)

Testing out your package

While you are developing your package, we recommend you create trial builds of your package through the package trial CLI command. This command will generate a unique version based on the semver version you provide, and the build time.

Running the command conveyor package trial --version 1.2.3 will result in a build tagged with the version "1.2.3-20240909T081717" for example.

info

Conveyor expects you to provide versions that follow the MAJOR.MINOR.PATCH semantic versioning scheme. Attempting to attach a differently formatted version to your builds will result in an error.

Using a trial build

While you are developing your new package version, you likely want to try it out in a test project, to verify that everything works as expected. In order to load a trial build in the DAG of your project, you should use the packages.load() function.

Suppose you have deployed the ExampleOperator from the example above as part of a package named "my_package", with the version "1.2.3". The following example shows how to import this component into a test DAG defined by a separate project.

Project: dags/my_test_dag.py
from airflow import DAG
from conveyor import packages

example = packages.load("my_package.example", version="1.2.3", trial=True)

with DAG("my_dag"):
my_task = example.ExampleOperator(task_id="my_task")

Only a single trial build will be active at one time (old trial builds of the same version will be automatically cleaned when deploying a newer trial), so trials are simply referred to by the same semver version that was provided when building.

Project DAGs will automatically refresh and load the newly deployed trial version. This means that when you're developing a package, you can simply deploy a DAG that loads your package with trial=True. Every time that you update your trial build, the changes will be automatically reflected in your project DAG, without having to redeploy the Conveyor project.

For dag validation the necessary package versions will be automatically resolved and the package will be downloaded.

Releasing your package to users

Once you are satisfied with the behavior and shape of your package code, you can publish a release version of the package using the package release CLI command.

By running the command conveyor package release --version 1.2.3, your build will be tagged with the exact version "1.2.3". Be aware that these build versions are immutable, meaning that once a build has been tagged with a certain version, that same version cannot be reassigned to a new build.

(Trial builds follow the same constraints and hence are immutable as well, but because the timestamp is part of the tag, conflicts are unlikely to happen).

Using a release build

The true value of publishing packages lies of course in being able to reuse those components in your workflows. The switch from using a release build instead of a trial build is as simple as removing the trial=True option. The following snippet shows how to load release version "1.2.3" of the example package illustrated above.

Project: dags/my_production_dag.py
from airflow import DAG
from conveyor import packages

example = packages.load("my_package.example", version="1.2.3")

with DAG("my_dag"):
my_task = example.ExampleOperator(task_id="my_task")

Versioning details

Conveyor not only allows selecting specific versions of a package, but also using version ranges. If the packages.load(name) function is called without additional parameters, it will select the highest available release version by default (according to semver).

As shown in the examples above, a specific version is requested by adding it as a parameter to the call as packages.load(name, version="1.2.3"). The version parameter not only accepts a single version, but also handles version ranges. These different patterns are illustrated in the following snippet.

from conveyor import packages

# matches the latest semver version
latest = packages.load("my_package.example")

# matches the exact semver version
exact = packages.load("my_package.example", version="1.2.3")

# matches the latest semver version in the range
range = packages.load("my_package.example", version=">=1.0.0,<2.0.0")

Floating versions are not only supported for release builds, but can be combined with the trial=True option as well.

from conveyor import packages

# matches the trial version of the latest semver version
trial_latest = packages.load("my_package.example", trial=True)

# matches the trial version of the exact semver version
trial_exact = packages.load("my_package.example", version="1.2.3", trial=True)

# matches the trial version of the latest semver version in the range
trial_range = packages.load("my_package.example", version=">=1.0.0,<2.0.0", trial=True)

Updating to a new package version

When you release a new version of a package, the Airflow DAGs that consume this package will automatically be notified of this. This means that projects that depend on the latest version or a version range, will automatically receive the updated version if it matches the constraints. (Projects that depend on an exact version will of course still only load the exact version that was requested).

It is therefore not necessary to redeploy your projects to trigger an update, this is already handled for you. This feature allows you to easily keep dependencies updates across multiple projects and environments.

Footnotes

  1. If desired, this default can be changed by modifying the .conveyor/package.yaml file.