Skip to main content

Using Conveyor Python SDK

The Python SDK was introduced for the following reasons:

  • running jobs that should not run on a schedule but are triggered by some other action (e.g. manual, file upload,...)
  • simplify testing of your jobs within a Jupyter notebook environment
  • support running multiple ad-hoc jobs in parallel

The Python SDK provides the same advantages as conveyor run but for workloads running outside of Airflow. For full details on the SDK, take a look at the documentation.

Prerequisites

info

The Python SDK supports Python 3.10 and above.

You can install the PyPI package in your Python environment as follows:

pip install sdk-conveyor

Additionally, you will need to have the Conveyor CLI installed on your machine. You can find the installation instructions here.

warning

If you have already installed the Conveyor CLI, make sure you have at least version 1.18.10 installed. You can verify this by running: conveyor --version.

Using the Python SDK

The Python SDK provides four main classes to interact with Conveyor:

  • ContainerTaskRunner: this class is used to run a single container task and wait for your task to finish.
  • SparkTaskRunner: this class is used to run a single Spark application task and wait for your task to finish.
  • TaskSubmitter: this class accepts a list of tasks (e.g. ContainerTaskRunner or SparkTaskRunner) and executes them in parallel. It returns the results of each task as an iterator when the tasks are finished. You will not get the logs of each task as output, but you can get a link to the Conveyor logs.
  • ProjectBuilder: this class is used to build a project. Use it to build a project and use that build_id to run your jobs.

Running a single task

The following example shows how to run a single task using the Python SDK:

import logging

from conveyor.runner import ContainerTaskRunner

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

with ContainerTaskRunner(
task_name="a fun task",
project_name="<your-project>",
environment_name="<your-environment>",
).run() as runner:
logger.info(f"The run has {'failed' if runner.has_failed() else 'succeeded'}")
logger.info(f"Look at the task details using: {runner.conveyor_url()}")

In this case we do not pass the build_id to the ContainerTaskRunner, which means that we will look up the latest build for the given project and use that.

Running multiple tasks in parallel

In this case we create 3 ContainerTaskRunner instances and submit them to the TaskSubmitter. All 3 jobs will be run in parallel and when they finish we can inspect the status as well as request the application run details.

import logging

from conveyor.runner import (ContainerTaskRunner, TaskSubmitter)

logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)

for finished_job in TaskSubmitter.from_list([
ContainerTaskRunner(
task_name=f"instance number {i}",
project_name="sample-python",
environment_name="dev",
) for i in range(3)]).run():
logger.info(f"The run has with id: {finished_job.application_run_id} has {'failed' if finished_job.has_failed() else 'succeeded'}")
logger.info(finished_job.conveyor_url())

Building a project

In the last example, we will show how to build a project and use the build_id to run a job.

import logging

from conveyor.project import ProjectBuilder
from conveyor.runner import (ContainerTaskRunner)

logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)

build_id = ProjectBuilder(project_path="../../../../samples/python").build()
logger.info(f"Using build id: {build_id}")

with ContainerTaskRunner(
task_name="a fun task",
project_name="sample-python",
environment_name="dev",
build_id=build_id,
).run() as runner:
logger.info(f"The run has {'failed' if runner.has_failed() else 'succeeded'}")

This code will build the project located at ../../samples/python and use the build_id to run the job. The output of the build is piped to your console so you will see any potential issues.