Using Conveyor Python SDK
The Python SDK was introduced for the following usecases:
- running jobs that should not run on a schedule but are triggered by some other action (e.g. manual, file upload,...)
- simplify testing of your jobs within a Jupyter notebook environment
- support running multiple ad-hoc jobs in parallel
The Python SDK provides the same advantages as conveyor run
but for workloads running without Airflow.
For full details on the SDK, take a look at the documentation.
Prerequisites
The Python SDK supports Python 3.10 and above.
You can install the PyPI package in your Python environment as follows:
pip install sdk-conveyor
Additionally, you will need to have the Conveyor CLI installed on your machine. You can find the installation instructions here.
If you have already installed the Conveyor CLI, make sure you have at least version 1.18.10 installed.
You can verify this by running: conveyor --version
.
Using the Python SDK
The Python SDK provides four main classes to interact with Conveyor:
ContainerTaskRunner
: this class is used to run a single container task and wait for your task to finish.SparkTaskRunner
: this class is used to run a single Spark application task and wait for your task to finish.TaskSubmitter
: this class accepts a list of tasks (e.g. ContainerTaskRunner or SparkTaskRunner) and executes them in parallel. It returns the results of each task as an iterator when the tasks are finished. You will not get the logs of each task as output, but you can get a link to the Conveyor logs.ProjectBuilder
: this class is used to build a project. Use it to build a project and use that build_id to run your jobs.
Running a single container task
The following example shows how to run a single Container task using the Python SDK:
import logging
from conveyor.runner import ContainerTaskRunner
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
with ContainerTaskRunner(
task_name="a fun task",
project_name="<your-project>",
environment_name="<your-environment>",
args=[<arguments for your python job>],
command=["python3", "-m", "<python module>"]
env_vars={
"SOME_KEY": "SOME_VALUE",
}
).run() as runner:
logger.info(f"The run has {'failed' if runner.has_failed() else 'succeeded'}")
logger.info(f"Look at the task details using: {runner.conveyor_url()}")
In this case we do not pass the build_id
to the ContainerTaskRunner
,
which means that we will look up the latest build for the given project and use that.
Running a single Spark task
The following example shows how to run a single Spark task using the Python SDK:
import logging
from conveyor.runner import SparkTaskRunner
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
with SparkTaskRunner(
task_name="a fun spark task",
project_name="<your-project>",
environment_name="<your-environment>",
conf={<custom spark configuration>},
application="local://<path to your spark application entrypoint>",
).run() as runner:
logger.info(f"The run has {'failed' if runner.has_failed() else 'succeeded'}")
logger.info(f"Look at the task details using: {runner.conveyor_url()}")
In this case we do not pass the build_id
to the SparkTaskRunner
,
which means that we will look up the latest build for the given project and use that.
Running multiple tasks in parallel
In this case we create 3 ContainerTaskRunner
instances and submit them to the TaskSubmitter
.
You could use the same approach for triggering the SparkTaskRunner
.
All 3 jobs will be run in parallel and when they finish we can inspect the status as well as request the application run details.
import logging
from conveyor.runner import ContainerTaskRunner, TaskSubmitter
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
for finished_job in TaskSubmitter.from_list([
ContainerTaskRunner(
task_name=f"instance number {i}",
project_name="sample-python",
environment_name="dev",
args=[<arguments for your python job>],
command=["python3", "-m", "<python module>"]
env_vars={
"SOME_KEY": "SOME_VALUE",
}
) for i in range(3)]).run():
logger.info(f"The run has with id: {finished_job.application_run_id} has {'failed' if finished_job.has_failed() else 'succeeded'}")
logger.info(finished_job.conveyor_url())
Building a project
In the last example, we will show how to build a project and use the build_id
to run a job.
import logging
from conveyor.project import ProjectBuilder
from conveyor.runner import ContainerTaskRunner
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
build_id = ProjectBuilder(project_path="../../../../samples/python").build()
logger.info(f"Using build id: {build_id}")
with ContainerTaskRunner(
task_name="a fun task",
project_name="sample-python",
environment_name="dev",
build_id=build_id,
args=[<arguments for your python job>],
command=["python3", "-m", "<python module>"]
env_vars={
"SOME_KEY": "SOME_VALUE",
}
).run() as runner:
logger.info(f"The run has {'failed' if runner.has_failed() else 'succeeded'}")
This code will build the project located at ../../samples/python
and use the build_id
to run the job.
The output of the build is piped to your console so you will see any potential issues.