Skip to content

Auto CDC

Skill: databricks-spark-declarative-pipelines

Auto CDC takes a stream of change events (inserts, updates, deletes) and applies them to a target table with correct ordering and deduplication. You declare your keys and a sequence column — the framework handles merge logic, late-arriving data, and optionally tracks full history with SCD Type 2 temporal columns.

“Build a Python pipeline that tracks full customer history as an SCD Type 2 dimension from a CDC feed”

dp.create_streaming_table(name="dim_customers")
dp.create_auto_cdc_flow(
target="dim_customers",
source="customers_clean",
keys=["customer_id"],
sequence_by="updated_at",
stored_as_scd_type=2
)

Key decisions:

  • SCD Type 2 — every change creates a new row with __START_AT and __END_AT timestamps. Current records have __END_AT IS NULL. This preserves the full audit trail for point-in-time analysis.
  • sequence_by on updated_at — this is how Auto CDC resolves out-of-order arrivals. If two events arrive for the same key, the one with the later timestamp wins.
  • Clean source first — the CDC flow reads from customers_clean, not raw bronze. Type casting, null filtering, and validation happen upstream so the dimension table stays trustworthy.

“Apply SCD Type 1 to keep only the latest order record per order_id using SQL”

CREATE OR REFRESH STREAMING TABLE orders_current;
CREATE FLOW orders_cdc AS AUTO CDC INTO orders_current
FROM STREAM(orders_clean)
KEYS (order_id)
SEQUENCE BY updated_at;

SCD Type 1 is the default — it overwrites the existing row when a newer version arrives. No __START_AT/__END_AT columns, no history. Use this when you only need current state or are deduplicating a source with duplicate deliveries.

Handle deletes and truncates from the source system

Section titled “Handle deletes and truncates from the source system”

“Process a CDC feed where the op column signals DELETE and TRUNCATE operations, using Python”

from pyspark.sql.functions import expr
dp.create_streaming_table(name="orders")
dp.create_auto_cdc_flow(
target="orders",
source="order_changes",
keys=["order_id"],
sequence_by="event_time",
apply_as_deletes=expr("op = 'DELETE'"),
apply_as_truncates=expr("op = 'TRUNCATE'"),
ignore_null_updates=True
)

The apply_as_deletes expression maps source operations to actual DELETE behavior on the target. ignore_null_updates prevents partial records (with null fields) from overwriting good data — common when CDC tools emit sparse change events.

Track history only for columns that matter

Section titled “Track history only for columns that matter”

“Set up SCD Type 2 on a products table but only create new versions when price or cost changes, in SQL”

CREATE OR REFRESH STREAMING TABLE products_history;
CREATE FLOW products_scd2 AS AUTO CDC INTO products_history
FROM STREAM(products_clean)
KEYS (product_id)
SEQUENCE BY modified_at
STORED AS SCD TYPE 2
TRACK HISTORY ON price, cost;

Without TRACK HISTORY ON, every column change creates a new version — including trivial updates like a description edit. Selective tracking keeps the history table lean and the temporal queries fast.

Use a temporary view for preprocessing before CDC

Section titled “Use a temporary view for preprocessing before CDC”

“Clean and calculate fields on raw order data before applying SCD Type 1 deduplication, in Python”

@dp.temporary_view()
def orders_prepared():
return (
spark.readStream.table("bronze.orders")
.withColumn("order_total", col("quantity") * col("unit_price"))
.filter(col("order_id").isNotNull())
.filter(col("order_total") > 0)
)
dp.create_streaming_table(name="orders_current")
dp.create_auto_cdc_flow(
target="orders_current",
source="orders_prepared",
keys=["order_id"],
sequence_by="order_date",
stored_as_scd_type="1"
)

Temporary views exist only during pipeline execution — nothing persisted, no storage cost. They’re the right tool when you need to transform data before CDC but don’t need that intermediate result for anything else.

  • Target table must exist first — always call dp.create_streaming_table() or CREATE STREAMING TABLE before defining the CDC flow. Auto CDC writes into an existing table; it doesn’t create one.
  • SCD type syntax differs between Python and SQL — in Python, Type 2 is integer 2 and Type 1 is string "1". In SQL, use STORED AS SCD TYPE 2 or omit for Type 1 (the default).
  • TRUNCATE only works with SCD Type 1 — SCD Type 2 preserves history by design, so truncation is not supported. If your source sends truncate signals and you’re using Type 2, filter them out upstream.
  • Clause order matters in SQL — put APPLY AS DELETE WHEN before SEQUENCE BY. Getting this wrong produces a parse error that doesn’t point at the real cause.
  • Query SCD Type 2 with double-underscore columns — the temporal columns are __START_AT and __END_AT (two underscores), not START_AT/END_AT. Use WHERE __END_AT IS NULL to get current records.