Zerobus Ingest
Skill: databricks-zerobus-ingest
What You Can Build
Section titled “What You Can Build”Direct, low-latency data ingestion into Delta tables without standing up Kafka, Kinesis, or Event Hubs. Zerobus Ingest is a serverless gRPC connector that accepts records from any language SDK, validates them against the target table schema, and sends durability acknowledgments back to your client. Your AI coding assistant can generate the full producer — SDK setup, stream creation, record serialization, ACK handling, and retry logic — from a description of what you need to ingest and where it should land.
In Action
Section titled “In Action”“Build a Python producer that streams IoT sensor readings into my catalog.iot.sensor_events Delta table using JSON serialization.”
import jsonimport timefrom zerobus.sdk.sync import ZerobusSdkfrom zerobus.sdk.shared import RecordType, StreamConfigurationOptions, TableProperties
SERVER_ENDPOINT = "zerobus.us-west-2.cloud.databricks.com:443"WORKSPACE_URL = "https://my-workspace.cloud.databricks.com"TABLE = "catalog.iot.sensor_events"
sdk = ZerobusSdk(SERVER_ENDPOINT, WORKSPACE_URL)options = StreamConfigurationOptions(record_type=RecordType.JSON)table_props = TableProperties(TABLE)
stream = sdk.create_stream(client_id, client_secret, table_props, options)try: for i in range(1000): record = \{ "device_id": f"sensor-\{i % 10\}", "temperature": 20.0 + (i % 15), "humidity": 40 + (i % 30), "event_ts": int(time.time() * 1_000_000), # microseconds \} stream.ingest_record(json.dumps(record))
if i % 100 == 0: stream.flush() # durability checkpoint every 100 records stream.flush()finally: stream.close()Key decisions:
- JSON serialization for prototyping — gets you ingesting in minutes. Switch to Protobuf for production workloads where type safety and throughput matter.
- Timestamps as Unix microseconds — Zerobus requires integer timestamps, not ISO strings. Databricks Delta stores timestamps at microsecond precision, so
int(time.time() * 1_000_000)is the correct format. - Periodic flush every 100 records —
flush()blocks until the server confirms durability. Flushing too often kills throughput; never flushing risks losing a large batch on failure. 100 records is a reasonable starting point for most workloads. try/finallyfor stream cleanup — a stream holds server-side resources. Always close it, even on error, to avoid resource leaks and connection exhaustion.- Service principal auth — Zerobus uses OAuth via
client_id/client_secret. The service principal needs explicitMODIFYandSELECTgrants on the target table; schema-level inherited permissions are not sufficient.
More Patterns
Section titled “More Patterns”Production Protobuf producer
Section titled “Production Protobuf producer”“Generate a Protobuf schema from my Unity Catalog table and build a type-safe producer.”
# Step 1: Generate .proto from UC table schemafrom databricks.sdk import WorkspaceClient
w = WorkspaceClient()table = w.tables.get("catalog.iot.sensor_events")
proto_lines = ['syntax = "proto3";', "", "message SensorEvent \{"]for i, col in enumerate(table.columns, start=1): type_map = \{ "STRING": "string", "LONG": "int64", "INT": "int32", "DOUBLE": "double", "BOOLEAN": "bool", "TIMESTAMP": "int64", \} proto_type = type_map.get(col.type_text.upper(), "string") proto_lines.append(f" \{proto_type\} \{col.name\} = \{i\};")proto_lines.append("\}")
with open("sensor_event.proto", "w") as f: f.write("\n".join(proto_lines))# Step 2: Compile and use in producer# Run: python -m grpc_tools.protoc -I. --python_out=. sensor_event.proto
from sensor_event_pb2 import SensorEventfrom zerobus.sdk.sync import ZerobusSdkfrom zerobus.sdk.shared import RecordType, StreamConfigurationOptions, TableProperties
sdk = ZerobusSdk(SERVER_ENDPOINT, WORKSPACE_URL)options = StreamConfigurationOptions(record_type=RecordType.PROTOBUF)table_props = TableProperties(TABLE)
stream = sdk.create_stream(client_id, client_secret, table_props, options)try: event = SensorEvent( device_id="sensor-1", temperature=22.5, humidity=55, event_ts=int(time.time() * 1_000_000), ) stream.ingest_record(event.SerializeToString()) stream.flush()finally: stream.close()Protobuf gives you compile-time schema validation, smaller wire payloads, and forward compatibility when the table schema evolves. Generate the .proto from your UC table definition so the two never drift apart.
Retry with exponential backoff
Section titled “Retry with exponential backoff”“Make my producer resilient to transient gRPC failures and stream disconnections.”
import timeimport jsonfrom zerobus.sdk.sync import ZerobusSdkfrom zerobus.sdk.shared import RecordType, StreamConfigurationOptions, TableProperties
def create_resilient_stream(sdk, client_id, client_secret, table_props, options): max_retries = 5 for attempt in range(max_retries): try: return sdk.create_stream(client_id, client_secret, table_props, options) except Exception as e: if attempt == max_retries - 1: raise wait = min(2 ** attempt, 30) print(f"Stream creation failed: \{e\}. Retrying in \{wait\}s...") time.sleep(wait)
def ingest_with_retry(records, sdk, client_id, client_secret, table_props, options): stream = create_resilient_stream(sdk, client_id, client_secret, table_props, options) pending = list(records)
while pending: try: for record in pending: stream.ingest_record(json.dumps(record)) stream.flush() pending = [] # all acknowledged except Exception as e: print(f"Ingest failed: \{e\}. Reconnecting...") stream.close() stream = create_resilient_stream( sdk, client_id, client_secret, table_props, options ) stream.close()Zerobus provides at-least-once delivery, so your retry logic can safely re-send records after a failure. Design downstream consumers to handle duplicates (idempotent merges or deduplication on a unique key).
Multi-language: Go client
Section titled “Multi-language: Go client”“Build a Go microservice that ingests events into Zerobus.”
package main
import ( "encoding/json" "fmt" "log" "time"
zerobus "github.com/databricks/zerobus-ingest-sdk-go")
func main() { sdk, err := zerobus.NewSdk(serverEndpoint, workspaceURL) if err != nil { log.Fatal(err) }
stream, err := sdk.CreateStream(clientID, clientSecret, tableName, zerobus.JSON) if err != nil { log.Fatal(err) } defer stream.Close()
for i := 0; i < 1000; i++ { record := map[string]interface{}{ "device_id": fmt.Sprintf("sensor-%d", i%10), "temperature": 20.0 + float64(i%15), "event_ts": time.Now().UnixMicro(), } data, _ := json.Marshal(record) if err := stream.IngestRecord(data); err != nil { log.Printf("ingest error: %v", err) } } stream.Flush()}Same pattern in any language: init SDK, create stream, ingest records, flush, close. The gRPC transport is language-agnostic, so pick whatever fits your service architecture.
Watch Out For
Section titled “Watch Out For”- String timestamps instead of integers — Zerobus rejects ISO-format timestamps. All timestamp fields must be Unix integer timestamps in microseconds (
int(time.time() * 1_000_000)in Python). This is the single most common integration failure. - SDK on serverless compute — The Zerobus Ingest SDK cannot be pip-installed on serverless compute. Use classic compute clusters, or use the Zerobus REST API (Beta) for notebook-based ingestion without the SDK.
- Schema-level inherited grants — Service principals need explicit
MODIFYandSELECTgrants directly on the target table. Schema-level inherited permissions trigger error 4024 (authorization_detailsfailure) even though they appear sufficient in the Unity Catalog UI. - No table auto-creation — Zerobus does not create or alter tables. The target must be a pre-existing managed Delta table in Unity Catalog with a schema that exactly matches your records. A field name mismatch or missing column causes silent record rejection.