Spark Declarative Pipelines
Skill: databricks-spark-declarative-pipelines
What You Can Build
Section titled “What You Can Build”You can stand up a complete medallion-architecture pipeline — bronze ingestion through gold analytics — using streaming tables, materialized views, and Auto Loader. Everything runs on serverless compute with Unity Catalog governance. Ask your AI coding assistant to scaffold the project and it will generate the Asset Bundle config, pipeline files, and multi-environment deployment in one pass.
In Action
Section titled “In Action”“Create a SQL bronze-silver-gold pipeline for e-commerce order data. Ingest JSON files from a Volume with Auto Loader, clean and deduplicate in silver, and aggregate daily revenue in gold.”
-- bronze: ingest raw JSON with Auto LoaderCREATE OR REFRESH STREAMING TABLE bronze_ordersCLUSTER BY (order_date)ASSELECT *, current_timestamp() AS _ingested_at, _metadata.file_path AS _source_fileFROM STREAM read_files( '/Volumes/catalog/schema/raw/orders/', format => 'json', schemaHints => 'order_id STRING, customer_id STRING, amount DECIMAL(10,2), order_date DATE');
-- silver: validate, deduplicate, cast typesCREATE OR REFRESH STREAMING TABLE silver_ordersCLUSTER BY (customer_id)ASSELECT order_id, customer_id, CAST(amount AS DECIMAL(10,2)) AS amount, order_date, _ingested_atFROM STREAM bronze_ordersWHERE order_id IS NOT NULL;
-- gold: daily revenue aggregationCREATE OR REFRESH MATERIALIZED VIEW gold_daily_revenueASSELECT order_date, COUNT(*) AS order_count, SUM(amount) AS total_revenue, AVG(amount) AS avg_order_valueFROM silver_ordersGROUP BY order_date;Key decisions:
STREAM read_files()withSTREAMkeyword — activates Auto Loader for incremental ingestion. WithoutSTREAM, it runs as a batch scan and fails with “cannot create streaming table from batch query.”schemaHintson critical columns — hint types for IDs, amounts, and dates so they don’t silently drift. Let the rest auto-infer.CLUSTER BYinstead ofPARTITION BY— Liquid Clustering is the modern default. Pick 1-4 columns your downstream queries filter on most.- Streaming table for bronze/silver, materialized view for gold — streaming tables handle append-only incremental data. Materialized views recompute on refresh, which is what you want for aggregations.
- Metadata columns —
_ingested_atand_source_filecost nothing and save hours debugging late-arriving or corrupt files.
More Patterns
Section titled “More Patterns”Initialize with Asset Bundles
Section titled “Initialize with Asset Bundles”“Set up a new SDP project with Asset Bundles for dev and prod environments”
databricks pipelines initThis generates a production-ready project structure:
my_pipeline/├── databricks.yml # Multi-environment config (dev/prod)├── resources/│ └── *_etl.pipeline.yml # Pipeline resource definition└── src/ └── *_etl/ ├── explorations/ # Exploratory code in .ipynb └── transformations/ # Your .sql or .py files hereDeploy with databricks bundle deploy for dev, databricks bundle deploy --target prod for production. The same SQL or Python files work across both environments because catalog and schema are parameterized in the bundle config.
Python pipeline with parameterized schemas
Section titled “Python pipeline with parameterized schemas”“Write a Python SDP pipeline where bronze uses the default schema but silver and gold write to separate schemas via pipeline parameters”
from pyspark import pipelines as dpfrom pyspark.sql import functions as F
silver_schema = spark.conf.get("silver_schema")gold_schema = spark.conf.get("gold_schema")
@dp.table(name="bronze_orders", cluster_by=["order_date"])def bronze_orders(): schema_location = spark.conf.get("schema_location_base") return ( spark.readStream.format("cloudFiles") .option("cloudFiles.format", "json") .option("cloudFiles.schemaLocation", f"{schema_location}/bronze_orders") .option("cloudFiles.inferColumnTypes", "true") .load("/Volumes/catalog/schema/raw/orders/") .withColumn("_ingested_at", F.current_timestamp()) )
@dp.table(name=f"{silver_schema}.orders_clean")def orders_clean(): return ( spark.read.table("bronze_orders") .filter(F.col("order_id").isNotNull()) )
@dp.materialized_view(name=f"{gold_schema}.daily_revenue")def daily_revenue(): return ( spark.read.table(f"{silver_schema}.orders_clean") .groupBy("order_date") .agg(F.sum("amount").alias("total_revenue")) )Python uses cloudFiles format instead of read_files(). The schemaLocation is mandatory with schema inference — it stores the inferred schema so Auto Loader skips re-scanning on every run. Store it in a metadata Volume, never alongside source data.
SCD Type 2 with AUTO CDC
Section titled “SCD Type 2 with AUTO CDC”“Add SCD Type 2 history tracking to my customer dimension table so I can query point-in-time state”
CREATE OR REFRESH STREAMING TABLE dim_customers;
APPLY CHANGES INTO dim_customersFROM STREAM silver_customersKEYS (customer_id)APPLY AS DELETE WHEN operation = 'DELETE'SEQUENCE BY updated_atSTORED AS SCD TYPE 2;AUTO CDC tracks the full change history. Query current state with WHERE __END_AT IS NULL. For point-in-time analysis, filter with WHERE __START_AT <= '2025-06-01' AND (__END_AT IS NULL OR __END_AT > '2025-06-01'). Note the double underscores on __START_AT and __END_AT.
Watch Out For
Section titled “Watch Out For”- Missing
STREAMkeyword —FROM read_files(...)is batch.FROM STREAM read_files(...)is incremental. Getting this wrong means the pipeline re-scans every file on every run, then errors on streaming tables. - Schema location in the source Volume — storing Auto Loader schema metadata alongside source data causes permission conflicts and checkpoint corruption. Use a dedicated metadata Volume.
import dltinstead ofpyspark.pipelines— the legacy DLT API is replaced byfrom pyspark import pipelines as dp. Use@dp.table(),@dp.materialized_view(), and@dp.temporary_view().- SCD2 column names — Lakeflow uses
__START_ATand__END_AT(double underscore), notSTART_AT/END_AT. Queries filtering on single-underscore columns silently return no results.