Python API: Modern vs Legacy
Skill: databricks-spark-declarative-pipelines
What You Can Build
Section titled “What You Can Build”The Python API for Spark Declarative Pipelines moved from import dlt to from pyspark import pipelines as dp. Ask your AI coding assistant to write pipeline code and it will use the modern API — but when you are migrating existing DLT pipelines, knowing the exact mapping between old and new saves hours of trial and error.
In Action
Section titled “In Action”“Write a Python bronze streaming table using the modern dp API that ingests JSON from cloud storage with Auto Loader”
from pyspark import pipelines as dpfrom pyspark.sql import functions as F
schema_location_base = spark.conf.get("schema_location_base")
@dp.table(name="bronze_events", cluster_by=["event_date"])def bronze_events(): return ( spark.readStream .format("cloudFiles") .option("cloudFiles.format", "json") .option("cloudFiles.schemaLocation", f"{schema_location_base}/bronze_events") .load("/Volumes/catalog/schema/raw/events/") .withColumn("_ingested_at", F.current_timestamp()) .withColumn("event_date", F.current_date()) )Key decisions:
from pyspark import pipelines as dp— this is the modern import. Thedpalias is the convention. Never useimport dltin new code.spark.read.table()andspark.readStream.table()— replacedlt.read()anddlt.read_stream(). Standard Spark reads work directly; no special pipeline-specific read functions needed.cluster_byinstead ofpartition_cols— Liquid Clustering is a parameter on the decorator. The oldpartition_colsparameter still works but should not be used in new pipelines.@dp.table()is context-aware — if the function returns a streaming DataFrame, SDP creates a streaming table. If it returns a batch DataFrame, it creates a materialized view. No separate decorators needed.
More Patterns
Section titled “More Patterns”Data quality expectations
Section titled “Data quality expectations”“Add quality expectations to a Python silver table that drops rows with null IDs and fails the pipeline if timestamps are missing”
@dp.table(name="silver_validated")@dp.expect_or_drop("valid_id", "id IS NOT NULL")@dp.expect_or_drop("valid_amount", "amount > 0")@dp.expect_or_fail("critical_field", "timestamp IS NOT NULL")def silver_validated(): return spark.read.table("bronze_events")expect_or_drop silently removes bad rows. expect_or_fail halts the pipeline — use it for invariants that indicate a broken source. The expressions are SQL predicates evaluated per row. Check pipeline event logs to see how many rows each expectation filtered.
Auto CDC with the modern API
Section titled “Auto CDC with the modern API”“Set up SCD Type 2 change tracking on a customer CDC feed using the dp API in Python”
from pyspark.sql.functions import col
dp.create_streaming_table("customers_history")
dp.create_auto_cdc_flow( target="customers_history", source="customers_cdc", keys=["customer_id"], sequence_by=col("event_timestamp"), stored_as_scd_type="2", track_history_column_list=["*"])create_auto_cdc_flow() replaces the legacy dlt.apply_changes(). The parameters are nearly identical, but sequence_by now takes a Column object (col("...")) instead of a plain string. The target streaming table must be created first with create_streaming_table().
Table properties and Liquid Clustering
Section titled “Table properties and Liquid Clustering”“Create a Python bronze table with Liquid Clustering, auto-optimize, and auto-compact enabled”
@dp.table( name="bronze_events", table_properties={ "delta.autoOptimize.optimizeWrite": "true", "delta.autoOptimize.autoCompact": "true" }, cluster_by=["event_type", "event_date"])def bronze_events(): return ( spark.readStream .format("cloudFiles") .option("cloudFiles.format", "json") .option("cloudFiles.schemaLocation", f"{schema_location_base}/bronze_events") .load("/Volumes/catalog/schema/raw/events/") )The table_properties dict maps directly to Delta table properties. cluster_by accepts a list of column names. For automatic clustering, pass cluster_by=["AUTO"] and Databricks will choose columns based on query patterns.
Side-by-side API mapping
Section titled “Side-by-side API mapping”“Show me the key differences between the legacy dlt API and the modern dp API so I can update my imports”
# === Import ===# Legacy: import dlt# Modern: from pyspark import pipelines as dp
# === Table decorator ===# Legacy: @dlt.table(name="my_table")# Modern: @dp.table(name="my_table")
# === Reading tables ===# Legacy: dlt.read("source_table")# Modern: spark.read.table("source_table")
# Legacy: dlt.read_stream("source_table")# Modern: spark.readStream.table("source_table")
# === CDC ===# Legacy: dlt.apply_changes(target=..., source=..., ...)# Modern: dp.create_auto_cdc_flow(target=..., source=..., ...)
# === Clustering ===# Legacy: @dlt.table(partition_cols=["date"])# Modern: @dp.table(cluster_by=["date", "region"])The mapping is mechanical — a find-and-replace covers most of it. The significant behavioral change is table reads: dlt.read() resolved names within an implicit LIVE schema, while spark.read.table() uses Unity Catalog paths directly. Unqualified names resolve to the pipeline’s configured target catalog and schema.
Watch Out For
Section titled “Watch Out For”sequence_bytype change — in the legacy API,sequence_byaccepted a string. In the modern API, it requires a Column object:col("event_timestamp"), not"event_timestamp".stored_as_scd_typeis a string — SCD Type 2 in Python is"2"(string), not2(integer). SCD Type 1 is"1". In SQL, the syntax isSTORED AS SCD TYPE 2with no quotes.- Implicit LIVE schema is gone —
dlt.read("my_table")used an implicit namespace.spark.read.table("my_table")resolves against the pipeline’s target catalog and schema. For cross-schema reads, use fully qualified names. dp.read()anddp.read_stream()do not exist — some migration guides reference these. They were never part of the modern API. Usespark.read.table()andspark.readStream.table()directly.