Part 5a: Schedule dbt models on Airflow

My quick-start guide of running dbt models on Airflow with some tips and tricks

Jessica Le
7 min readJul 15, 2021
Schedule dbt models on Airflow

Previously — on my #dataseries
A Beginner’s Guide to Apache Airflow — Part 3: Intro and Setup

On my previous episode, we just finished setting up Airflow. In this episode, I will focus on the workflow — How to create a DAG file and schedule a run for your dbt models on Airflow. Let’s get started :)

Previously on the episode 3 we ended here. Now let’s continue from there

Table of Contents

  1. Workflows
    1.1 Create a DAG
    1.2 Create the tasks
    1.3 Write date macros
    1.4 Test the DAG
    1.5 Activate the DAG
  2. Tips
  3. Final Word

1. Workflows

To schedule your dbt models on Airflow, the workflows consists of 4 key steps :

  1. Create a DAG file
  2. Create the tasks
  3. Test the DAG
  4. Activate the DAG

1.1 Creating a DAG file

In Airflow, a DAG (Directed Acyclic Graph) is a Python script that is a collection of all the tasks you want to run, organised in a way that reflects their relationships and dependencies.

Go to the folder that you’ve designated to be your AIRFLOW_HOME and find the dags folder. Create a Python file named my_first_dag.py . Your workflow will automatically be picked up and scheduled to run.

Firstly, let’s first start by importing the necessary modules:

import datetime as dt# We'll start importing this to instantiate a DAG object
from airflow import DAG
#Then import Operators to be used in the following tasks
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator

Then , we define a dictionary containing the default arguments that we want to pass to our DAG. These arguments will then be applied to all of the DAG’s operators (here is the BashOperator and PythonOperator).

#define function print_world to be used for PythonOperatordef print_world():
print('world')
#create a Python dictionarydefault_args = {'owner': 'me',
'start_date': dt.datetime(2021, 7, 11),
'end_date': dt.datetime(2021, 7, 14),
'retries': 1,
'retry_delay': dt.timedelta(minutes=5),
}

Here, Airflow will schedule a DAG run from start_date 11th July 2021 to end_date 14th July 2021

1.2 Creating the tasks

Then, we create our first DAG object.

with DAG('airflow_tutorial_v10', 
default_args=default_args,
schedule_interval='0 0 * * *',
) as dag:
print_report = BashOperator(task_id='dbt_run_first', bash_command=f"""cd $DBT_PROFILES_DIR && dbt run --models shiba_ecommerce.daily_order_count --vars '{{"ingestion_date": "{execution_date}"}}'""")sleep = BashOperator(task_id='sleep',
bash_command='sleep 10')
print_world = PythonOperator(task_id='print_world',
python_callable=print_world)

The DAG object is named as airflow_tutorial_v01 which has the default arguments passed into the object.

The DAG consists of 3 tasks that will be scheduled to run on a daily basis.

  • run my dbt model daily_order_count.sql (More about this model at my episode
  • wait for 10 seconds
  • print world

With schedule_interval='0 0 * * *' we've specified a run once a day at midnight 00:00. You can refer further at the Airflow documentation or see crontab.guru to help deciphering cron schedule expressions.

Dependencies in tasks are added by setting other actions as upstream (or downstream).

print_report >> sleep >> print_world

print_report doesn’t have dependencies because print_report is the first task in the DAG and doesn’t depend on any upstream task. sleep has 1 dependency because sleep depends on an upstream task which is print_report . print_world also has 2 dependencies because print_world depends on 2 upstream tasks which are sleep and print_report .

1.3 Writing the date macros

Dbt Date Macro

This is the command line I run for the dbt model:

dbt run — models shiba_ecommerce.daily_order_count — vars ‘{“ingestion_date”: “2021–08–01”

In my previous episode Part 4: DBT — Automation, Test and Templating, I explained the Variables concept which I find very useful in dbt.

Imagine you need to generate a daily incremental report before 10 AM everyday. Visiting each of those .sql to edit will be super time-consuming and error-prone. My suggestion here is to change from hardcoding the shipping_limit_date values to a variable '{{ var("ingestion_date") }}' . This variable (ingestion_date) is that could be used across multiple models within a package. That variable will be changed and passed into the DBT run command each day those reports are generated.

This is the task specified in the DAG file. I made some changes to the dbt run command. Can you spot what is the change?

print_report = BashOperator(task_id='dbt_run_first', bash_command=f"""cd $DBT_PROFILES_DIR && dbt run --models shiba_ecommerce.daily_order_count --vars '{{"ingestion_date": "{execution_date}"}}'""")

Airflow Date Macro

I added in another Airflow date macro — execution_date — inside the dbt variable '{{ var("ingestion_date") }} , and remove the “2021–08–01” to the Airflow variable — execution_date

execution_date = “2021–08–01”

Then, let’s quickly define this execution_date macro as follows.

# Define a custom macro!def quick_format(ds, format_string):
ds = ds.strptime('%Y-%m-%d')
return ds.strftime(format_string)
execution_date = '{{ prev_ds }}'

What is ‘{{ prev_ds }}’ ?

{{ prev_ds }} means the previous execution date as YYYY-MM-DD if {{ ds }} is 2018-07–12 and schedule_interval is @daily (schedule_interval=’0 0 * * *’), {{ prev_ds }} will be 2018-07-11 . I use {{ prev_ds }} because in this case, I want to generate the daily order count report of yesterday.

Final DAG

After rearranging the code here is my final DAG:

import datetime as dt
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
#define function print_world to be used for PythonOperatordef print_world():
print('world')
# Define a custom macro!def quick_format(ds, format_string):
ds = ds.strptime('%Y-%m-%d')
return ds.strftime(format_string)
#create a Python dictionarydefault_args = {
'owner': 'me',
'start_date': dt.datetime(2021, 7, 11),
'end_date': dt.datetime(2021, 7, 14),
'retries': 1,
'retry_delay': dt.timedelta(minutes=5),}
execution_date = '{{ prev_ds }}'#create a DAG objectwith DAG('airflow_tutorial_v10', default_args=default_args, schedule_interval='0 0 * * *',) as dag:print_report = BashOperator(task_id='dbt_run_first', bash_command=f"""cd $DBT_PROFILES_DIR && dbt run --models shiba_ecommerce.daily_order_count --vars '{{"ingestion_date": "{execution_date}"}}'""")sleep = BashOperator(task_id='sleep', bash_command='sleep 10')print_world = PythonOperator(task_id='print_world', python_callable=print_world)print_report >> sleep >> print_worldexecution_date = "2021-08-01"bash_command=f"""cd $DBT_PROFILES_DIR && dbt run --models shiba_ecommerce.daily_order_count --vars '{{"ingestion_date": "{execution_date}"}}'"""print(bash_command)

1.4 Testing the DAG

You can test the DAG file in your terminal to see if it works to save time instead of running directly on Airflow.

Run cd $DBT_PROFILES_DIR && dbt run — models shiba_ecommerce.daily_order_count — vars ‘{“ingestion_date”: “2018–06–30”}’

1.5 Activating the DAG

Once your DAG works, let’s set it to run automatically on Airflow.

Here’s my tip: First of all, go to your dags folder and delete python cache.

delete _pycache_ folder

Then go to Airflow UI > DAGS on your web browsers, refresh and wait for your airflow_tutorial_version appear.

Screenshot of my Airflow UI on my web browser. (FYI I couldn’t run dbt model until v10)

Once it appears, click into your airflow_tutorial_version . Then switch the button from OFF to ON. Once the scheduler is up and running, refresh the DAGs page in the web browser UI. You will see the task running and show job status. If it is dark green as above, it means successful. If there is any error, you will see yellow (up_for_retry). In that case, click into the yellow boxes and View Log

Switch the button from OFF to ON (bottom left corner)
Click View Log to read Log

2. My tips

  • Whenever you fail or cannot run your model, create a new version of airflow_tutorial_version so that later on you can still keep your job history.
  • Especially when your tasks include running a dbt or sql model, before schedule on Airflow, try to test your DAG file in your terminal first. It will save you some time of uploading, waiting and refreshing the Airflow UI.
  • Lastly. I believe this is the most important point and it took me a while to realise. If you want to schedule a run of your dbt model on Airflow, make sure that 2 very important files dbt_project.yml and profiles.yml in both AIRFLOW and DBT folder should have the exactly the same of profile name. The profiles.yml in your AIRFLOW must have match the profile in your dbt_project.yml file

Final Word

Weee so that’s all for my journal — How I schedule my dbt model on Airflow with some of my tips. I hope you now have a clearer sense about how to schedule your dbt models on Airflow. There are definitely many more advanced features on Airflow that I will surely explore further and share here in the very soon!

I’ll be back so stay tuned!

--

--

Jessica Le

Data-driven, strategic professional with a passion for driving user acquisition and product performance. Eager to make a social impact in this VUCA world.