Developing Conveyor packages
This how-to guide describes how to create and use a Conveyor package. Additionally, we also propose a development flow to easily build your own packages.
Creating a package
In order to create a new package, run the following CLI command:
conveyor package create --name <your-package>
The structure of a package is largely analogous to that of a project.
The main difference is that packages will look for Airflow files in the /pkgs
folder by default1,
instead of the /dags
folder which is used by projects.
By providing a /pkgs
folder in your package, shared functionality can be made available to projects for usage in Airflow DAGs.
If you provide a Dockerfile
in your package, this will be used to create a container image as part of your build.
By attaching a container image to your package, you can easily create and distribute custom operators that provide common functionality
(a typical example is calling an alerting endpoint).
Example
A minimal example of a package that exposes a common container for use by other projects could look as follows.
Note the usage of the packages.image()
command which will resolve the correct image belonging to the package you have published.
from conveyor import packages
from conveyor.operators import ConveyorContainerOperatorV2
class ExampleOperator(ConveyorContainerOperatorV2):
"""Example of a reusable operator"""
def __init__(self, task_id: str, **kwargs):
super().__init__(
task_id=task_id,
image=packages.image(),
**kwargs,
)
Testing out your package
While you are developing your package, we recommend you create trial builds of your package through the package trial
CLI command.
This command will generate a unique version based on the semver version you provide, and the build time.
Running the command conveyor package trial --version 1.2.3
will result
in a build tagged with the version "1.2.3-20241121T081104" for example.
Conveyor expects you to provide versions that follow the MAJOR.MINOR.PATCH
semantic versioning scheme.
Attempting to attach a differently formatted version to your builds will result in an error.
Using a trial build
While you are developing your new package version, you likely want to try it out in a test project, to verify that everything works as expected.
In order to load a trial build in the DAG of your project, you should use the packages.load()
function.
Suppose you have deployed the ExampleOperator
from the example above as part of a package named "my_package", with the version "1.2.3".
The following example shows how to import this component into a test DAG defined by a separate project.
from airflow import DAG
from conveyor import packages
example = packages.load("my_package.example", version="1.2.3", trial=True)
with DAG("my_dag"):
my_task = example.ExampleOperator(task_id="my_task")
Only a single trial build will be active at one time (old trial builds of the same version will be automatically cleaned when deploying a newer trial), so trials are simply referred to by the same semver version that was provided when building.
Project DAGs will automatically refresh and load the newly deployed trial version.
This means that when you're developing a package, you can simply deploy a DAG that loads your package with trial=True
.
Every time that you update your trial build, the changes will be automatically reflected in your project DAG,
without having to redeploy the Conveyor project.
For dag validation the necessary package versions will be automatically resolved and the package will be downloaded.
Releasing your package to users
Once you are satisfied with the behavior and shape of your package code,
you can publish a release version of the package using the package release
CLI command.
By running the command conveyor package release --version 1.2.3
, your build will be tagged with the exact version "1.2.3".
Be aware that these build versions are immutable, meaning that once a build has been tagged with a certain version,
that same version cannot be reassigned to a new build.
(Trial builds follow the same constraints and hence are immutable as well, but because the timestamp is part of the tag, conflicts are unlikely to happen).
Using a release build
The true value of publishing packages lies of course in being able to reuse those components in your workflows.
The switch from using a release build instead of a trial build is as simple as removing the trial=True
option.
The following snippet shows how to load release version "1.2.3" of the example package illustrated above.
from airflow import DAG
from conveyor import packages
example = packages.load("my_package.example", version="1.2.3")
with DAG("my_dag"):
my_task = example.ExampleOperator(task_id="my_task")
Versioning details
Conveyor not only allows selecting specific versions of a package, but also using version ranges.
If the packages.load(name)
function is called without additional parameters,
it will select the highest available release version by default (according to semver).
As shown in the examples above, a specific version is requested by adding it as a parameter to the call as packages.load(name, version="1.2.3")
.
The version parameter not only accepts a single version, but also handles version ranges.
These different patterns are illustrated in the following snippet.
from conveyor import packages
# matches the latest semver version
latest = packages.load("my_package.example")
# matches the exact semver version
exact = packages.load("my_package.example", version="1.2.3")
# matches the latest semver version in the range
range = packages.load("my_package.example", version=">=1.0.0,<2.0.0")
Floating versions are not only supported for release builds, but can be combined with the trial=True
option as well.
from conveyor import packages
# matches the trial version of the latest semver version
trial_latest = packages.load("my_package.example", trial=True)
# matches the trial version of the exact semver version
trial_exact = packages.load("my_package.example", version="1.2.3", trial=True)
# matches the trial version of the latest semver version in the range
trial_range = packages.load("my_package.example", version=">=1.0.0,<2.0.0", trial=True)
Updating to a new package version
When you release a new version of a package, the Airflow DAGs that consume this package will automatically be notified of this. This means that projects that depend on the latest version or a version range, will automatically receive the updated version if it matches the constraints. (Projects that depend on an exact version will of course still only load the exact version that was requested).
It is therefore not necessary to redeploy your projects to trigger an update, this is already handled for you. This feature allows you to easily keep dependencies updates across multiple projects and environments.
Footnotes
-
If desired, this default can be changed by modifying the
.conveyor/package.yaml
file. ↩