Let us learn how to build DBT Models using Apache Spark on AWS EMR Cluster using denormalized JSON Dataset.

Here is the high-level agenda for this session.

  • DBT for ELT (Extract, Load and Transformation)
  • Overview of DBT CLI and DBT Cloud
  • Setting up EMR Cluster with Thrift Server using Bootstrapping
  • Overview of Semi Structured Data Set used for the Demo
  • Develop required queries using Spark SQL on AWS EMR
  • Develop the Spark Application on AWS EMR using DBT Cloud
  • Run the Spark Application on AWS EMR using DBT Cloud
  • Overview of Orchestration using Tools like Airflow

DBT for ELT (Extract, Load and Transformation)

First let us understand what ELT is and where DBT come into play.

  • ELT stands for Extract, Load and Transformation.
  • DBT is the tool which is used purely for Transformation leveraging target database resources to process the data.

Based on the requirements and design we need to modularize and develop models using DBT. Once the models are developed and run using DBT, the models will be compiled into SQL Queries and run using target database.

The open source community of DBT have developed adapters for all leading databases such as Spark, Databricks, Redshift, Snowflake, etc.

Overview of DBT CLI and DBT Cloud

DBT CLI and DBT Cloud can be used to develop DBT Models based on the requirements.

  • DBT CLI is completely open source and can be setup on Windows or Mac or Linux based desktops.
  • As part of DBT CLI installation we can take care of installing dbt-core along with the relevant adapters based on the target database.

Setting up EMR Cluster with Thrift Server using Step

As we are not processing significantly large amount of Data, we will setup single node EMR Cluster using latest version. If you are not familiar about AWS EMR, you sign up to this course on Udemy.

DBT Internally uses JDBC to connect to target Database and hence we need to ensure the Spark Thrift Server is also started as the EMR Cluster comes up with Spark. At the time of configuring single node cluster make sure to add step with command-runner.jar and sudo /usr/lib/spark/sbin/start-thriftserver.sh so that Spark Thrift Server is started after the cluster is started.

Here is the screenshot to configure the step.

Overview of Semi Structured Data Set used for the Demo

Here are the details of Semi Structured Data Set used for the Demo. The data set have 5 columns.

  1. order_id which is of type integer
  2. order_date which is string representation of the date
  3. order_customer_id which is of type integer
  4. order_status which is of type string
  5. order_items which is of type string. But the string is valid JSON Array.

We can covert string which contain JSON Array to Spark Metastore array<struct> using from_json function of Spark SQL. However, we need to make sure to specify the schema as second argument while invoking from_json on top of order_items column in our data set.

Develop required queries using Spark SQL on AWS EMR

Here are the queries to process the semi-structured JSON Data using Spark SQL.

Spark SQL have the feature of providing the path of files using SELECT Query.

SELECT *
FROM JSON.`s3://airetail/order_details`

The column order_items is of type string which have JSON Array stored in it. We can convert to Spark Metastore Array using from_json as below.

SELECT order_id, order_date, order_customer_id, order_status,
    explode_outer(from_json(order_items, 'array<struct<order_item_id:INT, order_item_order_id:INT, order_item_product_id:INT, order_item_quantity:INT, order_item_subtotal:FLOAT, order_item_product_price:FLOAT>>')) AS order_item
FROM order_details

Here is the final query which have the core logic to compute monthly revenue considering COMPLETE or CLOSED orders.

WITH order_details_exploded AS (
    SELECT order_id, order_date, order_customer_id, order_status,
        explode_outer(from_json(order_items, 'array<struct<order_item_id:INT, order_item_order_id:INT, order_item_product_id:INT, order_item_quantity:INT, order_item_subtotal:FLOAT, order_item_product_price:FLOAT>>')) AS order_item
    FROM order_details
) SELECT date_format(order_date, 'yyyy-MM') AS order_month,
    round(sum(order_item.order_item_subtotal), 2) AS revenue
FROM order_details
WHERE order_status IN ('COMPLETE', 'CLOSED')
GROUP BY 1
ORDER BY 1

Develop the DBT Models using Spark on AWS EMR

Let us go ahead and setup the project to develop required DBT Models to compute monthly revenue. We’ll break the overall logic to compute monthly revenue into 2 dependent DBT Models.

Here are the steps that are involved to complete the development process.

  1. Setup DBT Project using Spark Adapter
  2. Run Example Models and confirm if project is setup successfully
  3. Develop Required DBT Models with core logic
  4. Update Project File (change project name and also make required changes related to the models)

Here is the code for the first model order_details_exploded.sql where we will be preserving the logic for exploded order details in the form of a view.

{{ config(materialized='view') }}

SELECT order_id, order_date, order_customer_id, order_status,
    explode_outer(from_json(order_items, 'array<struct<order_item_id:INT, order_item_order_id:INT, order_item_product_id:INT, order_item_quantity:INT, order_item_subtotal:FLOAT, order_item_product_price:FLOAT>>')) AS order_item
FROM JSON.`s3://airetail/order_details`

Here is the code for the second model monthly_revenue.sql where we will be preserving the results in a table in specified s3 location. The configurations related to create the table pointing to specific s3 location can be specified in either this model or at the project level by updating dbt_project.yml.

{{ 
    config(
        materialized='table',
        location_root='s3://airetail/monthly_revenue'
    ) 
}}

SELECT date_format(order_date, 'yyyy-MM') AS order_month,
    round(sum(order_item.order_item_subtotal), 2) AS revenue
FROM {{ ref('order_details_exploded') }}
WHERE order_status IN ('COMPLETE', 'CLOSED')
GROUP BY 1
ORDER BY 1

Run the DBT Models using Spark on AWS EMR

As the development of DBT Model using Spark Adapter is done let us see how to run and validate the same.

  1. Run the DBT Project with 2 models
  2. Login into the EMR Cluster and launch Spark SQL
  3. Run query pointing to the target location in which the monthly revenue data is preserved.

Overview of Orchestration using Tools like Airflow

DBT Applications are primarily developed to take care of implementing required transformation logic using ELT pattern. The overall pipeline might require beyond the transformation logic. We need to make sure the entire pipeline is supposed to be orchestrated.

One of the ways we can orchestrated the pipeline by using orchestration tools such as AWS Step Functions, Airflow, etc.

Here is one of the common design when it comes to building end to end pipeline in which DBT play a critical role.