Skip to content

Merge Operations

Skill: databricks-spark-structured-streaming

A raw append is fine for bronze ingestion, but most silver and gold tables need upsert semantics — insert new records, update existing ones, and optionally delete. Delta MERGE inside foreachBatch gives you that. Combined with Liquid Clustering, Deletion Vectors, and Row-Level Concurrency, you get production-grade merge performance without manual OPTIMIZE pauses.

“Write a Python streaming pipeline that upserts incoming events into a Delta table using MERGE inside foreachBatch, with Liquid Clustering enabled on the target table.”

CREATE TABLE silver.orders (
id STRING, name STRING, amount DECIMAL(10,2), updated_at TIMESTAMP
) USING DELTA
CLUSTER BY (id)
TBLPROPERTIES (
'delta.enableDeletionVectors' = true,
'delta.enableRowLevelConcurrency' = true
);
def upsert_batch(batch_df, batch_id):
batch_df.createOrReplaceTempView("updates")
spark.sql("""
MERGE INTO silver.orders t
USING updates s ON t.id = s.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")
stream.writeStream \
.foreachBatch(upsert_batch) \
.option("checkpointLocation", "/Volumes/catalog/checkpoints/silver_orders") \
.trigger(processingTime="30 seconds") \
.start()

Key decisions:

  • Liquid Clustering replaces OPTIMIZE + ZORDER. The CLUSTER BY (id) clause tells Delta to incrementally co-locate data by the merge key. No scheduled OPTIMIZE jobs, no periodic ZORDER — the table self-organizes during writes.
  • Deletion Vectors avoid full file rewrites on updates. Instead of rewriting a 128MB file to change one row, Spark writes a small deletion vector. This drops merge P99 latency significantly.
  • Row-Level Concurrency allows concurrent MERGE operations on different rows of the same table without conflicts. Without it, two streams merging into the same table will throw ConcurrentAppendException.
  • foreachBatch is required because MERGE is not a native streaming sink. Each microbatch materializes as a DataFrame, creates a temp view, and runs a SQL MERGE against the target.

“Write a Python foreachBatch that merges incoming data into three different silver tables in parallel.”

from delta.tables import DeltaTable
from concurrent.futures import ThreadPoolExecutor, as_completed
def parallel_merge(batch_df, batch_id):
batch_df.cache()
def merge_one(table_name, merge_key):
target = DeltaTable.forName(spark, table_name)
(target.alias("target")
.merge(batch_df.alias("source"),
f"target.{merge_key} = source.{merge_key}")
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute())
tables = [
("silver.customers", "customer_id"),
("silver.orders", "order_id"),
("silver.products", "product_id")
]
max_workers = min(len(tables), max(2, total_cores // 2))
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {executor.submit(merge_one, t, k): t for t, k in tables}
errors = []
for future in as_completed(futures):
try:
future.result()
except Exception as e:
errors.append((futures[future], str(e)))
batch_df.unpersist()
if errors:
raise Exception(f"Merge failures: {errors}")

Cache the batch DataFrame before parallel merges to avoid recomputing it for each table. The thread count formula — min(tables, cores/2) — leaves half the cluster’s capacity for the actual merge work. Collect all errors before raising so you see every failure, not just the first.

“Write a Python MERGE that includes the partition column in the join condition for faster execution.”

def partition_pruned_merge(batch_df, batch_id):
batch_df.createOrReplaceTempView("updates")
spark.sql("""
MERGE INTO target_table t
USING updates s
ON t.id = s.id AND t.date = s.date
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")

Including the partition column (t.date = s.date) in the merge condition tells Spark to scan only the relevant partitions. On large tables, this is the difference between scanning 100 files and 100,000.

“Write a Python foreachBatch that applies CDC changes — inserts, updates, and deletes — to a target Delta table.”

from delta.tables import DeltaTable
def cdc_merge(batch_df, batch_id):
batch_df.cache()
deletes = batch_df.filter(col("_op") == "DELETE")
upserts = batch_df.filter(col("_op").isin(["INSERT", "UPDATE"]))
target = DeltaTable.forName(spark, "silver.customers")
if upserts.count() > 0:
(target.alias("t")
.merge(upserts.alias("s"), "t.customer_id = s.customer_id")
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute())
if deletes.count() > 0:
(target.alias("t")
.merge(deletes.alias("s"), "t.customer_id = s.customer_id")
.whenMatchedDelete()
.execute())
batch_df.unpersist()

Separate the CDC batch into upserts and deletes before merging. Running a single MERGE with conditional logic for all three operations works but is harder to debug when something goes wrong.

  • High P99 latency from periodic OPTIMIZE — if you’re running scheduled OPTIMIZE + ZORDER jobs, switch to Liquid Clustering. It eliminates the optimization pauses that cause latency spikes during merges.
  • ConcurrentAppendException on shared target tables — two streams merging into the same table will conflict unless Row-Level Concurrency is enabled. Set delta.enableRowLevelConcurrency = true on the target table.
  • Thread count too high in parallel merges — more threads than cluster_cores / 2 causes resource contention that slows every merge. Start with 2 threads and increase only if monitoring shows idle capacity.
  • Forgetting to unpersist after cache — cached DataFrames persist in memory across microbatches if you don’t explicitly unpersist. Over time this crowds out actual processing memory.
  • Slow merges on large unsorted tables — if Liquid Clustering isn’t an option, Z-ORDER on the merge key gives 5-10x lookup speedups. Run OPTIMIZE target_table ZORDER BY (id) on a schedule.