Using Conveyor Python SDK
The Python SDK was introduced for the following reasons:
- running jobs that should not run on a schedule but are triggered by some other action (e.g. manual, file upload,...)
- simplify testing of your jobs within a Jupyter notebook environment
- support running multiple ad-hoc jobs in parallel
The Python SDK provides the same advantages as conveyor run
but for workloads running outside of Airflow.
For full details on the SDK, take a look at the documentation.
Prerequisites
The Python SDK supports Python 3.10 and above.
You can install the PyPI package in your Python environment as follows:
pip install sdk-conveyor
Additionally, you will need to have the Conveyor CLI installed on your machine. You can find the installation instructions here.
If you have already installed the Conveyor CLI, make sure you have at least version 1.18.10 installed.
You can verify this by running: conveyor --version
.
Using the Python SDK
The Python SDK provides four main classes to interact with Conveyor:
ContainerTaskRunner
: this class is used to run a single container task and wait for your task to finish.SparkTaskRunner
: this class is used to run a single Spark application task and wait for your task to finish.TaskSubmitter
: this class accepts a list of tasks (e.g. ContainerTaskRunner or SparkTaskRunner) and executes them in parallel. It returns the results of each task as an iterator when the tasks are finished. You will not get the logs of each task as output, but you can get a link to the Conveyor logs.ProjectBuilder
: this class is used to build a project. Use it to build a project and use that build_id to run your jobs.
Running a single task
The following example shows how to run a single task using the Python SDK:
import logging
from conveyor.runner import ContainerTaskRunner
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
with ContainerTaskRunner(
task_name="a fun task",
project_name="<your-project>",
environment_name="<your-environment>",
).run() as runner:
logger.info(f"The run has {'failed' if runner.has_failed() else 'succeeded'}")
logger.info(f"Look at the task details using: {runner.conveyor_url()}")
In this case we do not pass the build_id
to the ContainerTaskRunner
,
which means that we will look up the latest build for the given project and use that.
Running multiple tasks in parallel
In this case we create 3 ContainerTaskRunner
instances and submit them to the TaskSubmitter
.
All 3 jobs will be run in parallel and when they finish we can inspect the status as well as request the application run details.
import logging
from conveyor.runner import (ContainerTaskRunner, TaskSubmitter)
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
for finished_job in TaskSubmitter.from_list([
ContainerTaskRunner(
task_name=f"instance number {i}",
project_name="sample-python",
environment_name="dev",
) for i in range(3)]).run():
logger.info(f"The run has with id: {finished_job.application_run_id} has {'failed' if finished_job.has_failed() else 'succeeded'}")
logger.info(finished_job.conveyor_url())
Building a project
In the last example, we will show how to build a project and use the build_id
to run a job.
import logging
from conveyor.project import ProjectBuilder
from conveyor.runner import (ContainerTaskRunner)
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
build_id = ProjectBuilder(project_path="../../../../samples/python").build()
logger.info(f"Using build id: {build_id}")
with ContainerTaskRunner(
task_name="a fun task",
project_name="sample-python",
environment_name="dev",
build_id=build_id,
).run() as runner:
logger.info(f"The run has {'failed' if runner.has_failed() else 'succeeded'}")
This code will build the project located at ../../samples/python
and use the build_id
to run the job.
The output of the build is piped to your console so you will see any potential issues.