ConveyorContainerSensor
The ConveyorContainerSensor allows you to use any code you have written as a sensor in Airflow.
The ConveyorContainerSensor
extends the Airflow BaseSensorOperator
and has the same semantics,
more details can be found here.
The sensor task succeeds if your container has exit code 0, and fails for any other exit code.
A new sensor task will be scheduled as long as the sensor did not succeed and the timeout was not reached.
The underlying code is based on the ConveyorContainerOperatorV2 and can be used in a similar way.
The following snippet illustrates how to use a ConveyorContainerSensor
:
from conveyor.sensors import ConveyorContainerSensor
role = "JOBROLE-{{ macros.conveyor.env() }}"
ConveyorContainerSensor(
task_id="sensor",
cmds=["python3", "-m", "python_greeter.sensor"],
aws_role=role,
poke_interval=300,
mode='reschedule',
timeout=100 * 60,
)
The default parameters of a sensor are supported augmented with the following parameters:
Parameter | Type | Default | Explanation |
---|---|---|---|
image | str | Project image | Docker image you wish to launch, the default is the image and version of your project. If you want to use the image of another project you can use macros.conveyor.image('PROJECT_NAME') here (templated). |
cmds | list[str] | [] | entrypoint of the container (templated). The docker images’s entrypoint is used if this is not provided. |
arguments | list[str] | [] | arguments of the entrypoint (templated). The docker image’s CMD is used if this is not provided. |
instance_type | str | mx.micro | The Conveyor instance type to use for this containers. This specifies the CPU/Memory this container can use. |
instance_life_cycle | string | spot | The lifecycle of the instance used to run this job. Options are on-demand or spot . |
env_vars | dict | Environment variables initialized in the container (templated). | |
aws_role | string | The AWS role used by the container. | |
azure_application_client_id | string | The Azure service principal used by the container. | |
mode | string | poke | The mode determines how the sensor operates. Options are poke and reschedule . |
xcom_push | bool | False | Indicates whether you want to register an XCom value in this task. |
Setting CPU/Memory
We provide the Conveyor instance types for you to set CPU/Memory for your job. That way choosing CPU/Memory is as easy as:
from conveyor.sensors import ConveyorContainerSensor
ConveyorContainerSensor(
...,
instance_type='mx.micro',
)
Conveyor supports the following instances types for all jobs:
Instance type | CPU | Total Memory (AWS) | Total Memory (Azure) |
---|---|---|---|
mx.nano | 1* | 0.438 Gb | 0.434 Gb |
mx.micro | 1* | 0.875 Gb | 0.868 Gb |
mx.small | 1* | 1.75 Gb | 1.736 Gb |
mx.medium | 1 | 3.5 Gb | 3.47 Gb |
mx.large | 2 | 7 Gb | 6.94 Gb |
mx.xlarge | 4 | 14 Gb | 13.89 Gb |
mx.2xlarge | 8 | 29 Gb | 30.65 Gb |
mx.4xlarge | 16 | 59 Gb | 64.16 Gb |
cx.nano | 1* | 0.219 Gb | Not supported |
cx.micro | 1* | 0.438 Gb | Not supported |
cx.small | 1* | 0.875 Gb | Not supported |
cx.medium | 1 | 1.75 Gb | Not supported |
cx.large | 2 | 3.5 Gb | Not supported |
cx.xlarge | 4 | 7 Gb | Not supported |
cx.2xlarge | 8 | 14 Gb | Not supported |
cx.4xlarge | 16 | 29 Gb | Not supported |
rx.xlarge | 4 | 28 Gb | Not supported |
rx.2xlarge | 8 | 59 Gb | Not supported |
rx.4xlarge | 16 | 120 Gb | Not supported |
(*) These instance types don't get a guaranteed full CPU but only a slice of a full CPU, but they are allowed to burst up to a full CPU if the cluster allows.
The numbers for AWS and Azure differ because nodes on both clouds run different DaemonSets and have different reservation requirements set by the provider. We aim to minimize the node overhead as much as possible while still obeying the minimum requirements of each cloud provider.
Sensor mode
When setting the sensor mode to poke
the sensor is taking up a worker slot for its whole execution time and sleeps between pokes.
When using the reschedule
mode, the sensor task frees up a worker slot when the criteria are not met and will be rescheduled at a later time.
When to use which mode
- Use mode
poke
if the expected runtime of the sensor is short or if a short poke interval (less than a minute) is required. - Use mode
reschedule
when the time before the criteria is met is expected to be quite long and the poke interval is more than one minute
Handling spot interrupts
There are 2 ways in which you can handle spot interruptions in Conveyor when using sensors:
- Use
instance_life_cycle="on-demand"
such that a spot interrupt cannot occur. The downside here is that you pay more for on-demand instances. - Use
mode="reschedule"
and Conveyor will make sure that the sensor will be rescheduled instead of fail when a spot interruption occurs.