Skip to main content

Airflow issues

Table of Contents

I want to share code between Airflow DAGs

The easiest way for sharing Airflow code and operators across multiple projects and environments, is to make use of the Conveyor packages feature. Packages allow you to centrally define and version code that can easily accessed by any of your projects in any of your environments.

My Airflow DAG configuration is different from the code I deployed

All the DAGs defined in different projects are deployed to the same Airflow server for a given environment. Please make sure to give DAGs in different projects different names to prevent these issues.

I added pools in Airflow and updated my DAG, but my task is still scheduled in the default_pool

If you restart a task in the Airflow UI, the task instance details are not cleared. You can find these task instance details by clicking on the task (as if you would clear it) but then click the button "task instance details" instead of "clear". There you see the Task instance attributes and such, these will not be cleared when you clear the task. Be aware of this, only new tasks will use your pool.

I can't import a file in another file in my dags folder

Suppose you have a project called project1, and your dags folder looks like the following structure:

/dags
/util.py
/dag.py

You want to import the util.py into the dag.py. To be able to do this you need to know that projects are structured in Airflow like this:

/dags
/project1
/project2

Where project1 is the name of your project and project2 is another project deployed onto the same environment. The structure is like this to avoid name clashes between different projects with overlapping file names.

To be able to use the util.py package you will need to import it like this:

from project1.util import *

I can't import my module when the project name contains a hyphen in its name

Let's say you have a project called test-project. The following statement then won't be parsed correctly, since Python package names cannot contain hyphens.

from test-project import utils

You can work around this issue by using the importlib package:

import importlib

utils = importlib.import_module("test-project.utils")

My Airflow DAG is showing strange behavior for runs in the past

You might see this in case you have changed the schedule parameter of an existing DAG. The reason for this is that previously run TaskInstances will not align with the new scheduling interval, which can cause the Airflow scheduler to consider these as missing.

This is a long-standing open issue in Airflow, but no true fix has been developed yet for it. The recommendation from the development team behind Airflow is to create a new DAG with a new name when the scheduling needs to be modified. This can for example be done by adding a _v2 suffix.

I cannot use the Airflow CLI with Conveyor

Conveyor indeed does not support using the Airflow CLI directly. The most common use for the Airflow CLI is running the backfill command. We have described how to best execute this pattern on Conveyor in our Airflow DAG backfilling guide.

In case you want to execute a different pattern provided by the Airflow CLI that is missing in Conveyor, please contact us for support.

I want to decrease the queuing time of my Airflow tasks

The queued time is the time spent in the queue before a task is picked up by a worker. Conveyor runs Airflow on top of Kubernetes, which means an Airflow worker is spun up for each task. The queued time in this setup is the time spent before the worker is active.

The time it takes to start the Airflow worker is the sum of:

  • The container startup time
  • The time it takes for Python to load
  • The time taken to start Airflow on the worker and process the DAG

In the worst-case scenario, the cluster also needs to scale up when there is not enough room on the available nodes to launch the additional worker. This might add another 40 seconds to the queued time (but only when needed).

Conveyor has already optimized these processes to be as fast as possible by:

  • Speeding up node boot times through optimized node-images
  • Caching of the container image
  • Optimizing node shutdown times in order to maximize node-reuse for downstream tasks

Unfortunately the worst case can still add a minute of queuing time (when a new node needs to be provisioned). As these queued times are the result of how Airflow works on Kubernetes, further reducing the time taken is not possible at this time.