Skip to main content

Python SDK

caution

The Python SDK is currently in preview, please provide feedback to help us improve it.

The SDK supports python 3.10 and above.

Find the latest version of the Conveyor Python SDK on PyPI. If you want to start using the Conveyor Python SDK, please have a look at the how-to guide.

Classes

TaskRunner

The SDK contains two task runners:

  • The ContainerTaskRunner for running regular containers (equivalent to the ContainerOperator in Airflow).
  • The SparkTaskRunner for running Spark applications (equivalent to the SparkSubmitOperator in Airflow).

The task runners can be imported from the runner package as:

from conveyor.runner import ContainerTaskRunner, SparkTaskRunner

Initializing the ContainerTaskRunner

The parameters you can pass to the ContainerTaskRunner are equivalent to those available to the Airflow ContainerOperatorV2.

from typing import Mapping, Optional, Sequence, Union

from conveyor.secrets import SecretValue
from conveyor.types import InstanceLifecycle, InstanceType

class ContainerTaskRunner(
task_name: str,
project_name: str,
environment_name: str,
build_id: Optional[str] = None,
command: Optional[Sequence[str]] = None,
args: Optional[Sequence[str]] = None,
env_vars: Optional[Mapping[str, Union[str, SecretValue]]] = None,
iam_identity: Optional[str] = None,
instance_type: InstanceType = InstanceType.mx_micro,
instance_lifecycle: InstanceLifecycle = InstanceLifecycle.spot,
disk_size: Optional[int] = None,
disk_mount_path: Optional[str] = None,
show_output: bool = True,
):
...

Initializing the SparkTaskRunner

The parameters you can pass to the ContainerTaskRunner are equivalent to those available to the Airflow SparkSubmitOperatorV2.

from typing import Mapping, Optional, Sequence, Union

from conveyor.secrets import SecretValue
from conveyor.types import InstanceLifecycle, InstanceType

class SparkTaskRunner(
task_name: str,
project_name: str,
environment_name: str,
build_id: Optional[str] = None,
application: str = "",
application_args: Optional[Sequence[str]] = None,
conf: Optional[Mapping[str, str]] = None,
env_vars: Optional[Mapping[str, Union[str, SecretValue]]] = None,
iam_identity: Optional[str] = None,
num_executors: Optional[int] = None,
driver_instance_type: InstanceType = InstanceType.mx_micro,
executor_instance_type: InstanceType = InstanceType.mx_small,
instance_lifecycle: InstanceLifecycle = InstanceLifecycle.spot,
s3_committer: Optional[str] = "file",
abfs_committer: Optional[str] = "file",
executor_disk_size: Optional[int] = None,
mode: Optional[str] = "cluster-v2",
aws_availability_zone: Optional[str] = None,
verbose: bool = False,
):
...

Available methods

The Conveyor task runners support two main methods: run and start. The difference between these two methods is that run will wait for your application to complete, whereas start will only start your application on the cluster and return before completing (you can think of this as a "fire-and-forget" mode).

The run method is recommended in most scenarios, except where you explicitly don't want to wait for results.

from conveyor.runner import ApplicationRunResult

def run(self) -> ApplicationRunResult:
"""Run waits for the application to complete and returns the Result"""
...

def start(self) -> None:
"""Starts submits the application to the cluster but does not wait for a result"""
...

ApplicationRunResult

The ApplicationRunResult class is returned by the run() function of a runner class. As such, it can be imported from the runner module as well as:

from conveyor.runner import ApplicationRunResult

Available methods

def has_failed(self) -> bool:
...

def conveyor_url(self) -> str:
...

TaskSubmitter

A TaskSubmitter allows you to specify a group of tasks and launch them at the same time. It can be imported from runner as well as:

from conveyor.runner import TaskSubmitter

Initializing the TaskSubmitter

from conveyor.runner import TaskRunner

class TaskSubmitter(
*tasks: TaskRunner
):
...

Available methods

Run the submitted tasks:

from conveyor.runner import ApplicationRunResult

def run(self) -> Iterator[ApplicationRunResult]:
...

Alternative way to create a TaskSubmitter from a list of TaskRunner objects:

from typing import Iterable
from conveyor.runner import TaskRunner, TaskSubmitter

def from_list(tasks: Iterable[TaskRunner]) -> TaskSubmitter:
...

ProjectBuilder

The ProjectBuilder is the Python version of the conveyor project build CLI command. It can be imported as:

from conveyor.project import ProjectBuilder

Initializing the ProjectBuilder

class ProjectBuilder(
project_path: str,
build_args: Optional[Sequence[str]] = None,
):
...

The project_path can be both a relative as an absolute path to your Conveyor project.

The build_args is an optional list of key-value pairs (key=value) that is supplied to the container builder process (more info).

Available methods

def build(self) -> str:
...

The build function returns the build id of the created build.

Environment variables

Similar to the Airflow operators, the Python SDK allows you to set environment variables for your applications. This is a convenient approach to pass slowly-changing or environment-dependent configuration to your applications.

A simple example of a static environment variable looks as follows:

ContainerTaskRunner(
    env_vars={
        "STATIC_KEY": "hello world",
    },
)

Sometimes these configurations contain sensitive information and should not be part of the task runner configuration. To support these use cases, you can load environment variables dynamically from a secrets store. We support the following stores:

AWS secrets

To access the secrets, the AWS IAM role configured in the task runner needs to have the permissions to access the secret. The following snippet shows how to use an AWS secret in your task runner:

from conveyor.secrets import AWSParameterStoreValue, AWSSecretsManagerValue

ContainerTaskRunner(
    env_vars={
        "USERNAME": AWSParameterStoreValue(name="/example/username"),
        "PASSWORD": AWSSecretsManagerValue(name="example-password"),
    },
    iam_identity="role-with-access-to-secrets",
)

Both stores also support selecting properties from JSON stored secrets using jmesPath syntax. For example, when storing a secret as JSON:

{
"username": "ADMIN",
"password": "MYSECRETPASSWORD"
}

You can access the secret as follows:

from conveyor.secrets import AWSSecretsManagerValue

ContainerTaskRunner(
    env_vars={
        "USERNAME": AWSSecretsManagerValue(name="example", path="username"),
        "PASSWORD": AWSSecretsManagerValue(name="example", path="password"),
    },
    iam_identity="role-with-access-to-secrets",
)

AWS Secrets Manager IAM access

To be able to access a secret from AWS Secrets Manager, you need to add the following actions to your IAM role:

  • secretsmanager:GetSecretValue
  • secretsmanager:DescribeSecret

You should scope these actions to the resulting resource for example:

{
"Version": "2012-10-17",
"Statement": [{
"Effect": "Allow",
"Action": ["secretsmanager:GetSecretValue", "secretsmanager:DescribeSecret"],
"Resource": ["arn:*:secretsmanager:*:*:secret:MySecret-??????"]
}]
}

The reason for the 6 ?'s is that AWS always adds 6 random characters at the end of the ARN of a secret. For more info look at the AWS docs.

AWS Parameter Store IAM access

To be able to get secrets from the AWS SSM Parameter Store, you need to add the following actions to your IAM role:

  • ssm:GetParameters
  • ssm:GetParametersByPath

You should scope these actions to the correct parameter, for example:

{
"Version": "2012-10-17",
"Statement": [{
"Effect": "Allow",
"Action": ["ssm:GetParameters", "ssm:GetParametersByPath"],
"Resource": ["arn:*:ssm:*:*:parameter/my-parameter"]
}]
}

For more info, you can refer to the AWS docs.

Azure secrets

In order to access the secrets, you must give the Azure Client ID access to the Key Vault that contains your secrets. The following snippet shows how to use an Azure secret in your task runner:

from conveyor.secrets import AzureKeyVaultValue

ContainerTaskRunner(
    env_vars={
        "PASSWORD": AzureKeyVaultValue(name="mySecretKey",vault="myKeyVault",vault_type="secret")
    },
    iam_identity="azure-client-id-with-access-to-secrets",
)
important

The vault_type indicates which type of resource the value is in your keyvault. The following resource types exist: secret, key, certificate. The default value in the task runner is secret and can thus be omitted. For more details, have a look at the Azure documentation

Azure Key Vault RBAC permissions

To be able to access a secret from Azure Key Vault, you need to provide your Azure application client ID access to your Key Vault. The recommended practice is to use Azure role based access control with Key Vaults. You can then give your application access by assigning both of the following Azure roles:

  • Key Vault Reader: allows metadata operations on the Key Vault but not reading sensitive values
  • Key Vault Secrets User: allows to read sensitive values in the Key Vault

For more details take a look at the Azure documentation

An example of how to do this in terraform can look as follows:

resource "azuread_application" "azure_application" {
display_name = "azure-application"
}

resource "azuread_service_principal" "azure_application" {
client_id = azuread_application.azure_application.client_id
app_role_assignment_required = false
}

resource "azurerm_role_assignment" "keyvault_read_project" {
scope = var.azure_keyvault_id
role_definition_name = "Key Vault Reader"
principal_id = azuread_service_principal.azure_application.id
}

resource "azurerm_role_assignment" "keyvault_read_secret_project" {
scope = var.azure_keyvault_id
role_definition_name = "Key Vault Secrets User"
principal_id = azuread_service_principal.azure_application.id
}