Streaming Patterns
Skill: databricks-spark-declarative-pipelines
What You Can Build
Section titled “What You Can Build”Streaming patterns transform raw event streams into deduplicated, windowed, and enriched datasets. Ask your AI coding assistant for deduplication logic, windowed aggregations, stream-to-stream joins, or session detection and it will generate the SQL or Python with correct watermarking, state management, and time semantics.
In Action
Section titled “In Action”“Write a SQL streaming table that deduplicates bronze events by event_id, keeping the earliest occurrence based on event_timestamp”
CREATE OR REFRESH STREAMING TABLE silver_events_dedup ASSELECT event_id, user_id, event_type, event_timestamp, _ingested_atFROM ( SELECT *, ROW_NUMBER() OVER (PARTITION BY event_id ORDER BY event_timestamp) AS rn FROM STREAM bronze_events)WHERE rn = 1;Key decisions:
ROW_NUMBER()overDISTINCT— gives you control over which duplicate wins.DISTINCTcannot express “keep the first arrival” or “keep the latest update.”- Partition by the business key —
event_idhere. The window function groups by this key and theORDER BYpicks the winner. - Dedup at silver, not bronze — bronze stays append-only and faithful to the source. Deduplication is a data quality decision that belongs in the silver layer.
More Patterns
Section titled “More Patterns”Windowed aggregation for time-series metrics
Section titled “Windowed aggregation for time-series metrics”“Create a SQL streaming table that computes 5-minute average, min, and max temperature per sensor”
CREATE OR REFRESH STREAMING TABLE silver_sensor_5min ASSELECT sensor_id, window(event_timestamp, '5 minutes') AS time_window, AVG(temperature) AS avg_temperature, MIN(temperature) AS min_temperature, MAX(temperature) AS max_temperature, COUNT(*) AS event_countFROM STREAM bronze_sensor_eventsGROUP BY sensor_id, window(event_timestamp, '5 minutes');The window() function creates non-overlapping (tumbling) windows. Each window closes after 5 minutes of event time, emitting one row per sensor per window. Use event timestamps, not processing time — otherwise late-arriving data lands in the wrong bucket.
Stream-to-stream join with time bounds
Section titled “Stream-to-stream join with time bounds”“Join streaming orders with streaming payments, matching within a 1-hour window of the order timestamp, in SQL”
CREATE OR REFRESH STREAMING TABLE silver_orders_with_payments ASSELECT o.order_id, o.customer_id, o.order_timestamp, o.amount AS order_amount, p.payment_id, p.payment_timestamp, p.payment_method, p.amount AS payment_amountFROM STREAM bronze_orders oINNER JOIN STREAM bronze_payments p ON o.order_id = p.order_id AND p.payment_timestamp BETWEEN o.order_timestamp AND o.order_timestamp + INTERVAL 1 HOUR;Stream-to-stream joins require a time bound — without one, the engine must buffer all events from both sides indefinitely. The BETWEEN clause limits state to a 1-hour window, keeping memory bounded. If payments can arrive days later, widen the interval or use a materialized view with batch semantics instead.
Stream-to-static enrichment join
Section titled “Stream-to-static enrichment join”“Enrich a streaming sales table with product names and categories from a static dimension table, in SQL”
CREATE OR REFRESH TABLE dim_products ASSELECT * FROM catalog.reference.products;
CREATE OR REFRESH STREAMING TABLE silver_sales_enriched ASSELECT s.sale_id, s.product_id, s.quantity, s.sale_timestamp, p.product_name, p.category, p.price, s.quantity * p.price AS total_amountFROM STREAM bronze_sales sLEFT JOIN dim_products p ON s.product_id = p.product_id;The dimension table is a regular (non-streaming) table that refreshes on each pipeline run. The streaming side drives the join — each new sale looks up the current product info. Use LEFT JOIN so sales with unknown products still flow through rather than silently dropping.
Session window detection
Section titled “Session window detection”“Detect user sessions from a clickstream with a 30-minute inactivity timeout, in SQL”
CREATE OR REFRESH STREAMING TABLE silver_user_sessions ASSELECT user_id, session_window(event_timestamp, '30 minutes') AS session, MIN(event_timestamp) AS session_start, MAX(event_timestamp) AS session_end, COUNT(*) AS event_count, COLLECT_LIST(event_type) AS event_sequenceFROM STREAM bronze_user_eventsGROUP BY user_id, session_window(event_timestamp, '30 minutes');session_window() groups events by user with a dynamic gap — if no event arrives within 30 minutes, the session closes. This is different from window() which uses fixed-size buckets. Session windows produce variable-length windows that match real user behavior.
CDC deduplication with Auto CDC
Section titled “CDC deduplication with Auto CDC”“Apply SCD Type 2 change tracking on a customer CDC feed, excluding operational columns, in SQL”
CREATE OR REFRESH STREAMING TABLE silver_customers_history;
CREATE FLOW customers_scd2_flow ASAUTO CDC INTO silver_customers_historyFROM stream(bronze_customer_cdc)KEYS (customer_id)APPLY AS DELETE WHEN operation = "DELETE"SEQUENCE BY event_timestampCOLUMNS * EXCEPT (operation, _ingested_at, _source_file)STORED AS SCD TYPE 2;Auto CDC handles deduplication, ordering, and merge logic in one declaration. The COLUMNS * EXCEPT clause strips operational metadata before writing to the target. Put APPLY AS DELETE WHEN before SEQUENCE BY — clause order matters and the error message does not point at the real cause.
Watch Out For
Section titled “Watch Out For”- Unbounded state from high-cardinality GROUP BY —
GROUP BY user_id, product_id, session_idcreates state for every unique combination. Aggregate at a higher level (product_categoryinstead ofproduct_id) or use windowed aggregations to bound state. - Processing time instead of event time — grouping by
_ingested_atinstead of the source event timestamp puts late-arriving data in the wrong window. Always use the event timestamp for business logic; keep_ingested_atfor debugging only. - Missing time bounds on stream-stream joins — without a
BETWEENclause orINTERVAL, the engine buffers all unmatched events forever. State grows until the pipeline runs out of memory. TRACK HISTORY ON *causing parse errors — if you hit “end of input” errors in Auto CDC, remove theTRACK HISTORY ON *clause. Tracking all columns is the default behavior; the explicit clause sometimes causes parser issues.