Python SDK
The Python SDK is currently in preview, please provide feedback to help us improve it.
The SDK supports python 3.10 and above.
Find the latest version of the Conveyor Python SDK on PyPI. If you want to start using the Conveyor Python SDK, please have a look at the how-to guide.
Classes
TaskRunner
The SDK contains two task runners:
- The
ContainerTaskRunner
for running regular containers (equivalent to theContainerOperator
in Airflow). - The
SparkTaskRunner
for running Spark applications (equivalent to theSparkSubmitOperator
in Airflow).
The task runners can be imported from the runner
package as:
from conveyor.runner import ContainerTaskRunner, SparkTaskRunner
Initializing the ContainerTaskRunner
The parameters you can pass to the ContainerTaskRunner are equivalent to those available to the Airflow ContainerOperatorV2.
from typing import Mapping, Optional, Sequence, Union
from conveyor.secrets import SecretValue
from conveyor.types import InstanceLifecycle, InstanceType
class ContainerTaskRunner(
task_name: str,
project_name: str,
environment_name: str,
build_id: Optional[str] = None,
command: Optional[Sequence[str]] = None,
args: Optional[Sequence[str]] = None,
env_vars: Optional[Mapping[str, Union[str, SecretValue]]] = None,
iam_identity: Optional[str] = None,
instance_type: InstanceType = InstanceType.mx_micro,
instance_lifecycle: InstanceLifecycle = InstanceLifecycle.spot,
disk_size: Optional[int] = None,
disk_mount_path: Optional[str] = None,
show_output: bool = True,
):
...
Initializing the SparkTaskRunner
The parameters you can pass to the ContainerTaskRunner are equivalent to those available to the Airflow SparkSubmitOperatorV2.
from typing import Mapping, Optional, Sequence, Union
from conveyor.secrets import SecretValue
from conveyor.types import InstanceLifecycle, InstanceType
class SparkTaskRunner(
task_name: str,
project_name: str,
environment_name: str,
build_id: Optional[str] = None,
application: str = "",
application_args: Optional[Sequence[str]] = None,
conf: Optional[Mapping[str, str]] = None,
env_vars: Optional[Mapping[str, Union[str, SecretValue]]] = None,
iam_identity: Optional[str] = None,
num_executors: Optional[int] = None,
driver_instance_type: InstanceType = InstanceType.mx_micro,
executor_instance_type: InstanceType = InstanceType.mx_small,
instance_lifecycle: InstanceLifecycle = InstanceLifecycle.spot,
s3_committer: Optional[str] = "file",
abfs_committer: Optional[str] = "file",
executor_disk_size: Optional[int] = None,
mode: Optional[str] = "cluster-v2",
aws_availability_zone: Optional[str] = None,
verbose: bool = False,
):
...
Available methods
The Conveyor task runners support two main methods: run
and start
.
The difference between these two methods is that run
will wait for your application to complete,
whereas start
will only start your application on the cluster and return before completing (you can think of this as a "fire-and-forget" mode).
The run
method is recommended in most scenarios, except where you explicitly don't want to wait for results.
from conveyor.runner import ApplicationRunResult
def run(self) -> ApplicationRunResult:
"""Run waits for the application to complete and returns the Result"""
...
def start(self) -> None:
"""Starts submits the application to the cluster but does not wait for a result"""
...
ApplicationRunResult
The ApplicationRunResult
class is returned by the run()
function of a runner class.
As such, it can be imported from the runner
module as well as:
from conveyor.runner import ApplicationRunResult
Available methods
def has_failed(self) -> bool:
...
def conveyor_url(self) -> str:
...
TaskSubmitter
A TaskSubmitter allows you to specify a group of tasks and launch them at the same time.
It can be imported from runner
as well as:
from conveyor.runner import TaskSubmitter
Initializing the TaskSubmitter
from conveyor.runner import TaskRunner
class TaskSubmitter(
*tasks: TaskRunner
):
...
Available methods
Run the submitted tasks:
from conveyor.runner import ApplicationRunResult
def run(self) -> Iterator[ApplicationRunResult]:
...
Alternative way to create a TaskSubmitter from a list of TaskRunner objects:
from typing import Iterable
from conveyor.runner import TaskRunner, TaskSubmitter
def from_list(tasks: Iterable[TaskRunner]) -> TaskSubmitter:
...
ProjectBuilder
The ProjectBuilder is the Python version of the conveyor project build
CLI command.
It can be imported as:
from conveyor.project import ProjectBuilder
Initializing the ProjectBuilder
class ProjectBuilder(
project_path: str,
build_args: Optional[Sequence[str]] = None,
):
...
The project_path can be both a relative as an absolute path to your Conveyor project.
The build_args is an optional list of key-value pairs (key=value
) that is supplied to the container builder process
(more info).
Available methods
def build(self) -> str:
...
The build
function returns the build id of the created build.
Environment variables
Similar to the Airflow operators, the Python SDK allows you to set environment variables for your applications. This is a convenient approach to pass slowly-changing or environment-dependent configuration to your applications.A simple example of a static environment variable looks as follows:
ContainerTaskRunner(
env_vars={
"STATIC_KEY": "hello world",
},
)
Sometimes these configurations contain sensitive information and should not be part of the task runner configuration. To support these use cases, you can load environment variables dynamically from a secrets store. We support the following stores:
AWS secrets
To access the secrets, the AWS IAM role configured in the task runner needs to have the permissions to access the secret. The following snippet shows how to use an AWS secret in your task runner:
from conveyor.secrets import AWSParameterStoreValue, AWSSecretsManagerValue
ContainerTaskRunner(
env_vars={
"USERNAME": AWSParameterStoreValue(name="/example/username"),
"PASSWORD": AWSSecretsManagerValue(name="example-password"),
},
iam_identity="role-with-access-to-secrets",
)
Both stores also support selecting properties from JSON stored secrets using jmesPath syntax. For example, when storing a secret as JSON:
{
"username": "ADMIN",
"password": "MYSECRETPASSWORD"
}
You can access the secret as follows:
from conveyor.secrets import AWSSecretsManagerValue
ContainerTaskRunner(
env_vars={
"USERNAME": AWSSecretsManagerValue(name="example", path="username"),
"PASSWORD": AWSSecretsManagerValue(name="example", path="password"),
},
iam_identity="role-with-access-to-secrets",
)
AWS Secrets Manager IAM access
To be able to access a secret from AWS Secrets Manager, you need to add the following actions to your IAM role:
secretsmanager:GetSecretValue
secretsmanager:DescribeSecret
You should scope these actions to the resulting resource for example:
{
"Version": "2012-10-17",
"Statement": [{
"Effect": "Allow",
"Action": ["secretsmanager:GetSecretValue", "secretsmanager:DescribeSecret"],
"Resource": ["arn:*:secretsmanager:*:*:secret:MySecret-??????"]
}]
}
The reason for the 6 ?
's is that AWS always adds 6 random characters at the end of the ARN of a secret. For more info look
at the AWS docs.
AWS Parameter Store IAM access
To be able to get secrets from the AWS SSM Parameter Store, you need to add the following actions to your IAM role:
ssm:GetParameters
ssm:GetParametersByPath
You should scope these actions to the correct parameter, for example:
{
"Version": "2012-10-17",
"Statement": [{
"Effect": "Allow",
"Action": ["ssm:GetParameters", "ssm:GetParametersByPath"],
"Resource": ["arn:*:ssm:*:*:parameter/my-parameter"]
}]
}
For more info, you can refer to the AWS docs.
Azure secrets
In order to access the secrets, you must give the Azure Client ID access to the Key Vault that contains your secrets. The following snippet shows how to use an Azure secret in your task runner:
from conveyor.secrets import AzureKeyVaultValue
ContainerTaskRunner(
env_vars={
"PASSWORD": AzureKeyVaultValue(name="mySecretKey",vault="myKeyVault",vault_type="secret")
},
iam_identity="azure-client-id-with-access-to-secrets",
)
The vault_type
indicates which type of resource the value is in your keyvault.
The following resource types exist: secret
, key
, certificate
.
The default value in the task runner is secret and can thus be omitted.
For more details, have a look at the Azure documentation
Azure Key Vault RBAC permissions
To be able to access a secret from Azure Key Vault, you need to provide your Azure application client ID access to your Key Vault. The recommended practice is to use Azure role based access control with Key Vaults. You can then give your application access by assigning both of the following Azure roles:
Key Vault Reader
: allows metadata operations on the Key Vault but not reading sensitive valuesKey Vault Secrets User
: allows to read sensitive values in the Key Vault
For more details take a look at the Azure documentation
An example of how to do this in terraform can look as follows:
resource "azuread_application" "azure_application" {
display_name = "azure-application"
}
resource "azuread_service_principal" "azure_application" {
client_id = azuread_application.azure_application.client_id
app_role_assignment_required = false
}
resource "azurerm_role_assignment" "keyvault_read_project" {
scope = var.azure_keyvault_id
role_definition_name = "Key Vault Reader"
principal_id = azuread_service_principal.azure_application.id
}
resource "azurerm_role_assignment" "keyvault_read_secret_project" {
scope = var.azure_keyvault_id
role_definition_name = "Key Vault Secrets User"
principal_id = azuread_service_principal.azure_application.id
}