Skip to content

DLT to SDP Migration

Skill: databricks-spark-declarative-pipelines

If you have existing Delta Live Tables (DLT) pipelines, you need a clear path to Spark Declarative Pipelines (SDP). Ask your AI coding assistant to migrate a DLT pipeline and it will convert the Python dlt API to SQL or the modern dp API — handling the read syntax changes, CDC method renames, and the shift from partitioning to Liquid Clustering.

“Migrate this DLT Python bronze ingestion to SDP SQL with Auto Loader and ingestion metadata”

Before (DLT Python):

@dlt.table(name="bronze_sales", comment="Raw sales")
def bronze_sales():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("/mnt/raw/sales")
.withColumn("_ingested_at", F.current_timestamp())
)

After (SDP SQL):

CREATE OR REFRESH STREAMING TABLE bronze_sales
CLUSTER BY (sale_date)
COMMENT 'Raw sales'
AS
SELECT *, current_timestamp() AS _ingested_at
FROM STREAM read_files(
'/Volumes/catalog/schema/raw/sales/',
format => 'json',
schemaHints => 'sale_id STRING, amount DECIMAL(10,2), sale_date DATE'
);

Key decisions:

  • Python to SQL — most DLT pipelines used Python because it was the only option early on. SQL is simpler for transformations that do not need UDFs or complex logic. Migrate to SQL when you can.
  • /mnt/ to /Volumes/ — mount points are legacy. Unity Catalog Volumes are the modern path for file access with proper governance.
  • Added CLUSTER BY and schemaHints — the DLT original had neither. Migration is a good time to add Liquid Clustering and schema hints that should have been there from the start.

“Migrate a DLT silver table with expect_or_drop quality checks to SDP SQL”

Before (DLT Python):

@dlt.table(name="silver_sales")
@dlt.expect_or_drop("valid_amount", "amount > 0")
@dlt.expect_or_drop("valid_sale_id", "sale_id IS NOT NULL")
def silver_sales():
return (
dlt.read_stream("bronze_sales")
.withColumn("sale_date", F.to_date("sale_date"))
.withColumn("amount", F.col("amount").cast("decimal(10,2)"))
.select("sale_id", "customer_id", "amount", "sale_date")
)

After (SDP SQL):

CREATE OR REFRESH STREAMING TABLE silver_sales
CLUSTER BY (sale_date)
AS
SELECT
sale_id, customer_id,
CAST(amount AS DECIMAL(10,2)) AS amount,
CAST(sale_date AS DATE) AS sale_date
FROM STREAM bronze_sales
WHERE amount > 0 AND sale_id IS NOT NULL;

The @dlt.expect_or_drop decorators become a WHERE clause. If you need to track how many rows fail validation, split into a flag-and-route pattern: add a boolean _is_valid column, then create separate tables for clean and quarantined records.

“Convert a DLT apply_changes SCD Type 2 pipeline to SDP SQL Auto CDC”

Before (DLT Python):

dlt.create_streaming_table("customers_history")
dlt.apply_changes(
target="customers_history",
source="customers_cdc_clean",
keys=["customer_id"],
sequence_by="event_timestamp",
stored_as_scd_type="2",
track_history_column_list=["*"]
)

After (SDP SQL):

CREATE OR REFRESH STREAMING TABLE customers_history;
CREATE FLOW customers_scd2_flow AS
AUTO CDC INTO customers_history
FROM stream(customers_cdc_clean)
KEYS (customer_id)
APPLY AS DELETE WHEN operation = "DELETE"
SEQUENCE BY event_timestamp
COLUMNS * EXCEPT (operation, _ingested_at, _source_file)
STORED AS SCD TYPE 2;

dlt.apply_changes() becomes AUTO CDC in SQL or dp.create_auto_cdc_flow() in modern Python. The SQL version adds APPLY AS DELETE WHEN and COLUMNS * EXCEPT — clauses that were often missing in DLT pipelines and caused unnecessary columns in the history table.

“Migrate a DLT join between a streaming and static table to SDP SQL”

Before (DLT Python):

@dlt.table(name="silver_sales_enriched")
def silver_sales_enriched():
sales = dlt.read_stream("silver_sales")
products = dlt.read("dim_products")
return (
sales.join(products, "product_id", "left")
.select(sales["*"], products["product_name"], products["category"])
)

After (SDP SQL):

CREATE OR REFRESH STREAMING TABLE silver_sales_enriched AS
SELECT
s.*,
p.product_name,
p.category
FROM STREAM silver_sales s
LEFT JOIN dim_products p ON s.product_id = p.product_id;

dlt.read_stream() becomes STREAM table_name and dlt.read() becomes a direct table reference. The SQL version is shorter and clearer. The STREAM keyword on the left side makes it a stream-to-static join where new sales rows trigger lookups against the latest product data.

“Replace a Python UDF that categorizes amounts with an equivalent SQL CASE expression”

Before (DLT Python):

@F.udf(returnType=StringType())
def categorize_amount(amount):
if amount > 1000:
return "High"
elif amount > 100:
return "Medium"
else:
return "Low"
@dlt.table(name="sales_categorized")
def sales_categorized():
return (
dlt.read("sales")
.withColumn("category", categorize_amount(F.col("amount")))
)

After (SDP SQL):

CREATE OR REPLACE MATERIALIZED VIEW sales_categorized AS
SELECT
*,
CASE
WHEN amount > 1000 THEN 'High'
WHEN amount > 100 THEN 'Medium'
ELSE 'Low'
END AS category
FROM sales;

Python UDFs have serialization overhead and cannot be optimized by the query planner. If the logic fits in a SQL CASE expression, use it. Reserve Python UDFs for logic that genuinely cannot be expressed in SQL — external API calls, ML inference, complex string parsing.

  • dlt.read() implicit namespace — DLT resolved table names within an implicit LIVE schema. SDP uses Unity Catalog paths. Unqualified names in SQL resolve to the pipeline’s target catalog and schema. In Python, use spark.read.table("table_name") for the same behavior.
  • Mount paths/mnt/raw/... paths are legacy DBFS mounts. Replace with /Volumes/catalog/schema/volume_name/... during migration. This requires creating the corresponding Unity Catalog Volume first.
  • Missing quality checks after migration@dlt.expect_or_drop decorators are easy to forget when converting to SQL. Audit every DLT file for expectations and convert them to WHERE clauses or flag-and-route patterns.
  • mode => 'PERMISSIVE' for schema flexibility — if the DLT pipeline relied on Auto Loader’s permissive mode for handling schema changes, add mode => 'PERMISSIVE' to the read_files() call in SQL. Without it, schema mismatches cause failures instead of routing to _rescued_data.