Checkpoint Best Practices
Skill: databricks-spark-structured-streaming
What You Can Build
Section titled “What You Can Build”Checkpoints are the foundation of exactly-once semantics in Structured Streaming. They store offset positions, state metadata, and commit records that let Spark resume exactly where it left off after a failure. Getting checkpoint storage, naming, and lifecycle management right prevents data loss, duplicate processing, and recovery failures that are notoriously hard to debug.
In Action
Section titled “In Action”“Write a Python function that generates checkpoint paths tied to the target table, and show the correct way to start a stream with persistent checkpoint storage.”
def get_checkpoint_path(table_name, environment="prod"): """Tie checkpoint to the target table, not the source.""" return f"/Volumes/{environment}/checkpoints/{table_name}"
stream.writeStream \ .format("delta") \ .option("checkpointLocation", get_checkpoint_path("orders")) \ .trigger(processingTime="30 seconds") \ .start("/delta/orders")Key decisions:
- Target-tied naming, not source-tied. The checkpoint already stores source information (topic, offsets). Naming by target gives systematic organization, easy backup, and clear ownership. When you see
/Volumes/prod/checkpoints/orders, you know exactly which table this belongs to. - Unity Catalog Volumes, not DBFS. UC Volumes are backed by S3 or ADLS — persistent, durable cloud storage. DBFS is ephemeral and workspace-local. A cluster termination can lose your DBFS checkpoints, which means reprocessing from scratch.
- One checkpoint per stream, always. Two streams writing to the same checkpoint corrupt each other’s offset tracking. This causes silent data loss that’s extremely hard to diagnose.
More Patterns
Section titled “More Patterns”Inspecting Checkpoint Contents
Section titled “Inspecting Checkpoint Contents”“Write Python code to read the latest offset and commit files from a checkpoint to diagnose whether a batch completed.”
import json
offset_file = "/checkpoints/stream/offsets/223"content = dbutils.fs.head(offset_file)offset_data = json.loads(content)print(json.dumps(offset_data, indent=2))
# Key fields:# - batchWatermarkMs: current watermark timestamp# - source[0].startOffset: beginning of batch (inclusive)# - source[0].endOffset: end of batch (exclusive)# - source[0].latestOffset: newest offset available in sourceThe gap between endOffset and latestOffset tells you how far behind the stream is. If a matching commit file exists for this batch ID, the batch completed successfully. If not, Spark will reprocess it on restart.
Reading State Store Directly
Section titled “Reading State Store Directly”“Write Python code to query the state store from a checkpoint to audit state size and partition balance.”
state_df = spark.read.format("statestore").load("/checkpoints/stream/state")state_df.show()
state_metadata = spark.read.format("state-metadata").load("/checkpoints/stream")state_metadata.show()The statestore reader exposes key, value, partition ID, and expiration timestamp for every state entry. The state-metadata reader shows operator names, partition counts, and batch ID ranges. Use these to diagnose state growth issues without stopping the stream.
Recovery from Lost or Corrupted Checkpoint
Section titled “Recovery from Lost or Corrupted Checkpoint”“Write Python code to recover a stream after checkpoint loss, including backup before risky changes.”
# Backup before major changes (schema evolution, code refactor)dbutils.fs.cp( "/Volumes/prod/checkpoints/orders", "/Volumes/prod/checkpoints/orders_backup_20240101", recurse=True)
# If checkpoint is lost or corrupted:dbutils.fs.rm("/Volumes/prod/checkpoints/orders", recurse=True)
# Restart with earliest offsets -- Delta idempotent writes prevent duplicatesstream.writeStream \ .format("delta") \ .option("checkpointLocation", "/Volumes/prod/checkpoints/orders") \ .trigger(processingTime="30 seconds") \ .start("/delta/orders")Deleting a checkpoint and restarting with startingOffsets=earliest reprocesses all data from the source. This is safe if your sink uses idempotent writes (txnVersion/txnAppId), because Delta skips batches it has already committed. Without idempotent writes, you get duplicates.
Monitoring Checkpoint Health
Section titled “Monitoring Checkpoint Health”“Write Python code to monitor checkpoint folder size and detect unprocessed batches.”
import json
# Check offset/commit synclatest_offset = sorted(dbutils.fs.ls("/checkpoints/stream/offsets"))[-1]batch_id = latest_offset.path.split("/")[-1]
commit_exists = dbutils.fs.exists(f"/checkpoints/stream/commits/{batch_id}")print(f"Batch {batch_id}: {'Committed' if commit_exists else 'Pending (will reprocess)'}")
# Track checkpoint folder sizefiles = dbutils.fs.ls("/checkpoints/stream")total_mb = sum(f.size for f in files if f.isFile()) / (1024 * 1024)print(f"Checkpoint size: {total_mb:.2f} MB")A missing commit file for the latest offset is normal after a crash — Spark wrote the intent (offset) but didn’t finish processing. It will reprocess on restart. Steadily growing checkpoint size without state operations indicates a retention issue; check minBatchesToRetain.
Watch Out For
Section titled “Watch Out For”- Shared checkpoints between streams cause silent data loss — each stream must have its own checkpoint location. When two streams share one, they overwrite each other’s offsets and skip or duplicate data with no error message.
- DBFS checkpoints lost on cluster termination — DBFS is not durable storage. Use Unity Catalog Volumes (backed by S3/ADLS) for any checkpoint that matters.
- State folder growing unboundedly — this means your watermark isn’t expiring state. Check that the watermark column has valid, increasing timestamps and that the watermark duration matches your data’s latency profile.
- Can’t resume after code change — certain changes (adding/removing stateful operators, changing the number of shuffle partitions) are incompatible with existing checkpoints. Back up the checkpoint before making changes, and be prepared to delete and reprocess if the schema is incompatible.
- Forgetting to back up before migration — always
dbutils.fs.cpthe checkpoint folder before schema changes, code refactors, or cluster migrations. A corrupted checkpoint with no backup means full reprocessing.