Kafka Streaming Patterns
Skill: databricks-spark-structured-streaming
What You Can Build
Section titled “What You Can Build”Kafka is the backbone of most streaming architectures, but getting data from Kafka into Delta reliably — with proper schema parsing, exactly-once semantics, and cost-aware triggers — takes more than a readStream. These patterns cover the full spectrum: bronze-layer ingestion, Kafka-to-Kafka event enrichment, Real-Time Mode for sub-second latency, and schema validation with dead letter queues.
In Action
Section titled “In Action”“Write a Python streaming pipeline that reads JSON events from a Kafka topic, parses the payload with a schema, preserves Kafka metadata columns, and writes to a Delta bronze table with a 30-second trigger.”
from pyspark.sql.functions import col, from_json, current_timestamp
df = (spark .readStream .format("kafka") .option("kafka.bootstrap.servers", "broker1:9092,broker2:9092") .option("subscribe", "order_events") .option("startingOffsets", "earliest") .option("maxOffsetsPerTrigger", "50000") .option("minPartitions", "6") .load())
df_parsed = df.select( col("key").cast("string"), from_json(col("value").cast("string"), event_schema).alias("data"), col("topic"), col("partition"), col("offset"), col("timestamp").alias("kafka_timestamp"), current_timestamp().alias("ingestion_timestamp")).select("key", "data.*", "topic", "partition", "offset", "kafka_timestamp", "ingestion_timestamp")
df_parsed.writeStream \ .format("delta") \ .outputMode("append") \ .option("checkpointLocation", "/Volumes/catalog/checkpoints/bronze_orders") \ .trigger(processingTime="30 seconds") \ .start("/delta/bronze_orders")Key decisions:
startingOffsets="earliest"reads existing data on first run. The defaultlatestsilently skips everything already in the topic — a common source of “where’s my data?” confusion.maxOffsetsPerTriggercaps batch size so a large backlog doesn’t overwhelm memory on first run. Dial this between 10,000 and 100,000 based on message size.minPartitionsshould match your Kafka partition count for optimal parallelism. Setting it lower underutilizes the cluster; higher creates unnecessary overhead.- Preserving Kafka metadata (topic, partition, offset, kafka_timestamp) in the bronze layer gives you lineage back to the source and simplifies debugging consumer lag.
ingestion_timestampseparates when the event happened from when you received it — essential for latency monitoring.
More Patterns
Section titled “More Patterns”Kafka-to-Kafka Event Enrichment
Section titled “Kafka-to-Kafka Event Enrichment”“Write a Python streaming pipeline that reads events from one Kafka topic, enriches them with a Delta dimension table, and writes the result to another Kafka topic.”
from pyspark.sql.functions import col, from_json, to_json, struct, current_timestamp
source_df = (spark .readStream .format("kafka") .option("kafka.bootstrap.servers", "broker1:9092") .option("subscribe", "raw-clickstream") .option("startingOffsets", "latest") .load())
parsed_df = source_df.select( col("key").cast("string"), from_json(col("value").cast("string"), event_schema).alias("data"),).select("key", "data.*")
user_dim = spark.table("users.dimension")
enriched_df = (parsed_df .join(user_dim, "user_id", "left") .withColumn("value", to_json(struct( "event_id", "user_id", "user_name", "user_segment", "event_type", "timestamp" ))))
enriched_df.select("key", "value").writeStream \ .format("kafka") \ .option("kafka.bootstrap.servers", "broker1:9092") \ .option("topic", "enriched-clickstream") \ .option("kafka.acks", "all") \ .option("kafka.compression.type", "lz4") \ .option("checkpointLocation", "/Volumes/catalog/checkpoints/enriched_clicks") \ .trigger(processingTime="30 seconds") \ .start()The Kafka producer requires data in key and value string columns. The to_json(struct(...)) call serializes your enriched fields back into a JSON string. Use acks=all for durability and lz4 compression to reduce network overhead without adding significant CPU cost.
Sub-Second Latency with Real-Time Mode
Section titled “Sub-Second Latency with Real-Time Mode”“Configure a Python Kafka-to-Kafka pipeline using Real-Time Mode for sub-second end-to-end latency on DBR 16.4 LTS.”
# Cluster Spark config: spark.databricks.streaming.realTimeMode.enabled = true
query = (enriched_df .select(col("key"), col("value")) .writeStream .format("kafka") .option("kafka.bootstrap.servers", brokers) .option("topic", "low-latency-events") .outputMode("update") .trigger(realTime="5 minutes") .option("checkpointLocation", checkpoint_path) .start())RTM achieves as low as 5ms end-to-end latency by eliminating microbatch overhead entirely. The realTime="5 minutes" parameter in PySpark sets the checkpoint interval, not the processing cadence. RTM requirements are strict: dedicated single-user cluster, no autoscaling, Photon disabled, outputMode("update") only, and no foreachBatch. Use RTM only when your SLA genuinely demands sub-800ms latency — microbatch is more cost-effective for everything else.
Schema Validation with Dead Letter Queue
Section titled “Schema Validation with Dead Letter Queue”“Write a Python foreachBatch function that validates incoming Kafka messages against a schema and routes invalid records to a DLQ topic.”
from pyspark.sql.functions import from_json, col, lit, to_json, struct, current_timestamp
def validate_and_route(batch_df, batch_id): parsed = batch_df.withColumn( "parsed", from_json(col("value").cast("string"), validated_schema) )
valid = parsed.filter(col("parsed").isNotNull()).select("key", "value") invalid = parsed.filter(col("parsed").isNull()).select( col("key"), to_json(struct( col("value"), lit("SCHEMA_VALIDATION_FAILED").alias("dlq_reason"), current_timestamp().alias("dlq_timestamp") )).alias("value") )
if valid.count() > 0: valid.write.format("kafka") \ .option("kafka.bootstrap.servers", brokers) \ .option("topic", "valid-events").save() if invalid.count() > 0: invalid.write.format("kafka") \ .option("kafka.bootstrap.servers", brokers) \ .option("topic", "dlq-events").save()
source_df.writeStream \ .foreachBatch(validate_and_route) \ .option("checkpointLocation", "/Volumes/catalog/checkpoints/validation") \ .trigger(processingTime="30 seconds") \ .start()The from_json function returns null when the message doesn’t match the schema, which gives you a clean split between valid and invalid records. Stamp each DLQ record with the failure reason and timestamp so you can audit and replay later.
SASL/SSL Authentication
Section titled “SASL/SSL Authentication”“Configure Kafka SASL/SSL authentication in Python using Databricks secrets.”
kafka_username = dbutils.secrets.get("kafka-scope", "username")kafka_password = dbutils.secrets.get("kafka-scope", "password")
df.writeStream.format("kafka") \ .option("kafka.bootstrap.servers", brokers) \ .option("topic", target_topic) \ .option("kafka.security.protocol", "SASL_SSL") \ .option("kafka.sasl.mechanism", "PLAIN") \ .option("kafka.sasl.jaas.config", f'org.apache.kafka.common.security.plain.PlainLoginModule ' f'required username="{kafka_username}" password="{kafka_password}";') \ .option("checkpointLocation", checkpoint_path) \ .start()Never hardcode credentials in notebooks. Databricks secrets integrate with your workspace’s secret scope — backed by Azure Key Vault or AWS Secrets Manager — so credentials stay out of version control and audit logs capture access.
Watch Out For
Section titled “Watch Out For”- No data appearing after starting the stream — the default
startingOffsetsislatest, which skips all existing messages. Useearliestwhen you need historical data, then switch tolatestfor subsequent restarts (checkpoints track position automatically). - Consumer lag growing unboundedly — your processing rate is slower than your input rate. Scale the cluster, reduce
maxOffsetsPerTriggerto process smaller batches faster, or increase the trigger interval if your SLA allows it. - Duplicate messages downstream — Kafka provides at-least-once delivery by default. Use
kafka.acks=allon the producer side andtxnVersion/txnAppIdon Delta writes for idempotent exactly-once semantics. - RTM pipeline failing silently — RTM doesn’t support
foreachBatch, Photon must be disabled (not enabled, despite what the trigger docs suggest), and autoscaling must be off. Check the cluster’s Spark config forspark.databricks.streaming.realTimeMode.enabled=true. - Trigger interval set shorter than batch duration — if processing a batch takes 45 seconds but your trigger is 30 seconds, batches pile up and latency grows indefinitely. Use the SLA/3 rule: set your trigger to one-third of your business SLA to leave headroom for processing and recovery.