Skip to content

Ingestion Patterns

Skill: databricks-spark-declarative-pipelines

Ingestion is the entry point of every pipeline — getting raw data from cloud storage, message queues, or external systems into bronze streaming tables. Ask your AI coding assistant to generate the right ingestion pattern and it will wire up Auto Loader, schema hints, metadata columns, and error handling so new files or events flow into your lakehouse automatically.

“Create a SQL streaming table that ingests JSON order files from a Volume, adds ingestion metadata, and uses schema hints for the key columns”

CREATE OR REFRESH STREAMING TABLE bronze_orders
CLUSTER BY (order_date)
AS
SELECT
*,
current_timestamp() AS _ingested_at,
_metadata.file_path AS _source_file,
_metadata.file_modification_time AS _file_timestamp
FROM STREAM read_files(
'/Volumes/catalog/schema/raw/orders/',
format => 'json',
schemaHints => 'order_id STRING, customer_id STRING, amount DECIMAL(10,2), order_date DATE'
);

Key decisions:

  • STREAM read_files() — the STREAM keyword activates Auto Loader mode so new files are picked up incrementally. Without it, read_files() runs as a batch scan and the pipeline errors with “cannot create streaming table from batch query.”
  • schemaHints for critical columns — hint the types for columns that matter (IDs, amounts, dates). Let the rest auto-infer. This prevents silent type drift on key business fields without locking down the entire schema.
  • Metadata columns_ingested_at and _source_file cost nothing and save hours when debugging late-arriving or corrupt files.
  • CLUSTER BY on query filter columns — Liquid Clustering replaces PARTITION BY. Pick 1-4 columns your downstream queries filter on most.

Ingest with rescued data and quarantine routing

Section titled “Ingest with rescued data and quarantine routing”

“Set up a SQL bronze ingestion that captures parse failures into a quarantine table and routes clean records downstream”

CREATE OR REFRESH STREAMING TABLE bronze_events AS
SELECT
*,
current_timestamp() AS _ingested_at,
CASE WHEN _rescued_data IS NOT NULL THEN TRUE ELSE FALSE END AS _has_errors
FROM STREAM read_files(
'/Volumes/catalog/schema/raw/events/',
format => 'json',
schemaHints => 'event_id STRING, event_time TIMESTAMP'
);
CREATE OR REFRESH STREAMING TABLE bronze_events_quarantine AS
SELECT * FROM STREAM bronze_events WHERE _rescued_data IS NOT NULL;
CREATE OR REFRESH STREAMING TABLE silver_events_clean AS
SELECT * FROM STREAM bronze_events WHERE _rescued_data IS NULL;

The _rescued_data column is populated automatically by Auto Loader when a record does not match the inferred or hinted schema. Routing these to a quarantine table keeps the silver layer clean while preserving the bad records for investigation.

“Create a SQL streaming table that reads from a Kafka topic with SASL authentication and casts the key and value to strings”

CREATE OR REFRESH STREAMING TABLE bronze_kafka_events AS
SELECT
CAST(key AS STRING) AS event_key,
CAST(value AS STRING) AS event_value,
topic,
partition,
offset,
timestamp AS kafka_timestamp,
current_timestamp() AS _ingested_at
FROM read_stream(
format => 'kafka',
kafka.bootstrap.servers => '${kafka_brokers}',
subscribe => 'events-topic',
startingOffsets => 'latest',
kafka.security.protocol => 'SASL_SSL',
kafka.sasl.mechanism => 'PLAIN',
kafka.sasl.jaas.config => 'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="{{secrets/kafka/username}}" password="{{secrets/kafka/password}}";'
);

Kafka values arrive as binary — cast immediately at bronze. Use \{\{secrets/scope/key\}\} for credentials so they stay out of code. The startingOffsets parameter controls whether you replay from earliest or pick up from latest on first run.

“Write a Python bronze table that ingests JSON files with Auto Loader, infers column types, and stores schema metadata in a Volume”

from pyspark import pipelines as dp
from pyspark.sql import functions as F
schema_location_base = spark.conf.get("schema_location_base")
@dp.table(name="bronze_orders", cluster_by=["order_date"])
def bronze_orders():
return (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", f"{schema_location_base}/bronze_orders")
.option("cloudFiles.inferColumnTypes", "true")
.load("/Volumes/catalog/schema/raw/orders/")
.withColumn("_ingested_at", F.current_timestamp())
.withColumn("_source_file", F.col("_metadata.file_path"))
)

Python ingestion uses cloudFiles format directly instead of read_files(). The schemaLocation is mandatory when using schema inference — it stores the inferred schema so Auto Loader does not re-scan all files on every run. Store it in a separate Volume like /Volumes/catalog/schema/pipeline_metadata/schemas/, never in the source data Volume.

“Show the Asset Bundle config for a pipeline that parameterizes catalog, schema, and schema location across dev and prod”

databricks.yml
variables:
catalog:
description: Target catalog
schema:
description: Target schema
schema_location_base:
description: Base path for Auto Loader schema metadata
targets:
dev:
variables:
catalog: dev_catalog
schema: dev_${workspace.current_user.short_name}
schema_location_base: /Volumes/dev_catalog/pipeline_metadata/schemas
prod:
variables:
catalog: prod_catalog
schema: production
schema_location_base: /Volumes/prod_catalog/pipeline_metadata/schemas

Access these in pipeline code with spark.conf.get("schema_location_base"). Parameterizing at the bundle level means the same SQL or Python files deploy to any environment without edits.

  • Missing STREAM keywordFROM read_files(...) is batch. FROM STREAM read_files(...) is incremental. Getting this wrong means the pipeline re-scans every file on every run, then fails if the target is a streaming table.
  • Schema location in the source Volume — storing Auto Loader schema metadata alongside source data causes permission conflicts and checkpoint corruption. Always use a dedicated metadata Volume.
  • No schemaHints on critical columns — fully inferred schemas can silently drift when source data changes. Hint at least your primary keys, amounts, and dates.
  • Kafka startingOffsets on first runlatest skips all existing data. Use earliest if you need to backfill, but be ready for a large initial load.