Skip to content

Checkpoint Best Practices

Skill: databricks-spark-structured-streaming

Checkpoints are the foundation of exactly-once semantics in Structured Streaming. They store offset positions, state metadata, and commit records that let Spark resume exactly where it left off after a failure. Getting checkpoint storage, naming, and lifecycle management right prevents data loss, duplicate processing, and recovery failures that are notoriously hard to debug.

“Write a Python function that generates checkpoint paths tied to the target table, and show the correct way to start a stream with persistent checkpoint storage.”

def get_checkpoint_path(table_name, environment="prod"):
"""Tie checkpoint to the target table, not the source."""
return f"/Volumes/{environment}/checkpoints/{table_name}"
stream.writeStream \
.format("delta") \
.option("checkpointLocation", get_checkpoint_path("orders")) \
.trigger(processingTime="30 seconds") \
.start("/delta/orders")

Key decisions:

  • Target-tied naming, not source-tied. The checkpoint already stores source information (topic, offsets). Naming by target gives systematic organization, easy backup, and clear ownership. When you see /Volumes/prod/checkpoints/orders, you know exactly which table this belongs to.
  • Unity Catalog Volumes, not DBFS. UC Volumes are backed by S3 or ADLS — persistent, durable cloud storage. DBFS is ephemeral and workspace-local. A cluster termination can lose your DBFS checkpoints, which means reprocessing from scratch.
  • One checkpoint per stream, always. Two streams writing to the same checkpoint corrupt each other’s offset tracking. This causes silent data loss that’s extremely hard to diagnose.

“Write Python code to read the latest offset and commit files from a checkpoint to diagnose whether a batch completed.”

import json
offset_file = "/checkpoints/stream/offsets/223"
content = dbutils.fs.head(offset_file)
offset_data = json.loads(content)
print(json.dumps(offset_data, indent=2))
# Key fields:
# - batchWatermarkMs: current watermark timestamp
# - source[0].startOffset: beginning of batch (inclusive)
# - source[0].endOffset: end of batch (exclusive)
# - source[0].latestOffset: newest offset available in source

The gap between endOffset and latestOffset tells you how far behind the stream is. If a matching commit file exists for this batch ID, the batch completed successfully. If not, Spark will reprocess it on restart.

“Write Python code to query the state store from a checkpoint to audit state size and partition balance.”

state_df = spark.read.format("statestore").load("/checkpoints/stream/state")
state_df.show()
state_metadata = spark.read.format("state-metadata").load("/checkpoints/stream")
state_metadata.show()

The statestore reader exposes key, value, partition ID, and expiration timestamp for every state entry. The state-metadata reader shows operator names, partition counts, and batch ID ranges. Use these to diagnose state growth issues without stopping the stream.

Recovery from Lost or Corrupted Checkpoint

Section titled “Recovery from Lost or Corrupted Checkpoint”

“Write Python code to recover a stream after checkpoint loss, including backup before risky changes.”

# Backup before major changes (schema evolution, code refactor)
dbutils.fs.cp(
"/Volumes/prod/checkpoints/orders",
"/Volumes/prod/checkpoints/orders_backup_20240101",
recurse=True
)
# If checkpoint is lost or corrupted:
dbutils.fs.rm("/Volumes/prod/checkpoints/orders", recurse=True)
# Restart with earliest offsets -- Delta idempotent writes prevent duplicates
stream.writeStream \
.format("delta") \
.option("checkpointLocation", "/Volumes/prod/checkpoints/orders") \
.trigger(processingTime="30 seconds") \
.start("/delta/orders")

Deleting a checkpoint and restarting with startingOffsets=earliest reprocesses all data from the source. This is safe if your sink uses idempotent writes (txnVersion/txnAppId), because Delta skips batches it has already committed. Without idempotent writes, you get duplicates.

“Write Python code to monitor checkpoint folder size and detect unprocessed batches.”

import json
# Check offset/commit sync
latest_offset = sorted(dbutils.fs.ls("/checkpoints/stream/offsets"))[-1]
batch_id = latest_offset.path.split("/")[-1]
commit_exists = dbutils.fs.exists(f"/checkpoints/stream/commits/{batch_id}")
print(f"Batch {batch_id}: {'Committed' if commit_exists else 'Pending (will reprocess)'}")
# Track checkpoint folder size
files = dbutils.fs.ls("/checkpoints/stream")
total_mb = sum(f.size for f in files if f.isFile()) / (1024 * 1024)
print(f"Checkpoint size: {total_mb:.2f} MB")

A missing commit file for the latest offset is normal after a crash — Spark wrote the intent (offset) but didn’t finish processing. It will reprocess on restart. Steadily growing checkpoint size without state operations indicates a retention issue; check minBatchesToRetain.

  • Shared checkpoints between streams cause silent data loss — each stream must have its own checkpoint location. When two streams share one, they overwrite each other’s offsets and skip or duplicate data with no error message.
  • DBFS checkpoints lost on cluster termination — DBFS is not durable storage. Use Unity Catalog Volumes (backed by S3/ADLS) for any checkpoint that matters.
  • State folder growing unboundedly — this means your watermark isn’t expiring state. Check that the watermark column has valid, increasing timestamps and that the watermark duration matches your data’s latency profile.
  • Can’t resume after code change — certain changes (adding/removing stateful operators, changing the number of shuffle partitions) are incompatible with existing checkpoints. Back up the checkpoint before making changes, and be prepared to delete and reprocess if the schema is incompatible.
  • Forgetting to back up before migration — always dbutils.fs.cp the checkpoint folder before schema changes, code refactors, or cluster migrations. A corrupted checkpoint with no backup means full reprocessing.