Skip to content

Streaming Patterns

Skill: databricks-spark-declarative-pipelines

Streaming patterns transform raw event streams into deduplicated, windowed, and enriched datasets. Ask your AI coding assistant for deduplication logic, windowed aggregations, stream-to-stream joins, or session detection and it will generate the SQL or Python with correct watermarking, state management, and time semantics.

“Write a SQL streaming table that deduplicates bronze events by event_id, keeping the earliest occurrence based on event_timestamp”

CREATE OR REFRESH STREAMING TABLE silver_events_dedup AS
SELECT
event_id, user_id, event_type, event_timestamp, _ingested_at
FROM (
SELECT
*,
ROW_NUMBER() OVER (PARTITION BY event_id ORDER BY event_timestamp) AS rn
FROM STREAM bronze_events
)
WHERE rn = 1;

Key decisions:

  • ROW_NUMBER() over DISTINCT — gives you control over which duplicate wins. DISTINCT cannot express “keep the first arrival” or “keep the latest update.”
  • Partition by the business keyevent_id here. The window function groups by this key and the ORDER BY picks the winner.
  • Dedup at silver, not bronze — bronze stays append-only and faithful to the source. Deduplication is a data quality decision that belongs in the silver layer.

Windowed aggregation for time-series metrics

Section titled “Windowed aggregation for time-series metrics”

“Create a SQL streaming table that computes 5-minute average, min, and max temperature per sensor”

CREATE OR REFRESH STREAMING TABLE silver_sensor_5min AS
SELECT
sensor_id,
window(event_timestamp, '5 minutes') AS time_window,
AVG(temperature) AS avg_temperature,
MIN(temperature) AS min_temperature,
MAX(temperature) AS max_temperature,
COUNT(*) AS event_count
FROM STREAM bronze_sensor_events
GROUP BY sensor_id, window(event_timestamp, '5 minutes');

The window() function creates non-overlapping (tumbling) windows. Each window closes after 5 minutes of event time, emitting one row per sensor per window. Use event timestamps, not processing time — otherwise late-arriving data lands in the wrong bucket.

“Join streaming orders with streaming payments, matching within a 1-hour window of the order timestamp, in SQL”

CREATE OR REFRESH STREAMING TABLE silver_orders_with_payments AS
SELECT
o.order_id, o.customer_id, o.order_timestamp, o.amount AS order_amount,
p.payment_id, p.payment_timestamp, p.payment_method, p.amount AS payment_amount
FROM STREAM bronze_orders o
INNER JOIN STREAM bronze_payments p
ON o.order_id = p.order_id
AND p.payment_timestamp BETWEEN o.order_timestamp
AND o.order_timestamp + INTERVAL 1 HOUR;

Stream-to-stream joins require a time bound — without one, the engine must buffer all events from both sides indefinitely. The BETWEEN clause limits state to a 1-hour window, keeping memory bounded. If payments can arrive days later, widen the interval or use a materialized view with batch semantics instead.

“Enrich a streaming sales table with product names and categories from a static dimension table, in SQL”

CREATE OR REFRESH TABLE dim_products AS
SELECT * FROM catalog.reference.products;
CREATE OR REFRESH STREAMING TABLE silver_sales_enriched AS
SELECT
s.sale_id, s.product_id, s.quantity, s.sale_timestamp,
p.product_name, p.category, p.price,
s.quantity * p.price AS total_amount
FROM STREAM bronze_sales s
LEFT JOIN dim_products p ON s.product_id = p.product_id;

The dimension table is a regular (non-streaming) table that refreshes on each pipeline run. The streaming side drives the join — each new sale looks up the current product info. Use LEFT JOIN so sales with unknown products still flow through rather than silently dropping.

“Detect user sessions from a clickstream with a 30-minute inactivity timeout, in SQL”

CREATE OR REFRESH STREAMING TABLE silver_user_sessions AS
SELECT
user_id,
session_window(event_timestamp, '30 minutes') AS session,
MIN(event_timestamp) AS session_start,
MAX(event_timestamp) AS session_end,
COUNT(*) AS event_count,
COLLECT_LIST(event_type) AS event_sequence
FROM STREAM bronze_user_events
GROUP BY user_id, session_window(event_timestamp, '30 minutes');

session_window() groups events by user with a dynamic gap — if no event arrives within 30 minutes, the session closes. This is different from window() which uses fixed-size buckets. Session windows produce variable-length windows that match real user behavior.

“Apply SCD Type 2 change tracking on a customer CDC feed, excluding operational columns, in SQL”

CREATE OR REFRESH STREAMING TABLE silver_customers_history;
CREATE FLOW customers_scd2_flow AS
AUTO CDC INTO silver_customers_history
FROM stream(bronze_customer_cdc)
KEYS (customer_id)
APPLY AS DELETE WHEN operation = "DELETE"
SEQUENCE BY event_timestamp
COLUMNS * EXCEPT (operation, _ingested_at, _source_file)
STORED AS SCD TYPE 2;

Auto CDC handles deduplication, ordering, and merge logic in one declaration. The COLUMNS * EXCEPT clause strips operational metadata before writing to the target. Put APPLY AS DELETE WHEN before SEQUENCE BY — clause order matters and the error message does not point at the real cause.

  • Unbounded state from high-cardinality GROUP BYGROUP BY user_id, product_id, session_id creates state for every unique combination. Aggregate at a higher level (product_category instead of product_id) or use windowed aggregations to bound state.
  • Processing time instead of event time — grouping by _ingested_at instead of the source event timestamp puts late-arriving data in the wrong window. Always use the event timestamp for business logic; keep _ingested_at for debugging only.
  • Missing time bounds on stream-stream joins — without a BETWEEN clause or INTERVAL, the engine buffers all unmatched events forever. State grows until the pipeline runs out of memory.
  • TRACK HISTORY ON * causing parse errors — if you hit “end of input” errors in Auto CDC, remove the TRACK HISTORY ON * clause. Tracking all columns is the default behavior; the explicit clause sometimes causes parser issues.