Spark Structured Streaming
Skill: databricks-spark-structured-streaming
What You Can Build
Section titled “What You Can Build”Production streaming pipelines that ingest from Kafka, transform in-flight, and land in Delta tables with exactly-once guarantees. Your AI coding assistant handles the full surface area — trigger selection, checkpoint configuration, watermarked stateful aggregations, stream-stream joins, and multi-sink fan-out — so you get a pipeline that runs reliably on Databricks without chasing down subtle streaming gotchas yourself.
In Action
Section titled “In Action”“Set up a Kafka-to-Delta streaming pipeline that parses JSON events, applies a 10-minute watermark for late arrivals, and writes to a Delta table with a 30-second processing trigger.”
from pyspark.sql.functions import col, from_json, window, count, to_timestampfrom pyspark.sql.types import StructType, StructField, StringType, TimestampType, DoubleType
schema = StructType([ StructField("event_id", StringType()), StructField("event_time", StringType()), StructField("device_id", StringType()), StructField("temperature", DoubleType()),])
raw = (spark .readStream .format("kafka") .option("kafka.bootstrap.servers", "broker1:9092,broker2:9092") .option("subscribe", "iot-events") .option("startingOffsets", "latest") .option("maxOffsetsPerTrigger", 100000) .load())
parsed = (raw .select(from_json(col("value").cast("string"), schema).alias("data")) .select("data.*") .withColumn("event_time", to_timestamp("event_time")) .withWatermark("event_time", "10 minutes"))
query = (parsed .writeStream .format("delta") .outputMode("append") .option("checkpointLocation", "/Volumes/prod/checkpoints/iot_events") .trigger(processingTime="30 seconds") .toTable("prod.bronze.iot_events"))Key decisions:
processingTime="30 seconds"— balances latency and cost. Sub-second needs Real-Time Mode (RTM); batch catch-up jobs useavailableNow=Trueto process all pending data and stop.maxOffsetsPerTrigger=100000— caps records per micro-batch to prevent OOMs when recovering from a lag spike. Without this, the first batch after downtime tries to consume everything.- Watermark at 10 minutes — any event arriving more than 10 minutes late is dropped from stateful aggregations. This bounds state store growth and is required for stream-stream joins.
- Checkpoint on UC Volumes — checkpoints must be persistent and unique per stream. Never use DBFS root (
dbfs:/) for production checkpoints; UC Volumes survive workspace resets. toTableover path-based writes — registers the table in Unity Catalog, giving you lineage, governance, and discoverability out of the box.
More Patterns
Section titled “More Patterns”Stream-stream join with event-time window
Section titled “Stream-stream join with event-time window”“Join a clickstream and a transactions stream on user_id within a 5-minute window to attribute purchases to page views.”
clicks = (spark.readStream.format("kafka") .option("kafka.bootstrap.servers", brokers) .option("subscribe", "clicks") .load() .select(from_json(col("value").cast("string"), click_schema).alias("c")) .select("c.*") .withColumn("click_time", to_timestamp("click_time")) .withWatermark("click_time", "10 minutes"))
transactions = (spark.readStream.format("kafka") .option("kafka.bootstrap.servers", brokers) .option("subscribe", "transactions") .load() .select(from_json(col("value").cast("string"), txn_schema).alias("t")) .select("t.*") .withColumn("txn_time", to_timestamp("txn_time")) .withWatermark("txn_time", "10 minutes"))
attributed = clicks.join( transactions, expr(""" c_user_id = t_user_id AND txn_time BETWEEN click_time AND click_time + INTERVAL 5 MINUTES """), "inner")
attributed.writeStream \ .format("delta") \ .outputMode("append") \ .option("checkpointLocation", "/Volumes/prod/checkpoints/attributed") \ .toTable("prod.silver.click_attribution")Both streams need watermarks for the join to work. Spark uses the watermarks to know when it can safely emit joined rows and evict old state. If you omit either watermark, state grows unbounded and you will eventually OOM.
Multi-sink fan-out with foreachBatch
Section titled “Multi-sink fan-out with foreachBatch”“Write each micro-batch to both a Delta table and a Kafka topic for downstream consumers.”
def write_to_sinks(batch_df, batch_id): # Sink 1: Delta table batch_df.write.format("delta").mode("append").saveAsTable("prod.silver.events")
# Sink 2: Kafka topic for downstream (batch_df .selectExpr("CAST(event_id AS STRING) AS key", "to_json(struct(*)) AS value") .write .format("kafka") .option("kafka.bootstrap.servers", "broker:9092") .option("topic", "enriched-events") .save() )
query = (parsed .writeStream .foreachBatch(write_to_sinks) .option("checkpointLocation", "/Volumes/prod/checkpoints/fan_out") .trigger(processingTime="1 minute") .start())foreachBatch gives you a static DataFrame for each micro-batch, so you can write to any number of sinks with standard DataFrame APIs. The checkpoint guarantees exactly-once for the Delta sink. The Kafka sink is at-least-once since Kafka does not participate in the checkpoint transaction.
Batch catch-up with availableNow
Section titled “Batch catch-up with availableNow”“Backfill my streaming table with all historical Kafka data, then stop the job automatically.”
(spark.readStream .format("kafka") .option("kafka.bootstrap.servers", brokers) .option("subscribe", "events") .option("startingOffsets", "earliest") .load() .select(from_json(col("value").cast("string"), schema).alias("d")) .select("d.*") .writeStream .format("delta") .outputMode("append") .option("checkpointLocation", "/Volumes/prod/checkpoints/backfill") .trigger(availableNow=True) .toTable("prod.bronze.events_backfill") .awaitTermination())availableNow=True processes all available data in multiple micro-batches (respecting maxOffsetsPerTrigger), then terminates. This is the right trigger for backfill and catch-up jobs because you get incremental processing without paying for a long-running cluster.
Watch Out For
Section titled “Watch Out For”- Autoscaling clusters for streaming — Streaming jobs need stable shuffle state across executors. Autoscaling adds and removes nodes, which causes state rebalancing, checkpoint corruption, and query restarts. Use a fixed-size cluster for streaming workloads.
- Reusing checkpoint paths — Each stream must have its own unique checkpoint directory. If two streams share a path, or you point a modified query at an old checkpoint, Spark will throw
StreamingQueryExceptionor silently produce wrong results. When changing a query’s schema or logic, start with a fresh checkpoint. - Missing watermarks on stateful operations — Aggregations and joins without watermarks accumulate state forever. On Databricks, this means the RocksDB state store grows until the executor runs out of disk, causing silent query failures hours or days after deployment.
- Inner joins on stream-static lookups — Use left joins for stream-static patterns. Inner joins silently drop streaming rows that have no match in the static table, and the static table is only read once at query start unless you explicitly refresh it.