Skip to content

Ingest from Cloud Storage and Kafka in SQL

Skill: databricks-spark-declarative-pipelines

Every data pipeline starts with ingestion. Spark Declarative Pipelines expose two SQL-native entry points — read_files() for cloud storage (Auto Loader under the hood) and read_kafka() for message brokers. Your AI coding assistant can generate production-grade bronze tables that handle schema drift, quarantine malformed records, and track lineage metadata, all from a single prompt describing your source.

“Create a bronze streaming table that ingests JSON order files from /Volumes/retail/prod/raw/orders/. Pin the schema for order_id and amount so they don’t drift, and add ingestion tracking columns.”

CREATE OR REFRESH STREAMING TABLE bronze_orders
COMMENT 'Raw order files via Auto Loader'
AS
SELECT
*,
current_timestamp() AS _ingested_at,
_metadata.file_path AS _source_file,
_metadata.file_modification_time AS _file_modified_at
FROM STREAM read_files(
'/Volumes/retail/prod/raw/orders/',
format => 'json',
schemaHints => 'order_id STRING, amount DECIMAL(10,2), order_date DATE'
);

Key decisions:

  • STREAM read_files() over bare read_files() — the STREAM keyword activates Auto Loader’s incremental file tracking so only new files get processed on each pipeline refresh
  • schemaHints for critical columns — pins types on the columns that matter for downstream joins and aggregations while still allowing Auto Loader to infer the rest
  • _metadata columns for lineage — file_path and file_modification_time let you trace any row back to the exact file and landing time, which is essential for debugging late-arriving data
  • No explicit schema evolution mode — Auto Loader defaults to schema inference with rescue; adding mode => 'PERMISSIVE' is only needed when you want to keep parsing even when the entire schema shifts

“Set up a bronze ingestion layer for event JSON files that separates clean records from malformed ones into a quarantine table.”

-- Bronze: ingest everything, flag parse failures
CREATE OR REFRESH STREAMING TABLE bronze_events
COMMENT 'Raw events with rescue column for malformed records'
AS
SELECT
*,
current_timestamp() AS _ingested_at,
_metadata.file_path AS _source_file,
CASE
WHEN _rescued_data IS NOT NULL THEN true
ELSE false
END AS _has_parse_errors
FROM STREAM read_files(
'/Volumes/retail/prod/raw/events/',
format => 'json'
);
-- Quarantine: isolate bad records for investigation
CREATE OR REFRESH STREAMING TABLE bronze_events_quarantine
COMMENT 'Malformed event records for review'
AS
SELECT * FROM STREAM bronze_events
WHERE _rescued_data IS NOT NULL;
-- Clean feed for silver layer
CREATE OR REFRESH STREAMING TABLE bronze_events_clean
AS
SELECT * FROM STREAM bronze_events
WHERE _rescued_data IS NULL;

Auto Loader automatically populates _rescued_data with any JSON fields that failed to parse against the inferred or hinted schema. Splitting clean and quarantined records into separate streaming tables means your silver layer never sees bad data, but you still retain everything for root cause analysis.

“Ingest from a Kafka topic called clickstream-events. Deserialize the value as JSON with known fields event_id, event_type, and user_id. Use pipeline variables for the broker address.”

-- Bronze: raw Kafka envelope
CREATE OR REFRESH STREAMING TABLE bronze_clickstream
COMMENT 'Raw Kafka messages from clickstream topic'
AS
SELECT
CAST(key AS STRING) AS event_key,
CAST(value AS STRING) AS raw_payload,
topic,
partition,
offset,
timestamp AS kafka_timestamp,
current_timestamp() AS _ingested_at
FROM read_kafka(
bootstrapServers => '${kafka_brokers}',
subscribe => 'clickstream-events',
startingOffsets => 'latest'
);
-- Silver: parsed JSON payload
CREATE OR REFRESH STREAMING TABLE silver_clickstream
COMMENT 'Parsed clickstream events'
AS
SELECT
event_key,
parsed.event_id,
parsed.event_type,
parsed.user_id,
parsed.page_url,
kafka_timestamp,
_ingested_at
FROM (
SELECT
*,
from_json(
raw_payload,
'event_id STRING, event_type STRING, user_id STRING, page_url STRING'
) AS parsed
FROM STREAM bronze_clickstream
);

Two-stage Kafka ingestion is the standard pattern: bronze preserves the raw envelope (key, offset, partition) for replay and debugging, silver deserializes the payload into typed columns. The from_json() schema string acts as a contract — fields missing from the payload come through as null rather than failing the pipeline.

“Ingest CSV files with headers from a volume path. The files have pipe delimiters and the amount column should be DECIMAL.”

CREATE OR REFRESH STREAMING TABLE bronze_transactions
COMMENT 'CSV transaction files with pipe delimiter'
AS
SELECT
*,
current_timestamp() AS _ingested_at,
_metadata.file_path AS _source_file
FROM STREAM read_files(
'/Volumes/finance/prod/raw/transactions/',
format => 'csv',
header => 'true',
delimiter => '|',
schemaHints => 'amount DECIMAL(12,2), transaction_date DATE'
);

CSV options like header, delimiter, and quote pass through directly to Auto Loader. Always set schemaHints on numeric and date columns in CSV sources — type inference on string-formatted files is unreliable for precision-sensitive fields like currency.

  • read_files() without STREAMFROM read_files(...) runs a batch scan that reprocesses every file on each refresh. Always use FROM STREAM read_files(...) for streaming tables unless you intentionally want full recomputation.
  • Skipping schemaHints on production pipelines — Auto Loader infers types from the first batch of files. If early files have nulls in a DECIMAL column, it may infer STRING. Pin critical column types with schemaHints from the start.
  • Kafka startingOffsets set to earliest in production — this replays the entire topic history on first run, which can overwhelm the pipeline. Use latest for new pipelines and earliest only for intentional backfills.
  • Secrets in plain text — use \{\{secrets/scope/key\}\} syntax for Kafka credentials and Event Hub connection strings. Never hardcode passwords in pipeline SQL, even in development.