Skip to content

Python API: Modern vs Legacy

Skill: databricks-spark-declarative-pipelines

The Python API for Spark Declarative Pipelines moved from import dlt to from pyspark import pipelines as dp. Ask your AI coding assistant to write pipeline code and it will use the modern API — but when you are migrating existing DLT pipelines, knowing the exact mapping between old and new saves hours of trial and error.

“Write a Python bronze streaming table using the modern dp API that ingests JSON from cloud storage with Auto Loader”

from pyspark import pipelines as dp
from pyspark.sql import functions as F
schema_location_base = spark.conf.get("schema_location_base")
@dp.table(name="bronze_events", cluster_by=["event_date"])
def bronze_events():
return (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", f"{schema_location_base}/bronze_events")
.load("/Volumes/catalog/schema/raw/events/")
.withColumn("_ingested_at", F.current_timestamp())
.withColumn("event_date", F.current_date())
)

Key decisions:

  • from pyspark import pipelines as dp — this is the modern import. The dp alias is the convention. Never use import dlt in new code.
  • spark.read.table() and spark.readStream.table() — replace dlt.read() and dlt.read_stream(). Standard Spark reads work directly; no special pipeline-specific read functions needed.
  • cluster_by instead of partition_cols — Liquid Clustering is a parameter on the decorator. The old partition_cols parameter still works but should not be used in new pipelines.
  • @dp.table() is context-aware — if the function returns a streaming DataFrame, SDP creates a streaming table. If it returns a batch DataFrame, it creates a materialized view. No separate decorators needed.

“Add quality expectations to a Python silver table that drops rows with null IDs and fails the pipeline if timestamps are missing”

@dp.table(name="silver_validated")
@dp.expect_or_drop("valid_id", "id IS NOT NULL")
@dp.expect_or_drop("valid_amount", "amount > 0")
@dp.expect_or_fail("critical_field", "timestamp IS NOT NULL")
def silver_validated():
return spark.read.table("bronze_events")

expect_or_drop silently removes bad rows. expect_or_fail halts the pipeline — use it for invariants that indicate a broken source. The expressions are SQL predicates evaluated per row. Check pipeline event logs to see how many rows each expectation filtered.

“Set up SCD Type 2 change tracking on a customer CDC feed using the dp API in Python”

from pyspark.sql.functions import col
dp.create_streaming_table("customers_history")
dp.create_auto_cdc_flow(
target="customers_history",
source="customers_cdc",
keys=["customer_id"],
sequence_by=col("event_timestamp"),
stored_as_scd_type="2",
track_history_column_list=["*"]
)

create_auto_cdc_flow() replaces the legacy dlt.apply_changes(). The parameters are nearly identical, but sequence_by now takes a Column object (col("...")) instead of a plain string. The target streaming table must be created first with create_streaming_table().

“Create a Python bronze table with Liquid Clustering, auto-optimize, and auto-compact enabled”

@dp.table(
name="bronze_events",
table_properties={
"delta.autoOptimize.optimizeWrite": "true",
"delta.autoOptimize.autoCompact": "true"
},
cluster_by=["event_type", "event_date"]
)
def bronze_events():
return (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", f"{schema_location_base}/bronze_events")
.load("/Volumes/catalog/schema/raw/events/")
)

The table_properties dict maps directly to Delta table properties. cluster_by accepts a list of column names. For automatic clustering, pass cluster_by=["AUTO"] and Databricks will choose columns based on query patterns.

“Show me the key differences between the legacy dlt API and the modern dp API so I can update my imports”

# === Import ===
# Legacy: import dlt
# Modern: from pyspark import pipelines as dp
# === Table decorator ===
# Legacy: @dlt.table(name="my_table")
# Modern: @dp.table(name="my_table")
# === Reading tables ===
# Legacy: dlt.read("source_table")
# Modern: spark.read.table("source_table")
# Legacy: dlt.read_stream("source_table")
# Modern: spark.readStream.table("source_table")
# === CDC ===
# Legacy: dlt.apply_changes(target=..., source=..., ...)
# Modern: dp.create_auto_cdc_flow(target=..., source=..., ...)
# === Clustering ===
# Legacy: @dlt.table(partition_cols=["date"])
# Modern: @dp.table(cluster_by=["date", "region"])

The mapping is mechanical — a find-and-replace covers most of it. The significant behavioral change is table reads: dlt.read() resolved names within an implicit LIVE schema, while spark.read.table() uses Unity Catalog paths directly. Unqualified names resolve to the pipeline’s configured target catalog and schema.

  • sequence_by type change — in the legacy API, sequence_by accepted a string. In the modern API, it requires a Column object: col("event_timestamp"), not "event_timestamp".
  • stored_as_scd_type is a string — SCD Type 2 in Python is "2" (string), not 2 (integer). SCD Type 1 is "1". In SQL, the syntax is STORED AS SCD TYPE 2 with no quotes.
  • Implicit LIVE schema is gonedlt.read("my_table") used an implicit namespace. spark.read.table("my_table") resolves against the pipeline’s target catalog and schema. For cross-schema reads, use fully qualified names.
  • dp.read() and dp.read_stream() do not exist — some migration guides reference these. They were never part of the modern API. Use spark.read.table() and spark.readStream.table() directly.