Skip to content
NextLytics
Megamenü_2023_Über-uns

Shaping Business Intelligence

Ob clevere Zusatzprodukte für SAP BI, Entwicklung aussagekräftiger Dashboards oder Implementierung KI-basierter Anwendungen - wir gestalten zusammen mit Ihnen die Zukunft von Business Intelligence. 

Megamenü_2023_Über-uns_1

Über uns

Als Partner mit tiefem Prozess-Know-how, Wissen der neuesten SAP-Technologien sowie hoher sozialer Kompetenz und langjähriger Projekterfahrung gestalten wir die Zukunft von Business Intelligence auch in Ihrem Unternehmen.

Megamenü_2023_Methodik

Unsere Methodik

Die Mischung aus klassischem Wasserfallmodell und agiler Methodik garantiert unseren Projekten eine hohe Effizienz und Zufriedenheit auf beiden Seiten. Erfahren Sie mehr über unsere Vorgehensweise.

Produkte
Megamenü_2023_NextTables

NextTables

Daten in SAP BW out of the Box bearbeiten: Mit NextTables wird das Editieren von Tabellen einfacher, schneller und intuitiver, egal ob Sie SAP BW on HANA, SAP S/4HANA oder SAP BW 4/HANA nutzen.

Megamenü_2023_Connector

NextLytics Connectoren

Die zunehmende Automatisierung von Prozessen erfordert die Konnektivität von IT-Systemen. Die NextLytics Connectoren ermöglichen eine Verbindung Ihres SAP Ökosystems mit diversen open-source Technologien.

IT-Services
Megamenü_2023_Data-Science

Data Science & Engineering

Bereit für die Zukunft? Als starker Partner stehen wir Ihnen bei der Konzeption, Umsetzung und Optimierung Ihrer KI-Anwendung zur Seite.

Megamenü_2023_Planning

SAP Planning

Wir gestalten neue Planungsanwendungen mithilfe von SAP BPC Embedded, IP oder  SAC Planning, die einen Mehrwert für Ihr Unternehmen schaffen.

Megamenü_2023_Dashboarding

Business Intelligence

Mit unserer Expertise verhelfen wir Ihnen auf Basis von Tableau, Power BI, SAP Analytics Cloud oder SAP Lumira zu aussagekräftigen Dashboards. 

Megamenü_2023_Data-Warehouse-1

SAP Data Warehouse

Planen Sie eine Migration auf SAP HANA? Wir zeigen Ihnen, welche Herausforderungen zu beachten sind und welche Vorteile eine Migration bringt.

Business Analytics
Megamenü_2023_Procurement

Procurement Analytics

Transparente und valide Zahlen sind vor allem in Unternehmen mit dezentraler Struktur wichtig. SAP Procurement Analytics ermöglicht die Auswertung von SAP ERP-Daten in SAP BI.

Megamenü_2023_Reporting

SAP HR Reporting & Analytics

Mit unserem Standardmodell für Reporting von SAP HCM mit SAP BW beschleunigen Sie administrative Tätigkeiten und stellen Daten aus verschiedenen Systemen zentral und valide zur Verfügung.

Megamenü_2023_Dataquality

Data Quality Management

In Zeiten von Big Data und IoT kommt der Vorhaltung einer hohen Datenqualität eine enorm wichtige Bedeutung zu. Mit unserer Lösung für Datenqualitätsmanagement (DQM) behalten Sie stets den Überblick.

Karriere
Megamenü_2023_Karriere-2b

Arbeiten bei NextLytics

Wenn Du mit Freude zur Arbeit gehen möchtest und dabei Deine berufliche und persönliche Weiterentwicklung nicht zu kurz kommen soll, dann bist Du bei uns genau richtig! 

Megamenü_2023_Karriere-1

Berufserfahrene

Zeit für etwas Neues? Gehe Deinen nächsten beruflichen Schritt und gestalte Innovation und Wachstum in einem spannenden Umfeld zusammen mit uns!

Megamenü_2023_Karriere-5

Berufseinsteigende

Schluss mit grauer Theorie - Zeit, die farbenfrohe Praxis kennenzulernen! Gestalte bei uns Deinen Einstieg ins Berufsleben mit lehrreichen Projekten und Freude an der Arbeit.

Megamenü_2023_Karriere-4-1

Studierende

Du möchtest nicht bloß die Theorie studieren, sondern Dich gleichzeitig auch praktisch von ihr überzeugen? Teste mit uns Theorie und Praxis und erlebe wo sich Unterschiede zeigen.

Megamenü_2023_Karriere-3

Offene Stellen

Hier findest Du alle offenen Stellenangebote. Schau Dich um und bewirb Dich - wir freuen uns! Falls keine passende Stelle dabei ist, sende uns gerne Deine Initiativbewerbung zu.

Blog
NextLytics Newsletter
Abonnieren Sie jetzt unseren monatlichen Newsletter:
Newsletter abonnieren
 

Effizientes Data Warehousing in Databricks mit dbt Makros & Snapshots

Die heutige Datenwelt ist ein sich rasant entwickelndes Feld, in dem Unternehmen zunehmend auf datengestützte Entscheidungen setzen, um organisatorische Flexibilität und die Fähigkeit zur Gewinnung wertvoller Echtzeit-Einsichten zu gewährleisten. Daher ist die Nachfrage nach skalierbaren und effizienten Data-Warehousing- und Analyseplattformen größer denn je. In den letzten Jahren hat Databricks seine Position unter den Mitbewerbern gefestigt und wurde von Gartner-Analysten Jahr für Jahr als führender Anbieter im Bereich Cloud-Data-Warehousing ausgezeichnet. Databricks zeichnet sich weiterhin durch seine einheitliche Analyseplattform aus, die Datenverarbeitung, maschinelles Lernen, Analyse und Planungsfunktionen nahtlos miteinander verbindet. Durch die Kombination der besten Funktionen des Data Lake und des Data Warehouse in seiner Lakehouse-Architektur ermöglicht Databricks Unternehmen die Speicherung und Verarbeitung großer Datenmengen und bietet gleichzeitig die Möglichkeit für erweiterte und maßgeschneiderte Analysen innerhalb der Plattform.

Finden Sie hier unsere Blogserie „Hands-on Databricks“:

1. So nutzen Sie SAP Databricks zur Integration von ServiceNow-Daten
2. Datenmodellierung mit dbt auf Databricks: Ein Praxisleitfaden
3. Databricks AutoML für Zeitreihen: Schnelle und zuverlässige Absatzprognosen
4. Effizientes Data Warehousing in Databricks mit dbt Makros & Snapshots


Das Datenframework dbt (Data Build Tool) hat bereits bewiesen, dass es hervorragend zu den Funktionen und Arbeitsabläufen von Databricks passt. Es ermöglicht Datenteams, Daten über die verschiedenen Ebenen des Lakehouse hinweg einfach zu modellieren, zu laden, zu transformieren, zu dokumentieren und zu testen, während es sich gleichzeitig gut in das interne Planungsframework von Databricks, Databricks Jobs, integrieren lässt.

In einem früheren Artikel haben wir bereits gezeigt, wie man eine dbt-Pipeline in Databricks einrichtet und das Framework nutzt, um ein Datenmodell in der Medallion-Architektur zu erstellen. Heute möchten wir noch einen Schritt weiter gehen, da die Anwendungen von Datentransformationen in der Praxis oft nicht so einfach sind, wie nur Daten aus ihren jeweiligen Quellen mit minimalen Transformationen zu laden. Die Verfolgung von Datenänderungen im Zeitverlauf erfordert beispielsweise standardisierte Tabellenmetadaten und Zusammenführungsprozesse, die die Grundlage für die Erstellung logischer Warehouse-Datenmodelle nach dem Stern- oder Schneeflockenschema oder der Data Vault 2.0-Methodik bilden.

Insbesondere möchten wir zwei mögliche Ansätze zur Durchführung einer historisierten Datenzusammenführung, auch Slowly Changing Dimensions (SCD) Typ 2 innerhalb von dbt in Databricks genannt, beleuchten – mit Hilfe von Makros und Snapshots.
graphic_medallion (2)
Grundlegendes Datenmodell gemäß der Medaillonarchitektur

Slowly Changing Dimensions (SCD) verstehen

In den Bereichen Business Intelligence und Analytics haben Unternehmen häufig mit sich zeitlich verändernden Daten zu tun. Nehmen wir zum Beispiel die Adresse eines Kunden, die sich im Laufe der Zeit ändern kann, oder die Berufsbezeichnung eines Mitarbeiters, die bei einer Beförderung aktualisiert wird. Die genaue Verfolgung dieser Änderungen ist entscheidend für einen vollständigen Überblick über die Geschäftsabläufe, während die historische Betrachtung dieser Daten und ihrer Veränderungen im Laufe der Zeit relevant ist, um Trends im Zeitverlauf zu verstehen und zu analysieren.

SCDs sind eine Methodik, die im Data Warehousing verwendet wird, um die Historie von Änderungen an Dimensionsdaten zu verwalten und zu bewahren. Dimensionen beziehen sich in diesem Fall auf beschreibende Attribute von Geschäftseinheiten wie Kunden, Produkten oder Mitarbeitern, wobei sich diese Attribute im Laufe der Zeit ändern können.

Es gibt mehrere Arten von SCDs, aber SCD Typ 2 wurde speziell entwickelt, um historische Änderungen zu verfolgen und gleichzeitig die Möglichkeit zu erhalten, die aktuellsten Datenpunkte zu extrahieren. Die Kernidee hinter SCD Typ 2 besteht darin, jede Änderung der Daten im Laufe der Zeit zu bewahren, indem für jede Änderung neue Datensätze erstellt werden, anstatt bestehende Werte zu aktualisieren oder zu löschen. Jede Zeile innerhalb einer Dimensionstabelle enthält Metadaten-Spalten (z. B. Zeitstempel-Spalten „valid_from” und „valid_to”), die den Zeitraum der Aktualität eines einzelnen Datensatzes detailliert angeben. Mithilfe dieser Metadaten-Spalten können Sie auch ganz einfach die Datenquelle abfragen und historisierte Datensätze herausfiltern, sodass nur die aktuell gültigen Datensätze angezeigt werden, was für tagesaktuelle Analyseanwendungen wichtig ist.

Slowly Changing Dimensions Type 2-merge-Operation
Beispiel für den resultierenden Datensatz einer Slowly Changing Dimensions Type 2-merge-Operation


Sehen Sie sich die Aufzeichnung unseres Webinars an: "Bridging Business and Analytics: The Plug-and-Play Future of Data Platforms"

Neuer Call-to-Action


Implementierung von SCD Typ 2 in dbt

Wir gehen davon aus, dass Sie mit der Einrichtung Ihrer Databricks-Instanz für die Verwendung mit dbt vertraut sind. Wenn dies nicht der Fall ist, empfehlen wir Ihnen, den vorherigen Artikel dieser Reihe zu lesen, da dieser Ihnen das notwendige Wissen für den Start Ihres dbt-Projekts vermittelt und den Datenanwendungsfall beschreibt, auf den wir hier näher eingehen werden.

In unserem Beispiel nehmen wir bereits Rohdaten aus einem Datenexport und einer Postgres-Datenbank auf und verfeinern sie in aufeinanderfolgenden Schritten gemäß der Medaillon-Architektur in den folgenden Schritten.

Bronze-Ebene
• Klonen der Daten aus unseren Quellen in Databricks-Delta-Tabellen.

Silber-Ebene
• Zusammenführen der beiden Tabellen der Bronze-Ebene und Bereinigen der Daten.

Gold-Ebene
• Erstellen einer Tabelle auf Berichtsebene mit berechneten Metriken auf der Grundlage der Daten aus der Tabelle der Silber-Ebene.


Jede der Tabellen in dieser Architektur wird als dbt-model implementiert.

Für diese Demonstration möchten wir unsere historisierte SCD-Typ-2-merge auf die Bronze-Layer-Tabelle anwenden, die aus unserer Postgres-Rohdatenquelle gespeist wird. Dies ist ein häufiger Anwendungsfall, bei dem das Quellsystem selbst die Änderungen in den Daten nicht verfolgt und beispielsweise Löschungen in dieser Rohdatenquelle dazu führen können, dass „Datenzombies” in weiteren Ebenen Ihres Datenmodells weiterhin vorhanden sind, da selbst eine inkrementelle Datenladung, bei der nur neue Zeilen eingefügt werden, in der Regel Löschungen nicht berücksichtigt.
Das bisherige dbt-model für diese Bronze-Layer-Tabelle sieht wie folgt aus:


{{ config(materialized='incremental', tags=["bronze"]) }}

SELECT *
FROM {{ source('external_postgres', 'raw_maschinendaten') }}

{% if is_incremental() %}
 -- this operation will only be triggered on an incremental run
where ts >= (select coalesce(max(ts),'1900-01-01') from {{ this }} )

{% endif %} 

Wenn dieses Modell zum ersten Mal ausgeführt wird, lädt es alle Daten aus unserer Rohdatenquelle, während bei nachfolgenden Ausführungen nur Daten mit einem Zeitstempel (Spalte „ts“) aufgenommen werden, der neuer ist als die aktuellsten Daten in unserer Bronze-Level-Tabelle. Dies funktioniert für die Verfolgung neuer Zeilen, hat jedoch Schwierigkeiten mit Aktualisierungen bestehender Zeilen, die den Zeitstempel nicht verändern, oder mit Löschungen.

Von hier an haben wir mehrere Optionen, wie wir diese Herausforderung angehen können.

Der einfache Weg - mit dbt-Snapshots

Snapshots sind die allgemein bevorzugte Methode zur Implementierung von SCD Typ 2 in dbt. Der Grund dafür ist, dass sie die Fähigkeiten des Frameworks bei minimalem Overhead und minimaler Komplexität in Ihren Datenpipelines voll ausschöpfen. Im Grunde genommen archiviert ein Snapshot alle früheren Datenzustände aus ihrer jeweiligen Datenquelle in einer veränderbaren Tabelle im Zeitverlauf und implementiert und füllt alle erforderlichen Metadaten-Spalten automatisch bei der Ausführung des Snapshots.

Für unsere Beispiele bedeutet dies, dass wir die Modelldatei „b_maschinendaten.sql” für diese Tabelle aus dem dbt-Ordner „models” durch eine Datei „maschinendaten_snapshot.sql” im dbt-Ordner „snapshots” mit folgendem Inhalt ersetzen:


{% snapshot maschinendaten_snapshot %}


{{
   config(
     unique_key="snapshot_pk",
     strategy="check",
     check_cols=["laufzeit_min", "stillstand_min", "gesamtteile", "gutteile", "ausschuss"],
     file_format="delta",
     hard_deletes="invalidate"
   )
}}


select
   *,
   -- Create a unique hash over key columns
   md5(concat_ws('-', cast(maschinenid as string), cast(ts as string))) as snapshot_pk


from {{ source("external_postgres", "raw_maschinendaten") }}


{% endsnapshot %}

In diesem Fall berechnen wir einen Hash über die Spalten „maschinenid” und „ts”, da diese beiden zusammen unseren zusammengesetzten Primärschlüssel für die Daten bilden, aber ein dbt-Snapshot erfordert einen eindeutigen Schlüssel als Einzelwert.


Jetzt müssen wir nur noch den dbt-Befehl ändern, um diese Datenladung auszuführen, und zwar von „dbt run –select b_maschinendaten“ zu „dbt snapshot –select maschinendaten_snapshot“, und die Pipeline ausführen.

Daraus entsteht eine Tabelle wie diese:

[Databricks-Delta-TabelleDatabricks-Delta-Tabelle, die aus einer dbt-Snapshot-Zusammenführung entsteht

Wie Sie sehen können, enthält diese Tabelle die Daten aus der Quelle mit einigen zusätzlichen Metadaten-Spalten. Relevant für unseren Fall sind die Spalten „dbt_valid_from” und „dbt_valid_to”, die den Zeitraum beschreiben, für den ein Datensatz aktuell war. Wenn „dbt_valid_to” null ist, bedeutet dies, dass der jeweilige Datensatz noch aktuell ist.


Wie Sie in der hervorgehobenen Zeile sehen können, wurde dieser Datensatz aktualisiert, wodurch eine Korrektur in der Maschinenausgabestatistik nach einer ersten Datenladung simuliert wurde. Dies wird dann im Snapshot dargestellt, indem der alte Datensatz finalisiert, „valid_to“ auf den Zeitpunkt und das Datum der Zusammenführung gesetzt und ein neuer Datensatz mit „valid_from“ eingefügt wird, der den Zeitpunkt und das Datum des Merge-Prozesses enthält.

Obwohl dies im Allgemeinen einfach zu implementieren ist, gibt es einige Einschränkungen, die es erforderlich machen könnten, über die (derzeitigen) Funktionen des Snapshots hinauszugehen, was uns zu unserem nächsten Schritt bringt.

Der maßgeschneiderte Weg - mit dbt-Makros

Es gibt mehrere Gründe, warum der Snapshot-Merge-Ansatz für Ihren speziellen Anwendungsfall möglicherweise nicht geeignet ist. Möglicherweise haben Sie Tabellen mit sehr großen Datenmengen, die während des Merge-Vorgangs spezielle Optimierungen erfordern, z. B. die Berücksichtigung nur einer Teilmenge von Zeilen für den Merge mithilfe von Filtern. Oder da die dbt-Snapshot-Merges die Metadaten-Spalten in ihrem spezifischen Format mit dem Präfix „dbt_“ generieren, kann dies zu Problemen in Systemen führen, die Ihre versionierten Daten aufnehmen. Es kann Fälle geben, in denen sich der Primärschlüssel Ihrer Datenquelle verschiebt, was eine spezielle Behandlung erfordert, um die vorhandenen Datensätze so zu migrieren, dass sie mit ihren vorherigen Zeilen übereinstimmen, und noch vieles mehr.

Unabhängig vom Grund liegt die Ursache meist in der mangelnden Flexibilität einer vereinfachten Standardlösung wie dem dbt-Snapshot. In diesem Fall können Sie sich dafür entscheiden, Ihre Daten-Merges über ein dbt-Makro auszuführen.

Makros in dbt bieten eine Möglichkeit, SQL und Jinja zu kombinieren, um Programmierbefehle in Ihren Transformationen auszuführen, die normalerweise in SQL nicht verfügbar sind, wie beispielsweise Schleifen oder Bedingungen.

Diese Makros sind generische Code-Teile, die mehrfach wiederverwendet werden können.

Für unsere Demonstration möchten wir einen SCD-Typ-2-Merge mit den folgenden zusätzlichen Funktionen implementieren:

  • Bereitstellung parametrisierter Namen für die Metadaten-Spalten.
  • Implementierung einer Spalte „is_current“, um aktive Zeilen einfach zu filtern.
  • Bereitstellung eines optionalen Parameters, um nur Daten aus den letzten X Tagen für den Merge zu berücksichtigen und so die Leistung zu verbessern.

Der Code für das Makro sieht wie folgt aus:


{% macro scd_type_2_merge(
       target,
       source,
       key_columns,
       effective_from_column,
       effective_from_expression=None,
       payload_columns=None,
       current_flag_column='is_current',
       valid_from_column='valid_from',
       valid_to_column='valid_to',
       lookback_window_days=None
   )
%}
   {% if is_incremental() %}


       {% if key_columns is string %}
           {% set key_cols = [key_columns] %}
       {% else %}
           {% set key_cols = key_columns %}
       {% endif %}


       {% if payload_columns is none %}
           {% set src_cols = adapter.get_columns_in_relation(source) %}
           {% set kc_lower = key_cols | map('lower') | list %}
           {% set payload_cols = [] %}
           {% for col in src_cols %}
               {% if col.name | lower not in kc_lower
                   and col.name | lower != effective_from_column | lower
                   and col.name | lower != current_flag_column | lower
                   and col.name | lower != valid_from_column | lower
                   and col.name | lower != valid_to_column | lower
               %}
                   {% do payload_cols.append(col.name) %}
               {% endif %}
           {% endfor %}
       {% else %}
           {% if payload_columns is string %}
               {% set payload_cols = [payload_columns] %}
           {% else %}
               {% set payload_cols = payload_columns %}
           {% endif %}
       {% endif %}


       {% set src_hash_parts = [] %}
       {% set dim_hash_parts = [] %}
       {% for c in payload_cols %}
           {% do src_hash_parts.append("coalesce(cast(src." ~ c ~ " as string), '')") %}
           {% do dim_hash_parts.append("coalesce(cast(dim." ~ c ~ " as string), '')") %}
       {% endfor %}


       {% set src_hash_expr = "md5(concat_ws('|', " ~ src_hash_parts | join(', ') ~ "))" %}
       {% set dim_hash_expr = "md5(concat_ws('|', " ~ dim_hash_parts | join(', ') ~ "))" %}


       {% set key_join_parts = [] %}
       {% for k in key_cols %}
           {% do key_join_parts.append("dim." ~ k ~ " = src." ~ k) %}
       {% endfor %}
       {% set key_join = key_join_parts | join(' and ') %}
      
       with src as (
           select
               *
               {% if effective_from_expression is not none %}
               , {{ effective_from_expression }} as {{ effective_from_column }}
               {% endif %}
           from {{ source }}
           {% if lookback_window_days is not none %}
           where
               {% if effective_from_expression is not none %}
                    {{ effective_from_expression }}
               {% else %}
                    {{ effective_from_column }}
               {% endif %}
               >= date_add(current_date(), -{{ lookback_window_days }})
           {% endif %}
          
           {# If multiple updates for the same key exist in the batch, pick only the latest one. #}
           qualify row_number() over (
               partition by {{ key_cols | join(', ') }}
               order by
               {% if effective_from_expression is not none %}
                    {{ effective_from_expression }}
               {% else %}
                    {{ effective_from_column }}
               {% endif %}
               desc
           ) = 1
       ),


       current_dim as (
           select *
           from {{ target }} as dim
           where dim.{{ current_flag_column }} = 1
          
           {% if lookback_window_days is not none %}
           and exists (
               select 1
               from src
               where {{ key_join }}
           )
           {% endif %}
       ),


       changes as (
           select
               {% for k in key_cols %}
                   src.{{ k }},
               {% endfor %}
               {% for c in payload_cols %}
                   src.{{ c }},
               {% endfor %}
               src.{{ effective_from_column }},
              
               {% for k in key_cols %}
                   dim.{{ k }} as dim_{{ k }},
               {% endfor %}
               {% for c in payload_cols %}
                   dim.{{ c }} as dim_{{ c }},
               {% endfor %}
               dim.{{ valid_from_column }}   as dim_{{ valid_from_column }},
               dim.{{ valid_to_column }}     as dim_{{ valid_to_column }},
               dim.{{ current_flag_column }} as dim_{{ current_flag_column }},


               {{ src_hash_expr }} as src_hash,
               {{ dim_hash_expr }} as dim_hash,


               case
                   when dim.{{ key_cols[0] }} is null then 'INSERT'
                   when {{ src_hash_expr }} <> {{ dim_hash_expr }} then 'UPDATE'
                   else 'NOCHANGE'
               end as change_type


           from src
           left join current_dim as dim
             on {{ key_join }}
       ),


       records_to_merge as (
           select
               {% for k in key_cols %}
                   dim_{{ k }} as {{ k }},
               {% endfor %}
               {% for c in payload_cols %}
                   dim_{{ c }} as {{ c }},
               {% endfor %}
               dim_{{ valid_from_column }} as {{ valid_from_column }},
               ({{ effective_from_column }} - interval '1' second) as {{ valid_to_column }},
               0 as {{ current_flag_column }},
               'UPDATE' as merge_action
           from changes
           where change_type = 'UPDATE'


           union all


           select
               {% for k in key_cols %}
                   {{ k }},
               {% endfor %}
               {% for c in payload_cols %}
                   {{ c }},
               {% endfor %}
               {{ effective_from_column }} as {{ valid_from_column }},
               to_timestamp('9999-12-31 23:59:59') as {{ valid_to_column }},
               1 as {{ current_flag_column }},
               'INSERT' as merge_action
           from changes
           where change_type in ('INSERT', 'UPDATE')
       )


       merge into {{ target }} as tgt
       using records_to_merge as s
       on
       (
           {% for k in key_cols %}
               tgt.{{ k }} = s.{{ k }}
               and
           {% endfor %}
           tgt.{{ valid_from_column }} = s.{{ valid_from_column }}
       )
       when matched and s.merge_action = 'UPDATE' then
           update set
               tgt.{{ valid_to_column }} = s.{{ valid_to_column }},
               tgt.{{ current_flag_column }} = s.{{ current_flag_column }}
              
       when not matched and s.merge_action = 'INSERT' then
           insert (
               {% for k in key_cols %}{{ k }}, {% endfor %}
               {% for c in payload_cols %}{{ c }}, {% endfor %}
               {{ valid_from_column }},
               {{ valid_to_column }},
               {{ current_flag_column }}
           )
           values (
               {% for k in key_cols %}s.{{ k }}, {% endfor %}
               {% for c in payload_cols %}s.{{ c }}, {% endfor %}
               s.{{ valid_from_column }},
               s.{{ valid_to_column }},
               s.{{ current_flag_column }}
           );
          
   {% else %}
       select 1 as scd_merge_skipped
   {% endif %}
{% endmacro %}

Dieses Makro kann dann aus der model .sql-Datei, für die Sie den Merge ausführen möchten, wie folgt ausgeführt werden:


{{ config(
   materialized='incremental',
   incremental_strategy='append',
   post_hook="{{ scd_type_2_merge(
       target=this,
       source=source('external_postgres', 'raw_maschinendaten'),
       key_columns=['maschinenid', 'ts'],
       effective_from_column='_loadtime',
       lookback_window_days=7
   ) }}"
) }}


-- Dbt requires a SELECT statement for the model body.
-- 1. On the FIRST run, we select everything to initialize the table structure including the SCD 2 metadata columns
-- 2. On INCREMENTAL runs, we select NOTHING (where 1=0).
--    This prevents dbt from trying to insert rows automatically, letting the
--    'post_hook' macro handle the complex SCD2 Merge logic entirely.


select
   *,
   1 as is_current,
   cast(_loadtime as timestamp) as valid_from,
   cast('9999-12-31 23:59:59' as timestamp) as valid_to
from {{ source('external_postgres', 'raw_maschinendaten') }}
{% if is_incremental() %}
   where 1=0
{% endif %}

Dieser Ansatz ist zwar deutlich komplexer und erfordert technische Kenntnisse sowohl in Jinja als auch in SQL, erfüllt jedoch alle zuvor genannten Anforderungen. Er gibt uns mehr Kontrolle über die Strukturierung der Metadaten und optimiert die Merge- Performance, indem nur Daten aus (in diesem Fall) den letzten sieben Tagen ausgewertet werden.
Ein Merge mit diesem Ansatz führt zu einer Tabelle der Bronze- Ebene, die wie folgt aussieht:

screenshot_macro_tableDatabricks-Delta-Tabelle, die aus einer Merge-Operation unter Verwendung des vorgestellten benutzerdefinierten dbt-Makros resultiert, einschließlich der Metadaten-Spalten, die die Aktualität der Datensätze beschreiben.

DBT-Makros und Snapshots für fortgeschrittenes Data Warehousing in Databricks: Unser Fazit

Letztendlich können beide vorgestellten Ansätze zur Implementierung eines historisierten Datenmodells in Ihrem Unternehmen verwendet werden. Wie so oft kommt es auf einen Kompromiss zwischen Benutzerfreundlichkeit und Flexibilität an, und die Entscheidung für einen der beiden Ansätze hängt von Ihren individuellen Geschäftsanforderungen ab. Beim Aufbau einer unternehmensweiten Warehouse-Umgebung und eines strukturierten Datenmodells sind standardisierte Tabellenmetadaten und Merge-Utility-Funktionen in Databricks von entscheidender Bedeutung. Selbst komplexe makrobasierte Merges können frühzeitig in Ihrem Einführungsprojekt definiert werden und bieten dann eine stabile Grundlage für den langfristigen Warehouse-Betrieb auf der kosteneffizienten, hoch skalierbaren Lakehouse-Plattform.

Falls Sie wissen möchten, welcher Ansatz für Ihre Bedürfnisse besser geeignet ist oder wie Sie mit dbt die Leistungsfähigkeit einer modernen Dateninfrastruktur für Ihr Unternehmen voll ausschöpfen können, wenden Sie sich gerne an uns.

 

Erfahren Sie mehr über  Databricks


FAQ - Effizientes Data Warehousing in Databricks

Hier finden Sie einige häufig gestellte Fragen zum Thema „Fortgeschrittenes Data Warehousing in Databricks mit dbt-Makros und Snapshots“.

Was ist das Hauptziel der Kombination von Databricks und dbt in diesem Szenario? Das Ziel besteht darin, mit dbt ein skalierbares, historisiertes Data Warehouse auf dem Databricks Lakehouse aufzubauen. Databricks bietet die einheitliche Rechen- und Speicherplattform, während dbt die Modellierung, Transformationen, Dokumentation, Tests und Orchestrierungsintegration über Databricks Jobs übernimmt.
Was sind Slowly Changing Dimensions (SCD) Typ 2 und warum sind diese relevant? SCD Typ 2 ist eine Modellierungsmethode, die die vollständige Historie der Änderungen in Dimensionsdaten bewahrt, indem für jede Änderung eine neue Zeile erstellt wird. Mit Metadatenfeldern wie „valid_from“, „valid_to“ (und optional einem „is_current“-Flag) lässt sich leicht analysieren, wie ein Attribut zu einem bestimmten Zeitpunkt aussah, und gleichzeitig lässt sich der aktuelle Status für das operative Reporting abrufen.
Wie fügt sich die Medaillon-Architektur in die Lösung ein? Die Lösung folgt der gängigen Medaillon-Datenarchitektur:
• Bronze: Rohdaten aus Dateien und PostgreSQL
• Silber: bereinigte und zusammengeführte Daten
• Gold: Reporting-Tabellen mit Geschäftsmetriken
Die SCD-Typ-2-Logik wird auf Bronze-Ebene für eine PostgreSQL-Quelle angewendet, deren Quellsystem selbst keine Historie pflegt.
Wie setzen dbt Snapshots SCD Typ 2 in Databricks um? Ein dbt Snapshot schreibt historische Versionen einer Quelltabelle in eine Delta-Tabelle und erzeugt automatisch Metadaten wie “dbt_valid_from” und “dbt_valid_to”. Über Konfiguration (Unique Key, Strategie “check”, zu prüfende Spalten) erkennt dbt Änderungen bei jedem Lauf und fügt neue Zeilen für geänderte Datensätze ein.
Wann ist es sinnvoll, Snapshots durch ein benutzerdefiniertes dbt-Makro zu ersetzen? Ein benutzerdefiniertes Makro ist hilfreich, wenn Sie mehr Kontrolle benötigen, zum Beispiel: benutzerdefinierte Namen für Metadaten-Spalten, ein „is_current”-Flag, Leistungsoptimierungen (z. B. nur Daten aus den letzten X Tagen zusammenführen) oder die Handhabung spezieller Primärschlüsseländerungen. In solchen Fällen kann sich die Abstraktion von Snapshots als zu starr erweisen, und eine maßgeschneiderte SCD2-Zusammenführung über ein Makro bietet die erforderliche Flexibilität.
Wie funktioniert das benutzerdefinierte SCD2-Makro im Beispiel des Artikels? Das Makro bildet anhand von Schlüssel- und Payload-Hash den Unterschied zwischen Quelle und aktueller Dimension ab, klassifiziert Zeilen als INSERT, UPDATE oder NOCHANGE und führt anschließend ein Delta MERGE aus. Dabei werden “valid_from”, “valid_to” und “is_current” gepflegt und über Parameter wie” lookback_window_days” kann die Menge der pro Lauf verarbeiteten Daten begrenzt werden.

 

,

avatar

Robin Brandt

Robin Brandt ist Berater für Machine Learning und Data Engineering. Durch langjährige Erfahrung im Software und Data Engineering bringt er Expertise in den Bereichen Automatisierung, Datentransformation und Datenbankmanagement mit - speziell im Bereich der Open Source Lösungen. Seine Freizeit verbringt er mit Musizieren oder dem Kreieren scharfer Gerichte.

Sie haben eine Frage zum Blog?
Fragen Sie Robin Brandt

Gender Hinweis Aufgrund der besseren Lesbarkeit wird im Text das generische Maskulinum verwendet. Gemeint sind jedoch immer alle Menschen.
Effizientes Data Warehousing in Databricks mit dbt Makros & Snapshots
13:45

Blog - NextLytics AG 

Welcome to our blog. In this section we regularly report on news and background information on topics such as SAP Business Intelligence (BI), SAP Dashboarding with Lumira Designer or SAP Analytics Cloud, Machine Learning with SAP BW, Data Science and Planning with SAP Business Planning and Consolidation (BPC), SAP Integrated Planning (IP) and SAC Planning and much more.

Informieren Sie mich über Neuigkeiten

Verwandte Beiträge

Letzte Beiträge