Merge Operations
Skill: databricks-spark-structured-streaming
What You Can Build
Section titled “What You Can Build”A raw append is fine for bronze ingestion, but most silver and gold tables need upsert semantics — insert new records, update existing ones, and optionally delete. Delta MERGE inside foreachBatch gives you that. Combined with Liquid Clustering, Deletion Vectors, and Row-Level Concurrency, you get production-grade merge performance without manual OPTIMIZE pauses.
In Action
Section titled “In Action”“Write a Python streaming pipeline that upserts incoming events into a Delta table using MERGE inside foreachBatch, with Liquid Clustering enabled on the target table.”
CREATE TABLE silver.orders ( id STRING, name STRING, amount DECIMAL(10,2), updated_at TIMESTAMP) USING DELTACLUSTER BY (id)TBLPROPERTIES ( 'delta.enableDeletionVectors' = true, 'delta.enableRowLevelConcurrency' = true);def upsert_batch(batch_df, batch_id): batch_df.createOrReplaceTempView("updates") spark.sql(""" MERGE INTO silver.orders t USING updates s ON t.id = s.id WHEN MATCHED THEN UPDATE SET * WHEN NOT MATCHED THEN INSERT * """)
stream.writeStream \ .foreachBatch(upsert_batch) \ .option("checkpointLocation", "/Volumes/catalog/checkpoints/silver_orders") \ .trigger(processingTime="30 seconds") \ .start()Key decisions:
- Liquid Clustering replaces OPTIMIZE + ZORDER. The
CLUSTER BY (id)clause tells Delta to incrementally co-locate data by the merge key. No scheduled OPTIMIZE jobs, no periodic ZORDER — the table self-organizes during writes. - Deletion Vectors avoid full file rewrites on updates. Instead of rewriting a 128MB file to change one row, Spark writes a small deletion vector. This drops merge P99 latency significantly.
- Row-Level Concurrency allows concurrent MERGE operations on different rows of the same table without conflicts. Without it, two streams merging into the same table will throw
ConcurrentAppendException. foreachBatchis required because MERGE is not a native streaming sink. Each microbatch materializes as a DataFrame, creates a temp view, and runs a SQL MERGE against the target.
More Patterns
Section titled “More Patterns”Parallel MERGE to Multiple Tables
Section titled “Parallel MERGE to Multiple Tables”“Write a Python foreachBatch that merges incoming data into three different silver tables in parallel.”
from delta.tables import DeltaTablefrom concurrent.futures import ThreadPoolExecutor, as_completed
def parallel_merge(batch_df, batch_id): batch_df.cache()
def merge_one(table_name, merge_key): target = DeltaTable.forName(spark, table_name) (target.alias("target") .merge(batch_df.alias("source"), f"target.{merge_key} = source.{merge_key}") .whenMatchedUpdateAll() .whenNotMatchedInsertAll() .execute())
tables = [ ("silver.customers", "customer_id"), ("silver.orders", "order_id"), ("silver.products", "product_id") ]
max_workers = min(len(tables), max(2, total_cores // 2))
with ThreadPoolExecutor(max_workers=max_workers) as executor: futures = {executor.submit(merge_one, t, k): t for t, k in tables} errors = [] for future in as_completed(futures): try: future.result() except Exception as e: errors.append((futures[future], str(e)))
batch_df.unpersist() if errors: raise Exception(f"Merge failures: {errors}")Cache the batch DataFrame before parallel merges to avoid recomputing it for each table. The thread count formula — min(tables, cores/2) — leaves half the cluster’s capacity for the actual merge work. Collect all errors before raising so you see every failure, not just the first.
MERGE with Partition Pruning
Section titled “MERGE with Partition Pruning”“Write a Python MERGE that includes the partition column in the join condition for faster execution.”
def partition_pruned_merge(batch_df, batch_id): batch_df.createOrReplaceTempView("updates") spark.sql(""" MERGE INTO target_table t USING updates s ON t.id = s.id AND t.date = s.date WHEN MATCHED THEN UPDATE SET * WHEN NOT MATCHED THEN INSERT * """)Including the partition column (t.date = s.date) in the merge condition tells Spark to scan only the relevant partitions. On large tables, this is the difference between scanning 100 files and 100,000.
CDC with Deletes
Section titled “CDC with Deletes”“Write a Python foreachBatch that applies CDC changes — inserts, updates, and deletes — to a target Delta table.”
from delta.tables import DeltaTable
def cdc_merge(batch_df, batch_id): batch_df.cache() deletes = batch_df.filter(col("_op") == "DELETE") upserts = batch_df.filter(col("_op").isin(["INSERT", "UPDATE"]))
target = DeltaTable.forName(spark, "silver.customers")
if upserts.count() > 0: (target.alias("t") .merge(upserts.alias("s"), "t.customer_id = s.customer_id") .whenMatchedUpdateAll() .whenNotMatchedInsertAll() .execute())
if deletes.count() > 0: (target.alias("t") .merge(deletes.alias("s"), "t.customer_id = s.customer_id") .whenMatchedDelete() .execute())
batch_df.unpersist()Separate the CDC batch into upserts and deletes before merging. Running a single MERGE with conditional logic for all three operations works but is harder to debug when something goes wrong.
Watch Out For
Section titled “Watch Out For”- High P99 latency from periodic OPTIMIZE — if you’re running scheduled OPTIMIZE + ZORDER jobs, switch to Liquid Clustering. It eliminates the optimization pauses that cause latency spikes during merges.
ConcurrentAppendExceptionon shared target tables — two streams merging into the same table will conflict unless Row-Level Concurrency is enabled. Setdelta.enableRowLevelConcurrency = trueon the target table.- Thread count too high in parallel merges — more threads than
cluster_cores / 2causes resource contention that slows every merge. Start with 2 threads and increase only if monitoring shows idle capacity. - Forgetting to unpersist after cache — cached DataFrames persist in memory across microbatches if you don’t explicitly unpersist. Over time this crowds out actual processing memory.
- Slow merges on large unsorted tables — if Liquid Clustering isn’t an option, Z-ORDER on the merge key gives 5-10x lookup speedups. Run
OPTIMIZE target_table ZORDER BY (id)on a schedule.