ConveyorDbtTaskFactory
The ConveyorDbtTaskFactory
allows you to split up the dbt models in your project in multiple Airflow dags.
A dag will contain tasks corresponding to each dbt model that need to be executed using the ConveyorContainerOperatorV2
.
You can filter on dbt models through tags and looking at the manifest
file generated by the dbt compile
command.
You can use the ConveyorDbtTaskFactory
as follows:
from airflow import DAG
from conveyor.factories import ConveyorDbtTaskFactory
dag = DAG("your-dag", schedule_interval="@daily")
role = "job-role-{{ macros.conveyor.env() }}"
factory = ConveyorDbtTaskFactory(task_aws_role=role)
start_task, end_task = factory.add_tasks_to_dag(dag=dag)
These are the configuration parameters present in ConveyorDbtTaskFactory:
Parameter | Type | Default | Explanation |
---|---|---|---|
manifest_file | str | manifest.json | Name of the dbt manifest file used to generate the tasks. Must be present in the dags folder. |
task_name_prefix | str | None | Prefix to apply to tasks. |
task_name_suffix | str | None | Suffix to apply to tasks. |
task_cmd | List[str] | [] | Commands passed to the container. |
task_arguments | List[str] | ["--no-use-colors", "{command}", "--target", "{{ macros.conveyor.env() }}", "--profiles-dir", "./..", "--select", "{model}",] | Arguments passed to the container. Needs to contain 2 templated fields {command} and {model} that conveyor will fill in. |
task_instance_type | str | mx.micro | Conveyor instance type to use for the tasks. |
task_instance_life_cycle | str | spot | The lifecycle of the instance used to run this job. Options are on-demand or spot . |
task_aws_role | str | None | The AWS role |
task_azure_application_client_id | string | The Azure service principal used by the container. | |
task_env_vars | dict | Environment variables passed to the tasks. | |
start_task_name_override | str | None | This override the name of the start task, setting this will ignore the prefix and suffix. |
end_task_name_override | str | None | This override the name of the end task, setting this will ignore the prefix and suffix. |
ConveyorDbtTaskFactory.add_tasks_to_dag
The factory has a method called add_tasks_to_dag
with the following arguments:
Parameter | Type | Default | Explanation |
---|---|---|---|
dag | DAG | Dag to add the airflow tasks to. | |
tags | List[str] | [] | Tags to match when adding models. |
any_tag | bool | True | Controls the matching behaviour of tags. When True, at least 1 tag needs to match. When False, all tags need to match. |
test_tasks | bool | True | Add test tasks to the Airflow dag for each model task. |
The method returns a start and stop task that can be used to attach to other tasks. For example:
from airflow import DAG
from conveyor.factories import ConveyorDbtTaskFactory
from conveyor.operators import ConveyorContainerOperatorV2
dag = DAG(
"your-dag-id", schedule_interval="@daily", max_active_runs=1
)
role = "job-role-{{ macros.conveyor.env() }}"
factory = ConveyorDbtTaskFactory(task_aws_role=role)
start_task, end_task = factory.add_tasks_to_dag(dag=dag)
before = ConveyorContainerOperatorV2(
dag=dag,
task_id="before",
)
before >> start_task
This will add a before
task that the start task of our generated dbt tasks will depend on.
ConveyorDbtTaskFactory.add_tasks_to_task_group
The factory has a method called add_tasks_to_task_group
with the following arguments:
Parameter | Type | Default | Explanation |
---|---|---|---|
dag | DAG | Dag to add the airflow tasks to. | |
tags | List[str] | [] | Tags to match when adding models. |
task_group_name | str | dbt_run | The name for the task group containing the model tasks. |
test_task_group_name | str | dbt_test | The name of the task group containing the test tasks. |
any_tag | bool | True | Controls the matching behaviour of tags. When True , at least 1 tag needs to match. When False , all tags need to match. |
The method returns the model_task_group
and the test_task_group
that can be combined with other tasks
or taskGroups
.
For example:
from airflow import DAG
from airflow.operators.empty import EmptyOperator
from conveyor.factories import ConveyorDbtTaskFactory
dag = DAG("your-dag", schedule_interval="@daily")
role = "job-role-{{ macros.conveyor.env() }}"
factory = ConveyorDbtTaskFactory(task_aws_role=role)
run_group, test_group = factory.add_tasks_to_task_group(dag=dag)
start = EmptyOperator(dag=dag, task_id="start")
end = EmptyOperator(dag=dag, task_id="end")
start >> run_group >> test_group >> end
This way your DAG starts with a start task and then runs the two TaskGroups consecutively and finishes with an end task.
When grouping the tasks in TaskGroups
, the dependencies between the model tasks follow the dependencies
of the dbt models, but the tests all run in parallel as they run after all model tasks have finished.
Environment variables
You can configure environment variables on the factory to pass to the generated tasks. This is a convenient and often recommended approach to pass configuration that is likely to vary between deploys. A simple example of a static environment variable looks as follows:
from conveyor.factories import ConveyorDbtTaskFactory
ConveyorDbtTaskFactory(
...,
task_env_vars={
"STATIC_KEY": "hello world",
},
)
Sometimes these configurations contain sensitive information and should not be part of the operator configuration. To support these use cases, you can load environment variables dynamically from a secrets store. We support the following stores:
AWS secrets
To access the secrets, the AWS IAM role configured in the operator needs to have the permissions to access the secret.
The following snippet shows how to use an AWS secret in the ConveyorDbtTaskFactory
:
from conveyor.factories import ConveyorDbtTaskFactory
from conveyor.secrets import AWSParameterStoreValue, AWSSecretsManagerValue
ConveyorDbtTaskFactory(
...,
task_env_vars={
"USERNAME": AWSParameterStoreValue(name="/example/username"),
"PASSWORD": AWSSecretsManagerValue(name="example-password"),
},
task_aws_role="role-with-access-to-secrets",
)
Both stores also support selecting properties from JSON stored secrets using JMESPath syntax.
{
"username": "ADMIN",
"password": "MYSECRETPASSWORD"
}
Aws Secrets manager IAM access
To be able to access a secret from AWS Secrets Manager, you need to add the following actions to your IAM role:
secretsmanager:GetSecretValue
secretsmanager:DescribeSecret
You should scope these actions to the resulting resource for example:
{
"Version": "2012-10-17",
"Statement": [{
"Effect": "Allow",
"Action": ["secretsmanager:GetSecretValue", "secretsmanager:DescribeSecret"],
"Resource": ["arn:*:secretsmanager:*:*:secret:MySecret-??????"]
}]
}
The reason for the 6 ?
's is that AWS always adds 6 random characters at the end of the ARN of a secret. For more info look
at the AWS docs.
Aws SSM Parameter Store IAM access
To be able to get secrets from the AWS SSM Parameter Store, you need to add the following actions to your IAM role:
ssm:GetParameters
ssm:GetParametersByPath
You should scope these actions to the correct parameter, for example:
{
"Version": "2012-10-17",
"Statement": [{
"Effect": "Allow",
"Action": ["ssm:GetParameters", "ssm:GetParametersByPath"],
"Resource": ["arn:*:ssm:*:*:parameter/my-parameter"]
}]
}
For more info, you can refer to the AWS docs.
Azure secrets
In order to access the secrets, you must give the Azure client id access to the respective keyvault, which contains your secrets.
The following snipptet show how to use a secret in a keyvault in the ConveyorDbtTaskFactory
:
from conveyor.factories import ConveyorDbtTaskFactory
from conveyor.secrets import AzureKeyVaultValue
ConveyorDbtTaskFactory(
...,
env_vars={
"PASSWORD": AzureKeyVaultValue(name="mySecretKey",vault="myKeyVault",vault_type="secret")
},
azure_application_client_id="azure-client-id-with-access-to-secrets",
)
The vault_type
indicates which type of resource the value is in your keyvault. The following resource types exist: secret
, key
, certificate
.
The default value in the operator is secret and can thus be omitted.
For more details, take a look at the azure documentation
Azure keyvault rbac permissions
To be able to access a secret from Azure Key Vault, you need to provide your Azure application client ID access to your Key Vault. The recommended practice is to use Azure role based access control with Key Vaults. You can then give your application access by assigning both of the following Azure roles:
Key Vault Reader
: allows metadata operations on the Key Vault but not reading sensitive valuesKey Vault Secrets User
: allows to read sensitive values in the Key Vault
For more details take a look at the Azure documentation
An example of how to do this in terraform can look as follows:
resource "azuread_application" "azure_application" {
display_name = "azure-application"
}
resource "azuread_service_principal" "azure_application" {
client_id = azuread_application.azure_application.client_id
app_role_assignment_required = false
}
resource "azurerm_role_assignment" "keyvault_read_project" {
scope = var.azure_keyvault_id
role_definition_name = "Key Vault Reader"
principal_id = azuread_service_principal.azure_application.id
}
resource "azurerm_role_assignment" "keyvault_read_secret_project" {
scope = var.azure_keyvault_id
role_definition_name = "Key Vault Secrets User"
principal_id = azuread_service_principal.azure_application.id
}
Instances
Conveyor supports the following instances types for all jobs:
Instance type | CPU | Total Memory (AWS) | Total Memory (Azure) |
---|---|---|---|
mx.nano | 1* | 0.438 GB | 0.434 GB |
mx.micro | 1* | 0.875 GB | 0.868 GB |
mx.small | 1* | 1.75 GB | 1.736 GB |
mx.medium | 1 | 3.5 GB | 3.47 GB |
mx.large | 2 | 7 GB | 6.94 GB |
mx.xlarge | 4 | 14 GB | 13.89 GB |
mx.2xlarge | 8 | 29 GB | 30.65 GB |
mx.4xlarge | 16 | 59 GB | 64.16 GB |
cx.nano | 1* | 0.219 GB | Not supported |
cx.micro | 1* | 0.438 GB | Not supported |
cx.small | 1* | 0.875 GB | Not supported |
cx.medium | 1 | 1.75 GB | Not supported |
cx.large | 2 | 3.5 GB | Not supported |
cx.xlarge | 4 | 7 GB | Not supported |
cx.2xlarge | 8 | 14 GB | Not supported |
cx.4xlarge | 16 | 29 GB | Not supported |
rx.xlarge | 4 | 28 GB | Not supported |
rx.2xlarge | 8 | 59 GB | Not supported |
rx.4xlarge | 16 | 120 GB | Not supported |
(*) These instance types don't get a guaranteed full CPU but only a slice of a full CPU, but they are allowed to burst up to a full CPU if the cluster allows.
The numbers for AWS and Azure differ because nodes on both clouds run different DaemonSets and have different reservation requirements set by the provider. We aim to minimize the node overhead as much as possible while still obeying the minimum requirements of each cloud provider.