Part 5a: Schedule dbt models on Airflow
My quick-start guide of running dbt models on Airflow with some tips and tricks
Previously — on my #dataseries
A Beginner’s Guide to Apache Airflow — Part 3: Intro and Setup
On my previous episode, we just finished setting up Airflow. In this episode, I will focus on the workflow — How to create a DAG file and schedule a run for your dbt models on Airflow. Let’s get started :)
Table of Contents
- Workflows
1.1 Create a DAG
1.2 Create the tasks
1.3 Write date macros
1.4 Test the DAG
1.5 Activate the DAG - Tips
- Final Word
1. Workflows
To schedule your dbt models on Airflow, the workflows consists of 4 key steps :
- Create a DAG file
- Create the tasks
- Test the DAG
- Activate the DAG
1.1 Creating a DAG file
In Airflow, a DAG
(Directed Acyclic Graph) is a Python script that is a collection of all the tasks you want to run, organised in a way that reflects their relationships and dependencies.
Go to the folder that you’ve designated to be your AIRFLOW_HOME
and find the dags
folder. Create a Python file named my_first_dag.py
. Your workflow will automatically be picked up and scheduled to run.
Firstly, let’s first start by importing the necessary modules:
import datetime as dt# We'll start importing this to instantiate a DAG object
from airflow import DAG#Then import Operators to be used in the following tasks
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
Then , we define a dictionary containing the default arguments that we want to pass to our DAG. These arguments will then be applied to all of the DAG’s operators (here is the BashOperator and PythonOperator).
#define function print_world to be used for PythonOperatordef print_world():
print('world')#create a Python dictionarydefault_args = {'owner': 'me',
'start_date': dt.datetime(2021, 7, 11),
'end_date': dt.datetime(2021, 7, 14),
'retries': 1,
'retry_delay': dt.timedelta(minutes=5),
}
Here, Airflow will schedule a DAG run from start_date
11th July 2021 to end_date
14th July 2021
1.2 Creating the tasks
Then, we create our first DAG object.
with DAG('airflow_tutorial_v10',
default_args=default_args,
schedule_interval='0 0 * * *',
) as dag:print_report = BashOperator(task_id='dbt_run_first', bash_command=f"""cd $DBT_PROFILES_DIR && dbt run --models shiba_ecommerce.daily_order_count --vars '{{"ingestion_date": "{execution_date}"}}'""")sleep = BashOperator(task_id='sleep',
bash_command='sleep 10')print_world = PythonOperator(task_id='print_world',
python_callable=print_world)
The DAG
object is named as airflow_tutorial_v01
which has the default arguments passed into the object.
The DAG
consists of 3 tasks that will be scheduled to run on a daily basis.
- run my dbt model
daily_order_count.sql
(More about this model at my episode - wait for 10 seconds
- print
world
With schedule_interval='0 0 * * *'
we've specified a run once a day at midnight 00:00. You can refer further at the Airflow documentation or see crontab.guru to help deciphering cron schedule expressions.
Dependencies in tasks are added by setting other actions as upstream (or downstream).
print_report >> sleep >> print_world
print_report
doesn’t have dependencies because print_report is the first task in the DAG and doesn’t depend on any upstream task. sleep
has 1 dependency because sleep
depends on an upstream task which is print_report
. print_world
also has 2 dependencies because print_world
depends on 2 upstream tasks which are sleep
and print_report
.
1.3 Writing the date macros
Dbt Date Macro
This is the command line I run for the dbt model:
dbt run — models shiba_ecommerce.daily_order_count — vars ‘{“ingestion_date”: “2021–08–01”
In my previous episode Part 4: DBT — Automation, Test and Templating, I explained the Variables concept which I find very useful in dbt.
Imagine you need to generate a daily incremental report before 10 AM everyday. Visiting each of those
.sql
to edit will be super time-consuming and error-prone. My suggestion here is to change from hardcoding theshipping_limit_date
values to avariable
'{{ var("ingestion_date") }}'
. This variable (ingestion_date) is that could be used across multiple models within a package. That variable will be changed and passed into the DBT run command each day those reports are generated.
This is the task specified in the DAG file. I made some changes to the dbt run
command. Can you spot what is the change?
print_report = BashOperator(task_id='dbt_run_first', bash_command=f"""cd $DBT_PROFILES_DIR && dbt run --models shiba_ecommerce.daily_order_count --vars '{{"ingestion_date": "{execution_date}"}}'""")
Airflow Date Macro
I added in another Airflow date macro — execution_date
— inside the dbt variable '{{ var("ingestion_date") }}
, and remove the “2021–08–01”
to the Airflow variable — execution_date
execution_date = “2021–08–01”
Then, let’s quickly define this execution_date
macro as follows.
# Define a custom macro!def quick_format(ds, format_string):
ds = ds.strptime('%Y-%m-%d')
return ds.strftime(format_string)execution_date = '{{ prev_ds }}'
What is ‘{{ prev_ds }}’
?
{{ prev_ds }}
means the previous execution date as YYYY-MM-DD
if {{ ds }}
is 2018-07–12
and schedule_interval
is @daily
(schedule_interval=’0 0 * * *’
), {{ prev_ds }}
will be 2018-07-11
. I use {{ prev_ds }}
because in this case, I want to generate the daily order count report of yesterday.
Final DAG
After rearranging the code here is my final DAG
:
import datetime as dt
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator#define function print_world to be used for PythonOperatordef print_world():
print('world')# Define a custom macro!def quick_format(ds, format_string):
ds = ds.strptime('%Y-%m-%d')
return ds.strftime(format_string)#create a Python dictionarydefault_args = {
'owner': 'me',
'start_date': dt.datetime(2021, 7, 11),
'end_date': dt.datetime(2021, 7, 14),
'retries': 1,
'retry_delay': dt.timedelta(minutes=5),}execution_date = '{{ prev_ds }}'#create a DAG objectwith DAG('airflow_tutorial_v10', default_args=default_args, schedule_interval='0 0 * * *',) as dag:print_report = BashOperator(task_id='dbt_run_first', bash_command=f"""cd $DBT_PROFILES_DIR && dbt run --models shiba_ecommerce.daily_order_count --vars '{{"ingestion_date": "{execution_date}"}}'""")sleep = BashOperator(task_id='sleep', bash_command='sleep 10')print_world = PythonOperator(task_id='print_world', python_callable=print_world)print_report >> sleep >> print_worldexecution_date = "2021-08-01"bash_command=f"""cd $DBT_PROFILES_DIR && dbt run --models shiba_ecommerce.daily_order_count --vars '{{"ingestion_date": "{execution_date}"}}'"""print(bash_command)
1.4 Testing the DAG
You can test the DAG
file in your terminal to see if it works to save time instead of running directly on Airflow.
Run cd $DBT_PROFILES_DIR && dbt run — models shiba_ecommerce.daily_order_count — vars ‘{“ingestion_date”: “2018–06–30”}’
1.5 Activating the DAG
Once your DAG
works, let’s set it to run automatically on Airflow.
Here’s my tip: First of all, go to your dags
folder and delete python cache.
Then go to Airflow UI > DAGS on your web browsers, refresh and wait for your airflow_tutorial_version
appear.
Once it appears, click into your airflow_tutorial_version
. Then switch the button from OFF to ON. Once the scheduler is up and running, refresh the DAGs page in the web browser UI. You will see the task running and show job status. If it is dark green as above, it means successful. If there is any error, you will see yellow (up_for_retry). In that case, click into the yellow boxes and View Log
2. My tips
- Whenever you fail or cannot run your model, create a new version of
airflow_tutorial_version
so that later on you can still keep your job history. - Especially when your tasks include running a dbt or sql model, before schedule on Airflow, try to test your
DAG
file in your terminal first. It will save you some time of uploading, waiting and refreshing the Airflow UI. - Lastly. I believe this is the most important point and it took me a while to realise. If you want to schedule a run of your dbt model on Airflow, make sure that 2 very important files
dbt_project.yml
andprofiles.yml
in bothAIRFLOW
andDBT
folder should have the exactly the same of profile name. Theprofiles.yml
in yourAIRFLOW
must have match the profile in yourdbt_project.yml
file
Final Word
Weee so that’s all for my journal — How I schedule my dbt model on Airflow with some of my tips. I hope you now have a clearer sense about how to schedule your dbt models on Airflow. There are definitely many more advanced features on Airflow that I will surely explore further and share here in the very soon!
I’ll be back so stay tuned!