Is this the Next Best Way to Create DAGs?

Reuse one or more tasks in more than one DAGs without duplicating your code.

Is this the Next Best Way to Create DAGs?
Probably not but it was fun writing nevertheless. Not much fun reading it though as it's meant to be rude. So if you are easily offended or a true python purist, please click away now.

Check this out:

  1. Do you want to be able to reuse one or more tasks in more than one DAG without duplicating your code?
  2. Do you despise jinja and templating with a passion because they make your code look like sh*t?

Here's what we're going to do. We'll use good ol' python in a weird and creative way that may make you raise your eyebrows. Tough potatoes. Whatever that means. Let's go.

This is the directory structure. If you have import issues, you are in the wrong place. Ask chatGPT what to do and leave me alone.

|--i_am_not_common
|  |--config.py
|  |--model.py
|--dags
|  |--the_big_tamale.py
|--tasks
|  |--dance.py
|  |--finish.py
|  |--start.py

Model

First let's create our model that once initialized will contain the id of the DAG and it's tasks.

from dataclasses import dataclass

from typing import Any, Dict, List

@dataclass
class Pipeline:
    dag_id: str
    tasks: List[Dict[str, Any]]
model.py

Tasks

Now let's create three tasks in separate files.

import os

from airflow.decorators import task

TASK_ID = os.path.basename(__file__).partition(".")[0]


@task(task_id=TASK_ID)
def run() -> None:
    print("I'm starting")
start.py
import os

from airflow.decorators import task

TASK_ID = os.path.basename(__file__).partition(".")[0]


@task(task_id=TASK_ID)
def run() -> None:
    print("I'm dancing")
dance.py
import os

from airflow.decorators import task

TASK_ID = os.path.basename(__file__).partition(".")[0]


@task(task_id=TASK_ID)
def run() -> None:
    print("I'm finished")
finish.py

Configuration

Let's describe the task sequence. Note, that this exercise demoes the capability to execute a linear task progression (ordinal is here only for illustration purposes and maybe an inspiration for a later exercise). Where is the fun in giving you everything?

from i_am_not_common.tasks.model import Pipeline

from dags.tasks import dance, finish, start

the_big_tamale = Pipeline(
    dag_id="the_big_tamale",
    tasks=[
        {"ordinal": 1, "name": "start", "callable": start.run},
        {"ordinal": 2, "name": "dance", "callable": dance.run},
        {"ordinal": 3, "name": "finish", "callable": finish.run},
    ],
)
config.py

The Big Tamale

Proceed with caution. You might hurt yourself.

import importlib
from airflow.models.baseoperator import chain
from datetime import datetime

from airflow.models.dag import dag, DAG

from i_am_not_common.tasks import config

for task_info in config.super_dynamic_dag.tasks:
    importlib.import_module(name=f"""dags.tasks.{task_info["name"]}""")

LOCALS = locals()

with DAG(
    catchup=False,
    dag_id=config.the_big_tamale.dag_id,
    description="experiment",
    start_date=datetime(year=2023, month=1, day=1),
    tags=["experiment"],
    max_active_runs=1,
) as dag:

    for task_info in config.super_dynamic_dag.tasks:
        LOCALS[task_info["name"]] = task_info["callable"]


    callables = []
    for task_info in config.the_big_tamale.tasks:
        task_name = task_info["name"]
        task_callable = task_info["callable"]
        task_callable.dag = dag
        callables.append(task_callable())

    chain(*callables)
the_big_tamale.py

Well, hello? Does your DAG look like this?

If not, then good luck with that. If yes, then life is beautiful. Either way, let's unpack it for fun.

The unpacking

for task_info in config.super_dynamic_dag.tasks:
    importlib.import_module(name=f"""dags.tasks.{task_info["name"]}""")
lines: 9-10

The importlib is used to dynamically import the relevant tasks with the f-string value of name constructed from your directory/module structure notation.

LOCALS = locals()
line: 12

I use this because when we later add keys to our locals, the value assignment looks marginally better. E.g LOCALS["key"] = "value" is better than locals()["key"] = "value". Eeerm or not, they both look like crap so it doesn't matter, it doesn't look pythonic... or does it? I told you, you may raise your eyebrows. This is dangerous territory especially if you over-write a local you shouldn't, so pay attention to the naming conventions.

with DAG(
    catchup=False,
    dag_id=config.the_big_tamale.dag_id,
    description="DAG experiment",
    start_date=datetime(year=2023, month=1, day=1),
    tags=["experiment"],
    max_active_runs=1,
) as dag:
lines: 14-21

Hang on, why is this hardcoded? Isn't the point of this exercise to be fully parameterised? Keep reading you lazy bum.

Exercise 1
Use the model.py and config.py to further parameterise this DAG. Ha!

Let's continue unpacking:

    for task_info in config.super_dynamic_dag.tasks:
        LOCALS[task_info["name"]] = task_info["callable"]
lines: 23-24

Hang on what's that? That's interpreted the same as:

@task
def start():
	...
    
@task
def dance():
	...
    
@task
def finish():
	...

It just makes the DAG a little more compact and mysterious.

    callables = []
    for task_info in config.the_big_tamale.tasks:
        task_name = task_info["name"]
        task_callable = task_info["callable"]
        task_callable.dag = dag
        callables.append(task_callable())

    chain(*callables)
lines: 27-34

This above monstrosity is iterating through the tasks of the  Pipeline class instance, assigning a dag object to each task (otherwise you are doomed to fail with a trecherous airflow.exceptions.AirflowException: Tried to create relationships between tasks that don't have DAGs yet. Set the DAG for at least one task and try again, and at the very end is chains everything together nicely.

Exercise 2
1. Create a new task called spin.py that prints "I am spinning".
2. spin.py is kicked off by start.py and is executed at the same time as dance.py.
3. finish.py will only execute once both dance.py and spin.py have completed.
4. I could have used a diagram but couldn't be bothered.
Exercise 3
Can you design this in another way? E.g could the dag object be assigned to a task post Pipeline class initialization and if yes what would your DAG look like?

This questionable and clean design will allow you to test your tasks independently, remove duplication and unlock reusability across multiple DAGs.

There are of course some other ways of doing the same thing such as gusty or dag-factory which look pretty good if you want an existing solution.

Thanks for reading. Bye.

平和と愛