Skip to main content

ConveyorContainerSensor

The ConveyorContainerSensor allows you to use any code you have written as a sensor in Airflow. The ConveyorContainerSensor extends the Airflow BaseSensorOperator and has the same semantics, more details can be found here. The sensor task succeeds if your container has exit code 0, and fails for any other exit code. A new sensor task will be scheduled as long as the sensor did not succeed and the timeout was not reached. The underlying code is based on the ConveyorContainerOperatorV2 and can be used in a similar way.

The following snippet illustrates how to use a ConveyorContainerSensor:

from conveyor.sensors import ConveyorContainerSensor

role = "JOBROLE-{{ macros.conveyor.env() }}"

ConveyorContainerSensor(
task_id="sensor",
cmds=["python3", "-m", "python_greeter.sensor"],
aws_role=role,
poke_interval=300,
mode='reschedule',
timeout=100 * 60,
)

The default parameters of a sensor are supported augmented with the following parameters:

ParameterTypeDefaultExplanation
imagestrProject imageDocker image you wish to launch, the default is the image and version of your project. If you want to use the image of another project you can use macros.conveyor.image('PROJECT_NAME') here (templated).
cmdslist[str][]entrypoint of the container (templated). The docker images’s entrypoint is used if this is not provided.
argumentslist[str][]arguments of the entrypoint (templated). The docker image’s CMD is used if this is not provided.
instance_typestrmx.microThe Conveyor instance type to use for this containers. This specifies the CPU/Memory this container can use.
instance_life_cyclestringspotThe lifecycle of the instance used to run this job. Options are on-demand or spot.
env_varsdictEnvironment variables initialized in the container (templated).
aws_rolestringThe AWS role used by the container.
azure_application_client_idstringThe Azure service principal used by the container.
modestringpokeThe mode determines how the sensor operates. Options are poke and reschedule.
xcom_pushboolFalseIndicates whether you want to register an XCom value in this task.

Setting CPU/Memory

We provide the Conveyor instance types for you to set CPU/Memory for your job. That way choosing CPU/Memory is as easy as:

from conveyor.sensors import ConveyorContainerSensor

ConveyorContainerSensor(
...,
instance_type='mx.micro',
)

Conveyor supports the following instances types for any jobs:

Instance typeCPUTotal Memory (AWS)Total Memory (Azure)
mx.nano1*0.438 Gb0.375 Gb
mx.micro1*0.875 Gb0.75 Gb
mx.small1*1.75 Gb1.5 Gb
mx.medium13.5 Gb3 Gb
mx.large27 Gb6 Gb
mx.xlarge414 Gb12 Gb
mx.2xlarge829 Gb26 Gb
mx.4xlarge1659 Gb55 Gb
cx.nano1*0.219 GbNot supported
cx.micro1*0.438 GbNot supported
cx.small1*0.875 GbNot supported
cx.medium11.75 GbNot supported
cx.large23.5 GbNot supported
cx.xlarge47 GbNot supported
cx.2xlarge814 GbNot supported
cx.4xlarge1629 GbNot supported
rx.xlarge428 GbNot supported
rx.2xlarge859 GbNot supported
rx.4xlarge16120 GbNot supported
info

(*) These instance types don't get a guaranteed full CPU but only a slice of a full CPU, but they are allowed to burst up to a full CPU if the cluster allows.

The numbers for AWS and Azure differ because nodes on both clouds run different DaemonSets and have different reservation requirements set by the provider. We aim to minimize the node overhead as much as possible while still obeying the minimum requirements of each cloud provider.

Sensor mode

When setting the sensor mode to poke the sensor is taking up a worker slot for its whole execution time and sleeps between pokes. When using the reschedule mode, the sensor task frees up a worker slot when the criteria are not met and will be rescheduled at a later time.

When to use which mode

  • Use mode poke if the expected runtime of the sensor is short or if a short poke interval (less than a minute) is required.
  • Use mode reschedule when the time before the criteria is met is expected to be quite long and the poke interval is more than one minute

Handling spot interrupts

There are 2 ways in which you can handle spot interruptions in Conveyor when using sensors:

  • Use instance_life_cycle="on-demand" such that a spot interrupt cannot occur. The downside here is that you pay more for on-demand instances.
  • Use mode="reschedule" and Conveyor will make sure that the sensor will be rescheduled instead of fail when a spot interruption occurs.