Stream-Static Joins
Skill: databricks-spark-structured-streaming
What You Can Build
Section titled “What You Can Build”Most streaming events arrive thin — a device ID, a user ID, a transaction code. The business context lives in dimension tables. Stream-static joins let you enrich events with that context in real time, without managing state or worrying about watermarks. The static side re-reads from Delta each microbatch, so dimension updates propagate automatically.
In Action
Section titled “In Action”“Write a Python streaming pipeline that reads IoT events from Kafka, enriches them with a Delta device dimension table using a left join, and writes the result to a Delta table.”
from pyspark.sql.functions import col, from_json, broadcast
iot_stream = (spark.readStream.format("kafka") .option("kafka.bootstrap.servers", "broker:9092") .option("subscribe", "iot-events") .load() .select(from_json(col("value").cast("string"), event_schema).alias("data")) .select("data.*"))
device_dim = spark.table("device_dimensions")
enriched = (iot_stream .join(broadcast(device_dim), "device_id", "left") .select( iot_stream["*"], device_dim["device_type"], device_dim["location"], device_dim["manufacturer"] ))
enriched.writeStream.format("delta") \ .option("checkpointLocation", "/Volumes/catalog/checkpoints/enriched_iot") \ .trigger(processingTime="30 seconds") \ .start("/delta/enriched_iot_events")Key decisions:
- Always use a left join — new devices may send data before the dimension table catches up. A left join preserves every streaming event; you backfill null dimensions later. An inner join silently drops those events, and you won’t notice until someone asks why counts don’t match.
- Delta tables, not Parquet — Delta’s versioning means Spark checks for a new version each microbatch and reads it if changed. A Parquet dimension is read once at startup and never refreshes.
- Broadcast hint — dimension tables under 100MB should be broadcast to avoid shuffle joins. Verify in the Spark UI SQL tab that you see
BroadcastHashJoin, notSortMergeJoin. - Select only needed columns before joining. A wide dimension table wastes broadcast memory on columns you’ll never use downstream.
More Patterns
Section titled “More Patterns”Multi-Table Enrichment
Section titled “Multi-Table Enrichment”“Enrich a stream with three different dimension tables in Python.”
devices = spark.table("device_dimensions")locations = spark.table("location_dimensions")categories = spark.table("category_dimensions")
enriched = (iot_stream .join(broadcast(devices), "device_id", "left") .join(broadcast(locations), "location_id", "left") .join(broadcast(categories), "category_id", "left"))Each join is stateless and refreshes independently each microbatch. Chain as many as you need, but watch aggregate broadcast size — if total dimension data exceeds the broadcast threshold, the last join falls back to a shuffle join.
Dimension Freshness Audit
Section titled “Dimension Freshness Audit”“Add freshness tracking to a stream-static join in Python so you can monitor how stale dimensions are.”
from pyspark.sql.functions import unix_timestamp, current_timestamp
enriched = (iot_stream .join(broadcast(device_dim), "device_id", "left") .withColumn("dim_lag_seconds", unix_timestamp(current_timestamp()) - unix_timestamp(col("dim_updated_at"))) .withColumn("dim_fresh", col("dim_lag_seconds") < 3600))The dim_lag_seconds column tells you how old the dimension data is at enrichment time. Set an alert when the average exceeds your threshold — a stale dimension table means your enriched data is silently wrong, not missing.
Backfill Missing Dimensions with MERGE
Section titled “Backfill Missing Dimensions with MERGE”“Write a SQL MERGE statement that backfills null dimension columns on previously enriched events.”
MERGE INTO enriched_events targetUSING device_dimensions sourceON target.device_id = source.device_id AND target.device_type IS NULLWHEN MATCHED THEN UPDATE SET device_type = source.device_type, location = source.location, manufacturer = source.manufacturerRun this as a scheduled batch job after dimension table updates. It fixes events that arrived before the dimension was available — the expected consequence of using left joins in production.
Watch Out For
Section titled “Watch Out For”- Inner join silently drops events — this is the most common mistake in stream-static joins. Switch to a left join and monitor the null rate on dimension columns to track unmatched events.
- Shuffle join instead of broadcast — if you see
SortMergeJoinin the query plan, the dimension table exceeded the broadcast threshold. Filter to active records, select only needed columns, or increasespark.sql.autoBroadcastJoinThreshold(up to 1GB). - Stale dimension data with non-Delta formats — Parquet and CSV dimensions are read once at stream startup and never refreshed. Convert to Delta for automatic per-microbatch version checking.
- High null rate climbing over time — dimension updates lagging behind event arrival. Add the freshness audit pattern above and schedule a backfill job to patch nulls retroactively.