Skip to content

Multi-Sink Writes

Skill: databricks-spark-structured-streaming

You have one stream of events, but three different tables need them — raw bronze, cleansed silver, and aggregated gold. Reading the source three times wastes resources and triplicates your Kafka consumer lag. foreachBatch lets you read once and write many, with all sinks sharing a single checkpoint and seeing the same data per batch.

“Write a Python streaming pipeline that reads events from Kafka, then writes raw data to a bronze table, deduplicates to a silver table, and aggregates to a gold table — all from a single stream.”

from pyspark.sql.functions import col, current_timestamp, count, sum, window
def medallion_write(batch_df, batch_id):
batch_df.cache()
batch_df.write.format("delta").mode("append") \
.option("txnVersion", batch_id) \
.option("txnAppId", "medallion_bronze") \
.saveAsTable("bronze.events")
silver_df = (batch_df
.dropDuplicates(["event_id"])
.filter(col("status").isin(["active", "pending"]))
.withColumn("processed_at", current_timestamp()))
silver_df.write.format("delta").mode("append") \
.option("txnVersion", batch_id) \
.option("txnAppId", "medallion_silver") \
.saveAsTable("silver.events")
gold_df = (silver_df
.groupBy(window(col("timestamp"), "5 minutes"), "category")
.agg(count("*").alias("event_count"), sum("amount").alias("total_amount")))
gold_df.write.format("delta").mode("append") \
.option("txnVersion", batch_id) \
.option("txnAppId", "medallion_gold") \
.saveAsTable("gold.category_metrics")
batch_df.unpersist()
stream.writeStream \
.foreachBatch(medallion_write) \
.option("checkpointLocation", "/Volumes/catalog/checkpoints/medallion") \
.trigger(processingTime="30 seconds") \
.start()

Key decisions:

  • One checkpoint for the entire multi-sink stream. All writes within a single foreachBatch call see the same input data and share the same batch_id. Don’t create separate streams per sink — that re-reads the source for each one.
  • txnVersion + txnAppId per sink makes each write idempotent. The txnAppId must be unique per target table so Spark can detect and skip duplicate batches independently for each sink.
  • Cache before multi-write avoids recomputing the batch for each sink. Unpersist after all writes complete to free memory before the next microbatch.
  • Gold aggregation on silver output, not raw bronze — the gold table benefits from the deduplication and filtering already done for silver, avoiding double-counting.

“Write a Python foreachBatch that routes events to different Delta tables based on event type.”

def route_by_type(batch_df, batch_id):
batch_df.cache()
orders = batch_df.filter(col("event_type") == "order")
refunds = batch_df.filter(col("event_type") == "refund")
reviews = batch_df.filter(col("event_type") == "review")
for df, table, app_id in [
(orders, "silver.orders", "router_orders"),
(refunds, "silver.refunds", "router_refunds"),
(reviews, "silver.reviews", "router_reviews"),
]:
if df.count() > 0:
df.write.format("delta").mode("append") \
.option("txnVersion", batch_id) \
.option("txnAppId", app_id) \
.saveAsTable(table)
batch_df.unpersist()

Filtering in foreachBatch is more efficient than running three separate streams with source-side filters. Spark evaluates the filter predicates against the cached DataFrame without re-reading from Kafka.

“Write a Python foreachBatch that writes to four Delta tables in parallel.”

from concurrent.futures import ThreadPoolExecutor, as_completed
def parallel_write(batch_df, batch_id):
batch_df.cache()
def write_table(table_name, filter_expr=None):
df = batch_df.filter(filter_expr) if filter_expr else batch_df
df.write.format("delta").mode("append") \
.option("txnVersion", batch_id) \
.option("txnAppId", f"parallel_{table_name}") \
.saveAsTable(table_name)
tables = [
("bronze.all_events", None),
("silver.errors", col("level") == "ERROR"),
("silver.warnings", col("level") == "WARN"),
("gold.metrics", col("type") == "metric"),
]
with ThreadPoolExecutor(max_workers=4) as executor:
futures = {executor.submit(write_table, t, f): t for t, f in tables}
errors = []
for future in as_completed(futures):
try:
future.result()
except Exception as e:
errors.append((futures[future], str(e)))
batch_df.unpersist()
if errors:
raise Exception(f"Write failures: {errors}")

Sequential writes to four tables mean the microbatch takes four times as long as a single write. Parallel writes cut that to roughly the duration of the slowest sink. Keep max_workers at 4 or fewer — more threads create resource contention that slows everything down.

“Write a Python foreachBatch that validates records and routes invalid ones to a DLQ table.”

from pyspark.sql.functions import lit
def write_with_dlq(batch_df, batch_id):
valid = batch_df.filter(col("required_field").isNotNull())
invalid = batch_df.filter(col("required_field").isNull())
if valid.count() > 0:
valid.write.format("delta").mode("append") \
.option("txnVersion", batch_id) \
.option("txnAppId", "multi_sink_valid") \
.saveAsTable("silver.valid_events")
if invalid.count() > 0:
(invalid
.withColumn("_error_reason", lit("missing_required_field"))
.withColumn("_batch_id", lit(batch_id))
.write.format("delta").mode("append")
.saveAsTable("errors.dead_letter_queue"))

Stamp DLQ records with the error reason and batch ID. This gives you enough context to replay or investigate without going back to the source.

  • Slow writes from sequential processing — if your foreachBatch writes to three or more tables sequentially, switch to ThreadPoolExecutor. The batch duration should be comfortably under your trigger interval.
  • Recomputing the batch for each sink — every .write action re-evaluates the DataFrame lineage. Cache at the top of foreachBatch and unpersist at the bottom. Forgetting to cache is the single most common performance issue in multi-sink writes.
  • Partial failures leave data inconsistent — if the second of three writes fails, Spark retries the entire batch. Use txnVersion/txnAppId on every write so successful sinks skip the duplicate and only the failed sink retries.
  • Expensive aggregations inside foreachBatch — groupBy/agg operations in the batch function block the entire microbatch. Move heavy aggregations to the stream definition if possible, or accept the latency cost.