DLT to SDP Migration
Skill: databricks-spark-declarative-pipelines
What You Can Build
Section titled “What You Can Build”If you have existing Delta Live Tables (DLT) pipelines, you need a clear path to Spark Declarative Pipelines (SDP). Ask your AI coding assistant to migrate a DLT pipeline and it will convert the Python dlt API to SQL or the modern dp API — handling the read syntax changes, CDC method renames, and the shift from partitioning to Liquid Clustering.
In Action
Section titled “In Action”“Migrate this DLT Python bronze ingestion to SDP SQL with Auto Loader and ingestion metadata”
Before (DLT Python):
@dlt.table(name="bronze_sales", comment="Raw sales")def bronze_sales(): return ( spark.readStream.format("cloudFiles") .option("cloudFiles.format", "json") .load("/mnt/raw/sales") .withColumn("_ingested_at", F.current_timestamp()) )After (SDP SQL):
CREATE OR REFRESH STREAMING TABLE bronze_salesCLUSTER BY (sale_date)COMMENT 'Raw sales'ASSELECT *, current_timestamp() AS _ingested_atFROM STREAM read_files( '/Volumes/catalog/schema/raw/sales/', format => 'json', schemaHints => 'sale_id STRING, amount DECIMAL(10,2), sale_date DATE');Key decisions:
- Python to SQL — most DLT pipelines used Python because it was the only option early on. SQL is simpler for transformations that do not need UDFs or complex logic. Migrate to SQL when you can.
/mnt/to/Volumes/— mount points are legacy. Unity Catalog Volumes are the modern path for file access with proper governance.- Added
CLUSTER BYandschemaHints— the DLT original had neither. Migration is a good time to add Liquid Clustering and schema hints that should have been there from the start.
More Patterns
Section titled “More Patterns”Silver validation layer
Section titled “Silver validation layer”“Migrate a DLT silver table with expect_or_drop quality checks to SDP SQL”
Before (DLT Python):
@dlt.table(name="silver_sales")@dlt.expect_or_drop("valid_amount", "amount > 0")@dlt.expect_or_drop("valid_sale_id", "sale_id IS NOT NULL")def silver_sales(): return ( dlt.read_stream("bronze_sales") .withColumn("sale_date", F.to_date("sale_date")) .withColumn("amount", F.col("amount").cast("decimal(10,2)")) .select("sale_id", "customer_id", "amount", "sale_date") )After (SDP SQL):
CREATE OR REFRESH STREAMING TABLE silver_salesCLUSTER BY (sale_date)ASSELECT sale_id, customer_id, CAST(amount AS DECIMAL(10,2)) AS amount, CAST(sale_date AS DATE) AS sale_dateFROM STREAM bronze_salesWHERE amount > 0 AND sale_id IS NOT NULL;The @dlt.expect_or_drop decorators become a WHERE clause. If you need to track how many rows fail validation, split into a flag-and-route pattern: add a boolean _is_valid column, then create separate tables for clean and quarantined records.
CDC with apply_changes to Auto CDC
Section titled “CDC with apply_changes to Auto CDC”“Convert a DLT apply_changes SCD Type 2 pipeline to SDP SQL Auto CDC”
Before (DLT Python):
dlt.create_streaming_table("customers_history")
dlt.apply_changes( target="customers_history", source="customers_cdc_clean", keys=["customer_id"], sequence_by="event_timestamp", stored_as_scd_type="2", track_history_column_list=["*"])After (SDP SQL):
CREATE OR REFRESH STREAMING TABLE customers_history;
CREATE FLOW customers_scd2_flow ASAUTO CDC INTO customers_historyFROM stream(customers_cdc_clean)KEYS (customer_id)APPLY AS DELETE WHEN operation = "DELETE"SEQUENCE BY event_timestampCOLUMNS * EXCEPT (operation, _ingested_at, _source_file)STORED AS SCD TYPE 2;dlt.apply_changes() becomes AUTO CDC in SQL or dp.create_auto_cdc_flow() in modern Python. The SQL version adds APPLY AS DELETE WHEN and COLUMNS * EXCEPT — clauses that were often missing in DLT pipelines and caused unnecessary columns in the history table.
Stream-to-static join enrichment
Section titled “Stream-to-static join enrichment”“Migrate a DLT join between a streaming and static table to SDP SQL”
Before (DLT Python):
@dlt.table(name="silver_sales_enriched")def silver_sales_enriched(): sales = dlt.read_stream("silver_sales") products = dlt.read("dim_products") return ( sales.join(products, "product_id", "left") .select(sales["*"], products["product_name"], products["category"]) )After (SDP SQL):
CREATE OR REFRESH STREAMING TABLE silver_sales_enriched ASSELECT s.*, p.product_name, p.categoryFROM STREAM silver_sales sLEFT JOIN dim_products p ON s.product_id = p.product_id;dlt.read_stream() becomes STREAM table_name and dlt.read() becomes a direct table reference. The SQL version is shorter and clearer. The STREAM keyword on the left side makes it a stream-to-static join where new sales rows trigger lookups against the latest product data.
UDF logic to SQL CASE expressions
Section titled “UDF logic to SQL CASE expressions”“Replace a Python UDF that categorizes amounts with an equivalent SQL CASE expression”
Before (DLT Python):
@F.udf(returnType=StringType())def categorize_amount(amount): if amount > 1000: return "High" elif amount > 100: return "Medium" else: return "Low"
@dlt.table(name="sales_categorized")def sales_categorized(): return ( dlt.read("sales") .withColumn("category", categorize_amount(F.col("amount"))) )After (SDP SQL):
CREATE OR REPLACE MATERIALIZED VIEW sales_categorized ASSELECT *, CASE WHEN amount > 1000 THEN 'High' WHEN amount > 100 THEN 'Medium' ELSE 'Low' END AS categoryFROM sales;Python UDFs have serialization overhead and cannot be optimized by the query planner. If the logic fits in a SQL CASE expression, use it. Reserve Python UDFs for logic that genuinely cannot be expressed in SQL — external API calls, ML inference, complex string parsing.
Watch Out For
Section titled “Watch Out For”dlt.read()implicit namespace — DLT resolved table names within an implicitLIVEschema. SDP uses Unity Catalog paths. Unqualified names in SQL resolve to the pipeline’s target catalog and schema. In Python, usespark.read.table("table_name")for the same behavior.- Mount paths —
/mnt/raw/...paths are legacy DBFS mounts. Replace with/Volumes/catalog/schema/volume_name/...during migration. This requires creating the corresponding Unity Catalog Volume first. - Missing quality checks after migration —
@dlt.expect_or_dropdecorators are easy to forget when converting to SQL. Audit every DLT file for expectations and convert them to WHERE clauses or flag-and-route patterns. mode => 'PERMISSIVE'for schema flexibility — if the DLT pipeline relied on Auto Loader’s permissive mode for handling schema changes, addmode => 'PERMISSIVE'to theread_files()call in SQL. Without it, schema mismatches cause failures instead of routing to_rescued_data.