Setting up alerting
Conveyor-integrated Airflow alerts
Conveyor has an integrated alert system which can send out emails when certain DAGs fail for a certain environment.
To add alerting, you need to specify the alert configurations on a project, which consists of the following three parameters:
- The environment for which alerts will be sent out. This makes it possible to change the alert configuration for a DAG failing in the development environment vs. the production environment.
- The emails to which the alert will be sent.
- The IDs of the DAGs for which an alert will be sent out when they fail. We support like semantics by using the
.*
suffix
How to configure it
You can specify the alerting rules in either the UI, CLI or Terraform.
UI
Navigate to the project you would like to add alerts to and click on the Alerts
tab.
Here you can manage all your current alert configurations as shown in the following example:
To add a new alert configuration you press the button in the top right corner of the tab, which looks as follows:
You can change and delete alert configurations by using the buttons on the right side of the table.
CLI
The same functionality is also supported through the CLI using the following command: conveyor project add-alert-config
for more details look here.
Additionally, we also support listing all alert configuration and deleting an alert configuration
Terraform
The final way in which you can configure Conveyor managed alerts is through Terraform.
We created the conveyor_project_alert_config
resource for this, full details can be found here
Custom Airflow alerting
If you want to configure the alerts yourself in Airflow, you have two options:
- use the Slack provider in Airflow
- use the
ConveyorContainerOperatorV2
operator in Airflow
In both cases you implement a failure_callback
function and specify it in the on_failure_callback
of your Airflow DAG:
from airflow import DAG
def failure_callback():
print("We failed")
sample_python_oom_dag = DAG(
"failing-dag",
default_args={
"on_failure_callback": failure_callback,
},
schedule_interval="@hourly",
max_active_runs=1,
)
By setting this on_failure_callback
at the DAG level,
you are guaranteed that the on_failure_callback
will be added to every task.
In the on_failure_callback
you can use any Airflow operator.
Since alerting is something common to every project,
it is often a good idea to create an alerting
package
to make it available in any environment that has to send out alerts (e.g. your production environment).
If you want to only send out alerts for a specific environment, you can do the following:
from airflow import DAG
from airflow.models import Variable
from conveyor import packages
alerting = packages.load("alerting")
def f_callback(context):
# We only send alerts in the production environment
if Variable.get("environment") == "production":
alerting.failure_callback(context)
sample_python_oom_dag = DAG(
"failing-dag",
default_args={"on_failure_callback": f_callback},
schedule_interval="@hourly",
max_active_runs=1,
)
By using an alerting package, you have the freedom to do whatever you need inside your alerting code and add the necessary custom logic. Alternatively, you can keep the alerting logic in Airflow to the bare minimum, and send the alert message to an external service that contains the logic to route alerts to the correct teams.
Slack failure callback
One of the few providers that we package with Airflow is the Slack provider, which makes it possible to automatically send Slack notifications when DAGs fail. Slack alerting can be configured as such:
from airflow.providers.slack.hooks.slack_webhook import SlackWebhookHook
from conveyor.alerting import conveyor_executions_url
def slack_failed_dag_notification(context):
slack_msg = """
:red_circle: DAG `{dag}` in environment `{env}` failed.
*DAG*: {dag}
*Environment*: {env}
*Execution Time*: {exec_date}
*Reason*: {reason}
*Url*: https://app.dev.conveyordata.com/environments/{env}/airflow/tree?dag_id={dag}
*ConveyorURL*: {url}
""".format(
dag=context.get('dag_run').dag_id,
env=environment,
exec_date=context.get('execution_date'),
reason=context.get('reason'),
url=conveyor_executions_url(context),
)
hook = SlackWebhookHook(
http_conn_id="slack_webhook",
message=slack_msg,
)
hook.execute()
ConveyorContainerOperatorV2 failure callback
If you want to send out alerts in another way than Slack (e.g. emails),
you can use the ConveyorContainerOperatorV2
in an on_failure_callback
in your DAG.
An example on how to implement such a failure callback is as follows:
from conveyor.operators import ConveyorContainerOperatorV2
def failure_callback(context):
operator = ConveyorContainerOperatorV2(
task_id="failure-callback",
image="{{ macros.conveyor.image('alerting') }}", # We have to specify this to use the image of the alerting project
validate_docker_image_exists=False,
)
operator.render_template_fields(context, jinja_env=context['dag'].get_template_env())
operator.execute(context=context)
You see that we need to explicitly render the template fields since Airflow doesn't do so automatically.
Adding the Conveyor alerting URL
You can add a link to the Conveyor task executions page to your alert.
You can use the function conveyor_executions_url
in the package conveyor.alerting
.
You need to pass it the context
of the failure callback.
The conveyor_executions_url
only works properly on the on_failure_callback
for tasks.
If you pass it in the default_args
on the DAG or directly on the tasks it works perfectly.
If you pass it to the DAG object, the generated URL will be wrong.
For example:
from conveyor.operators import ConveyorContainerOperatorV2
from conveyor.alerting import conveyor_executions_url
def failure_callback(context):
operator = ConveyorContainerOperatorV2(
task_id="failure-callback",
image="{{ macros.conveyor.image('alerting') }}", # We have to specify this to use the image of the alerting project
validate_docker_image_exists=False,
arguments=["--conveyor-executions-url", conveyor_executions_url(context)]
)
operator.render_template_fields(context, jinja_env=context['dag'].get_template_env())
operator.execute(context=context)