Using Conveyor Python SDK
The Python SDK was introduced for the following use cases:
- running jobs that should not run on a schedule but are triggered by some other action (e.g. manual, file upload,...)
- simplify testing of your jobs within a Jupyter notebook environment
- support running multiple ad-hoc jobs in parallel
The Python SDK provides the same advantages as conveyor run
but for workloads running without Airflow.
For full details on the SDK, take a look at the documentation.
Prerequisites
The Python SDK supports Python 3.10 and above.
You can install the PyPI package in your Python environment as follows:
pip install sdk-conveyor
Additionally, you will need to have the Conveyor CLI installed on your machine. You can find the installation instructions here.
If you have already installed the Conveyor CLI, make sure you have at least version 1.18.10 installed.
You can verify this by running: conveyor --version
.
Using the Python SDK
The Python SDK provides four main classes to interact with Conveyor:
ContainerTaskRunner
: this class is used to run a single container task and wait for your task to finish.SparkTaskRunner
: this class is used to run a single Spark application task and wait for your task to finish.TaskSubmitter
: this class accepts a list of tasks (e.g. ContainerTaskRunner or SparkTaskRunner) and executes them in parallel. It returns the results of each task as an iterator when the tasks are finished. You will not get the logs of each task as output, but you can get a link to the Conveyor logs.ProjectBuilder
: this class is used to build a project. Use it to build a project and use that build_id to run your jobs.
Running a single container task
The following example shows how to run a single Container task using the Python SDK:
import logging
from conveyor.runner import ContainerTaskRunner
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
with ContainerTaskRunner(
task_name="a fun task",
project_name="<your-project>",
environment_name="<your-environment>",
command=["python3", "-m", "<python module>"],
arguments=["arguments", "for", "your", "python", "job"],
env_vars={
"SOME_KEY": "SOME_VALUE",
},
).run() as runner:
logger.info(f"The run has {'failed' if runner.has_failed() else 'succeeded'}")
logger.info(f"Look at the task details using: {runner.conveyor_url()}")
In this case we do not pass the build_id
to the ContainerTaskRunner
,
which means that we will look up the latest build for the given project and use that.
Running a single Spark task
The following example shows how to run a single Spark task using the Python SDK:
import logging
from conveyor.runner import SparkTaskRunner
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
with SparkTaskRunner(
task_name="a fun spark task",
project_name="<your-project>",
environment_name="<your-environment>",
conf={"some.spark.configuration.option": "your_value"},
application="local://<path to your spark application entrypoint>",
).run() as runner:
logger.info(f"The run has {'failed' if runner.has_failed() else 'succeeded'}")
logger.info(f"Look at the task details using: {runner.conveyor_url()}")
In this case we do not pass the build_id
to the SparkTaskRunner
,
which means that we will look up the latest build for the given project and use that.
Running multiple tasks in parallel
In this case we create 3 ContainerTaskRunner
instances and submit them to the TaskSubmitter
.
You could use the same approach for triggering the SparkTaskRunner
.
All 3 jobs will be run in parallel and when they finish we can inspect the status as well as request the application run details.
import logging
from conveyor.runner import ContainerTaskRunner, TaskSubmitter
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
for finished_job in TaskSubmitter.from_list([
ContainerTaskRunner(
task_name=f"instance number {i}",
project_name="sample-python",
environment_name="dev",
command=["python3", "-m", "<python module>"],
arguments=["arguments", "for", "your", "python", "job"],
env_vars={
"SOME_KEY": "SOME_VALUE",
},
) for i in range(3)]).run():
logger.info(f"The run has with id: {finished_job.application_run_id} has {'failed' if finished_job.has_failed() else 'succeeded'}")
logger.info(finished_job.conveyor_url())
Building a project
In the last example, we will show how to build a project and use the build_id
to run a job.
import logging
from conveyor.project import ProjectBuilder
from conveyor.runner import ContainerTaskRunner
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
build_id = ProjectBuilder(project_path="../../../../samples/python").build()
logger.info(f"Using build id: {build_id}")
with ContainerTaskRunner(
task_name="a fun task",
project_name="sample-python",
environment_name="dev",
build_id=build_id,
command=["python3", "-m", "<python module>"],
arguments=["arguments", "for", "your", "python", "job"],
env_vars={
"SOME_KEY": "SOME_VALUE",
},
).run() as runner:
logger.info(f"The run has {'failed' if runner.has_failed() else 'succeeded'}")
This code will build the project located at ../../samples/python
and use the build_id
to run the job.
The output of the build is piped to your console so you will see any potential issues.
Using the SDK in tests
You might want to use the SDK for certain integration testing scenario's. For these types of applications, it is important to know that the SDK expects to be able to communicate via stdin/stdout for the authentication flow.
Frameworks such as Pytest will capture all output sent to stdout by default,
which will prevent the SDK from working correctly.
In the case of Pytest, you can run your tests using the -s
or --capture=no
option to instruct Pytest not to capture any output.
This change should allow the SDK to function normally.
Different testing frameworks might use different configuration options, please refer to the documentation of your chosen framework to check how you can configure this behavior.