Ingest from Cloud Storage and Kafka in SQL
Skill: databricks-spark-declarative-pipelines
What You Can Build
Section titled “What You Can Build”Every data pipeline starts with ingestion. Spark Declarative Pipelines expose two SQL-native entry points — read_files() for cloud storage (Auto Loader under the hood) and read_kafka() for message brokers. Your AI coding assistant can generate production-grade bronze tables that handle schema drift, quarantine malformed records, and track lineage metadata, all from a single prompt describing your source.
In Action
Section titled “In Action”“Create a bronze streaming table that ingests JSON order files from /Volumes/retail/prod/raw/orders/. Pin the schema for order_id and amount so they don’t drift, and add ingestion tracking columns.”
CREATE OR REFRESH STREAMING TABLE bronze_ordersCOMMENT 'Raw order files via Auto Loader'ASSELECT *, current_timestamp() AS _ingested_at, _metadata.file_path AS _source_file, _metadata.file_modification_time AS _file_modified_atFROM STREAM read_files( '/Volumes/retail/prod/raw/orders/', format => 'json', schemaHints => 'order_id STRING, amount DECIMAL(10,2), order_date DATE');Key decisions:
STREAM read_files()over bareread_files()— the STREAM keyword activates Auto Loader’s incremental file tracking so only new files get processed on each pipeline refresh- schemaHints for critical columns — pins types on the columns that matter for downstream joins and aggregations while still allowing Auto Loader to infer the rest
- _metadata columns for lineage —
file_pathandfile_modification_timelet you trace any row back to the exact file and landing time, which is essential for debugging late-arriving data - No explicit schema evolution mode — Auto Loader defaults to schema inference with rescue; adding
mode => 'PERMISSIVE'is only needed when you want to keep parsing even when the entire schema shifts
More Patterns
Section titled “More Patterns”Quarantine malformed records
Section titled “Quarantine malformed records”“Set up a bronze ingestion layer for event JSON files that separates clean records from malformed ones into a quarantine table.”
-- Bronze: ingest everything, flag parse failuresCREATE OR REFRESH STREAMING TABLE bronze_eventsCOMMENT 'Raw events with rescue column for malformed records'ASSELECT *, current_timestamp() AS _ingested_at, _metadata.file_path AS _source_file, CASE WHEN _rescued_data IS NOT NULL THEN true ELSE false END AS _has_parse_errorsFROM STREAM read_files( '/Volumes/retail/prod/raw/events/', format => 'json');
-- Quarantine: isolate bad records for investigationCREATE OR REFRESH STREAMING TABLE bronze_events_quarantineCOMMENT 'Malformed event records for review'ASSELECT * FROM STREAM bronze_eventsWHERE _rescued_data IS NOT NULL;
-- Clean feed for silver layerCREATE OR REFRESH STREAMING TABLE bronze_events_cleanASSELECT * FROM STREAM bronze_eventsWHERE _rescued_data IS NULL;Auto Loader automatically populates _rescued_data with any JSON fields that failed to parse against the inferred or hinted schema. Splitting clean and quarantined records into separate streaming tables means your silver layer never sees bad data, but you still retain everything for root cause analysis.
Kafka ingestion with JSON parsing
Section titled “Kafka ingestion with JSON parsing”“Ingest from a Kafka topic called clickstream-events. Deserialize the value as JSON with known fields event_id, event_type, and user_id. Use pipeline variables for the broker address.”
-- Bronze: raw Kafka envelopeCREATE OR REFRESH STREAMING TABLE bronze_clickstreamCOMMENT 'Raw Kafka messages from clickstream topic'ASSELECT CAST(key AS STRING) AS event_key, CAST(value AS STRING) AS raw_payload, topic, partition, offset, timestamp AS kafka_timestamp, current_timestamp() AS _ingested_atFROM read_kafka( bootstrapServers => '${kafka_brokers}', subscribe => 'clickstream-events', startingOffsets => 'latest');
-- Silver: parsed JSON payloadCREATE OR REFRESH STREAMING TABLE silver_clickstreamCOMMENT 'Parsed clickstream events'ASSELECT event_key, parsed.event_id, parsed.event_type, parsed.user_id, parsed.page_url, kafka_timestamp, _ingested_atFROM ( SELECT *, from_json( raw_payload, 'event_id STRING, event_type STRING, user_id STRING, page_url STRING' ) AS parsed FROM STREAM bronze_clickstream);Two-stage Kafka ingestion is the standard pattern: bronze preserves the raw envelope (key, offset, partition) for replay and debugging, silver deserializes the payload into typed columns. The from_json() schema string acts as a contract — fields missing from the payload come through as null rather than failing the pipeline.
CSV ingestion with header handling
Section titled “CSV ingestion with header handling”“Ingest CSV files with headers from a volume path. The files have pipe delimiters and the amount column should be DECIMAL.”
CREATE OR REFRESH STREAMING TABLE bronze_transactionsCOMMENT 'CSV transaction files with pipe delimiter'ASSELECT *, current_timestamp() AS _ingested_at, _metadata.file_path AS _source_fileFROM STREAM read_files( '/Volumes/finance/prod/raw/transactions/', format => 'csv', header => 'true', delimiter => '|', schemaHints => 'amount DECIMAL(12,2), transaction_date DATE');CSV options like header, delimiter, and quote pass through directly to Auto Loader. Always set schemaHints on numeric and date columns in CSV sources — type inference on string-formatted files is unreliable for precision-sensitive fields like currency.
Watch Out For
Section titled “Watch Out For”read_files()without STREAM —FROM read_files(...)runs a batch scan that reprocesses every file on each refresh. Always useFROM STREAM read_files(...)for streaming tables unless you intentionally want full recomputation.- Skipping schemaHints on production pipelines — Auto Loader infers types from the first batch of files. If early files have nulls in a DECIMAL column, it may infer STRING. Pin critical column types with schemaHints from the start.
- Kafka
startingOffsetsset toearliestin production — this replays the entire topic history on first run, which can overwhelm the pipeline. Uselatestfor new pipelines andearliestonly for intentional backfills. - Secrets in plain text — use
\{\{secrets/scope/key\}\}syntax for Kafka credentials and Event Hub connection strings. Never hardcode passwords in pipeline SQL, even in development.