Skip to main content

Using Conveyor Python SDK

The Python SDK was introduced for the following usecases:

  • running jobs that should not run on a schedule but are triggered by some other action (e.g. manual, file upload,...)
  • simplify testing of your jobs within a Jupyter notebook environment
  • support running multiple ad-hoc jobs in parallel

The Python SDK provides the same advantages as conveyor run but for workloads running without Airflow. For full details on the SDK, take a look at the documentation.

Prerequisites

info

The Python SDK supports Python 3.10 and above.

You can install the PyPI package in your Python environment as follows:

pip install sdk-conveyor

Additionally, you will need to have the Conveyor CLI installed on your machine. You can find the installation instructions here.

warning

If you have already installed the Conveyor CLI, make sure you have at least version 1.18.10 installed. You can verify this by running: conveyor --version.

Using the Python SDK

The Python SDK provides four main classes to interact with Conveyor:

  • ContainerTaskRunner: this class is used to run a single container task and wait for your task to finish.
  • SparkTaskRunner: this class is used to run a single Spark application task and wait for your task to finish.
  • TaskSubmitter: this class accepts a list of tasks (e.g. ContainerTaskRunner or SparkTaskRunner) and executes them in parallel. It returns the results of each task as an iterator when the tasks are finished. You will not get the logs of each task as output, but you can get a link to the Conveyor logs.
  • ProjectBuilder: this class is used to build a project. Use it to build a project and use that build_id to run your jobs.

Running a single container task

The following example shows how to run a single Container task using the Python SDK:

import logging

from conveyor.runner import ContainerTaskRunner

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

with ContainerTaskRunner(
task_name="a fun task",
project_name="<your-project>",
environment_name="<your-environment>",
args=[<arguments for your python job>],
command=["python3", "-m", "<python module>"]
env_vars={
"SOME_KEY": "SOME_VALUE",
}
).run() as runner:
logger.info(f"The run has {'failed' if runner.has_failed() else 'succeeded'}")
logger.info(f"Look at the task details using: {runner.conveyor_url()}")

In this case we do not pass the build_id to the ContainerTaskRunner, which means that we will look up the latest build for the given project and use that.

Running a single Spark task

The following example shows how to run a single Spark task using the Python SDK:

import logging

from conveyor.runner import SparkTaskRunner

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

with SparkTaskRunner(
task_name="a fun spark task",
project_name="<your-project>",
environment_name="<your-environment>",
conf={<custom spark configuration>},
application="local://<path to your spark application entrypoint>",
).run() as runner:
logger.info(f"The run has {'failed' if runner.has_failed() else 'succeeded'}")
logger.info(f"Look at the task details using: {runner.conveyor_url()}")

In this case we do not pass the build_id to the SparkTaskRunner, which means that we will look up the latest build for the given project and use that.

Running multiple tasks in parallel

In this case we create 3 ContainerTaskRunner instances and submit them to the TaskSubmitter. You could use the same approach for triggering the SparkTaskRunner. All 3 jobs will be run in parallel and when they finish we can inspect the status as well as request the application run details.

import logging

from conveyor.runner import ContainerTaskRunner, TaskSubmitter

logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)

for finished_job in TaskSubmitter.from_list([
ContainerTaskRunner(
task_name=f"instance number {i}",
project_name="sample-python",
environment_name="dev",
args=[<arguments for your python job>],
command=["python3", "-m", "<python module>"]
env_vars={
"SOME_KEY": "SOME_VALUE",
}
) for i in range(3)]).run():
logger.info(f"The run has with id: {finished_job.application_run_id} has {'failed' if finished_job.has_failed() else 'succeeded'}")
logger.info(finished_job.conveyor_url())

Building a project

In the last example, we will show how to build a project and use the build_id to run a job.

import logging

from conveyor.project import ProjectBuilder
from conveyor.runner import ContainerTaskRunner

logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)

build_id = ProjectBuilder(project_path="../../../../samples/python").build()
logger.info(f"Using build id: {build_id}")

with ContainerTaskRunner(
task_name="a fun task",
project_name="sample-python",
environment_name="dev",
build_id=build_id,
args=[<arguments for your python job>],
command=["python3", "-m", "<python module>"]
env_vars={
"SOME_KEY": "SOME_VALUE",
}
).run() as runner:
logger.info(f"The run has {'failed' if runner.has_failed() else 'succeeded'}")

This code will build the project located at ../../samples/python and use the build_id to run the job. The output of the build is piped to your console so you will see any potential issues.