Skip to content

Operations & Limits

Skill: databricks-zerobus-ingest

A production Zerobus producer that handles connection drops, stays within throughput limits, and deals with the reality of at-least-once delivery. This page is less about writing ingestion code and more about the operational decisions that keep it running reliably — ACK strategy selection, retry with reconnection, scaling beyond a single stream, and building observability when Zerobus has no built-in dashboards.

“Build a Python Zerobus producer with retry logic and exponential backoff that handles connection drops during server maintenance.”

import time
import logging
logger = logging.getLogger(__name__)
def ingest_with_retry(stream_factory, record, max_retries=5):
"""Ingest a record, reinitializing the stream on connection failures."""
stream = stream_factory()
for attempt in range(max_retries):
try:
offset = stream.ingest_record_offset(record)
stream.wait_for_offset(offset)
return stream
except Exception as e:
err = str(e).lower()
logger.warning("Attempt %d/%d failed: %s", attempt + 1, max_retries, e)
if "closed" in err or "connection" in err or "unavailable" in err:
try:
stream.close()
except Exception:
pass
backoff = min(2 ** attempt, 30)
time.sleep(backoff)
stream = stream_factory()
elif attempt < max_retries - 1:
time.sleep(2 ** attempt)
else:
raise
return stream

Key decisions:

  • Always reinitialize the stream on connection errors — retrying on a closed stream never works
  • Cap exponential backoff at 30 seconds to avoid long pauses during extended outages
  • The stream_factory callable creates a fresh stream on each reconnect, with its own authentication handshake
  • Log enough context (endpoint, table, error message) to diagnose failures without reproducing them
  • Distinguish connection errors (reinitialize) from data errors (retry or raise) — different failure modes need different responses

“Help me decide between blocking ACKs, callbacks, and flush-based ACKs for my Zerobus producer.”

The ACK strategy determines your throughput ceiling and error-handling complexity.

Blocking (offset + wait): Call ingest_record_offset, then wait_for_offset. You know exactly when each record is durable, but throughput tops out at a few hundred records/s because you’re round-tripping per record.

offset = stream.ingest_record_offset(record)
stream.wait_for_offset(offset)

Callback-based: Register an AckCallback and use ingest_record_nowait. Records flow at full speed while the callback fires asynchronously as batches are confirmed. Higher throughput, but you need to track which offsets have been acknowledged.

from zerobus.sdk.shared import AckCallback
class OffsetTracker(AckCallback):
def __init__(self):
self.last_acked = 0
def on_ack(self, offset: int) -> None:
self.last_acked = offset
def on_error(self, offset: int, message: str) -> None:
logger.error("ACK error at offset %d: %s", offset, message)

Flush-based: Use ingest_record_nowait for a batch of records, then call flush() once. This gives you the highest throughput for batch-oriented workloads where you don’t need per-record confirmation.

for record in batch:
stream.ingest_record_nowait(record)
stream.flush()

Start with blocking ACKs for correctness, move to callback or flush-based when you need more throughput.

“My Zerobus producer is hitting the 15,000 rows/s per-stream limit. How do I scale?”

import threading
def stream_worker(sdk, records, client_id, client_secret, table_props, options):
stream = sdk.create_stream(client_id, client_secret, table_props, options)
try:
for record in records:
stream.ingest_record_nowait(record)
stream.flush()
finally:
stream.close()
# Partition records across multiple streams
partitions = partition_by_key(all_records, num_streams=4)
threads = []
for partition in partitions:
t = threading.Thread(
target=stream_worker,
args=(sdk, partition, client_id, client_secret, table_props, options),
)
t.start()
threads.append(t)
for t in threads:
t.join()

Each stream gets its own 100 MB/s and 15,000 rows/s budget. Zerobus supports thousands of concurrent streams to the same table, so horizontal scaling is straightforward. Partition by a key (device ID, region, tenant) to distribute load evenly.

Build a health check without built-in dashboards

Section titled “Build a health check without built-in dashboards”

“Write a health check for my Zerobus connection — there’s no built-in monitoring dashboard.”

def check_zerobus_health(sdk, client_id, client_secret, table_props, options):
"""Verify Zerobus connectivity by opening and closing a stream."""
try:
stream = sdk.create_stream(client_id, client_secret, table_props, options)
stream.close()
return True
except Exception as e:
logger.error("Zerobus health check failed: %s", e)
return False

Zerobus has no metrics dashboards or status endpoints. You monitor it from the producer side: track ACK offsets and retry counts in your application logs, run periodic health checks like the one above, and query the target table’s row count to confirm data is arriving. If the health check fails, your retry logic handles reconnection automatically.

Handle at-least-once delivery with deduplication

Section titled “Handle at-least-once delivery with deduplication”

“My downstream consumers are seeing duplicate records from Zerobus. How do I handle this?”

Zerobus provides at-least-once delivery. A retry after a timeout can result in the same record being persisted twice — the original write succeeded, but the ACK was lost. Design your consumers to handle this.

-- Deduplicate on read using a unique key
MERGE INTO catalog.schema.clean_events AS target
USING (
SELECT *, ROW_NUMBER() OVER (
PARTITION BY event_id ORDER BY event_time DESC
) AS rn
FROM catalog.schema.raw_events
) AS source
ON target.event_id = source.event_id
WHEN NOT MATCHED AND source.rn = 1 THEN INSERT *;

The deduplication happens downstream, not in Zerobus itself. Include a unique identifier (UUID, composite key) in every record so you can detect and discard duplicates in your consumers or downstream pipelines.

  • 15,000 rows/s and 100 MB/s per stream are hard caps — you won’t get a clear error when you exceed them. Throughput silently degrades as the server throttles. If your producer seems slow, check whether you’ve hit the per-stream ceiling and split into multiple streams.
  • 10 MB max message size — a single record larger than 10 MB (10,485,760 bytes) is rejected. This usually happens with wide schemas containing large binary or nested struct columns. Flatten or split the data before ingestion.
  • Single-AZ deployment means zone outages affect you — Zerobus runs in one availability zone. A zone failure takes down the service until the zone recovers. Your retry logic must handle outages lasting minutes, not just seconds. Cap backoff at 30s and keep retrying.
  • Server closes streams during maintenance — you’ll see a “stream closed” error with no warning. This is expected behavior. Always implement stream reinitialization in your retry loop, not just record-level retries.
  • At-least-once delivery means duplicates are inevitable — any retry that succeeds after a timeout can produce duplicates. Build deduplication into your downstream consumers from day one, not after you discover the duplicates in production.
  • No automatic schema evolution — adding a column to the table without regenerating the .proto causes a silent mismatch. The new column gets null values. Removing a column from the table while the old .proto still references it causes ingestion failures.
  • Managed Delta tables only — external tables, views, and non-Delta formats are not supported. The table must be managed within Unity Catalog in a supported region.
  • Workspace must be in a supported region — AWS supports 9 regions, Azure supports 11. Check the Setup & Authentication page for the full list. Attempting to connect from an unsupported region gives a generic connection error.