Using the dbt task factory
Description
We created the DbtTaskFactory
to give pipeline developers more control about which dbt models are executed in a given task.
By default, when executing dbt run
for your project, it will run all your models at once. This can cause multiple issues:
- updating all models can be slow
- updating all models can be unnecessary if only 1 input table changed
- a single model with an error can cause the entire run to fail
The DbtTaskFactory
allows you to specify a set of models, that each run in an individual Airflow task.
You can select certain models by using the tags property for a dbt model.
These tasks are then executed using the ConveyorContainerOperatorV2
,
which is described in more detail here.
How to use it
When defining your SQL model in dbt, you should add the tag(s) to the model config, which can be done as follows:
{{ config(tags=["first"]) }}
with someModel as (
select ...
In order to make it possible to filter models based on tags in Airflow,
the manifest file inside the dags
folder needs to be checked.
You can generate the manifest file by using the following command:
make shell
make manifest target=<env>
In order to generate the manifest file, dbt needs to have access to the target DWH from where you execute the dbt commands.
When you have generated the manifest.json
, you can use the tags in the DbtTaskFactory
to create multiple DAGs
that execute a subset of the models, which are defined in your project.
from airflow import DAG
from conveyor.factories import ConveyorDbtTaskFactory
dag1 = DAG("sample-1", schedule_interval="@daily")
dag2 = DAG("sample-2", schedule_interval="@daily")
factory = ConveyorDbtTaskFactory(
task_aws_role="sample-dbt-{{ macros.conveyor.env() }}"
)
start_task1, end_task1 = factory.add_tasks_to_dag(dag=dag1, tags=["first"])
start_task2, end_task2 = factory.add_tasks_to_dag(dag=dag2, tags=["second"])
...
The final step is to build and deploy your project so that it can be scheduled by Airflow:
conveyor project build && conveyor project deploy --env=<environment>