Probably not but it was fun writing nevertheless. Not much fun reading it though as it's meant to be rude. So if you are easily offended or a true python purist, please click away now.
Check this out:
- Do you want to be able to reuse one or more tasks in more than one DAG without duplicating your code?
- Do you despise jinja and templating with a passion because they make your code look like sh*t?
Here's what we're going to do. We'll use good ol' python in a weird and creative way that may make you raise your eyebrows. Tough potatoes. Whatever that means. Let's go.
This is the directory structure. If you have import issues, you are in the wrong place. Ask chatGPT what to do and leave me alone.
First let's create our model that once initialized will contain the id of the DAG and it's tasks.
Now let's create three tasks in separate files.
Let's describe the task sequence. Note, that this exercise demoes the capability to execute a linear task progression (
ordinal is here only for illustration purposes and maybe an inspiration for a later exercise). Where is the fun in giving you everything?
The Big Tamale
Proceed with caution. You might hurt yourself.
Well, hello? Does your DAG look like this?
If not, then good luck with that. If yes, then life is beautiful. Either way, let's unpack it for fun.
I use this because when we later add keys to our locals, the value assignment looks marginally better. E.g
LOCALS["key"] = "value" is better than
locals()["key"] = "value". Eeerm or not, they both look like crap so it doesn't matter, it doesn't look pythonic... or does it? I told you, you may raise your eyebrows. This is dangerous territory especially if you over-write a local you shouldn't, so pay attention to the naming conventions.
Hang on, why is this hardcoded? Isn't the point of this exercise to be fully parameterised? Keep reading you lazy bum.
config.py to further parameterise this DAG. Ha!
Let's continue unpacking:
Hang on what's that? That's interpreted the same as:
It just makes the DAG a little more compact and mysterious.
This above monstrosity is iterating through the tasks of the
Pipeline class instance, assigning a
dag object to each task (otherwise you are doomed to fail with a trecherous
airflow.exceptions.AirflowException: Tried to create relationships between tasks that don't have DAGs yet. Set the DAG for at least one task and try again, and at the very end is chains everything together nicely.
1. Create a new task called
spin.py that prints "I am spinning".
spin.py is kicked off by
start.py and is executed at the same time as
finish.py will only execute once both
spin.py have completed.
4. I could have used a diagram but couldn't be bothered.
Can you design this in another way? E.g could the dag object be assigned to a task post
Pipeline class initialization and if yes what would your DAG look like?
This questionable and clean design will allow you to test your tasks independently, remove duplication and unlock reusability across multiple DAGs.
Thanks for reading. Bye.