Sinks
Skill: databricks-spark-declarative-pipelines
What You Can Build
Section titled “What You Can Build”Sinks let you push data from your pipeline to destinations that aren’t Databricks-managed Delta tables — Kafka topics, external Delta paths, JDBC databases, or any system you can write to from a Spark DataFrame. You define the sink, then attach an append flow that feeds it from any streaming table in your pipeline. Python only.
In Action
Section titled “In Action”“Write silver order events to an external Delta table at a mounted storage path using Python”
dp.create_sink( name="external_orders", format="delta", options={"path": "/mnt/external/orders"})
@dp.append_flow(target="external_orders")def external_orders_flow(): return spark.readStream.table("silver.orders")Key decisions:
- Sink instead of a pipeline table — the target lives outside Unity Catalog, so it can’t be a managed streaming table. Sinks handle the write mechanics for external paths.
- Append flow as the delivery mechanism — sinks only accept data through append flows. You can attach multiple flows to the same sink if you need to fan in from several sources.
More Patterns
Section titled “More Patterns”Publish events to a Kafka topic
Section titled “Publish events to a Kafka topic”“Serialize silver events as JSON and write them to a Kafka topic using Python”
dp.create_sink( name="kafka_events", format="kafka", options={ "kafka.bootstrap.servers": "broker:9092", "topic": "output-events" })
@dp.append_flow(target="kafka_events")def kafka_flow(): return ( spark.readStream.table("silver.events") .select(to_json(struct("*")).alias("value")) )Kafka sinks require a value column — that’s the message payload. The to_json(struct("*")) pattern serializes the entire row into a single JSON string. If you also need a message key, add a key column to the select.
Write to an external database with custom logic
Section titled “Write to an external database with custom logic”“Push each microbatch of processed data to a PostgreSQL table using forEachBatch in Python”
@dp.foreach_batch_sink(name="postgres_sink")def write_to_postgres(batch_df, batch_id): batch_df.write.format("jdbc") \ .option("url", "jdbc:postgresql://host/db") \ .option("dbtable", "target_table") \ .mode("append").save()
@dp.append_flow(target="postgres_sink")def postgres_flow(): return spark.readStream.table("silver.data")foreach_batch_sink gives you a plain DataFrame and a batch ID for each microbatch. You write whatever custom logic you need — JDBC, REST API calls, file writes. The pipeline still manages checkpointing and exactly-once delivery to the sink boundary.
Watch Out For
Section titled “Watch Out For”- Sinks are Python only — there’s no SQL equivalent. If your pipeline is SQL-based and needs an external sink, you’ll need a Python file for just the sink definition.
- Sinks only work with streaming append flows — you can’t use a batch read or a materialized view as a source. The data must flow through
spark.readStream. - Kafka requires a
valuecolumn — if your select doesn’t produce a column namedvalue, the write fails. Useto_json(struct("*")).alias("value")as the standard pattern.