Skip to main content

Setting up alerting

Conveyor-integrated Airflow alerts

Conveyor has an integrated alert system which can send out emails when certain DAGs fail for a certain environment.

To add alerting, you need to specify the alert configurations on a project, which consists of the following three parameters:

  • The environment for which alerts will be sent out. This makes it possible to change the alert configuration for a DAG failing in the development environment vs the production environment.
  • The emails to which the alert will be sent.
  • The IDs of the DAGs for which an alert will be sent out when they fail. We support like semantics by using the .* suffix

How to configure it

You can specify the alerting rules in either the UI, CLI or Terraform.

UI

Navigate to the project you would like to add alerts to and click on the Alerts tab. Here you can manage all your current alert configurations as shown in the following example:

To add a new alert configuration you press the button in the top right corner of the tab, which looks as follows:

You can change and delete alert configurations by using the buttons on the right side of the table.

CLI

The same functionality is also supported through the CLI using the following command: conveyor project add-alert-config for more details look here. Additionally, we also support listing all alert configuration and deleting an alert configuration

Terraform

The final way in which you can configure Conveyor managed alerts is through Terraform. We created the conveyor_project_alert_config resource for this, full details can be found here

Custom Airflow alerting

If you want to configure the alerts yourself in Airflow, you have two options:

  • use the Slack provider in Airflow
  • use the ConveyorContainerOperatorV2 operator in Airflow

In both cases you implement a failure_callback function and specify it in the on_failure_callback of your Airflow DAG:

from airflow import DAG

def failure_callback():
print("We failed")


sample_python_oom_dag = DAG(
"failing-dag",
default_args={
"on_failure_callback": failure_callback,
},
schedule_interval="@hourly",
max_active_runs=1,
)

By setting this on_failure_callback at the DAG level, you are guaranteed that the on_failure_callback will be added to every task. In the on_failure_callback you can use any Airflow operator.

Since alerting is something common to every project, it is often a good idea to create an alerting package to make it available in any environment that has to send out alerts (e.g. your production environment). If you want to only send out alerts for a specific environment, you can do the following:

from airflow import DAG
from airflow.models import Variable
from conveyor import packages

alerting = packages.load("alerting")

def f_callback(context):
# We only send alerts in the production environment
if Variable.get("environment") == "production":
alerting.failure_callback(context)


sample_python_oom_dag = DAG(
"failing-dag",
default_args={"on_failure_callback": f_callback},
schedule_interval="@hourly",
max_active_runs=1,
)

By using an alerting package, you have the freedom to do whatever you need inside your alerting code and add the necessary custom logic. Alternatively, you can keep the alerting logic in Airflow to the bare minimum, and send the alert message to an external service that contains the logic to route alerts to the correct teams.

Slack failure callback

One of the few providers that we package with Airflow is the Slack provider, which makes it possible to automatically send Slack notifications when DAGs fail. Slack alerting can be configured as such:

from airflow.providers.slack.hooks.slack_webhook import SlackWebhookHook
from conveyor.alerting import conveyor_executions_url

def slack_failed_dag_notification(context):
slack_msg = """
:red_circle: DAG `{dag}` in environment `{env}` failed.
*DAG*: {dag}
*Environment*: {env}
*Execution Time*: {exec_date}
*Reason*: {reason}
*Url*: https://app.dev.conveyordata.com/environments/{env}/airflow/tree?dag_id={dag}
*ConveyorURL*: {url}
""".format(
dag=context.get('dag_run').dag_id,
env=environment,
exec_date=context.get('execution_date'),
reason=context.get('reason'),
url=conveyor_executions_url(context),
)
hook = SlackWebhookHook(
http_conn_id="slack_webhook",
message=slack_msg,
)
hook.execute()

ConveyorContainerOperatorV2 failure callback

If you want to send out alerts in another way than Slack (e.g. emails), you can use the ConveyorContainerOperatorV2 in an on_failure_callback in your DAG.

An example on how to use it in a failure callback is as follows:

from conveyor.operators import ConveyorContainerOperatorV2

def failure_callback(context):
operator = ConveyorContainerOperatorV2(
task_id="failure-callback",
image="{{ macros.conveyor.image('alerting') }}", # We have to specify this to use the image of the alerting project
validate_docker_image_exists=False,
)
operator.render_template_fields(context, jinja_env=context['dag'].get_template_env())
operator.execute(context=context)

You see that we need to explicitly render the template fields since Airflow doesn't do so automatically.

Adding the Conveyor alerting URL

You can add a link to the Conveyor task executions page to your alert. You can use the function conveyor_executions_url in the package conveyor.alerting. You need to pass it the context of the failure callback.

caution

The conveyor_executions_url only works properly on the on_failure_callback for tasks. If you pass it in the default_args on the DAG or directly on the tasks it works perfectly. If you pass it to the DAG object, the generated URL will be wrong.

For example:

from conveyor.operators import ConveyorContainerOperatorV2
from conveyor.alerting import conveyor_executions_url

def failure_callback(context):
operator = ConveyorContainerOperatorV2(
task_id="failure-callback",
image="{{ macros.conveyor.image('alerting') }}", # We have to specify this to use the image of the alerting project
validate_docker_image_exists=False,
arguments=["--conveyor-executions-url", conveyor_executions_url(context)]
)
operator.render_template_fields(context, jinja_env=context['dag'].get_template_env())
operator.execute(context=context)