Stateful Operations
Skill: databricks-spark-structured-streaming
What You Can Build
Section titled “What You Can Build”Stateless streaming works until you need to deduplicate, aggregate over time windows, or track running totals. At that point, Spark needs to remember what it’s seen — and that state must be bounded, or your job will eventually OOM. Watermarks define how long state lives; RocksDB handles state that exceeds memory. Together, they make stateful operations production-safe.
In Action
Section titled “In Action”“Write a Python streaming pipeline that deduplicates Kafka events by event_id using a watermark, then computes 5-minute windowed aggregations per user.”
from pyspark.sql.functions import col, from_json, window, count, sum, current_timestamp
spark.conf.set( "spark.sql.streaming.stateStore.providerClass", "com.databricks.sql.streaming.state.RocksDBStateProvider")
df = (spark.readStream.format("kafka") .option("subscribe", "user-activity").load() .select(from_json(col("value").cast("string"), schema).alias("data")) .select("data.*") .withWatermark("event_time", "10 minutes") .dropDuplicates(["event_id"]))
windowed = (df .groupBy(window(col("event_time"), "5 minutes"), col("user_id")) .agg( count("*").alias("event_count"), sum("value").alias("total_value")))
windowed.writeStream \ .outputMode("update") \ .format("delta") \ .option("checkpointLocation", "/Volumes/catalog/checkpoints/user_activity") \ .trigger(processingTime="30 seconds") \ .start("/delta/user_activity_metrics")Key decisions:
- Watermark before stateful operations — the
.withWatermark("event_time", "10 minutes")call must appear beforedropDuplicatesorgroupBy. It tells Spark that events arriving more than 10 minutes late can be dropped and their state cleaned up. - Watermark duration = 2-3x your p95 latency. Start with 10 minutes for general streaming. Financial transactions may need an hour; real-time analytics can use 5 minutes. Monitor your late data rate and adjust.
- RocksDB from the start — the default in-memory state store works for small state but fails hard when you exceed memory. RocksDB stores state on disk with in-memory caching. Set it preemptively rather than waiting for the first OOM.
outputMode("update")emits corrected results as late data arrives within the watermark. Useappendonly if you need final, immutable results (emitted once the window closes past the watermark).- Deduplication before aggregation ensures the counts and sums in the windowed output are accurate. Reversing the order means duplicates inflate your metrics.
More Patterns
Section titled “More Patterns”Session Windows
Section titled “Session Windows”“Write a Python streaming pipeline that groups user events into 15-minute inactivity-based sessions.”
from pyspark.sql.functions import session_window, count, min, max
sessions = (df .withWatermark("event_time", "30 minutes") .groupBy( col("user_id"), session_window(col("event_time"), "15 minutes")) .agg( count("*").alias("event_count"), min("event_time").alias("session_start"), max("event_time").alias("session_end")))
sessions.writeStream \ .outputMode("update") \ .format("delta") \ .option("checkpointLocation", "/Volumes/catalog/checkpoints/sessions") \ .start("/delta/user_sessions")Session windows group events by inactivity gaps rather than fixed time boundaries. A 15-minute gap means a new session starts when a user is idle for more than 15 minutes. Set the watermark wider than the session gap — here 30 minutes gives time for late events to land in the correct session.
Monitoring State Health
Section titled “Monitoring State Health”“Write a Python function that checks state store size and partition balance for a running stream.”
for stream in spark.streams.active: progress = stream.lastProgress if progress and "stateOperators" in progress: for op in progress["stateOperators"]: rows = op.get("numRowsTotal", 0) memory = op.get("memoryUsedBytes", 0) disk = op.get("diskBytesUsed", 0) print(f"State: {rows} rows, {memory / 1e6:.1f} MB memory, {disk / 1e6:.1f} MB disk")Check state metrics after every deployment. If numRowsTotal grows monotonically across batches, your watermark isn’t expiring state fast enough — either the watermark is too long or events don’t have valid timestamps.
Detecting State Partition Skew
Section titled “Detecting State Partition Skew”“Write a Python function that reads the state store checkpoint and checks for partition imbalance.”
def check_state_balance(checkpoint_path): state_df = spark.read.format("statestore").load(f"{checkpoint_path}/state") partition_counts = state_df.groupBy("partitionId").count().orderBy(desc("count")) partition_counts.show()
counts = [row["count"] for row in partition_counts.collect()] if counts: skew_ratio = max(counts) / min(counts) if min(counts) > 0 else float("inf") if skew_ratio > 10: print(f"WARNING: State skew ratio {skew_ratio:.1f}x -- consider salting keys")A skew ratio above 10x means one partition holds an outsized share of the state, creating a bottleneck. This usually happens with hot keys (e.g., a single user_id generating 90% of events). Salt the key by appending a hash prefix to distribute state more evenly.
Watch Out For
Section titled “Watch Out For”- Missing watermark on stateful operations —
dropDuplicates,groupBy, and stream-stream joins all accumulate state. Without a watermark, state grows forever. This is the most common cause of OOM failures in streaming jobs. - Watermark too long inflates state and cost — a 72-hour watermark on 1M events/minute means billions of keys in state. Start conservative (10 minutes) and widen only if monitoring shows unacceptable late data drops.
- Watermark too short drops legitimate events — if your p95 latency is 8 minutes and your watermark is 5 minutes, you’re dropping real data. Check late event rates before tightening.
- State not expiring despite watermark — if event timestamps are constant (e.g., all set to epoch 0), the watermark never advances and state never expires. Validate that your event time column contains meaningful, increasing timestamps.
- High-cardinality deduplication keys —
dropDuplicates(["user_id"])across millions of users creates millions of state entries. Use naturally expiring keys likesession_idor composite keys with a date component to partition the state space.