Auto CDC
Skill: databricks-spark-declarative-pipelines
What You Can Build
Section titled “What You Can Build”Auto CDC takes a stream of change events (inserts, updates, deletes) and applies them to a target table with correct ordering and deduplication. You declare your keys and a sequence column — the framework handles merge logic, late-arriving data, and optionally tracks full history with SCD Type 2 temporal columns.
In Action
Section titled “In Action”“Build a Python pipeline that tracks full customer history as an SCD Type 2 dimension from a CDC feed”
dp.create_streaming_table(name="dim_customers")
dp.create_auto_cdc_flow( target="dim_customers", source="customers_clean", keys=["customer_id"], sequence_by="updated_at", stored_as_scd_type=2)Key decisions:
- SCD Type 2 — every change creates a new row with
__START_ATand__END_ATtimestamps. Current records have__END_AT IS NULL. This preserves the full audit trail for point-in-time analysis. sequence_byonupdated_at— this is how Auto CDC resolves out-of-order arrivals. If two events arrive for the same key, the one with the later timestamp wins.- Clean source first — the CDC flow reads from
customers_clean, not raw bronze. Type casting, null filtering, and validation happen upstream so the dimension table stays trustworthy.
More Patterns
Section titled “More Patterns”Deduplicate without keeping history
Section titled “Deduplicate without keeping history”“Apply SCD Type 1 to keep only the latest order record per order_id using SQL”
CREATE OR REFRESH STREAMING TABLE orders_current;
CREATE FLOW orders_cdc AS AUTO CDC INTO orders_currentFROM STREAM(orders_clean)KEYS (order_id)SEQUENCE BY updated_at;SCD Type 1 is the default — it overwrites the existing row when a newer version arrives. No __START_AT/__END_AT columns, no history. Use this when you only need current state or are deduplicating a source with duplicate deliveries.
Handle deletes and truncates from the source system
Section titled “Handle deletes and truncates from the source system”“Process a CDC feed where the
opcolumn signals DELETE and TRUNCATE operations, using Python”
from pyspark.sql.functions import expr
dp.create_streaming_table(name="orders")
dp.create_auto_cdc_flow( target="orders", source="order_changes", keys=["order_id"], sequence_by="event_time", apply_as_deletes=expr("op = 'DELETE'"), apply_as_truncates=expr("op = 'TRUNCATE'"), ignore_null_updates=True)The apply_as_deletes expression maps source operations to actual DELETE behavior on the target. ignore_null_updates prevents partial records (with null fields) from overwriting good data — common when CDC tools emit sparse change events.
Track history only for columns that matter
Section titled “Track history only for columns that matter”“Set up SCD Type 2 on a products table but only create new versions when price or cost changes, in SQL”
CREATE OR REFRESH STREAMING TABLE products_history;
CREATE FLOW products_scd2 AS AUTO CDC INTO products_historyFROM STREAM(products_clean)KEYS (product_id)SEQUENCE BY modified_atSTORED AS SCD TYPE 2TRACK HISTORY ON price, cost;Without TRACK HISTORY ON, every column change creates a new version — including trivial updates like a description edit. Selective tracking keeps the history table lean and the temporal queries fast.
Use a temporary view for preprocessing before CDC
Section titled “Use a temporary view for preprocessing before CDC”“Clean and calculate fields on raw order data before applying SCD Type 1 deduplication, in Python”
@dp.temporary_view()def orders_prepared(): return ( spark.readStream.table("bronze.orders") .withColumn("order_total", col("quantity") * col("unit_price")) .filter(col("order_id").isNotNull()) .filter(col("order_total") > 0) )
dp.create_streaming_table(name="orders_current")
dp.create_auto_cdc_flow( target="orders_current", source="orders_prepared", keys=["order_id"], sequence_by="order_date", stored_as_scd_type="1")Temporary views exist only during pipeline execution — nothing persisted, no storage cost. They’re the right tool when you need to transform data before CDC but don’t need that intermediate result for anything else.
Watch Out For
Section titled “Watch Out For”- Target table must exist first — always call
dp.create_streaming_table()orCREATE STREAMING TABLEbefore defining the CDC flow. Auto CDC writes into an existing table; it doesn’t create one. - SCD type syntax differs between Python and SQL — in Python, Type 2 is integer
2and Type 1 is string"1". In SQL, useSTORED AS SCD TYPE 2or omit for Type 1 (the default). - TRUNCATE only works with SCD Type 1 — SCD Type 2 preserves history by design, so truncation is not supported. If your source sends truncate signals and you’re using Type 2, filter them out upstream.
- Clause order matters in SQL — put
APPLY AS DELETE WHENbeforeSEQUENCE BY. Getting this wrong produces a parse error that doesn’t point at the real cause. - Query SCD Type 2 with double-underscore columns — the temporal columns are
__START_ATand__END_AT(two underscores), notSTART_AT/END_AT. UseWHERE __END_AT IS NULLto get current records.