Skip to content

Stateful Operations

Skill: databricks-spark-structured-streaming

Stateless streaming works until you need to deduplicate, aggregate over time windows, or track running totals. At that point, Spark needs to remember what it’s seen — and that state must be bounded, or your job will eventually OOM. Watermarks define how long state lives; RocksDB handles state that exceeds memory. Together, they make stateful operations production-safe.

“Write a Python streaming pipeline that deduplicates Kafka events by event_id using a watermark, then computes 5-minute windowed aggregations per user.”

from pyspark.sql.functions import col, from_json, window, count, sum, current_timestamp
spark.conf.set(
"spark.sql.streaming.stateStore.providerClass",
"com.databricks.sql.streaming.state.RocksDBStateProvider"
)
df = (spark.readStream.format("kafka")
.option("subscribe", "user-activity").load()
.select(from_json(col("value").cast("string"), schema).alias("data"))
.select("data.*")
.withWatermark("event_time", "10 minutes")
.dropDuplicates(["event_id"]))
windowed = (df
.groupBy(window(col("event_time"), "5 minutes"), col("user_id"))
.agg(
count("*").alias("event_count"),
sum("value").alias("total_value")))
windowed.writeStream \
.outputMode("update") \
.format("delta") \
.option("checkpointLocation", "/Volumes/catalog/checkpoints/user_activity") \
.trigger(processingTime="30 seconds") \
.start("/delta/user_activity_metrics")

Key decisions:

  • Watermark before stateful operations — the .withWatermark("event_time", "10 minutes") call must appear before dropDuplicates or groupBy. It tells Spark that events arriving more than 10 minutes late can be dropped and their state cleaned up.
  • Watermark duration = 2-3x your p95 latency. Start with 10 minutes for general streaming. Financial transactions may need an hour; real-time analytics can use 5 minutes. Monitor your late data rate and adjust.
  • RocksDB from the start — the default in-memory state store works for small state but fails hard when you exceed memory. RocksDB stores state on disk with in-memory caching. Set it preemptively rather than waiting for the first OOM.
  • outputMode("update") emits corrected results as late data arrives within the watermark. Use append only if you need final, immutable results (emitted once the window closes past the watermark).
  • Deduplication before aggregation ensures the counts and sums in the windowed output are accurate. Reversing the order means duplicates inflate your metrics.

“Write a Python streaming pipeline that groups user events into 15-minute inactivity-based sessions.”

from pyspark.sql.functions import session_window, count, min, max
sessions = (df
.withWatermark("event_time", "30 minutes")
.groupBy(
col("user_id"),
session_window(col("event_time"), "15 minutes"))
.agg(
count("*").alias("event_count"),
min("event_time").alias("session_start"),
max("event_time").alias("session_end")))
sessions.writeStream \
.outputMode("update") \
.format("delta") \
.option("checkpointLocation", "/Volumes/catalog/checkpoints/sessions") \
.start("/delta/user_sessions")

Session windows group events by inactivity gaps rather than fixed time boundaries. A 15-minute gap means a new session starts when a user is idle for more than 15 minutes. Set the watermark wider than the session gap — here 30 minutes gives time for late events to land in the correct session.

“Write a Python function that checks state store size and partition balance for a running stream.”

for stream in spark.streams.active:
progress = stream.lastProgress
if progress and "stateOperators" in progress:
for op in progress["stateOperators"]:
rows = op.get("numRowsTotal", 0)
memory = op.get("memoryUsedBytes", 0)
disk = op.get("diskBytesUsed", 0)
print(f"State: {rows} rows, {memory / 1e6:.1f} MB memory, {disk / 1e6:.1f} MB disk")

Check state metrics after every deployment. If numRowsTotal grows monotonically across batches, your watermark isn’t expiring state fast enough — either the watermark is too long or events don’t have valid timestamps.

“Write a Python function that reads the state store checkpoint and checks for partition imbalance.”

def check_state_balance(checkpoint_path):
state_df = spark.read.format("statestore").load(f"{checkpoint_path}/state")
partition_counts = state_df.groupBy("partitionId").count().orderBy(desc("count"))
partition_counts.show()
counts = [row["count"] for row in partition_counts.collect()]
if counts:
skew_ratio = max(counts) / min(counts) if min(counts) > 0 else float("inf")
if skew_ratio > 10:
print(f"WARNING: State skew ratio {skew_ratio:.1f}x -- consider salting keys")

A skew ratio above 10x means one partition holds an outsized share of the state, creating a bottleneck. This usually happens with hot keys (e.g., a single user_id generating 90% of events). Salt the key by appending a hash prefix to distribute state more evenly.

  • Missing watermark on stateful operationsdropDuplicates, groupBy, and stream-stream joins all accumulate state. Without a watermark, state grows forever. This is the most common cause of OOM failures in streaming jobs.
  • Watermark too long inflates state and cost — a 72-hour watermark on 1M events/minute means billions of keys in state. Start conservative (10 minutes) and widen only if monitoring shows unacceptable late data drops.
  • Watermark too short drops legitimate events — if your p95 latency is 8 minutes and your watermark is 5 minutes, you’re dropping real data. Check late event rates before tightening.
  • State not expiring despite watermark — if event timestamps are constant (e.g., all set to epoch 0), the watermark never advances and state never expires. Validate that your event time column contains meaningful, increasing timestamps.
  • High-cardinality deduplication keysdropDuplicates(["user_id"]) across millions of users creates millions of state entries. Use naturally expiring keys like session_id or composite keys with a date component to partition the state space.