Skip to content

Sinks

Skill: databricks-spark-declarative-pipelines

Sinks let you push data from your pipeline to destinations that aren’t Databricks-managed Delta tables — Kafka topics, external Delta paths, JDBC databases, or any system you can write to from a Spark DataFrame. You define the sink, then attach an append flow that feeds it from any streaming table in your pipeline. Python only.

“Write silver order events to an external Delta table at a mounted storage path using Python”

dp.create_sink(
name="external_orders",
format="delta",
options={"path": "/mnt/external/orders"}
)
@dp.append_flow(target="external_orders")
def external_orders_flow():
return spark.readStream.table("silver.orders")

Key decisions:

  • Sink instead of a pipeline table — the target lives outside Unity Catalog, so it can’t be a managed streaming table. Sinks handle the write mechanics for external paths.
  • Append flow as the delivery mechanism — sinks only accept data through append flows. You can attach multiple flows to the same sink if you need to fan in from several sources.

“Serialize silver events as JSON and write them to a Kafka topic using Python”

dp.create_sink(
name="kafka_events",
format="kafka",
options={
"kafka.bootstrap.servers": "broker:9092",
"topic": "output-events"
}
)
@dp.append_flow(target="kafka_events")
def kafka_flow():
return (
spark.readStream.table("silver.events")
.select(to_json(struct("*")).alias("value"))
)

Kafka sinks require a value column — that’s the message payload. The to_json(struct("*")) pattern serializes the entire row into a single JSON string. If you also need a message key, add a key column to the select.

Write to an external database with custom logic

Section titled “Write to an external database with custom logic”

“Push each microbatch of processed data to a PostgreSQL table using forEachBatch in Python”

@dp.foreach_batch_sink(name="postgres_sink")
def write_to_postgres(batch_df, batch_id):
batch_df.write.format("jdbc") \
.option("url", "jdbc:postgresql://host/db") \
.option("dbtable", "target_table") \
.mode("append").save()
@dp.append_flow(target="postgres_sink")
def postgres_flow():
return spark.readStream.table("silver.data")

foreach_batch_sink gives you a plain DataFrame and a batch ID for each microbatch. You write whatever custom logic you need — JDBC, REST API calls, file writes. The pipeline still manages checkpointing and exactly-once delivery to the sink boundary.

  • Sinks are Python only — there’s no SQL equivalent. If your pipeline is SQL-based and needs an external sink, you’ll need a Python file for just the sink definition.
  • Sinks only work with streaming append flows — you can’t use a batch read or a materialized view as a source. The data must flow through spark.readStream.
  • Kafka requires a value column — if your select doesn’t produce a column named value, the write fails. Use to_json(struct("*")).alias("value") as the standard pattern.