Ingestion Patterns
Skill: databricks-spark-declarative-pipelines
What You Can Build
Section titled “What You Can Build”Ingestion is the entry point of every pipeline — getting raw data from cloud storage, message queues, or external systems into bronze streaming tables. Ask your AI coding assistant to generate the right ingestion pattern and it will wire up Auto Loader, schema hints, metadata columns, and error handling so new files or events flow into your lakehouse automatically.
In Action
Section titled “In Action”“Create a SQL streaming table that ingests JSON order files from a Volume, adds ingestion metadata, and uses schema hints for the key columns”
CREATE OR REFRESH STREAMING TABLE bronze_ordersCLUSTER BY (order_date)ASSELECT *, current_timestamp() AS _ingested_at, _metadata.file_path AS _source_file, _metadata.file_modification_time AS _file_timestampFROM STREAM read_files( '/Volumes/catalog/schema/raw/orders/', format => 'json', schemaHints => 'order_id STRING, customer_id STRING, amount DECIMAL(10,2), order_date DATE');Key decisions:
STREAM read_files()— theSTREAMkeyword activates Auto Loader mode so new files are picked up incrementally. Without it,read_files()runs as a batch scan and the pipeline errors with “cannot create streaming table from batch query.”schemaHintsfor critical columns — hint the types for columns that matter (IDs, amounts, dates). Let the rest auto-infer. This prevents silent type drift on key business fields without locking down the entire schema.- Metadata columns —
_ingested_atand_source_filecost nothing and save hours when debugging late-arriving or corrupt files. CLUSTER BYon query filter columns — Liquid Clustering replacesPARTITION BY. Pick 1-4 columns your downstream queries filter on most.
More Patterns
Section titled “More Patterns”Ingest with rescued data and quarantine routing
Section titled “Ingest with rescued data and quarantine routing”“Set up a SQL bronze ingestion that captures parse failures into a quarantine table and routes clean records downstream”
CREATE OR REFRESH STREAMING TABLE bronze_events ASSELECT *, current_timestamp() AS _ingested_at, CASE WHEN _rescued_data IS NOT NULL THEN TRUE ELSE FALSE END AS _has_errorsFROM STREAM read_files( '/Volumes/catalog/schema/raw/events/', format => 'json', schemaHints => 'event_id STRING, event_time TIMESTAMP');
CREATE OR REFRESH STREAMING TABLE bronze_events_quarantine ASSELECT * FROM STREAM bronze_events WHERE _rescued_data IS NOT NULL;
CREATE OR REFRESH STREAMING TABLE silver_events_clean ASSELECT * FROM STREAM bronze_events WHERE _rescued_data IS NULL;The _rescued_data column is populated automatically by Auto Loader when a record does not match the inferred or hinted schema. Routing these to a quarantine table keeps the silver layer clean while preserving the bad records for investigation.
Ingest from Kafka with secret references
Section titled “Ingest from Kafka with secret references”“Create a SQL streaming table that reads from a Kafka topic with SASL authentication and casts the key and value to strings”
CREATE OR REFRESH STREAMING TABLE bronze_kafka_events ASSELECT CAST(key AS STRING) AS event_key, CAST(value AS STRING) AS event_value, topic, partition, offset, timestamp AS kafka_timestamp, current_timestamp() AS _ingested_atFROM read_stream( format => 'kafka', kafka.bootstrap.servers => '${kafka_brokers}', subscribe => 'events-topic', startingOffsets => 'latest', kafka.security.protocol => 'SASL_SSL', kafka.sasl.mechanism => 'PLAIN', kafka.sasl.jaas.config => 'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="{{secrets/kafka/username}}" password="{{secrets/kafka/password}}";');Kafka values arrive as binary — cast immediately at bronze. Use \{\{secrets/scope/key\}\} for credentials so they stay out of code. The startingOffsets parameter controls whether you replay from earliest or pick up from latest on first run.
Python Auto Loader with schema location
Section titled “Python Auto Loader with schema location”“Write a Python bronze table that ingests JSON files with Auto Loader, infers column types, and stores schema metadata in a Volume”
from pyspark import pipelines as dpfrom pyspark.sql import functions as F
schema_location_base = spark.conf.get("schema_location_base")
@dp.table(name="bronze_orders", cluster_by=["order_date"])def bronze_orders(): return ( spark.readStream .format("cloudFiles") .option("cloudFiles.format", "json") .option("cloudFiles.schemaLocation", f"{schema_location_base}/bronze_orders") .option("cloudFiles.inferColumnTypes", "true") .load("/Volumes/catalog/schema/raw/orders/") .withColumn("_ingested_at", F.current_timestamp()) .withColumn("_source_file", F.col("_metadata.file_path")) )Python ingestion uses cloudFiles format directly instead of read_files(). The schemaLocation is mandatory when using schema inference — it stores the inferred schema so Auto Loader does not re-scan all files on every run. Store it in a separate Volume like /Volumes/catalog/schema/pipeline_metadata/schemas/, never in the source data Volume.
Parameterized multi-environment ingestion
Section titled “Parameterized multi-environment ingestion”“Show the Asset Bundle config for a pipeline that parameterizes catalog, schema, and schema location across dev and prod”
variables: catalog: description: Target catalog schema: description: Target schema schema_location_base: description: Base path for Auto Loader schema metadata
targets: dev: variables: catalog: dev_catalog schema: dev_${workspace.current_user.short_name} schema_location_base: /Volumes/dev_catalog/pipeline_metadata/schemas prod: variables: catalog: prod_catalog schema: production schema_location_base: /Volumes/prod_catalog/pipeline_metadata/schemasAccess these in pipeline code with spark.conf.get("schema_location_base"). Parameterizing at the bundle level means the same SQL or Python files deploy to any environment without edits.
Watch Out For
Section titled “Watch Out For”- Missing
STREAMkeyword —FROM read_files(...)is batch.FROM STREAM read_files(...)is incremental. Getting this wrong means the pipeline re-scans every file on every run, then fails if the target is a streaming table. - Schema location in the source Volume — storing Auto Loader schema metadata alongside source data causes permission conflicts and checkpoint corruption. Always use a dedicated metadata Volume.
- No
schemaHintson critical columns — fully inferred schemas can silently drift when source data changes. Hint at least your primary keys, amounts, and dates. - Kafka
startingOffsetson first run —latestskips all existing data. Useearliestif you need to backfill, but be ready for a large initial load.