Auto Loader
Skill: databricks-spark-declarative-pipelines
What You Can Build
Section titled “What You Can Build”Auto Loader watches a directory in cloud storage and processes only the files that are new since the last run. You point it at a landing zone, tell it the file format, and it handles discovery, deduplication, and schema inference. No manual file listing, no tracking state yourself.
In Action
Section titled “In Action”“Create a SQL streaming table that ingests JSON files from our events landing zone with automatic schema inference”
CREATE STREAMING TABLE bronze_eventsAS SELECT *, current_timestamp() AS _ingested_at, _metadata.file_path AS _source_fileFROM STREAM(read_files( '/Volumes/catalog/raw/events/', format => 'json', inferColumnTypes => true));Key decisions:
STREAM(read_files(...))— theSTREAMkeyword is what activates Auto Loader’s incremental behavior. Without it,read_filesruns as a one-time batch scan.inferColumnTypes— Auto Loader infers the actual data types instead of defaulting everything to strings. Good for prototyping; useschemaHintsin production for stability.- Ingestion metadata columns —
_ingested_atand_source_filecost nothing to add and save hours when debugging data quality issues downstream.
More Patterns
Section titled “More Patterns”Lock down the schema for production
Section titled “Lock down the schema for production”“Ingest CSV order files with an explicit schema and header parsing in SQL”
CREATE STREAMING TABLE bronze_orders ( order_id STRING, customer_id STRING, amount DECIMAL(10,2), order_date DATE)AS SELECT * FROM STREAM(read_files( '/Volumes/catalog/raw/orders/', format => 'csv', header => true, schemaHints => 'order_id STRING, customer_id STRING, amount DECIMAL(10,2), order_date DATE'));Declaring the schema in both the table definition and schemaHints acts as a double guard — if the upstream file format drifts, you get a clear error instead of silent type coercion.
Handle schema evolution gracefully
Section titled “Handle schema evolution gracefully”“Set up a Python Auto Loader pipeline that tolerates new columns appearing in JSON files over time”
schema_location_base = spark.conf.get("schema_location_base")
@dp.table(name="bronze_customers", cluster_by=["ingestion_date"])def bronze_customers(): return ( spark.readStream.format("cloudFiles") .option("cloudFiles.format", "json") .option("cloudFiles.schemaLocation", f"{schema_location_base}/bronze_customers") .option("rescuedDataColumn", "_rescued_data") .load("/Volumes/catalog/raw/customers/") .withColumn("_ingested_at", current_timestamp()) .withColumn("ingestion_date", current_date()) )The schemaLocation stores inferred schema snapshots so Auto Loader detects when new columns appear. Records that don’t match the current schema land in _rescued_data instead of failing the pipeline.
Route malformed records to a quarantine table
Section titled “Route malformed records to a quarantine table”“Split ingested events into a clean stream and a quarantine table for records with parsing errors, in SQL”
CREATE STREAMING TABLE bronze_eventsAS SELECT *, current_timestamp() AS _ingested_at, CASE WHEN _rescued_data IS NOT NULL THEN true ELSE false END AS _has_errorsFROM STREAM(read_files( '/Volumes/catalog/raw/events/', format => 'json'));
CREATE STREAMING TABLE quarantine_eventsAS SELECT * FROM STREAM(bronze_events)WHERE _has_errors = true;
CREATE STREAMING TABLE silver_eventsAS SELECT * FROM STREAM(bronze_events)WHERE _has_errors = false;The _rescued_data column captures any fields that didn’t parse correctly. Routing them to a quarantine table keeps your silver layer clean while preserving the raw evidence for investigation.
Watch Out For
Section titled “Watch Out For”- Missing
STREAMkeyword —FROM read_files(...)withoutSTREAMruns a one-time batch read, not Auto Loader. You’ll get all files once and never pick up new arrivals. - Python pipelines require
schemaLocation— when usingspark.readStream.format("cloudFiles")with schema inference, you must setcloudFiles.schemaLocation. Store it in a Volume separate from your source data (e.g.,/Volumes/catalog/schema/pipeline_metadata/schemas/). inferColumnTypescan drift — auto-inferred schemas change when new files have different shapes. UseschemaHintsfor columns that must stay stable in production.- Notification mode for high-volume directories — if your landing zone has thousands of files per hour, set
cloudFiles.useNotificationstotrueso Auto Loader uses cloud-native events instead of directory listing.