Skip to content

Stream-Stream Joins

Skill: databricks-spark-structured-streaming

Some questions require correlating events from two different streams in real time — matching orders to payments, attributing conversions to impressions, or pairing sensor readings across devices. Stream-stream joins handle this by buffering both sides in state and matching events within a bounded time window. They require watermarks to control state size, making them more complex than stream-static joins but essential for event-time correlation.

“Write a Python streaming pipeline that joins an orders stream with a payments stream, matching payments that arrive within 10 minutes of the order, and writes matched pairs to a Delta table.”

from pyspark.sql.functions import expr, from_json, col
orders = (spark.readStream.format("kafka")
.option("subscribe", "orders").load()
.select(from_json(col("value").cast("string"), order_schema).alias("data"))
.select("data.*")
.withWatermark("order_time", "10 minutes"))
payments = (spark.readStream.format("kafka")
.option("subscribe", "payments").load()
.select(from_json(col("value").cast("string"), payment_schema).alias("data"))
.select("data.*")
.withWatermark("payment_time", "10 minutes"))
matched = orders.join(payments, expr("""
orders.order_id = payments.order_id AND
payments.payment_time >= orders.order_time - interval 5 minutes AND
payments.payment_time <= orders.order_time + interval 10 minutes
"""), "inner")
matched.writeStream.format("delta") \
.option("checkpointLocation", "/Volumes/catalog/checkpoints/order_payments") \
.trigger(processingTime="30 seconds") \
.start("/delta/order_payments")

Key decisions:

  • Watermarks on both sides tell Spark when to stop waiting for late events and clean up state. Without them, state grows indefinitely until the job runs out of memory.
  • Bounded time conditions in the join expression (BETWEEN ... AND ...) constrain how long Spark holds each event in state. An open-ended condition like s2.ts >= s1.ts means state never expires.
  • Watermark duration = 2-3x your p95 latency. A 10-minute watermark handles events delayed up to 10 minutes. Tighter watermarks reduce state size but drop more late events; wider watermarks increase completeness but cost more memory.
  • Inner join here means you only see matched pairs. Use leftOuter if you need to see all orders regardless of whether payment arrived.

“Write a Python stream-stream join that attributes conversions to ad impressions within a 24-hour window.”

from pyspark.sql.functions import expr, col
impressions = (spark.readStream.format("kafka")
.option("subscribe", "impressions").load()
.select(from_json(col("value").cast("string"), impression_schema).alias("data"))
.select("data.*")
.withWatermark("impression_time", "1 hour"))
conversions = (spark.readStream.format("kafka")
.option("subscribe", "conversions").load()
.select(from_json(col("value").cast("string"), conversion_schema).alias("data"))
.select("data.*")
.withWatermark("conversion_time", "1 hour"))
attributed = impressions.join(conversions, expr("""
impressions.user_id = conversions.user_id AND
impressions.ad_id = conversions.ad_id AND
conversions.conversion_time >= impressions.impression_time AND
conversions.conversion_time <= impressions.impression_time + interval 24 hours
"""), "inner")
attributed.writeStream.format("delta") \
.option("checkpointLocation", "/Volumes/catalog/checkpoints/attribution") \
.start("/delta/attributed_conversions")

The 24-hour attribution window means Spark holds each impression in state for up to 24 hours waiting for a matching conversion. That’s a lot of state for high-volume ad streams — you’ll want RocksDB enabled for this pattern.

“Join three Kafka streams together in Python with watermarks on each.”

ab = (stream_a.withWatermark("ts", "10 minutes")
.join(stream_b.withWatermark("ts", "10 minutes"),
expr("""a.key = b.key AND
b.ts BETWEEN a.ts - interval 5 min AND a.ts + interval 5 min"""),
"inner"))
abc = ab.join(stream_c.withWatermark("ts", "10 minutes"),
expr("""ab.key = c.key AND
c.ts BETWEEN ab.ts - interval 5 min AND ab.ts + interval 5 min"""),
"inner")

Each additional join doubles state overhead because Spark maintains state for each join independently. The result watermark comes from the left side of the final join. If you need to join more than three streams, consider whether a shared staging Delta table and sequential stream-static joins would be simpler.

“Write a Python foreachBatch that separates on-time and late events into different Delta tables.”

def write_with_late_data_handling(batch_df, batch_id):
from pyspark.sql.functions import current_timestamp, unix_timestamp, lit
processed = batch_df.withColumn("delay_seconds",
unix_timestamp(current_timestamp()) - unix_timestamp(col("event_time")))
on_time = processed.filter(col("delay_seconds") < 600)
late = processed.filter(col("delay_seconds") >= 600)
on_time.write.format("delta").mode("append") \
.option("txnVersion", batch_id) \
.option("txnAppId", "stream_join_job") \
.saveAsTable("matched_events")
if late.count() > 0:
late.withColumn("dlq_reason", lit("LATE_ARRIVAL")) \
.write.format("delta").mode("append") \
.saveAsTable("late_data_dlq")

Routing late data to a separate table lets you audit how much arrives outside your watermark window. If late volumes are consistently high, your watermark is too aggressive — widen it.

  • Missing watermarks cause unbounded state growth — the job will eventually OOM. Always define a watermark on both streaming sides before joining. This is the number one production issue with stream-stream joins.
  • Unbounded time conditions (s2.ts >= s1.ts) prevent state cleanup even with watermarks. Always use a two-sided bound: s2.ts BETWEEN s1.ts - interval X AND s1.ts + interval Y.
  • State too large for in-memory store — if you’re joining high-cardinality keys or using long watermarks, enable RocksDB: spark.conf.set("spark.sql.streaming.stateStore.providerClass", "com.databricks.sql.streaming.state.RocksDBStateProvider").
  • No matches appearing — check your time condition units. Mixing minutes and hours or using the wrong column name in the join expression are common bugs. Log a sample from each side and verify the time ranges overlap.
  • OOM despite watermarks — when the effective watermark is max(left, right), a single slow source with a long watermark drags both sides. Tighten the slower source’s watermark or investigate why it’s falling behind.