Python Client
Skill: databricks-zerobus-ingest
What You Can Build
Section titled “What You Can Build”A Python application that pushes records directly into Delta tables over gRPC — no message bus in between. The Zerobus Python SDK gives you sync and async APIs, JSON and Protobuf serialization, and batch methods that can sustain 15,000 rows/s per stream. You’ll use this when your IoT gateway, web backend, or ETL script needs to write to the lakehouse in near real-time.
In Action
Section titled “In Action”“Send IoT sensor readings to Databricks from a Python application using Zerobus. Use Protobuf serialization for production.”
import osfrom zerobus.sdk.sync import ZerobusSdkfrom zerobus.sdk.shared import RecordType, StreamConfigurationOptions, TablePropertiesimport record_pb2
sdk = ZerobusSdk( os.environ["ZEROBUS_SERVER_ENDPOINT"], os.environ["DATABRICKS_WORKSPACE_URL"],)options = StreamConfigurationOptions(record_type=RecordType.PROTO)table_props = TableProperties( os.environ["ZEROBUS_TABLE_NAME"], record_pb2.AirQuality.DESCRIPTOR,)
stream = sdk.create_stream( os.environ["DATABRICKS_CLIENT_ID"], os.environ["DATABRICKS_CLIENT_SECRET"], table_props, options,)
try: for i in range(100): record = record_pb2.AirQuality( device_name=f"sensor-{i}", temp=22, humidity=55 ) offset = stream.ingest_record_offset(record) stream.wait_for_offset(offset)finally: stream.close()Key decisions:
- Protobuf gives you compile-time type safety and smaller payloads — use it for anything beyond a quick prototype
ingest_record_offset+wait_for_offsetblocks until the record is durably written, which is the safest starting point- The
DESCRIPTORfrom your compiledrecord_pb2module tells Zerobus how to deserialize the wire format - Always wrap ingestion in
try/finallyto ensure the stream closes and releases its gRPC connection - Environment variables keep credentials out of code — your AI coding assistant reads these automatically
More Patterns
Section titled “More Patterns”High-throughput fire-and-forget with ACK callbacks
Section titled “High-throughput fire-and-forget with ACK callbacks”“Build a Python producer that pushes thousands of records per second using Zerobus, with background acknowledgment tracking.”
from zerobus.sdk.shared import AckCallback, StreamConfigurationOptions, RecordType
class MyAckHandler(AckCallback): def on_ack(self, offset: int) -> None: print(f"Durable up to offset: {offset}")
def on_error(self, offset: int, message: str) -> None: print(f"Error at offset {offset}: {message}")
options = StreamConfigurationOptions( record_type=RecordType.JSON, ack_callback=MyAckHandler(),)stream = sdk.create_stream(client_id, client_secret, table_props, options)
try: for i in range(10_000): stream.ingest_record_nowait( {"device_name": f"sensor-{i}", "temp": 22, "humidity": 55} ) stream.flush()finally: stream.close()The AckCallback pattern decouples ingestion from durability confirmation. ingest_record_nowait enqueues records without blocking, while the callback fires asynchronously as batches are persisted. Call flush() at the end to ensure everything in the buffer gets sent.
Batch ingestion for bulk loads
Section titled “Batch ingestion for bulk loads”“Load 10,000 sensor records into Zerobus in a single batch from Python.”
records = [ {"device_name": f"sensor-{i}", "temp": 22, "humidity": 55} for i in range(10_000)]
offset = stream.ingest_records_offset(records)stream.wait_for_offset(offset)The batch methods (ingest_records_offset, ingest_records_nowait) accept a list and submit everything in one call. ingest_records_offset returns the offset of the last record, so a single wait_for_offset confirms the entire batch.
Async producer for web frameworks
Section titled “Async producer for web frameworks”“Write an async Python Zerobus producer that I can call from a FastAPI endpoint.”
import asynciofrom zerobus.sdk.aio import ZerobusSdk as AsyncZerobusSdkfrom zerobus.sdk.shared import RecordType, StreamConfigurationOptions, TableProperties
async def ingest_async(records: list[dict]) -> None: sdk = AsyncZerobusSdk(server_endpoint, workspace_url) options = StreamConfigurationOptions(record_type=RecordType.JSON) table_props = TableProperties(table_name)
stream = await sdk.create_stream( client_id, client_secret, table_props, options ) try: for record in records: offset = await stream.ingest_record_offset(record) await stream.wait_for_offset(offset) finally: await stream.close()The async SDK (zerobus.sdk.aio) has the same API surface as the sync version. Use it when your application already runs on an async event loop — FastAPI, aiohttp, or any asyncio-based service.
Reusable client class with retry logic
Section titled “Reusable client class with retry logic”“Build a reusable Zerobus client class in Python with automatic retry and reconnection.”
import timeimport loggingfrom typing import Optionalfrom zerobus.sdk.sync import ZerobusSdkfrom zerobus.sdk.shared import ( RecordType, AckCallback, StreamConfigurationOptions, TableProperties,)
logger = logging.getLogger(__name__)
class ZerobusClient: def __init__(self, server_endpoint, workspace_url, table_name, client_id, client_secret, record_type=RecordType.JSON, ack_callback=None, proto_descriptor=None): self.sdk = ZerobusSdk(server_endpoint, workspace_url) self.table_name = table_name self.client_id = client_id self.client_secret = client_secret self.record_type = record_type self.ack_callback = ack_callback self.proto_descriptor = proto_descriptor self.stream = None
def init_stream(self): options = StreamConfigurationOptions( record_type=self.record_type, ack_callback=self.ack_callback ) if self.record_type == RecordType.PROTO and self.proto_descriptor: table_props = TableProperties(self.table_name, self.proto_descriptor) else: table_props = TableProperties(self.table_name) self.stream = self.sdk.create_stream( self.client_id, self.client_secret, table_props, options )
def ingest(self, payload, max_retries=3): for attempt in range(max_retries): try: if self.stream is None: self.init_stream() offset = self.stream.ingest_record_offset(payload) self.stream.wait_for_offset(offset) return True except Exception as e: logger.warning("Attempt %d/%d failed: %s", attempt + 1, max_retries, e) if "closed" in str(e).lower() or "connection" in str(e).lower(): self.close() self.init_stream() if attempt < max_retries - 1: time.sleep(2 ** attempt) return False
def close(self): if self.stream: self.stream.close() self.stream = None
def __enter__(self): self.init_stream() return self
def __exit__(self, *args): if self.stream: self.stream.flush() self.close()This class handles the two failure modes you’ll hit in production: transient connection drops (retry with backoff) and stream closures during server maintenance (reinitialize the stream). The context manager ensures flush() and close() happen even if your code raises.
Watch Out For
Section titled “Watch Out For”- Timestamps must be epoch microseconds, not strings — Zerobus expects
int64forTIMESTAMPcolumns. Passint(time.time() * 1_000_000), notdatetime.isoformat(). This is the single most common ingestion failure. wait_for_offseton every record kills throughput — blocking per record caps you well below the 15,000 rows/s limit. For high-volume producers, switch toingest_record_nowaitwith anAckCallbackor periodicflush().- Stream objects are not thread-safe — create one stream per thread, or serialize access. Sharing a stream across threads causes intermittent gRPC errors that are difficult to diagnose.
- JSON serialization requires
json.dumpsfor the raw SDK — theZerobusClientwrapper accepts dicts, but if you callstream.ingest_record()directly, you must pass the JSON-encoded string, not a Python dict. - Forgetting
flush()beforeclose()— records buffered in the SDK are lost if you close without flushing. The reusable client class handles this, but raw stream usage requires explicitflush()calls.