Skip to content

Auto Loader

Skill: databricks-spark-declarative-pipelines

Auto Loader watches a directory in cloud storage and processes only the files that are new since the last run. You point it at a landing zone, tell it the file format, and it handles discovery, deduplication, and schema inference. No manual file listing, no tracking state yourself.

“Create a SQL streaming table that ingests JSON files from our events landing zone with automatic schema inference”

CREATE STREAMING TABLE bronze_events
AS SELECT
*,
current_timestamp() AS _ingested_at,
_metadata.file_path AS _source_file
FROM STREAM(read_files(
'/Volumes/catalog/raw/events/',
format => 'json',
inferColumnTypes => true
));

Key decisions:

  • STREAM(read_files(...)) — the STREAM keyword is what activates Auto Loader’s incremental behavior. Without it, read_files runs as a one-time batch scan.
  • inferColumnTypes — Auto Loader infers the actual data types instead of defaulting everything to strings. Good for prototyping; use schemaHints in production for stability.
  • Ingestion metadata columns_ingested_at and _source_file cost nothing to add and save hours when debugging data quality issues downstream.

“Ingest CSV order files with an explicit schema and header parsing in SQL”

CREATE STREAMING TABLE bronze_orders (
order_id STRING,
customer_id STRING,
amount DECIMAL(10,2),
order_date DATE
)
AS SELECT * FROM STREAM(read_files(
'/Volumes/catalog/raw/orders/',
format => 'csv',
header => true,
schemaHints => 'order_id STRING, customer_id STRING, amount DECIMAL(10,2), order_date DATE'
));

Declaring the schema in both the table definition and schemaHints acts as a double guard — if the upstream file format drifts, you get a clear error instead of silent type coercion.

“Set up a Python Auto Loader pipeline that tolerates new columns appearing in JSON files over time”

schema_location_base = spark.conf.get("schema_location_base")
@dp.table(name="bronze_customers", cluster_by=["ingestion_date"])
def bronze_customers():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", f"{schema_location_base}/bronze_customers")
.option("rescuedDataColumn", "_rescued_data")
.load("/Volumes/catalog/raw/customers/")
.withColumn("_ingested_at", current_timestamp())
.withColumn("ingestion_date", current_date())
)

The schemaLocation stores inferred schema snapshots so Auto Loader detects when new columns appear. Records that don’t match the current schema land in _rescued_data instead of failing the pipeline.

Route malformed records to a quarantine table

Section titled “Route malformed records to a quarantine table”

“Split ingested events into a clean stream and a quarantine table for records with parsing errors, in SQL”

CREATE STREAMING TABLE bronze_events
AS SELECT
*,
current_timestamp() AS _ingested_at,
CASE WHEN _rescued_data IS NOT NULL THEN true ELSE false END AS _has_errors
FROM STREAM(read_files(
'/Volumes/catalog/raw/events/',
format => 'json'
));
CREATE STREAMING TABLE quarantine_events
AS SELECT * FROM STREAM(bronze_events)
WHERE _has_errors = true;
CREATE STREAMING TABLE silver_events
AS SELECT * FROM STREAM(bronze_events)
WHERE _has_errors = false;

The _rescued_data column captures any fields that didn’t parse correctly. Routing them to a quarantine table keeps your silver layer clean while preserving the raw evidence for investigation.

  • Missing STREAM keywordFROM read_files(...) without STREAM runs a one-time batch read, not Auto Loader. You’ll get all files once and never pick up new arrivals.
  • Python pipelines require schemaLocation — when using spark.readStream.format("cloudFiles") with schema inference, you must set cloudFiles.schemaLocation. Store it in a Volume separate from your source data (e.g., /Volumes/catalog/schema/pipeline_metadata/schemas/).
  • inferColumnTypes can drift — auto-inferred schemas change when new files have different shapes. Use schemaHints for columns that must stay stable in production.
  • Notification mode for high-volume directories — if your landing zone has thousands of files per hour, set cloudFiles.useNotifications to true so Auto Loader uses cloud-native events instead of directory listing.