Multi-Sink Writes
Skill: databricks-spark-structured-streaming
What You Can Build
Section titled “What You Can Build”You have one stream of events, but three different tables need them — raw bronze, cleansed silver, and aggregated gold. Reading the source three times wastes resources and triplicates your Kafka consumer lag. foreachBatch lets you read once and write many, with all sinks sharing a single checkpoint and seeing the same data per batch.
In Action
Section titled “In Action”“Write a Python streaming pipeline that reads events from Kafka, then writes raw data to a bronze table, deduplicates to a silver table, and aggregates to a gold table — all from a single stream.”
from pyspark.sql.functions import col, current_timestamp, count, sum, window
def medallion_write(batch_df, batch_id): batch_df.cache()
batch_df.write.format("delta").mode("append") \ .option("txnVersion", batch_id) \ .option("txnAppId", "medallion_bronze") \ .saveAsTable("bronze.events")
silver_df = (batch_df .dropDuplicates(["event_id"]) .filter(col("status").isin(["active", "pending"])) .withColumn("processed_at", current_timestamp())) silver_df.write.format("delta").mode("append") \ .option("txnVersion", batch_id) \ .option("txnAppId", "medallion_silver") \ .saveAsTable("silver.events")
gold_df = (silver_df .groupBy(window(col("timestamp"), "5 minutes"), "category") .agg(count("*").alias("event_count"), sum("amount").alias("total_amount"))) gold_df.write.format("delta").mode("append") \ .option("txnVersion", batch_id) \ .option("txnAppId", "medallion_gold") \ .saveAsTable("gold.category_metrics")
batch_df.unpersist()
stream.writeStream \ .foreachBatch(medallion_write) \ .option("checkpointLocation", "/Volumes/catalog/checkpoints/medallion") \ .trigger(processingTime="30 seconds") \ .start()Key decisions:
- One checkpoint for the entire multi-sink stream. All writes within a single
foreachBatchcall see the same input data and share the samebatch_id. Don’t create separate streams per sink — that re-reads the source for each one. txnVersion+txnAppIdper sink makes each write idempotent. ThetxnAppIdmust be unique per target table so Spark can detect and skip duplicate batches independently for each sink.- Cache before multi-write avoids recomputing the batch for each sink. Unpersist after all writes complete to free memory before the next microbatch.
- Gold aggregation on silver output, not raw bronze — the gold table benefits from the deduplication and filtering already done for silver, avoiding double-counting.
More Patterns
Section titled “More Patterns”Conditional Routing by Event Type
Section titled “Conditional Routing by Event Type”“Write a Python foreachBatch that routes events to different Delta tables based on event type.”
def route_by_type(batch_df, batch_id): batch_df.cache()
orders = batch_df.filter(col("event_type") == "order") refunds = batch_df.filter(col("event_type") == "refund") reviews = batch_df.filter(col("event_type") == "review")
for df, table, app_id in [ (orders, "silver.orders", "router_orders"), (refunds, "silver.refunds", "router_refunds"), (reviews, "silver.reviews", "router_reviews"), ]: if df.count() > 0: df.write.format("delta").mode("append") \ .option("txnVersion", batch_id) \ .option("txnAppId", app_id) \ .saveAsTable(table)
batch_df.unpersist()Filtering in foreachBatch is more efficient than running three separate streams with source-side filters. Spark evaluates the filter predicates against the cached DataFrame without re-reading from Kafka.
Parallel Fan-Out with ThreadPoolExecutor
Section titled “Parallel Fan-Out with ThreadPoolExecutor”“Write a Python foreachBatch that writes to four Delta tables in parallel.”
from concurrent.futures import ThreadPoolExecutor, as_completed
def parallel_write(batch_df, batch_id): batch_df.cache()
def write_table(table_name, filter_expr=None): df = batch_df.filter(filter_expr) if filter_expr else batch_df df.write.format("delta").mode("append") \ .option("txnVersion", batch_id) \ .option("txnAppId", f"parallel_{table_name}") \ .saveAsTable(table_name)
tables = [ ("bronze.all_events", None), ("silver.errors", col("level") == "ERROR"), ("silver.warnings", col("level") == "WARN"), ("gold.metrics", col("type") == "metric"), ]
with ThreadPoolExecutor(max_workers=4) as executor: futures = {executor.submit(write_table, t, f): t for t, f in tables} errors = [] for future in as_completed(futures): try: future.result() except Exception as e: errors.append((futures[future], str(e)))
batch_df.unpersist() if errors: raise Exception(f"Write failures: {errors}")Sequential writes to four tables mean the microbatch takes four times as long as a single write. Parallel writes cut that to roughly the duration of the slowest sink. Keep max_workers at 4 or fewer — more threads create resource contention that slows everything down.
Error Handling with Dead Letter Queue
Section titled “Error Handling with Dead Letter Queue”“Write a Python foreachBatch that validates records and routes invalid ones to a DLQ table.”
from pyspark.sql.functions import lit
def write_with_dlq(batch_df, batch_id): valid = batch_df.filter(col("required_field").isNotNull()) invalid = batch_df.filter(col("required_field").isNull())
if valid.count() > 0: valid.write.format("delta").mode("append") \ .option("txnVersion", batch_id) \ .option("txnAppId", "multi_sink_valid") \ .saveAsTable("silver.valid_events")
if invalid.count() > 0: (invalid .withColumn("_error_reason", lit("missing_required_field")) .withColumn("_batch_id", lit(batch_id)) .write.format("delta").mode("append") .saveAsTable("errors.dead_letter_queue"))Stamp DLQ records with the error reason and batch ID. This gives you enough context to replay or investigate without going back to the source.
Watch Out For
Section titled “Watch Out For”- Slow writes from sequential processing — if your
foreachBatchwrites to three or more tables sequentially, switch toThreadPoolExecutor. The batch duration should be comfortably under your trigger interval. - Recomputing the batch for each sink — every
.writeaction re-evaluates the DataFrame lineage. Cache at the top offoreachBatchand unpersist at the bottom. Forgetting to cache is the single most common performance issue in multi-sink writes. - Partial failures leave data inconsistent — if the second of three writes fails, Spark retries the entire batch. Use
txnVersion/txnAppIdon every write so successful sinks skip the duplicate and only the failed sink retries. - Expensive aggregations inside foreachBatch — groupBy/agg operations in the batch function block the entire microbatch. Move heavy aggregations to the stream definition if possible, or accept the latency cost.