NextLytics Blog

Advanced Data Warehousing in Databricks with DBT Macros and Snapshots

Written by Robin | 27 November 2025

Today’s data landscape is a rapidly evolving field, in which companies are relying more and more on data-driven decision-making to provide organizational flexibility and the capability of extracting real time valuable insights. Thus the demand for scalable and efficient data warehousing and analytics platforms has never been greater. For the last years, Databricks has cemented its position among its contenders, named a Leader in cloud data warehousing by Gartner analysts year after year. Databricks continues to stand out through its unified analytics platform, which seamlessly blends data engineering, machine learning, analytics and scheduling functionality. Combining the best features of the data lake and the data warehouse in its lakehouse architecture, Databricks allows companies to store and process huge amounts of data while also providing the capabilities for advanced and custom-tailored analytics from within the platform.

Read our "Hands-on Databricks" Blog-Series:

1. How to leverage SAP Databricks to integrate ServiceNow Data
2. Data Modeling with dbt on Databricks: A Practical Guide
3. Databricks AutoML for Time-Series: Fast, Reliable Sales Forecasting
4. Advanced Data Warehousing in Databricks with DBT Macros and Snapshots

The data framework dbt (Data Build Tool) has already proven to be a great match with the feature scope and workflow that Databricks provides. It enables data teams to model, load, transform, document and test data across the different layers of the lakehouse easily, while integrating well into Databricks internal scheduling framework, Databricks Jobs.

In a previous article we already demonstrated how to set up a dbt pipeline in Databricks and leverage the framework to create a data model in the medallion architecture. Today, we want to go a bit further, since many times real world applications of data transformations are not as straightforward as just loading data from their respective sources with minimal transformations. Tracking data changes over time for example requires standardized table metadata and merges processes which are the foundation of building logical warehouse data models according to star- or snowflake-schema or Data Vault 2.0 methodology.
Specifically, we want to shed light on two possible approaches to execute a historized data merge, also called Slowly Changing Dimensions (SCD) type 2 within dbt in Databricks - using Macros and Snapshots.

Basic data model following the medallion architecture

Understanding Slowly Changing Dimensions (SCD)

In the areas of business intelligence and analytics, organizations often deal with data that evolves over time. Take for instance a customer’s address, which might change over time or an employee’s job title, which may be updated as they are promoted. Accurately tracking these changes is crucial for maintaining a complete view of business operations, while having a historical view of this data and its changes over time is relevant to understand and analyze trends over time.

SCDs are a methodology used in data warehousing to manage and preserve the history of changes in dimension data. Dimensions in this case refer to descriptive attributes of business entities like customers, products or employees, where over time these attributes might change.

There are multiple types of SCDs, but SCD type 2 is specifically designed to track historical changes while maintaining the ability to extract the most current data points. The core idea behind SCD type 2 is to preserve every change in data over time by creating new records for each change rather than updating or deleting existing values. Each row within a dimension table contains metadata columns (for example timestamp columns “valid_from” & “valid_to”), that will detail the timespan of the actuality of a single record. Using those metadata columns, you can also easily query the data source and filter out historized records and only show those that are currently active, which is important for up-to-date analytics use cases.


Example of the resulting dataset of a Slowly Changing Dimensions Type 2 merge operation

Watch the recording of our Webinar:

"Bridging Business and Analytics: The Plug-and-Play Future of Data Platforms"

Implementing SCD type 2 in dbt

Starting from this section, we’ll assume that you are familiar with how to set up your instance of Databricks for use with dbt. If you’re not, please refer to the previous article of this series, since this will give you the knowledge you need to kick off your dbt project, as well as describing the data use case we will expand on here.

In our example we already ingest raw data from a file export and a postgres database and refine it in successive steps following the medallion architecture in the following steps.

Bronze Layer

• Clone the data from our sources to databricks delta tables.

Silver Layer

• Join the two bronze layer tables together and clean up the data.

Gold Layer

• Generate a reporting level table with calculated metrics based on the data from the silver layer table.

Each of the tables in this architecture is implemented as dbt models.

For this showcase, we want to implement our historized SCD type 2 merge on the bronze layer table, that is fed from our postgres raw data source. This is a common use case, where the source system itself is not tracking the changes in the data itself and for example deletions in this raw data source might cause for “data zombies” to continually exist in further layers of your data model, because even an incremental data load inserting only new rows will usually not take into account deletions.
The previous model for this bronze layer table looks as follows:


{{ config(materialized='incremental', tags=["bronze"]) }}

SELECT *
FROM {{ source('external_postgres', 'raw_maschinendaten') }}

{% if is_incremental() %}
 -- this operation will only be triggered on an incremental run
where ts >= (select coalesce(max(ts),'1900-01-01') from {{ this }} )

{% endif %} 

The first time this model is executed, it will load all data from our raw data source, while for subsequent runs only data with a timestamp (column “ts”) newer than the most recent data in our bronze level table will be ingested. This works for tracking new rows, but will struggle with updates to existing rows that do not alter the timestamp or deletions.

From here on, we have multiple options on how to approach this challenge.

The simple way - using dbt snapshots

Snapshots are the generally preferred way to implement scd type 2 in dbt. This is, because it fully utilizes the framework's capabilities with minimal overhead and complexity in your data pipelines. Basically a snapshot will archive all previous data states from their respective data source in a mutable table over time, implementing and filling all the necessary metadata columns automatically on execution of the snapshot.

For our examples, that means we replace the model “b_maschinendaten.sql” file for this table from the dbt folder “models” with a file “maschinendaten_snapshot.sql” in the dbt folder “snapshots” with the following contents:


{% snapshot maschinendaten_snapshot %}


{{
   config(
     unique_key="snapshot_pk",
     strategy="check",
     check_cols=["laufzeit_min", "stillstand_min", "gesamtteile", "gutteile", "ausschuss"],
     file_format="delta",
     hard_deletes="invalidate"
   )
}}


select
   *,
   -- Create a unique hash over key columns
   md5(concat_ws('-', cast(maschinenid as string), cast(ts as string))) as snapshot_pk


from {{ source("external_postgres", "raw_maschinendaten") }}


{% endsnapshot %}

In this case we compute a hash over the columns “maschinenid” and “ts” since these two combined build our composite primary key for the data, but a dbt snapshot requires a unique key as a single value.
Now we just have to alter the dbt command to execute this data load from “dbt run –select b_maschinendaten” to “dbt snapshot –select maschinendaten_snapshot” and execute the pipeline.


Databricks delta table resulting from a dbt snapshot merge

As you can see, this table contains the data from the source with some added metadata columns. Relevant in our case are the columns “dbt_valid_from” & “dbt_valid_to”, which describe the timespan for which a record was up to date. If “dbt_valid_to” is null, it means that the respective record is still current.
As you can see in the highlighted row, this record was updated, simulating a correction in the machine output statistics after an initial data load. This is then represented in the snapshot as finalizing the old record, setting “valid_to” to the time and date of the merge and inserting a new record with “valid_from” having the time and date of the merge process.

While this is easy to implement in general, there are some limitations which might require you to go beyond what the snapshot feature (currently) does, which brings us to our next step.

The customized way - using dbt macros

There are a couple of reasons why the snapshot merge approach might not work for your specific use case. You might have tables with a very large amount of data, requiring specific optimizations to be run during the merge like only considering a subset of rows for the merge using filters, or since the dbt snapshot merges generate the metadata columns in their specific format with the “dbt_” prefix it might cause some issues in systems that ingest your versioned data. There might be cases where the primary key of your data source might shift, requiring specific handling to migrate the existing records to match their previous rows and so on.

Whatever the reason, it is most commonly caused by the lack of flexibility a simplified out of the box solution like the dbt snapshot provides. In this case, you might opt to execute your data merge via a dbt macro. 
Macros in dbt offer a way to combine SQL and Jinja to execute programming commands in your transformations that aren’t normally available in SQL, like loops or conditionals for example.
These macros are generic code pieces that can be reused multiple times.

For our demonstration we want to implement an SCD type 2 merge with the following additional features:

  • Provide parametrized names for the metadata columns.
  • Implement an “is_current” column to easily filter for active rows.
  • Provide an optional parameter to only take data from the last X days into consideration for the merge to improve performance.

The code for the macro looks like this: 

 
{% macro scd_type_2_merge(
       target,
       source,
       key_columns,
       effective_from_column,
       effective_from_expression=None,
       payload_columns=None,
       current_flag_column='is_current',
       valid_from_column='valid_from',
       valid_to_column='valid_to',
       lookback_window_days=None
   )
%}
   {% if is_incremental() %}


       {% if key_columns is string %}
           {% set key_cols = [key_columns] %}
       {% else %}
           {% set key_cols = key_columns %}
       {% endif %}


       {% if payload_columns is none %}
           {% set src_cols = adapter.get_columns_in_relation(source) %}
           {% set kc_lower = key_cols | map('lower') | list %}
           {% set payload_cols = [] %}
           {% for col in src_cols %}
               {% if col.name | lower not in kc_lower
                   and col.name | lower != effective_from_column | lower
                   and col.name | lower != current_flag_column | lower
                   and col.name | lower != valid_from_column | lower
                   and col.name | lower != valid_to_column | lower
               %}
                   {% do payload_cols.append(col.name) %}
               {% endif %}
           {% endfor %}
       {% else %}
           {% if payload_columns is string %}
               {% set payload_cols = [payload_columns] %}
           {% else %}
               {% set payload_cols = payload_columns %}
           {% endif %}
       {% endif %}


       {% set src_hash_parts = [] %}
       {% set dim_hash_parts = [] %}
       {% for c in payload_cols %}
           {% do src_hash_parts.append("coalesce(cast(src." ~ c ~ " as string), '')") %}
           {% do dim_hash_parts.append("coalesce(cast(dim." ~ c ~ " as string), '')") %}
       {% endfor %}


       {% set src_hash_expr = "md5(concat_ws('|', " ~ src_hash_parts | join(', ') ~ "))" %}
       {% set dim_hash_expr = "md5(concat_ws('|', " ~ dim_hash_parts | join(', ') ~ "))" %}


       {% set key_join_parts = [] %}
       {% for k in key_cols %}
           {% do key_join_parts.append("dim." ~ k ~ " = src." ~ k) %}
       {% endfor %}
       {% set key_join = key_join_parts | join(' and ') %}
      
       with src as (
           select
               *
               {% if effective_from_expression is not none %}
               , {{ effective_from_expression }} as {{ effective_from_column }}
               {% endif %}
           from {{ source }}
           {% if lookback_window_days is not none %}
           where
               {% if effective_from_expression is not none %}
                    {{ effective_from_expression }}
               {% else %}
                    {{ effective_from_column }}
               {% endif %}
               >= date_add(current_date(), -{{ lookback_window_days }})
           {% endif %}
          
           {# If multiple updates for the same key exist in the batch, pick only the latest one. #}
           qualify row_number() over (
               partition by {{ key_cols | join(', ') }}
               order by
               {% if effective_from_expression is not none %}
                    {{ effective_from_expression }}
               {% else %}
                    {{ effective_from_column }}
               {% endif %}
               desc
           ) = 1
       ),


       current_dim as (
           select *
           from {{ target }} as dim
           where dim.{{ current_flag_column }} = 1
          
           {% if lookback_window_days is not none %}
           and exists (
               select 1
               from src
               where {{ key_join }}
           )
           {% endif %}
       ),


       changes as (
           select
               {% for k in key_cols %}
                   src.{{ k }},
               {% endfor %}
               {% for c in payload_cols %}
                   src.{{ c }},
               {% endfor %}
               src.{{ effective_from_column }},
              
               {% for k in key_cols %}
                   dim.{{ k }} as dim_{{ k }},
               {% endfor %}
               {% for c in payload_cols %}
                   dim.{{ c }} as dim_{{ c }},
               {% endfor %}
               dim.{{ valid_from_column }}   as dim_{{ valid_from_column }},
               dim.{{ valid_to_column }}     as dim_{{ valid_to_column }},
               dim.{{ current_flag_column }} as dim_{{ current_flag_column }},


               {{ src_hash_expr }} as src_hash,
               {{ dim_hash_expr }} as dim_hash,


               case
                   when dim.{{ key_cols[0] }} is null then 'INSERT'
                   when {{ src_hash_expr }} <> {{ dim_hash_expr }} then 'UPDATE'
                   else 'NOCHANGE'
               end as change_type


           from src
           left join current_dim as dim
             on {{ key_join }}
       ),


       records_to_merge as (
           select
               {% for k in key_cols %}
                   dim_{{ k }} as {{ k }},
               {% endfor %}
               {% for c in payload_cols %}
                   dim_{{ c }} as {{ c }},
               {% endfor %}
               dim_{{ valid_from_column }} as {{ valid_from_column }},
               ({{ effective_from_column }} - interval '1' second) as {{ valid_to_column }},
               0 as {{ current_flag_column }},
               'UPDATE' as merge_action
           from changes
           where change_type = 'UPDATE'


           union all


           select
               {% for k in key_cols %}
                   {{ k }},
               {% endfor %}
               {% for c in payload_cols %}
                   {{ c }},
               {% endfor %}
               {{ effective_from_column }} as {{ valid_from_column }},
               to_timestamp('9999-12-31 23:59:59') as {{ valid_to_column }},
               1 as {{ current_flag_column }},
               'INSERT' as merge_action
           from changes
           where change_type in ('INSERT', 'UPDATE')
       )


       merge into {{ target }} as tgt
       using records_to_merge as s
       on
       (
           {% for k in key_cols %}
               tgt.{{ k }} = s.{{ k }}
               and
           {% endfor %}
           tgt.{{ valid_from_column }} = s.{{ valid_from_column }}
       )
       when matched and s.merge_action = 'UPDATE' then
           update set
               tgt.{{ valid_to_column }} = s.{{ valid_to_column }},
               tgt.{{ current_flag_column }} = s.{{ current_flag_column }}
              
       when not matched and s.merge_action = 'INSERT' then
           insert (
               {% for k in key_cols %}{{ k }}, {% endfor %}
               {% for c in payload_cols %}{{ c }}, {% endfor %}
               {{ valid_from_column }},
               {{ valid_to_column }},
               {{ current_flag_column }}
           )
           values (
               {% for k in key_cols %}s.{{ k }}, {% endfor %}
               {% for c in payload_cols %}s.{{ c }}, {% endfor %}
               s.{{ valid_from_column }},
               s.{{ valid_to_column }},
               s.{{ current_flag_column }}
           );
          
   {% else %}
       select 1 as scd_merge_skipped
   {% endif %}
{% endmacro %}

 

 This macro can then be executed from the model .sql file you want to execute the merge for like this:


{{ config(
   materialized='incremental',
   incremental_strategy='append',
   post_hook="{{ scd_type_2_merge(
       target=this,
       source=source('external_postgres', 'raw_maschinendaten'),
       key_columns=['maschinenid', 'ts'],
       effective_from_column='_loadtime',
       lookback_window_days=7
   ) }}"
) }}


-- Dbt requires a SELECT statement for the model body.
-- 1. On the FIRST run, we select everything to initialize the table structure including the SCD 2 metadata columns
-- 2. On INCREMENTAL runs, we select NOTHING (where 1=0).
--    This prevents dbt from trying to insert rows automatically, letting the
--    'post_hook' macro handle the complex SCD2 Merge logic entirely.


select
   *,
   1 as is_current,
   cast(_loadtime as timestamp) as valid_from,
   cast('9999-12-31 23:59:59' as timestamp) as valid_to
from {{ source('external_postgres', 'raw_maschinendaten') }}
{% if is_incremental() %}
   where 1=0
{% endif %}


While this approach is significantly more complex, requiring technical knowledge of both Jinja and SQL, it fulfills all of our previously stated requirements, giving us more control over how the metadata is structured and optimizing the merge performance by only evaluating data from (in this case) the last seven days.

A merge using this approach will result in a bronze level table that looks like this:


Databricks delta table resulting from a merge operation using the demonstrated custom dbt macro including the metadata columns describing the currentness of the records

Using DBT Macros and Snapshots for advanced Data Warehousing in Databricks: Our Conclusion

At the end of the day, both of the demonstrated approaches can be used to implement a historized data model in your organisation. As it is so often the case, it comes down to a trade off between ease of use and flexibility and deciding which path to take depends on your unique business requirements. When building a company-wide warehouse environment and structured data model, standardized table metadata and merge utility functions are key in Databricks. Even the complex macro-based merge can be defined early-on in your adoption project and then offers a stable basis for long-term warehouse operations on the cost-efficient, highly scalable Lakehouse platform.

If you are curious, which approach might be better suited for your needs or how you can use dbt to fully leverage the power of modern data infrastructure for your company, don’t hesitate to contact us.