Die heutige Datenwelt ist ein sich rasant entwickelndes Feld, in dem Unternehmen zunehmend auf datengestützte Entscheidungen setzen, um organisatorische Flexibilität und die Fähigkeit zur Gewinnung wertvoller Echtzeit-Einsichten zu gewährleisten. Daher ist die Nachfrage nach skalierbaren und effizienten Data-Warehousing- und Analyseplattformen größer denn je. In den letzten Jahren hat Databricks seine Position unter den Mitbewerbern gefestigt und wurde von Gartner-Analysten Jahr für Jahr als führender Anbieter im Bereich Cloud-Data-Warehousing ausgezeichnet. Databricks zeichnet sich weiterhin durch seine einheitliche Analyseplattform aus, die Datenverarbeitung, maschinelles Lernen, Analyse und Planungsfunktionen nahtlos miteinander verbindet. Durch die Kombination der besten Funktionen des Data Lake und des Data Warehouse in seiner Lakehouse-Architektur ermöglicht Databricks Unternehmen die Speicherung und Verarbeitung großer Datenmengen und bietet gleichzeitig die Möglichkeit für erweiterte und maßgeschneiderte Analysen innerhalb der Plattform.
Finden Sie hier unsere Blogserie „Hands-on Databricks“: |
| 1. So nutzen Sie SAP Databricks zur Integration von ServiceNow-Daten |
| 2. Datenmodellierung mit dbt auf Databricks: Ein Praxisleitfaden |
| 3. Databricks AutoML für Zeitreihen: Schnelle und zuverlässige Absatzprognosen |
| 4. Effizientes Data Warehousing in Databricks mit dbt Makros & Snapshots |
Das Datenframework dbt (Data Build Tool) hat bereits bewiesen, dass es hervorragend zu den Funktionen und Arbeitsabläufen von Databricks passt. Es ermöglicht Datenteams, Daten über die verschiedenen Ebenen des Lakehouse hinweg einfach zu modellieren, zu laden, zu transformieren, zu dokumentieren und zu testen, während es sich gleichzeitig gut in das interne Planungsframework von Databricks, Databricks Jobs, integrieren lässt.
In einem früheren Artikel haben wir bereits gezeigt, wie man eine dbt-Pipeline in Databricks einrichtet und das Framework nutzt, um ein Datenmodell in der Medallion-Architektur zu erstellen. Heute möchten wir noch einen Schritt weiter gehen, da die Anwendungen von Datentransformationen in der Praxis oft nicht so einfach sind, wie nur Daten aus ihren jeweiligen Quellen mit minimalen Transformationen zu laden. Die Verfolgung von Datenänderungen im Zeitverlauf erfordert beispielsweise standardisierte Tabellenmetadaten und Zusammenführungsprozesse, die die Grundlage für die Erstellung logischer Warehouse-Datenmodelle nach dem Stern- oder Schneeflockenschema oder der Data Vault 2.0-Methodik bilden.
Insbesondere möchten wir zwei mögliche Ansätze zur Durchführung einer historisierten Datenzusammenführung, auch Slowly Changing Dimensions (SCD) Typ 2 innerhalb von dbt in Databricks genannt, beleuchten – mit Hilfe von Makros und Snapshots..png?width=800&height=300&name=graphic_medallion%20(2).png)
Grundlegendes Datenmodell gemäß der Medaillonarchitektur
Slowly Changing Dimensions (SCD) verstehen
In den Bereichen Business Intelligence und Analytics haben Unternehmen häufig mit sich zeitlich verändernden Daten zu tun. Nehmen wir zum Beispiel die Adresse eines Kunden, die sich im Laufe der Zeit ändern kann, oder die Berufsbezeichnung eines Mitarbeiters, die bei einer Beförderung aktualisiert wird. Die genaue Verfolgung dieser Änderungen ist entscheidend für einen vollständigen Überblick über die Geschäftsabläufe, während die historische Betrachtung dieser Daten und ihrer Veränderungen im Laufe der Zeit relevant ist, um Trends im Zeitverlauf zu verstehen und zu analysieren.
SCDs sind eine Methodik, die im Data Warehousing verwendet wird, um die Historie von Änderungen an Dimensionsdaten zu verwalten und zu bewahren. Dimensionen beziehen sich in diesem Fall auf beschreibende Attribute von Geschäftseinheiten wie Kunden, Produkten oder Mitarbeitern, wobei sich diese Attribute im Laufe der Zeit ändern können.
Es gibt mehrere Arten von SCDs, aber SCD Typ 2 wurde speziell entwickelt, um historische Änderungen zu verfolgen und gleichzeitig die Möglichkeit zu erhalten, die aktuellsten Datenpunkte zu extrahieren. Die Kernidee hinter SCD Typ 2 besteht darin, jede Änderung der Daten im Laufe der Zeit zu bewahren, indem für jede Änderung neue Datensätze erstellt werden, anstatt bestehende Werte zu aktualisieren oder zu löschen. Jede Zeile innerhalb einer Dimensionstabelle enthält Metadaten-Spalten (z. B. Zeitstempel-Spalten „valid_from” und „valid_to”), die den Zeitraum der Aktualität eines einzelnen Datensatzes detailliert angeben. Mithilfe dieser Metadaten-Spalten können Sie auch ganz einfach die Datenquelle abfragen und historisierte Datensätze herausfiltern, sodass nur die aktuell gültigen Datensätze angezeigt werden, was für tagesaktuelle Analyseanwendungen wichtig ist.

Beispiel für den resultierenden Datensatz einer Slowly Changing Dimensions Type 2-merge-Operation
Sehen Sie sich die Aufzeichnung unseres Webinars an: "Bridging Business and Analytics: The Plug-and-Play Future of Data Platforms"
Implementierung von SCD Typ 2 in dbt
Wir gehen davon aus, dass Sie mit der Einrichtung Ihrer Databricks-Instanz für die Verwendung mit dbt vertraut sind. Wenn dies nicht der Fall ist, empfehlen wir Ihnen, den vorherigen Artikel dieser Reihe zu lesen, da dieser Ihnen das notwendige Wissen für den Start Ihres dbt-Projekts vermittelt und den Datenanwendungsfall beschreibt, auf den wir hier näher eingehen werden.
In unserem Beispiel nehmen wir bereits Rohdaten aus einem Datenexport und einer Postgres-Datenbank auf und verfeinern sie in aufeinanderfolgenden Schritten gemäß der Medaillon-Architektur in den folgenden Schritten.
Bronze-Ebene
• Klonen der Daten aus unseren Quellen in Databricks-Delta-Tabellen.
Silber-Ebene
• Zusammenführen der beiden Tabellen der Bronze-Ebene und Bereinigen der Daten.
Gold-Ebene
• Erstellen einer Tabelle auf Berichtsebene mit berechneten Metriken auf der Grundlage der Daten aus der Tabelle der Silber-Ebene.
Jede der Tabellen in dieser Architektur wird als dbt-model implementiert.
Für diese Demonstration möchten wir unsere historisierte SCD-Typ-2-merge auf die Bronze-Layer-Tabelle anwenden, die aus unserer Postgres-Rohdatenquelle gespeist wird. Dies ist ein häufiger Anwendungsfall, bei dem das Quellsystem selbst die Änderungen in den Daten nicht verfolgt und beispielsweise Löschungen in dieser Rohdatenquelle dazu führen können, dass „Datenzombies” in weiteren Ebenen Ihres Datenmodells weiterhin vorhanden sind, da selbst eine inkrementelle Datenladung, bei der nur neue Zeilen eingefügt werden, in der Regel Löschungen nicht berücksichtigt.
Das bisherige dbt-model für diese Bronze-Layer-Tabelle sieht wie folgt aus:
{{ config(materialized='incremental', tags=["bronze"]) }}
SELECT *
FROM {{ source('external_postgres', 'raw_maschinendaten') }}
{% if is_incremental() %}
-- this operation will only be triggered on an incremental run
where ts >= (select coalesce(max(ts),'1900-01-01') from {{ this }} )
{% endif %}
Wenn dieses Modell zum ersten Mal ausgeführt wird, lädt es alle Daten aus unserer Rohdatenquelle, während bei nachfolgenden Ausführungen nur Daten mit einem Zeitstempel (Spalte „ts“) aufgenommen werden, der neuer ist als die aktuellsten Daten in unserer Bronze-Level-Tabelle. Dies funktioniert für die Verfolgung neuer Zeilen, hat jedoch Schwierigkeiten mit Aktualisierungen bestehender Zeilen, die den Zeitstempel nicht verändern, oder mit Löschungen.
Von hier an haben wir mehrere Optionen, wie wir diese Herausforderung angehen können.
Der einfache Weg - mit dbt-Snapshots
Snapshots sind die allgemein bevorzugte Methode zur Implementierung von SCD Typ 2 in dbt. Der Grund dafür ist, dass sie die Fähigkeiten des Frameworks bei minimalem Overhead und minimaler Komplexität in Ihren Datenpipelines voll ausschöpfen. Im Grunde genommen archiviert ein Snapshot alle früheren Datenzustände aus ihrer jeweiligen Datenquelle in einer veränderbaren Tabelle im Zeitverlauf und implementiert und füllt alle erforderlichen Metadaten-Spalten automatisch bei der Ausführung des Snapshots.
Für unsere Beispiele bedeutet dies, dass wir die Modelldatei „b_maschinendaten.sql” für diese Tabelle aus dem dbt-Ordner „models” durch eine Datei „maschinendaten_snapshot.sql” im dbt-Ordner „snapshots” mit folgendem Inhalt ersetzen:
{% snapshot maschinendaten_snapshot %}
{{
config(
unique_key="snapshot_pk",
strategy="check",
check_cols=["laufzeit_min", "stillstand_min", "gesamtteile", "gutteile", "ausschuss"],
file_format="delta",
hard_deletes="invalidate"
)
}}
select
*,
-- Create a unique hash over key columns
md5(concat_ws('-', cast(maschinenid as string), cast(ts as string))) as snapshot_pk
from {{ source("external_postgres", "raw_maschinendaten") }}
{% endsnapshot %}
In diesem Fall berechnen wir einen Hash über die Spalten „maschinenid” und „ts”, da diese beiden zusammen unseren zusammengesetzten Primärschlüssel für die Daten bilden, aber ein dbt-Snapshot erfordert einen eindeutigen Schlüssel als Einzelwert.
Jetzt müssen wir nur noch den dbt-Befehl ändern, um diese Datenladung auszuführen, und zwar von „dbt run –select b_maschinendaten“ zu „dbt snapshot –select maschinendaten_snapshot“, und die Pipeline ausführen.
Daraus entsteht eine Tabelle wie diese:
Databricks-Delta-Tabelle, die aus einer dbt-Snapshot-Zusammenführung entsteht
Wie Sie sehen können, enthält diese Tabelle die Daten aus der Quelle mit einigen zusätzlichen Metadaten-Spalten. Relevant für unseren Fall sind die Spalten „dbt_valid_from” und „dbt_valid_to”, die den Zeitraum beschreiben, für den ein Datensatz aktuell war. Wenn „dbt_valid_to” null ist, bedeutet dies, dass der jeweilige Datensatz noch aktuell ist.
Wie Sie in der hervorgehobenen Zeile sehen können, wurde dieser Datensatz aktualisiert, wodurch eine Korrektur in der Maschinenausgabestatistik nach einer ersten Datenladung simuliert wurde. Dies wird dann im Snapshot dargestellt, indem der alte Datensatz finalisiert, „valid_to“ auf den Zeitpunkt und das Datum der Zusammenführung gesetzt und ein neuer Datensatz mit „valid_from“ eingefügt wird, der den Zeitpunkt und das Datum des Merge-Prozesses enthält.
Obwohl dies im Allgemeinen einfach zu implementieren ist, gibt es einige Einschränkungen, die es erforderlich machen könnten, über die (derzeitigen) Funktionen des Snapshots hinauszugehen, was uns zu unserem nächsten Schritt bringt.
Der maßgeschneiderte Weg - mit dbt-Makros
Es gibt mehrere Gründe, warum der Snapshot-Merge-Ansatz für Ihren speziellen Anwendungsfall möglicherweise nicht geeignet ist. Möglicherweise haben Sie Tabellen mit sehr großen Datenmengen, die während des Merge-Vorgangs spezielle Optimierungen erfordern, z. B. die Berücksichtigung nur einer Teilmenge von Zeilen für den Merge mithilfe von Filtern. Oder da die dbt-Snapshot-Merges die Metadaten-Spalten in ihrem spezifischen Format mit dem Präfix „dbt_“ generieren, kann dies zu Problemen in Systemen führen, die Ihre versionierten Daten aufnehmen. Es kann Fälle geben, in denen sich der Primärschlüssel Ihrer Datenquelle verschiebt, was eine spezielle Behandlung erfordert, um die vorhandenen Datensätze so zu migrieren, dass sie mit ihren vorherigen Zeilen übereinstimmen, und noch vieles mehr.
Unabhängig vom Grund liegt die Ursache meist in der mangelnden Flexibilität einer vereinfachten Standardlösung wie dem dbt-Snapshot. In diesem Fall können Sie sich dafür entscheiden, Ihre Daten-Merges über ein dbt-Makro auszuführen.
Makros in dbt bieten eine Möglichkeit, SQL und Jinja zu kombinieren, um Programmierbefehle in Ihren Transformationen auszuführen, die normalerweise in SQL nicht verfügbar sind, wie beispielsweise Schleifen oder Bedingungen.
Diese Makros sind generische Code-Teile, die mehrfach wiederverwendet werden können.
Für unsere Demonstration möchten wir einen SCD-Typ-2-Merge mit den folgenden zusätzlichen Funktionen implementieren:
- Bereitstellung parametrisierter Namen für die Metadaten-Spalten.
- Implementierung einer Spalte „is_current“, um aktive Zeilen einfach zu filtern.
- Bereitstellung eines optionalen Parameters, um nur Daten aus den letzten X Tagen für den Merge zu berücksichtigen und so die Leistung zu verbessern.
Der Code für das Makro sieht wie folgt aus:
{% macro scd_type_2_merge(
target,
source,
key_columns,
effective_from_column,
effective_from_expression=None,
payload_columns=None,
current_flag_column='is_current',
valid_from_column='valid_from',
valid_to_column='valid_to',
lookback_window_days=None
)
%}
{% if is_incremental() %}
{% if key_columns is string %}
{% set key_cols = [key_columns] %}
{% else %}
{% set key_cols = key_columns %}
{% endif %}
{% if payload_columns is none %}
{% set src_cols = adapter.get_columns_in_relation(source) %}
{% set kc_lower = key_cols | map('lower') | list %}
{% set payload_cols = [] %}
{% for col in src_cols %}
{% if col.name | lower not in kc_lower
and col.name | lower != effective_from_column | lower
and col.name | lower != current_flag_column | lower
and col.name | lower != valid_from_column | lower
and col.name | lower != valid_to_column | lower
%}
{% do payload_cols.append(col.name) %}
{% endif %}
{% endfor %}
{% else %}
{% if payload_columns is string %}
{% set payload_cols = [payload_columns] %}
{% else %}
{% set payload_cols = payload_columns %}
{% endif %}
{% endif %}
{% set src_hash_parts = [] %}
{% set dim_hash_parts = [] %}
{% for c in payload_cols %}
{% do src_hash_parts.append("coalesce(cast(src." ~ c ~ " as string), '')") %}
{% do dim_hash_parts.append("coalesce(cast(dim." ~ c ~ " as string), '')") %}
{% endfor %}
{% set src_hash_expr = "md5(concat_ws('|', " ~ src_hash_parts | join(', ') ~ "))" %}
{% set dim_hash_expr = "md5(concat_ws('|', " ~ dim_hash_parts | join(', ') ~ "))" %}
{% set key_join_parts = [] %}
{% for k in key_cols %}
{% do key_join_parts.append("dim." ~ k ~ " = src." ~ k) %}
{% endfor %}
{% set key_join = key_join_parts | join(' and ') %}
with src as (
select
*
{% if effective_from_expression is not none %}
, {{ effective_from_expression }} as {{ effective_from_column }}
{% endif %}
from {{ source }}
{% if lookback_window_days is not none %}
where
{% if effective_from_expression is not none %}
{{ effective_from_expression }}
{% else %}
{{ effective_from_column }}
{% endif %}
>= date_add(current_date(), -{{ lookback_window_days }})
{% endif %}
{# If multiple updates for the same key exist in the batch, pick only the latest one. #}
qualify row_number() over (
partition by {{ key_cols | join(', ') }}
order by
{% if effective_from_expression is not none %}
{{ effective_from_expression }}
{% else %}
{{ effective_from_column }}
{% endif %}
desc
) = 1
),
current_dim as (
select *
from {{ target }} as dim
where dim.{{ current_flag_column }} = 1
{% if lookback_window_days is not none %}
and exists (
select 1
from src
where {{ key_join }}
)
{% endif %}
),
changes as (
select
{% for k in key_cols %}
src.{{ k }},
{% endfor %}
{% for c in payload_cols %}
src.{{ c }},
{% endfor %}
src.{{ effective_from_column }},
{% for k in key_cols %}
dim.{{ k }} as dim_{{ k }},
{% endfor %}
{% for c in payload_cols %}
dim.{{ c }} as dim_{{ c }},
{% endfor %}
dim.{{ valid_from_column }} as dim_{{ valid_from_column }},
dim.{{ valid_to_column }} as dim_{{ valid_to_column }},
dim.{{ current_flag_column }} as dim_{{ current_flag_column }},
{{ src_hash_expr }} as src_hash,
{{ dim_hash_expr }} as dim_hash,
case
when dim.{{ key_cols[0] }} is null then 'INSERT'
when {{ src_hash_expr }} <> {{ dim_hash_expr }} then 'UPDATE'
else 'NOCHANGE'
end as change_type
from src
left join current_dim as dim
on {{ key_join }}
),
records_to_merge as (
select
{% for k in key_cols %}
dim_{{ k }} as {{ k }},
{% endfor %}
{% for c in payload_cols %}
dim_{{ c }} as {{ c }},
{% endfor %}
dim_{{ valid_from_column }} as {{ valid_from_column }},
({{ effective_from_column }} - interval '1' second) as {{ valid_to_column }},
0 as {{ current_flag_column }},
'UPDATE' as merge_action
from changes
where change_type = 'UPDATE'
union all
select
{% for k in key_cols %}
{{ k }},
{% endfor %}
{% for c in payload_cols %}
{{ c }},
{% endfor %}
{{ effective_from_column }} as {{ valid_from_column }},
to_timestamp('9999-12-31 23:59:59') as {{ valid_to_column }},
1 as {{ current_flag_column }},
'INSERT' as merge_action
from changes
where change_type in ('INSERT', 'UPDATE')
)
merge into {{ target }} as tgt
using records_to_merge as s
on
(
{% for k in key_cols %}
tgt.{{ k }} = s.{{ k }}
and
{% endfor %}
tgt.{{ valid_from_column }} = s.{{ valid_from_column }}
)
when matched and s.merge_action = 'UPDATE' then
update set
tgt.{{ valid_to_column }} = s.{{ valid_to_column }},
tgt.{{ current_flag_column }} = s.{{ current_flag_column }}
when not matched and s.merge_action = 'INSERT' then
insert (
{% for k in key_cols %}{{ k }}, {% endfor %}
{% for c in payload_cols %}{{ c }}, {% endfor %}
{{ valid_from_column }},
{{ valid_to_column }},
{{ current_flag_column }}
)
values (
{% for k in key_cols %}s.{{ k }}, {% endfor %}
{% for c in payload_cols %}s.{{ c }}, {% endfor %}
s.{{ valid_from_column }},
s.{{ valid_to_column }},
s.{{ current_flag_column }}
);
{% else %}
select 1 as scd_merge_skipped
{% endif %}
{% endmacro %}
Dieses Makro kann dann aus der model .sql-Datei, für die Sie den Merge ausführen möchten, wie folgt ausgeführt werden:
{{ config(
materialized='incremental',
incremental_strategy='append',
post_hook="{{ scd_type_2_merge(
target=this,
source=source('external_postgres', 'raw_maschinendaten'),
key_columns=['maschinenid', 'ts'],
effective_from_column='_loadtime',
lookback_window_days=7
) }}"
) }}
-- Dbt requires a SELECT statement for the model body.
-- 1. On the FIRST run, we select everything to initialize the table structure including the SCD 2 metadata columns
-- 2. On INCREMENTAL runs, we select NOTHING (where 1=0).
-- This prevents dbt from trying to insert rows automatically, letting the
-- 'post_hook' macro handle the complex SCD2 Merge logic entirely.
select
*,
1 as is_current,
cast(_loadtime as timestamp) as valid_from,
cast('9999-12-31 23:59:59' as timestamp) as valid_to
from {{ source('external_postgres', 'raw_maschinendaten') }}
{% if is_incremental() %}
where 1=0
{% endif %}
Dieser Ansatz ist zwar deutlich komplexer und erfordert technische Kenntnisse sowohl in Jinja als auch in SQL, erfüllt jedoch alle zuvor genannten Anforderungen. Er gibt uns mehr Kontrolle über die Strukturierung der Metadaten und optimiert die Merge- Performance, indem nur Daten aus (in diesem Fall) den letzten sieben Tagen ausgewertet werden.
Ein Merge mit diesem Ansatz führt zu einer Tabelle der Bronze- Ebene, die wie folgt aussieht:
Databricks-Delta-Tabelle, die aus einer Merge-Operation unter Verwendung des vorgestellten benutzerdefinierten dbt-Makros resultiert, einschließlich der Metadaten-Spalten, die die Aktualität der Datensätze beschreiben.
DBT-Makros und Snapshots für fortgeschrittenes Data Warehousing in Databricks: Unser Fazit
Letztendlich können beide vorgestellten Ansätze zur Implementierung eines historisierten Datenmodells in Ihrem Unternehmen verwendet werden. Wie so oft kommt es auf einen Kompromiss zwischen Benutzerfreundlichkeit und Flexibilität an, und die Entscheidung für einen der beiden Ansätze hängt von Ihren individuellen Geschäftsanforderungen ab. Beim Aufbau einer unternehmensweiten Warehouse-Umgebung und eines strukturierten Datenmodells sind standardisierte Tabellenmetadaten und Merge-Utility-Funktionen in Databricks von entscheidender Bedeutung. Selbst komplexe makrobasierte Merges können frühzeitig in Ihrem Einführungsprojekt definiert werden und bieten dann eine stabile Grundlage für den langfristigen Warehouse-Betrieb auf der kosteneffizienten, hoch skalierbaren Lakehouse-Plattform.
Falls Sie wissen möchten, welcher Ansatz für Ihre Bedürfnisse besser geeignet ist oder wie Sie mit dbt die Leistungsfähigkeit einer modernen Dateninfrastruktur für Ihr Unternehmen voll ausschöpfen können, wenden Sie sich gerne an uns.
FAQ - Effizientes Data Warehousing in Databricks
Hier finden Sie einige häufig gestellte Fragen zum Thema „Fortgeschrittenes Data Warehousing in Databricks mit dbt-Makros und Snapshots“.
• Bronze: Rohdaten aus Dateien und PostgreSQL
• Silber: bereinigte und zusammengeführte Daten
• Gold: Reporting-Tabellen mit Geschäftsmetriken
Die SCD-Typ-2-Logik wird auf Bronze-Ebene für eine PostgreSQL-Quelle angewendet, deren Quellsystem selbst keine Historie pflegt.
Data Science & Engineering, Databricks
/Logo%202023%20final%20dunkelgrau.png?width=221&height=97&name=Logo%202023%20final%20dunkelgrau.png)


