Skip to content

Python Client

Skill: databricks-zerobus-ingest

A Python application that pushes records directly into Delta tables over gRPC — no message bus in between. The Zerobus Python SDK gives you sync and async APIs, JSON and Protobuf serialization, and batch methods that can sustain 15,000 rows/s per stream. You’ll use this when your IoT gateway, web backend, or ETL script needs to write to the lakehouse in near real-time.

“Send IoT sensor readings to Databricks from a Python application using Zerobus. Use Protobuf serialization for production.”

import os
from zerobus.sdk.sync import ZerobusSdk
from zerobus.sdk.shared import RecordType, StreamConfigurationOptions, TableProperties
import record_pb2
sdk = ZerobusSdk(
os.environ["ZEROBUS_SERVER_ENDPOINT"],
os.environ["DATABRICKS_WORKSPACE_URL"],
)
options = StreamConfigurationOptions(record_type=RecordType.PROTO)
table_props = TableProperties(
os.environ["ZEROBUS_TABLE_NAME"],
record_pb2.AirQuality.DESCRIPTOR,
)
stream = sdk.create_stream(
os.environ["DATABRICKS_CLIENT_ID"],
os.environ["DATABRICKS_CLIENT_SECRET"],
table_props,
options,
)
try:
for i in range(100):
record = record_pb2.AirQuality(
device_name=f"sensor-{i}", temp=22, humidity=55
)
offset = stream.ingest_record_offset(record)
stream.wait_for_offset(offset)
finally:
stream.close()

Key decisions:

  • Protobuf gives you compile-time type safety and smaller payloads — use it for anything beyond a quick prototype
  • ingest_record_offset + wait_for_offset blocks until the record is durably written, which is the safest starting point
  • The DESCRIPTOR from your compiled record_pb2 module tells Zerobus how to deserialize the wire format
  • Always wrap ingestion in try/finally to ensure the stream closes and releases its gRPC connection
  • Environment variables keep credentials out of code — your AI coding assistant reads these automatically

High-throughput fire-and-forget with ACK callbacks

Section titled “High-throughput fire-and-forget with ACK callbacks”

“Build a Python producer that pushes thousands of records per second using Zerobus, with background acknowledgment tracking.”

from zerobus.sdk.shared import AckCallback, StreamConfigurationOptions, RecordType
class MyAckHandler(AckCallback):
def on_ack(self, offset: int) -> None:
print(f"Durable up to offset: {offset}")
def on_error(self, offset: int, message: str) -> None:
print(f"Error at offset {offset}: {message}")
options = StreamConfigurationOptions(
record_type=RecordType.JSON,
ack_callback=MyAckHandler(),
)
stream = sdk.create_stream(client_id, client_secret, table_props, options)
try:
for i in range(10_000):
stream.ingest_record_nowait(
{"device_name": f"sensor-{i}", "temp": 22, "humidity": 55}
)
stream.flush()
finally:
stream.close()

The AckCallback pattern decouples ingestion from durability confirmation. ingest_record_nowait enqueues records without blocking, while the callback fires asynchronously as batches are persisted. Call flush() at the end to ensure everything in the buffer gets sent.

“Load 10,000 sensor records into Zerobus in a single batch from Python.”

records = [
{"device_name": f"sensor-{i}", "temp": 22, "humidity": 55}
for i in range(10_000)
]
offset = stream.ingest_records_offset(records)
stream.wait_for_offset(offset)

The batch methods (ingest_records_offset, ingest_records_nowait) accept a list and submit everything in one call. ingest_records_offset returns the offset of the last record, so a single wait_for_offset confirms the entire batch.

“Write an async Python Zerobus producer that I can call from a FastAPI endpoint.”

import asyncio
from zerobus.sdk.aio import ZerobusSdk as AsyncZerobusSdk
from zerobus.sdk.shared import RecordType, StreamConfigurationOptions, TableProperties
async def ingest_async(records: list[dict]) -> None:
sdk = AsyncZerobusSdk(server_endpoint, workspace_url)
options = StreamConfigurationOptions(record_type=RecordType.JSON)
table_props = TableProperties(table_name)
stream = await sdk.create_stream(
client_id, client_secret, table_props, options
)
try:
for record in records:
offset = await stream.ingest_record_offset(record)
await stream.wait_for_offset(offset)
finally:
await stream.close()

The async SDK (zerobus.sdk.aio) has the same API surface as the sync version. Use it when your application already runs on an async event loop — FastAPI, aiohttp, or any asyncio-based service.

“Build a reusable Zerobus client class in Python with automatic retry and reconnection.”

import time
import logging
from typing import Optional
from zerobus.sdk.sync import ZerobusSdk
from zerobus.sdk.shared import (
RecordType, AckCallback, StreamConfigurationOptions, TableProperties,
)
logger = logging.getLogger(__name__)
class ZerobusClient:
def __init__(self, server_endpoint, workspace_url, table_name,
client_id, client_secret, record_type=RecordType.JSON,
ack_callback=None, proto_descriptor=None):
self.sdk = ZerobusSdk(server_endpoint, workspace_url)
self.table_name = table_name
self.client_id = client_id
self.client_secret = client_secret
self.record_type = record_type
self.ack_callback = ack_callback
self.proto_descriptor = proto_descriptor
self.stream = None
def init_stream(self):
options = StreamConfigurationOptions(
record_type=self.record_type, ack_callback=self.ack_callback
)
if self.record_type == RecordType.PROTO and self.proto_descriptor:
table_props = TableProperties(self.table_name, self.proto_descriptor)
else:
table_props = TableProperties(self.table_name)
self.stream = self.sdk.create_stream(
self.client_id, self.client_secret, table_props, options
)
def ingest(self, payload, max_retries=3):
for attempt in range(max_retries):
try:
if self.stream is None:
self.init_stream()
offset = self.stream.ingest_record_offset(payload)
self.stream.wait_for_offset(offset)
return True
except Exception as e:
logger.warning("Attempt %d/%d failed: %s", attempt + 1, max_retries, e)
if "closed" in str(e).lower() or "connection" in str(e).lower():
self.close()
self.init_stream()
if attempt < max_retries - 1:
time.sleep(2 ** attempt)
return False
def close(self):
if self.stream:
self.stream.close()
self.stream = None
def __enter__(self):
self.init_stream()
return self
def __exit__(self, *args):
if self.stream:
self.stream.flush()
self.close()

This class handles the two failure modes you’ll hit in production: transient connection drops (retry with backoff) and stream closures during server maintenance (reinitialize the stream). The context manager ensures flush() and close() happen even if your code raises.

  • Timestamps must be epoch microseconds, not strings — Zerobus expects int64 for TIMESTAMP columns. Pass int(time.time() * 1_000_000), not datetime.isoformat(). This is the single most common ingestion failure.
  • wait_for_offset on every record kills throughput — blocking per record caps you well below the 15,000 rows/s limit. For high-volume producers, switch to ingest_record_nowait with an AckCallback or periodic flush().
  • Stream objects are not thread-safe — create one stream per thread, or serialize access. Sharing a stream across threads causes intermittent gRPC errors that are difficult to diagnose.
  • JSON serialization requires json.dumps for the raw SDK — the ZerobusClient wrapper accepts dicts, but if you call stream.ingest_record() directly, you must pass the JSON-encoded string, not a Python dict.
  • Forgetting flush() before close() — records buffered in the SDK are lost if you close without flushing. The reusable client class handles this, but raw stream usage requires explicit flush() calls.