ConveyorContainerOperatorV2
The ConveyorContainerOperatorV2 allows you to run your custom ingestion or processing jobs that are not based on Spark. If you can package it in a Docker container, it can be run by the ConveyorContainerOperatorV2. For running Spark applications, we provide a dedicated ConveyorSparkSubmitOperatorV2.
The following is an example of a ConveyorContainerOperatorV2
:
from conveyor.operators import ConveyorContainerOperatorV2
role = "JOBROLE-{{ macros.conveyor.env() }}"
ConveyorContainerOperatorV2(
task_id="ingest-weather",
arguments=["--date", "{{ ds }}"],
aws_role=role,
instance_type='mx.micro',
)
The ConveyorContainerOperatorV2
extends the Airflow BaseOperator, so any parameter supported by the BaseOperator can also be set on the Conveyor operator.
More details on the supported parameters can be found here
The Conveyor specific parameters supported by the ConveyorContainerOperatorV2
are:
Parameter | Type | Default | Explanation |
---|---|---|---|
image | str | Project image | Docker image you wish to launch, the default is the image and version of your project. If you want to use the image of another project you can use macros.conveyor.image('PROJECT_NAME') here (templated). |
cmds | list[str] | [] | Entrypoint of the container (templated). The docker images’s entrypoint is used if this is not provided. |
arguments | list[str] | [] | Arguments of the entrypoint (templated). The docker image’s CMD is used if this is not provided. |
instance_type | str | mx.micro | The Conveyor instance type to use for this containers. This specifies the CPU/Memory this container can use. |
env_vars | dict | Environment variables initialized in the container (templated). | |
aws_role | string | The AWS role used by the container. | |
azure_application_client_id | string | The Azure service principal used by the container. | |
instance_life_cycle | string | spot | The lifecycle of the instance used to run this job. Options are on-demand or spot . |
disk_size | int | 0 | The size in gigabytes for the external disk to mounted to the container. |
disk_mount_path | string | /var/data | The path where the external disk should be mounted (should be an absolute path without a colon). |
xcom_push | bool | False | Indicates whether you want to register an XCom value in this task. |
In case the aws_role
or azure_application_client_id
is not supplied to the operator,
the default identity configured on the project will be used instead.
Setting CPU/Memory
We provide the Conveyor instance types for you to set CPU/Memory for your job. That way choosing CPU/Memory is as easy as:
from conveyor.operators import ConveyorContainerOperatorV2
ConveyorContainerOperatorV2(
...,
instance_type='mx.micro',
)
Conveyor supports the following instances types for all jobs:
Instance type | CPU | Total Memory (AWS) | Total Memory (Azure) |
---|---|---|---|
mx.nano | 1* | 0.438 Gb | 0.434 Gb |
mx.micro | 1* | 0.875 Gb | 0.868 Gb |
mx.small | 1* | 1.75 Gb | 1.736 Gb |
mx.medium | 1 | 3.5 Gb | 3.47 Gb |
mx.large | 2 | 7 Gb | 6.94 Gb |
mx.xlarge | 4 | 14 Gb | 13.89 Gb |
mx.2xlarge | 8 | 29 Gb | 30.65 Gb |
mx.4xlarge | 16 | 59 Gb | 64.16 Gb |
cx.nano | 1* | 0.219 Gb | Not supported |
cx.micro | 1* | 0.438 Gb | Not supported |
cx.small | 1* | 0.875 Gb | Not supported |
cx.medium | 1 | 1.75 Gb | Not supported |
cx.large | 2 | 3.5 Gb | Not supported |
cx.xlarge | 4 | 7 Gb | Not supported |
cx.2xlarge | 8 | 14 Gb | Not supported |
cx.4xlarge | 16 | 29 Gb | Not supported |
rx.xlarge | 4 | 28 Gb | Not supported |
rx.2xlarge | 8 | 59 Gb | Not supported |
rx.4xlarge | 16 | 120 Gb | Not supported |
(*) These instance types don't get a guaranteed full CPU but only a slice of a full CPU, but they are allowed to burst up to a full CPU if the cluster allows.
The numbers for AWS and Azure differ because nodes on both clouds run different DaemonSets and have different reservation requirements set by the provider. We aim to minimize the node overhead as much as possible while still obeying the minimum requirements of each cloud provider.
Instance life cycle
You can specify the instance life cycle for your job in the operator, which determines whether your job runs on-demand or on spot instances. Spot instances can provide discounts of up to 90% compared to on-demand prices, but it's possible that AWS reclaims the spot instance, which is what we call a spot interrupt. Luckily, this does not happen often.
It's important to note that the Airflow executor running your job will follow the instance life cycle that you have set for your job. So if you select the spot instance life cycle, both your job and the Airflow executor will run on spot instances, and if you choose the on-demand instance life cycle, both will run on on-demand instances.
Here's an example configuration using the instance life cycle:
from conveyor.operators import ConveyorContainerOperatorV2
ConveyorContainerOperatorV2(
...,
instance_life_cycle='spot',
)
Environment variables
Similar to the Airflow operators, the Python SDK allows you to set environment variables for your applications. This is a convenient approach to pass slowly-changing or environment-dependent configuration to your applications.A simple example of a static environment variable looks as follows:
ConveyorContainerOperatorV2(
env_vars={
"STATIC_KEY": "hello world",
},
)
Sometimes these configurations contain sensitive information and should not be part of the task runner configuration. To support these use cases, you can load environment variables dynamically from a secrets store. We support the following stores:
AWS secrets
To access the secrets, the AWS IAM role configured in the task runner needs to have the permissions to access the secret. The following snippet shows how to use an AWS secret in your task runner:
from conveyor.secrets import AWSParameterStoreValue, AWSSecretsManagerValue
ConveyorContainerOperatorV2(
env_vars={
"USERNAME": AWSParameterStoreValue(name="/example/username"),
"PASSWORD": AWSSecretsManagerValue(name="example-password"),
},
iam_identity="role-with-access-to-secrets",
)
Both stores also support selecting properties from JSON stored secrets using jmesPath syntax. For example, when storing a secret as JSON:
{
"username": "ADMIN",
"password": "MYSECRETPASSWORD"
}
You can access the secret as follows:
from conveyor.secrets import AWSSecretsManagerValue
ConveyorContainerOperatorV2(
env_vars={
"USERNAME": AWSSecretsManagerValue(name="example", path="username"),
"PASSWORD": AWSSecretsManagerValue(name="example", path="password"),
},
iam_identity="role-with-access-to-secrets",
)
AWS Secrets Manager IAM access
To be able to access a secret from AWS Secrets Manager, you need to add the following actions to your IAM role:
secretsmanager:GetSecretValue
secretsmanager:DescribeSecret
You should scope these actions to the resulting resource for example:
{
"Version": "2012-10-17",
"Statement": [{
"Effect": "Allow",
"Action": ["secretsmanager:GetSecretValue", "secretsmanager:DescribeSecret"],
"Resource": ["arn:*:secretsmanager:*:*:secret:MySecret-??????"]
}]
}
The reason for the 6 ?
's is that AWS always adds 6 random characters at the end of the ARN of a secret. For more info look
at the AWS docs.
AWS Parameter Store IAM access
To be able to get secrets from the AWS SSM Parameter Store, you need to add the following actions to your IAM role:
ssm:GetParameters
ssm:GetParametersByPath
You should scope these actions to the correct parameter, for example:
{
"Version": "2012-10-17",
"Statement": [{
"Effect": "Allow",
"Action": ["ssm:GetParameters", "ssm:GetParametersByPath"],
"Resource": ["arn:*:ssm:*:*:parameter/my-parameter"]
}]
}
For more info, you can refer to the AWS docs.
Azure secrets
In order to access the secrets, you must give the Azure Client ID access to the Key Vault that contains your secrets. The following snippet shows how to use an Azure secret in your task runner:
from conveyor.secrets import AzureKeyVaultValue
ConveyorContainerOperatorV2(
env_vars={
"PASSWORD": AzureKeyVaultValue(name="mySecretKey",vault="myKeyVault",vault_type="secret")
},
iam_identity="azure-client-id-with-access-to-secrets",
)
The vault_type
indicates which type of resource the value is in your keyvault.
The following resource types exist: secret
, key
, certificate
.
The default value in the task runner is secret and can thus be omitted.
For more details, have a look at the Azure documentation
Azure Key Vault RBAC permissions
To be able to access a secret from Azure Key Vault, you need to provide your Azure application client ID access to your Key Vault. The recommended practice is to use Azure role based access control with Key Vaults. You can then give your application access by assigning both of the following Azure roles:
Key Vault Reader
: allows metadata operations on the Key Vault but not reading sensitive valuesKey Vault Secrets User
: allows to read sensitive values in the Key Vault
For more details take a look at the Azure documentation
An example of how to do this in terraform can look as follows:
resource "azuread_application" "azure_application" {
display_name = "azure-application"
}
resource "azuread_service_principal" "azure_application" {
client_id = azuread_application.azure_application.client_id
app_role_assignment_required = false
}
resource "azurerm_role_assignment" "keyvault_read_project" {
scope = var.azure_keyvault_id
role_definition_name = "Key Vault Reader"
principal_id = azuread_service_principal.azure_application.id
}
resource "azurerm_role_assignment" "keyvault_read_secret_project" {
scope = var.azure_keyvault_id
role_definition_name = "Key Vault Secrets User"
principal_id = azuread_service_principal.azure_application.id
}