Skip to main content

ConveyorExternalTaskSensor

The ConveyorExternalTaskSensor allows you to wait until a task from a certain dag in a certain environment has finished. It is based on the ExternalTaskSensor from Airflow, but has the added benefit you can check for a task in another environment.

The following is an example of a ConveyorExternalTaskSensor:

from conveyor.sensors import ConveyorExternalTaskSensor

ConveyorExternalTaskSensor(
dag=dag,
task_id="check-other-task",
external_dag_id='a_dag',
external_task_id='a_task',
poke_interval=300,
mode='reschedule',
timeout=100 * 60,
)

The parameters supported by the ConveyorExternalTaskSensor are:

ParameterTypeDefaultExplanation
external_dag_idstrThe dag you want to check the state of (templated).
external_task_idstrNoneThe task you want to check the state of, if it and the external_task_ids parameter are not specified we will check the state of the dag (templated)
external_task_idslist[str]NoneThe tasks you want to check the state of, if it and the external_task_id parameter are not specified we will check the state of the dag (templated)
environmentstrcurrent environmentThe environment the dag you want to check is deployed in.
allowed_stateslist[str][success]The state you want to check for.
execution_deltadatetime.timedeltaTime difference with the previous execution to look at, the default is the same execution_date as the current task or DAG. For yesterday, use positive datetime.timedelta(days=1). Either execution_delta or execution_date_fn can be passed to ConveyorExternalTaskSensor, but not both.
execution_date_fnCallableFunction that receives the current execution date as the first positional argument and the context dictionary as second positional argument. Has to return the desired execution date to query. Either execution_delta or execution_date_fn can be passed to ConveyorExternalTaskSensor, but not both.
instance_life_cyclestringspotThe lifecycle of the instance used to run this job. Options are on-demand or spot.
modestringpokeThe mode to use by the sensor. Options are poke and reschedule.

Using execution_delta

The execution_delta is the time difference with the previous execution to look at, the default is the same execution_date as the current task or DAG. For yesterday, use positive datetime.timedelta(days=1), for the previous hour for example you should use datetime.timedelta(hours=1).

You should use this when for example your dag is scheduled at 02:00 every day, and the dag you want to check for is scheduled at 01:00 every day.

For example:

import datetime
from conveyor.sensors import ConveyorExternalTaskSensor

def execution_date_to_check(current_execution_date: datetime.datetime, context) -> datetime.datetime:
return current_execution_date - datetime.timedelate(hours=1)

ConveyorExternalTaskSensor(
dag=dag,
task_id="check-other-task",
external_dag_id='a_dag',
external_task_id='a_task',
poke_interval=300,
mode='reschedule',
timeout=100 * 60,
execution_delta=datetime.timedelta(hours=1),
)

Using execution_date_fn

The execution_date_fn parameters expects a function that receives the current execution date as the first positional argument and the context dictionary as second positional argument. It has to return the desired execution date to query.

You can use this for example if your dag is scheduled every 12 hours, but the dag you depend on is scheduled every day at 02:00. This allows you to make a function that for every scheduled run during a day you can check the same execution date of the depending dag.

Your scheduled timeScheduled time to check
00:0002:00
12:0002:00

The resulting code for this schedule is the following:

from conveyor.sensors import ConveyorExternalTaskSensor
import datetime

def execution_date_to_check(current_execution_date: datetime.datetime, context) -> datetime.datetime:
return current_execution_date.replace(hour=2)

ConveyorExternalTaskSensor(
dag=dag,
task_id="check-other-task",
external_dag_id='a_dag',
external_task_id='a_task',
poke_interval=300,
mode='reschedule',
timeout=100 * 60,
execution_date_fn=execution_date_to_check,
)