Skip to content

Spark Declarative Pipelines

Skill: databricks-spark-declarative-pipelines

You can stand up a complete medallion-architecture pipeline — bronze ingestion through gold analytics — using streaming tables, materialized views, and Auto Loader. Everything runs on serverless compute with Unity Catalog governance. Ask your AI coding assistant to scaffold the project and it will generate the Asset Bundle config, pipeline files, and multi-environment deployment in one pass.

“Create a SQL bronze-silver-gold pipeline for e-commerce order data. Ingest JSON files from a Volume with Auto Loader, clean and deduplicate in silver, and aggregate daily revenue in gold.”

-- bronze: ingest raw JSON with Auto Loader
CREATE OR REFRESH STREAMING TABLE bronze_orders
CLUSTER BY (order_date)
AS
SELECT
*,
current_timestamp() AS _ingested_at,
_metadata.file_path AS _source_file
FROM STREAM read_files(
'/Volumes/catalog/schema/raw/orders/',
format => 'json',
schemaHints => 'order_id STRING, customer_id STRING, amount DECIMAL(10,2), order_date DATE'
);
-- silver: validate, deduplicate, cast types
CREATE OR REFRESH STREAMING TABLE silver_orders
CLUSTER BY (customer_id)
AS
SELECT
order_id,
customer_id,
CAST(amount AS DECIMAL(10,2)) AS amount,
order_date,
_ingested_at
FROM STREAM bronze_orders
WHERE order_id IS NOT NULL;
-- gold: daily revenue aggregation
CREATE OR REFRESH MATERIALIZED VIEW gold_daily_revenue
AS
SELECT
order_date,
COUNT(*) AS order_count,
SUM(amount) AS total_revenue,
AVG(amount) AS avg_order_value
FROM silver_orders
GROUP BY order_date;

Key decisions:

  • STREAM read_files() with STREAM keyword — activates Auto Loader for incremental ingestion. Without STREAM, it runs as a batch scan and fails with “cannot create streaming table from batch query.”
  • schemaHints on critical columns — hint types for IDs, amounts, and dates so they don’t silently drift. Let the rest auto-infer.
  • CLUSTER BY instead of PARTITION BY — Liquid Clustering is the modern default. Pick 1-4 columns your downstream queries filter on most.
  • Streaming table for bronze/silver, materialized view for gold — streaming tables handle append-only incremental data. Materialized views recompute on refresh, which is what you want for aggregations.
  • Metadata columns_ingested_at and _source_file cost nothing and save hours debugging late-arriving or corrupt files.

“Set up a new SDP project with Asset Bundles for dev and prod environments”

Terminal window
databricks pipelines init

This generates a production-ready project structure:

my_pipeline/
├── databricks.yml # Multi-environment config (dev/prod)
├── resources/
│ └── *_etl.pipeline.yml # Pipeline resource definition
└── src/
└── *_etl/
├── explorations/ # Exploratory code in .ipynb
└── transformations/ # Your .sql or .py files here

Deploy with databricks bundle deploy for dev, databricks bundle deploy --target prod for production. The same SQL or Python files work across both environments because catalog and schema are parameterized in the bundle config.

Python pipeline with parameterized schemas

Section titled “Python pipeline with parameterized schemas”

“Write a Python SDP pipeline where bronze uses the default schema but silver and gold write to separate schemas via pipeline parameters”

from pyspark import pipelines as dp
from pyspark.sql import functions as F
silver_schema = spark.conf.get("silver_schema")
gold_schema = spark.conf.get("gold_schema")
@dp.table(name="bronze_orders", cluster_by=["order_date"])
def bronze_orders():
schema_location = spark.conf.get("schema_location_base")
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", f"{schema_location}/bronze_orders")
.option("cloudFiles.inferColumnTypes", "true")
.load("/Volumes/catalog/schema/raw/orders/")
.withColumn("_ingested_at", F.current_timestamp())
)
@dp.table(name=f"{silver_schema}.orders_clean")
def orders_clean():
return (
spark.read.table("bronze_orders")
.filter(F.col("order_id").isNotNull())
)
@dp.materialized_view(name=f"{gold_schema}.daily_revenue")
def daily_revenue():
return (
spark.read.table(f"{silver_schema}.orders_clean")
.groupBy("order_date")
.agg(F.sum("amount").alias("total_revenue"))
)

Python uses cloudFiles format instead of read_files(). The schemaLocation is mandatory with schema inference — it stores the inferred schema so Auto Loader skips re-scanning on every run. Store it in a metadata Volume, never alongside source data.

“Add SCD Type 2 history tracking to my customer dimension table so I can query point-in-time state”

CREATE OR REFRESH STREAMING TABLE dim_customers;
APPLY CHANGES INTO dim_customers
FROM STREAM silver_customers
KEYS (customer_id)
APPLY AS DELETE WHEN operation = 'DELETE'
SEQUENCE BY updated_at
STORED AS SCD TYPE 2;

AUTO CDC tracks the full change history. Query current state with WHERE __END_AT IS NULL. For point-in-time analysis, filter with WHERE __START_AT <= '2025-06-01' AND (__END_AT IS NULL OR __END_AT > '2025-06-01'). Note the double underscores on __START_AT and __END_AT.

  • Missing STREAM keywordFROM read_files(...) is batch. FROM STREAM read_files(...) is incremental. Getting this wrong means the pipeline re-scans every file on every run, then errors on streaming tables.
  • Schema location in the source Volume — storing Auto Loader schema metadata alongside source data causes permission conflicts and checkpoint corruption. Use a dedicated metadata Volume.
  • import dlt instead of pyspark.pipelines — the legacy DLT API is replaced by from pyspark import pipelines as dp. Use @dp.table(), @dp.materialized_view(), and @dp.temporary_view().
  • SCD2 column names — Lakeflow uses __START_AT and __END_AT (double underscore), not START_AT/END_AT. Queries filtering on single-underscore columns silently return no results.