Skip to main content

Using the dbt task factory

Description

We created the DbtTaskFactory to give pipeline developers more control about which dbt models are executed in a given task. By default, when executing dbt run for your project, it will run all your models at once. This can cause multiple issues:

  • updating all models can be slow
  • updating all models can be unnecessary if only 1 input table changed
  • a single model with an error can cause the entire run to fail

The DbtTaskFactory allows you to specify a set of models, that each run in an individual Airflow task. You can select certain models by using the tags property for a dbt model. These tasks are then executed using the ConveyorContainerOperatorV2, which is described in more detail here.

How to use it

When defining your SQL model in dbt, you should add the tag(s) to the model config, which can be done as follows:

{{ config(tags=["first"]) }}

with someModel as (
select ...

In order to make it possible to filter models based on tags in Airflow, the manifest file inside the dags folder needs to be checked. You can generate the manifest file by using the following command:

make shell
make manifest target=<env>
info

In order to generate the manifest file, dbt needs to have access to the target DWH from where you execute the dbt commands.

When you have generated the manifest.json, you can use the tags in the DbtTaskFactory to create multiple dags that execute a subset of the models, which are defined in your project.

from airflow import DAG
from conveyor.factories import ConveyorDbtTaskFactory

dag1 = DAG("sample-1", schedule_interval="@daily")
dag2 = DAG("sample-2", schedule_interval="@daily")

factory = ConveyorDbtTaskFactory(
task_aws_role="sample-dbt-{{ macros.conveyor.env() }}"
)

start_task1, end_task1 = factory.add_tasks_to_dag(dag=dag1, tags=["first"])
start_task2, end_task2 = factory.add_tasks_to_dag(dag=dag2, tags=["second"])
...

The final step is to build and deploy your project so that it can be scheduled by Airflow:

conveyor project build && conveyor project deploy --env=<environment>