Skip to content

Kafka Streaming Patterns

Skill: databricks-spark-structured-streaming

Kafka is the backbone of most streaming architectures, but getting data from Kafka into Delta reliably — with proper schema parsing, exactly-once semantics, and cost-aware triggers — takes more than a readStream. These patterns cover the full spectrum: bronze-layer ingestion, Kafka-to-Kafka event enrichment, Real-Time Mode for sub-second latency, and schema validation with dead letter queues.

“Write a Python streaming pipeline that reads JSON events from a Kafka topic, parses the payload with a schema, preserves Kafka metadata columns, and writes to a Delta bronze table with a 30-second trigger.”

from pyspark.sql.functions import col, from_json, current_timestamp
df = (spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "broker1:9092,broker2:9092")
.option("subscribe", "order_events")
.option("startingOffsets", "earliest")
.option("maxOffsetsPerTrigger", "50000")
.option("minPartitions", "6")
.load()
)
df_parsed = df.select(
col("key").cast("string"),
from_json(col("value").cast("string"), event_schema).alias("data"),
col("topic"), col("partition"), col("offset"),
col("timestamp").alias("kafka_timestamp"),
current_timestamp().alias("ingestion_timestamp")
).select("key", "data.*", "topic", "partition", "offset",
"kafka_timestamp", "ingestion_timestamp")
df_parsed.writeStream \
.format("delta") \
.outputMode("append") \
.option("checkpointLocation", "/Volumes/catalog/checkpoints/bronze_orders") \
.trigger(processingTime="30 seconds") \
.start("/delta/bronze_orders")

Key decisions:

  • startingOffsets="earliest" reads existing data on first run. The default latest silently skips everything already in the topic — a common source of “where’s my data?” confusion.
  • maxOffsetsPerTrigger caps batch size so a large backlog doesn’t overwhelm memory on first run. Dial this between 10,000 and 100,000 based on message size.
  • minPartitions should match your Kafka partition count for optimal parallelism. Setting it lower underutilizes the cluster; higher creates unnecessary overhead.
  • Preserving Kafka metadata (topic, partition, offset, kafka_timestamp) in the bronze layer gives you lineage back to the source and simplifies debugging consumer lag.
  • ingestion_timestamp separates when the event happened from when you received it — essential for latency monitoring.

“Write a Python streaming pipeline that reads events from one Kafka topic, enriches them with a Delta dimension table, and writes the result to another Kafka topic.”

from pyspark.sql.functions import col, from_json, to_json, struct, current_timestamp
source_df = (spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "broker1:9092")
.option("subscribe", "raw-clickstream")
.option("startingOffsets", "latest")
.load()
)
parsed_df = source_df.select(
col("key").cast("string"),
from_json(col("value").cast("string"), event_schema).alias("data"),
).select("key", "data.*")
user_dim = spark.table("users.dimension")
enriched_df = (parsed_df
.join(user_dim, "user_id", "left")
.withColumn("value", to_json(struct(
"event_id", "user_id", "user_name",
"user_segment", "event_type", "timestamp"
)))
)
enriched_df.select("key", "value").writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "broker1:9092") \
.option("topic", "enriched-clickstream") \
.option("kafka.acks", "all") \
.option("kafka.compression.type", "lz4") \
.option("checkpointLocation", "/Volumes/catalog/checkpoints/enriched_clicks") \
.trigger(processingTime="30 seconds") \
.start()

The Kafka producer requires data in key and value string columns. The to_json(struct(...)) call serializes your enriched fields back into a JSON string. Use acks=all for durability and lz4 compression to reduce network overhead without adding significant CPU cost.

“Configure a Python Kafka-to-Kafka pipeline using Real-Time Mode for sub-second end-to-end latency on DBR 16.4 LTS.”

# Cluster Spark config: spark.databricks.streaming.realTimeMode.enabled = true
query = (enriched_df
.select(col("key"), col("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", brokers)
.option("topic", "low-latency-events")
.outputMode("update")
.trigger(realTime="5 minutes")
.option("checkpointLocation", checkpoint_path)
.start()
)

RTM achieves as low as 5ms end-to-end latency by eliminating microbatch overhead entirely. The realTime="5 minutes" parameter in PySpark sets the checkpoint interval, not the processing cadence. RTM requirements are strict: dedicated single-user cluster, no autoscaling, Photon disabled, outputMode("update") only, and no foreachBatch. Use RTM only when your SLA genuinely demands sub-800ms latency — microbatch is more cost-effective for everything else.

“Write a Python foreachBatch function that validates incoming Kafka messages against a schema and routes invalid records to a DLQ topic.”

from pyspark.sql.functions import from_json, col, lit, to_json, struct, current_timestamp
def validate_and_route(batch_df, batch_id):
parsed = batch_df.withColumn(
"parsed", from_json(col("value").cast("string"), validated_schema)
)
valid = parsed.filter(col("parsed").isNotNull()).select("key", "value")
invalid = parsed.filter(col("parsed").isNull()).select(
col("key"),
to_json(struct(
col("value"),
lit("SCHEMA_VALIDATION_FAILED").alias("dlq_reason"),
current_timestamp().alias("dlq_timestamp")
)).alias("value")
)
if valid.count() > 0:
valid.write.format("kafka") \
.option("kafka.bootstrap.servers", brokers) \
.option("topic", "valid-events").save()
if invalid.count() > 0:
invalid.write.format("kafka") \
.option("kafka.bootstrap.servers", brokers) \
.option("topic", "dlq-events").save()
source_df.writeStream \
.foreachBatch(validate_and_route) \
.option("checkpointLocation", "/Volumes/catalog/checkpoints/validation") \
.trigger(processingTime="30 seconds") \
.start()

The from_json function returns null when the message doesn’t match the schema, which gives you a clean split between valid and invalid records. Stamp each DLQ record with the failure reason and timestamp so you can audit and replay later.

“Configure Kafka SASL/SSL authentication in Python using Databricks secrets.”

kafka_username = dbutils.secrets.get("kafka-scope", "username")
kafka_password = dbutils.secrets.get("kafka-scope", "password")
df.writeStream.format("kafka") \
.option("kafka.bootstrap.servers", brokers) \
.option("topic", target_topic) \
.option("kafka.security.protocol", "SASL_SSL") \
.option("kafka.sasl.mechanism", "PLAIN") \
.option("kafka.sasl.jaas.config",
f'org.apache.kafka.common.security.plain.PlainLoginModule '
f'required username="{kafka_username}" password="{kafka_password}";') \
.option("checkpointLocation", checkpoint_path) \
.start()

Never hardcode credentials in notebooks. Databricks secrets integrate with your workspace’s secret scope — backed by Azure Key Vault or AWS Secrets Manager — so credentials stay out of version control and audit logs capture access.

  • No data appearing after starting the stream — the default startingOffsets is latest, which skips all existing messages. Use earliest when you need historical data, then switch to latest for subsequent restarts (checkpoints track position automatically).
  • Consumer lag growing unboundedly — your processing rate is slower than your input rate. Scale the cluster, reduce maxOffsetsPerTrigger to process smaller batches faster, or increase the trigger interval if your SLA allows it.
  • Duplicate messages downstream — Kafka provides at-least-once delivery by default. Use kafka.acks=all on the producer side and txnVersion/txnAppId on Delta writes for idempotent exactly-once semantics.
  • RTM pipeline failing silently — RTM doesn’t support foreachBatch, Photon must be disabled (not enabled, despite what the trigger docs suggest), and autoscaling must be off. Check the cluster’s Spark config for spark.databricks.streaming.realTimeMode.enabled=true.
  • Trigger interval set shorter than batch duration — if processing a batch takes 45 seconds but your trigger is 30 seconds, batches pile up and latency grows indefinitely. Use the SLA/3 rule: set your trigger to one-third of your business SLA to leave headroom for processing and recovery.