ConveyorExternalTaskSensor
The ConveyorExternalTaskSensor allows you to wait until a task from a certain dag in a certain environment has finished. It is based on the ExternalTaskSensor from Airflow, but has the added benefit you can check for a task in another environment.
The following is an example of a ConveyorExternalTaskSensor
:
from conveyor.sensors import ConveyorExternalTaskSensor
ConveyorExternalTaskSensor(
dag=dag,
task_id="check-other-task",
external_dag_id='a_dag',
external_task_id='a_task',
poke_interval=300,
mode='reschedule',
timeout=100 * 60,
)
The parameters supported by the ConveyorExternalTaskSensor are:
Parameter | Type | Default | Explanation |
---|---|---|---|
external_dag_id | str | The dag you want to check the state of (templated). | |
external_task_id | str | None | The task you want to check the state of, if it and the external_task_ids parameter are not specified we will check the state of the dag (templated) |
external_task_ids | list[str] | None | The tasks you want to check the state of, if it and the external_task_id parameter are not specified we will check the state of the dag (templated) |
environment | str | current environment | The environment the dag you want to check is deployed in. |
allowed_states | list[str] | [success] | The state you want to check for. |
execution_delta | datetime.timedelta | Time difference with the previous execution to look at, the default is the same execution_date as the current task or DAG. For yesterday, use positive datetime.timedelta(days=1). Either execution_delta or execution_date_fn can be passed to ConveyorExternalTaskSensor, but not both. | |
execution_date_fn | Callable | Function that receives the current execution date as the first positional argument and the context dictionary as second positional argument. Has to return the desired execution date to query. Either execution_delta or execution_date_fn can be passed to ConveyorExternalTaskSensor, but not both. | |
instance_life_cycle | string | spot | The lifecycle of the instance used to run this job. Options are on-demand or spot . |
mode | string | poke | The mode to use by the sensor. Options are poke and reschedule . |
Using execution_delta
The execution_delta is the time difference with the previous execution to look at, the default is the same
execution_date as the current task or DAG.
For yesterday, use positive datetime.timedelta(days=1)
, for the previous hour for example you should
use datetime.timedelta(hours=1)
.
You should use this when for example your dag is scheduled at 02:00
every day,
and the dag you want to check for is scheduled at 01:00
every day.
For example:
import datetime
from conveyor.sensors import ConveyorExternalTaskSensor
def execution_date_to_check(current_execution_date: datetime.datetime, context) -> datetime.datetime:
return current_execution_date - datetime.timedelate(hours=1)
ConveyorExternalTaskSensor(
dag=dag,
task_id="check-other-task",
external_dag_id='a_dag',
external_task_id='a_task',
poke_interval=300,
mode='reschedule',
timeout=100 * 60,
execution_delta=datetime.timedelta(hours=1),
)
Using execution_date_fn
The execution_date_fn
parameters expects a function that receives the current execution date
as the first positional argument and the context dictionary as second positional argument.
It has to return the desired execution date to query.
You can use this for example if your dag is scheduled every 12 hours, but the dag you depend on is scheduled every day at 02:00. This allows you to make a function that for every scheduled run during a day you can check the same execution date of the depending dag.
Your scheduled time | Scheduled time to check |
---|---|
00:00 | 02:00 |
12:00 | 02:00 |
The resulting code for this schedule is the following:
from conveyor.sensors import ConveyorExternalTaskSensor
import datetime
def execution_date_to_check(current_execution_date: datetime.datetime, context) -> datetime.datetime:
return current_execution_date.replace(hour=2)
ConveyorExternalTaskSensor(
dag=dag,
task_id="check-other-task",
external_dag_id='a_dag',
external_task_id='a_task',
poke_interval=300,
mode='reschedule',
timeout=100 * 60,
execution_date_fn=execution_date_to_check,
)