Back to Blog

DAG Factories — A better way to Airflow

Tameem Iftikhar

If you’ve been on the hunt for a workflow management platform, my guess is you’ve come across Apache Airflow already. Originally hailing from the corner of Airbnb, this widely used project is now under the Apache banner and is the tool of choice for many data teams. Airflow allows you to write complex workflows in a declarative manner and offers many out-of-box operators to do complex tasks.

At Flywheel Software we heavily use Airflow for complex ETL pipelines, machine learning pipelines, and statistical analysis. Moreover, we manage multiple airflow deployments and also run massive multi-tenant airflow clusters running a plethora of workloads. As we began to push airflow to its limits we recently undertook a reworking of how we deploy to airflow clusters — and dare I say found a better way to use airflow.

Our main goal was to move away from the declarative format of deploying airflow and move more towards dynamically generated DAGs for flexibility and scalability — allowing us to quickly change what was running on airflow with as little as a feature flag modification. This lead to the inception of DAG factories.

DAG Factories — Using a factory pattern with python classes that generate DAGs automatically based on dynamic input to the system.

Enough with the backstory, it’s time to get to the exciting part. To set the stage, throughout this article we will assume that we want to execute two complex tasks in airflow — process_message & process_invoices.

Before Picture

Before we dive into the new setup, it’s important to take a quick detour to see how this would be generally done in airflow. One possible approach is to have a monorepo with individual folders for each of your projects. You can either duplicate DAGs in each project folder or have all your shared DAGs in a shared_dags folder. Your deployment pipeline can then pick the correct DAGs from the monorepo to push to each of the airflow clusters. In practice, the file structure might look something like this snippet.

CODE: https://gist.github.com/tameem92/65715c34fee1515c36f111717a40721e.js

As your project starts to grow, the complexity of DAGS and the deployment process will increase quickly. In a multi-tenant cluster, you would need the same DAG duplicated multiple times for airflow to consider them separately for each tenant.

After Picture — DAG Factories

Now, with DAG factories you get a better organizational structure that consolidates duplicated code across projects or tenants. By abstracting out the code into python classes that are responsible for the generation of DAG objects we make the setup more configurable. And best of all, it looks like Marie Kondo did a sweep of our monorepo and removed everything that didn’t spark joy.

CODE: https://gist.github.com/tameem92/cd400701341cc62c3da1f4440e88eb3e.js

Quick component breakdown 🕺🏽

  • projects/<name>/config.py — a file to fetch configuration from airflow variables or from a centralized config store
  • projects/<name>/main.py — the core file where we will call the factory methods to generate DAGs we want to run for a project
  • dag_factory — folder with all our DAGs in a factory pattern with a set format of standardized methods.

Dynamic Configuration

The first step towards this architecture was to get our airflow clusters to talk to a centralized configuration store on our platform. This allows airflow to dynamically fetch configurations from other services on the platform — like our web app, feature flags, or other business logic. We won’t dig too deep into the store itself but this is usually accomplished using tools like Etcd, Consul, or building your own thin configuration API.

On the airflow side, the communication with the config store happens in the config.py file. We can easily fetch airflow variables or fetch configuration using the API for the config store. As these values change, airflow will automatically re-fetch and regenerate DAGs.

CODE: https://gist.github.com/tameem92/1e2eab3dbaa8693715bd4a2e36c94bb1.js

The Factory

Moving on to the centerpiece, all our heavy lifting is being done in the dag_factory folder. The DAGFactory() class is responsible for mapping our supported dags in the factory and dynamically calling on the correct module based on the provided key.

CODE: https://gist.github.com/tameem92/fb81df44b0c7b91266f3dc717bffa5e6.js

As you can see in the gist above the create() function simply returns a correctly mapped dag builder class which can be called easily like this:

factory = DAGFactory(config.environment)
factory.create('ProcessMessages', config.process_messages)

The python classes for the generation of DAGs for our process_invoices and process_messages tasks follow a specific format in order to be triggered from the DAGFactory. Each file has the following standardized structure:

  • init() — setups up global & dag specific configuration parameters.
  • build() — builds and returns the actual DAG object with all the tasks under it. This is where the actual bulk of the functionality for a task is defined.

As an example, in this snippet, we take a look at the process_messages factory. It defines two tasks using KubernetesPodOperators to fetch and process messages. This DAG can now be called multiple times through our factory create() method with different variables — giving us dynamic variations of the same DAG.

CODE: https://gist.github.com/tameem92/ac72dee0298f6190daddbccbca5543fc.js

Putting it all together

Everything eventually comes together in the projects/<project_name>/main.py files — where all that is left to do is call the build method on our factories and reconfigure those returned DAG objects under a parent. In this example, we can see how easily we can call on our factory methods to generate our DAG objects. Then we just add them to the main DAG using our trusty SubDagOperators.

CODE: https://gist.github.com/tameem92/118521a9de3a274ddbb43d66a964ab71.js

Once everything comes together, this setup also allows us to run a single cluster with multi-tenant DAG workloads. Let’s assume that our configuration store returns client_config which is an array of configurations for multiple clients. We can simply just loop over our client configs and call our factories to build variations of multiple DAGs for each client as shown in the next snippet. 🤯

CODE: https://gist.github.com/tameem92/14547743b03730fb8ef9e0e361d1a6ef.js

The bottom line

Just like with any other tool, once you start to scale up it’s imperative to take a pause and rethink how things can be improved or optimized. Our need for a more dynamic and configuration-driven approach to airflow led us to build with dag factories, and hopefully, this will be helpful to the wider community using airflow. As for us, our airflow clusters are humming along and automatically building DAGs listening to configuration changes — allowing us to scale multi-tenant airflow clusters.

Love talking tech or data?

You’re in luck, me too! If you want to chat about cutting-edge technology, entrepreneurship, or the perils of being a startup founder, find me on Twitter or on LinkedIn.


Share on social media: 

More from the Blog

The Monthly Wheeler: September 2021

The easiest way to Activate your Customer Data: September Release Notes

Read Story

The Monthly Wheeler: August 2021

The easiest way to Activate your Customer Data: August Release Notes

Read Story

The Monthly Wheeler: May 2021

The easiest way to Activate your Customer Data: May Release Notes

Read Story

Looking for guidance on your Data Warehouse?

Get in touch with a Flywheel Engineer to chat about activating your data