Skip to main content

Airflow

A Conveyor environment comes with a managed Airflow installation. If you are new to Airflow, take a look at the architecture overview. The high-level architecture is as follows:

Based on the general architecture, let's go over how Conveyor configures Airflow:

  • Scheduler: The Airflow scheduler is responsible for triggering tasks based on the schedule information provided in the dags folder of your project. Conveyor ensures that all dags of the different projects deployed in the same environment are mounted such that the scheduler can manage them.
  • Executor: We configure Airflow to use the Kubernetes executor, meaning that every task that needs to run is scheduled as a kubernetes pod in your kubernetes cluster. More details can be found here
  • Webserver: Web interface of Airflow that we also deploy in every Conveyor environment such that users can easily monitor/interact with their DAGS. We use a couple of hooks to be able to navigate from Airflow to the Conveyor UI, most importantly to go from the Airflow task details to the Conveyor application runs for inspecting the logs of Conveyor pods.
  • Airflow Database: We create one database server for every kubernetes cluster and every Conveyor environment uses a separate database within that server.
  • Workers: Every task in Airflow gets a dedicated Airflow worker pod that is responsible for executing the task and updating the result of the task_id in the Airflow database. Most tasks in Conveyor use a predefined template, called an operator (e.g. ConveyorContainerOperatorV2, ConveyorSparkSubmitOperatorV2). The goal of these operators is to make it easy to run a container/spark job through Airflow.

Airflow task execution

Scheduling a task involves many components and it is important to understand how they interact in order to work well with Conveyor. The following sequence diagram describes the full flow for a Container job. A spark job follows the same sequence from Airflow's perspective, only internally in Conveyor there are more moving parts.

This also shows why there are 2 different places to look for logs for your task.

  • On one side we have the Airflow worker logs, which are managed by Airflow and accessible through the Airflow web UI.
  • On the other side we have the Conveyor logs, which are stored in cloudwatch and made available in the Conveyor UI.

Since Conveyor has no control over the Airflow worker and its logs, we cannot show them in the Conveyor UI.

Airflow DAG validation

Writing Airflow DAGS is error-prone as it is easy to make a small mistake (e.g. forgetting an import statement or specifying the wrong method argument). In order to capture these issues as quickly as possible, we validate your DAG code before building your Docker image. Alternatively, you can also run the validation explicitly by using the conveyor project validate-dags command.

DAG validation works by running our Airflow docker image, which contains the Conveyor plugin, and mounting all relevant DAG files in the container, namely: the DAGS of the current project as well as the DAGS for every dependent project. These files are stored under /opt/airflow/dags, which is the default location for DAGS in Airflow. Next, we create a dagbag based on all the DAG files, more details on the dagbag can be found here.

Finally, using the dagbag we can show all the Airflow warnings as well as the import errors for every DAG file. The validation command fails when there are any errors.

Airflow updates

There are 2 ways in which an Airflow environment gets updated:

  • When a user deploys a project to the environment by issuing conveyor deploy --env <env_name>. This will cause the Airflow web and scheduler to be refreshed and restarted. We restart them to ensure that the new dag information is loaded into both the Airflow scheduler and web component.
  • When the Conveyor team releases a new version of Conveyor. This will change the Airflow image tag and thus cause a rollout of both the scheduler and the web component.

What happens to running jobs when the Airflow instance that started them is itself restarted?

Conveyor runs its containerized jobs as independent pods in Kubernetes. As these pods run independently of the Airflow instance that started them, they are unaffected by an Airflow restart and continue running as if nothing happened. Upon completion, they will update their status based on the task_id in the Airflow database. The new Airflow scheduler will attempt to restart watching the existing Airflow workers, in order to know when they finished. For more details look here.

In summary, your running jobs are unaffected by updates to the Conveyor environment and will complete normally.

Airflow upgrades

In order to benefit from fixes/improvements in newer versions, the Conveyor team attempts to upgrade the Airflow versions regularly. We do not aim to be on the latest version since we noticed that it can contain a lot of unexpected issues. Therefore, we wait for 1 or 2 bugfix releases to be out before upgrading.

We deliberately choose to limit the number of Airflow providers supported in Conveyor to enable updating the Airflow images in a Conveyor release without impacting our users. This way we can ensure that all environments are on the latest version, and we do not have to support multiple versions at the same time.