Sharing code between Airflow DAGs
This guide describes a pattern that has been superseded by the packages feature. We encourage all new development to make use of packages instead of the pattern described on this page.
You should only refer to this guide for maintenance of legacy projects that already use this pattern, although we also encourage you to migrate these from a project to package. If you require assistance with such a migration, don't hesitate to reach out to the Conveyor team.
When to use
This pattern can be useful if you have internally developed operators or some simple common code that you want to reuse.
We recommend to not hastily start with this pattern as it introduces a tight coupling between the common project and all the projects that depend on it.
Implementation
You want to create one Conveyor project that contains this common code.
Let's call it airflow_common
for now.
In Airflow, your projects are deployed as such:
/dags
/project1/*.py
/project2/*.py
A function defined in airflow_common/utils.py
can be imported as:
from airflow_common.utils import X
Considerations when using this approach
Advantages | Disadvantages |
---|---|
Less code duplication | Strict coupling between projects. Bugs in common code impact all projects. Because of this make sure your changes are backwards compatible or that you validate all dependent projects still work after changing the common code. |
You can write custom operators | You can not test projects without first deploying the common project to an environment |
Recommended way of working
Try to keep your Airflow dags as simple as possible. Airflow should only orchestrate your application, and most logic should be in your application. Hopefully you won't need a lot of extra code in your DAGs.
If you do use this pattern, we recommend writing tests for your Airflow DAGs, as well as making sure that when the common code is updated, all your projects are still able to load their DAGs.
Using the Conveyor build and run commands with shared code
If your DAGs reference code from a shared project you might notice
that the DAG validation used during conveyor build
or conveyor run
won't be able to resolve these dependencies.
Solution
When referencing code from another repository in your DAGs,
the conveyor build
command,
you should specify this dependency in your .conveyor/project.yaml
,
so that Conveyor knows to also load these DAGs during validation.
The configuration that you need to add is the following:
dependencies:
projects:
- name: common
environment: dev
As shown, you can specify multiple projects on which you depend. For every project that you add, the Airflow code of the active build will be loaded for the specified environment.
We use the same environment for fetching the active build of all dependent projects.
If you made recent changes to a dependent project and want to test these,
we support overwriting the default environment (mentioned in the .conveyor/project.yaml
),
by specifying the project-dependencies-environment
argument for the conveyor build
command.
Alternatively, you can also disable dag validation at build time.
For more details look here.
Cross project permissions using RBAC
In order to be able to download the Airflow DAGs for dependent projects,
you need to have at least the Contributor
permission.
If you do not have the necessary permissions on dependent projects but still want to build your project, you can do the following:
- Use the
--skip-dag-validation
flag when building the Conveyor project. The downside is that no validation is done on any DAG code in your project. - Catch the import error for the shared code and import a dummy function instead. This ensures that all other dag code is validated. The drawback is that the same fallback will be used in production, and thus you do not get a DAG import error if you made a mistake in using the common library.
To handle the import error, you can use the following pattern:
try:
from shared_code import getName
except ImportError:
from current_project import getName
In the __init__.py
file, you can put a dummy getName
function as follows:
def getName():
return ""
During validation, this dummy function will be used instead of the implementation in the shared code, which avoids the validation error.