Skip to content

Zerobus Ingest

Skill: databricks-zerobus-ingest

Direct, low-latency data ingestion into Delta tables without standing up Kafka, Kinesis, or Event Hubs. Zerobus Ingest is a serverless gRPC connector that accepts records from any language SDK, validates them against the target table schema, and sends durability acknowledgments back to your client. Your AI coding assistant can generate the full producer — SDK setup, stream creation, record serialization, ACK handling, and retry logic — from a description of what you need to ingest and where it should land.

“Build a Python producer that streams IoT sensor readings into my catalog.iot.sensor_events Delta table using JSON serialization.”

import json
import time
from zerobus.sdk.sync import ZerobusSdk
from zerobus.sdk.shared import RecordType, StreamConfigurationOptions, TableProperties
SERVER_ENDPOINT = "zerobus.us-west-2.cloud.databricks.com:443"
WORKSPACE_URL = "https://my-workspace.cloud.databricks.com"
TABLE = "catalog.iot.sensor_events"
sdk = ZerobusSdk(SERVER_ENDPOINT, WORKSPACE_URL)
options = StreamConfigurationOptions(record_type=RecordType.JSON)
table_props = TableProperties(TABLE)
stream = sdk.create_stream(client_id, client_secret, table_props, options)
try:
for i in range(1000):
record = \{
"device_id": f"sensor-\{i % 10\}",
"temperature": 20.0 + (i % 15),
"humidity": 40 + (i % 30),
"event_ts": int(time.time() * 1_000_000), # microseconds
\}
stream.ingest_record(json.dumps(record))
if i % 100 == 0:
stream.flush() # durability checkpoint every 100 records
stream.flush()
finally:
stream.close()

Key decisions:

  • JSON serialization for prototyping — gets you ingesting in minutes. Switch to Protobuf for production workloads where type safety and throughput matter.
  • Timestamps as Unix microseconds — Zerobus requires integer timestamps, not ISO strings. Databricks Delta stores timestamps at microsecond precision, so int(time.time() * 1_000_000) is the correct format.
  • Periodic flush every 100 recordsflush() blocks until the server confirms durability. Flushing too often kills throughput; never flushing risks losing a large batch on failure. 100 records is a reasonable starting point for most workloads.
  • try/finally for stream cleanup — a stream holds server-side resources. Always close it, even on error, to avoid resource leaks and connection exhaustion.
  • Service principal auth — Zerobus uses OAuth via client_id/client_secret. The service principal needs explicit MODIFY and SELECT grants on the target table; schema-level inherited permissions are not sufficient.

“Generate a Protobuf schema from my Unity Catalog table and build a type-safe producer.”

# Step 1: Generate .proto from UC table schema
from databricks.sdk import WorkspaceClient
w = WorkspaceClient()
table = w.tables.get("catalog.iot.sensor_events")
proto_lines = ['syntax = "proto3";', "", "message SensorEvent \{"]
for i, col in enumerate(table.columns, start=1):
type_map = \{
"STRING": "string", "LONG": "int64", "INT": "int32",
"DOUBLE": "double", "BOOLEAN": "bool", "TIMESTAMP": "int64",
\}
proto_type = type_map.get(col.type_text.upper(), "string")
proto_lines.append(f" \{proto_type\} \{col.name\} = \{i\};")
proto_lines.append("\}")
with open("sensor_event.proto", "w") as f:
f.write("\n".join(proto_lines))
# Step 2: Compile and use in producer
# Run: python -m grpc_tools.protoc -I. --python_out=. sensor_event.proto
from sensor_event_pb2 import SensorEvent
from zerobus.sdk.sync import ZerobusSdk
from zerobus.sdk.shared import RecordType, StreamConfigurationOptions, TableProperties
sdk = ZerobusSdk(SERVER_ENDPOINT, WORKSPACE_URL)
options = StreamConfigurationOptions(record_type=RecordType.PROTOBUF)
table_props = TableProperties(TABLE)
stream = sdk.create_stream(client_id, client_secret, table_props, options)
try:
event = SensorEvent(
device_id="sensor-1",
temperature=22.5,
humidity=55,
event_ts=int(time.time() * 1_000_000),
)
stream.ingest_record(event.SerializeToString())
stream.flush()
finally:
stream.close()

Protobuf gives you compile-time schema validation, smaller wire payloads, and forward compatibility when the table schema evolves. Generate the .proto from your UC table definition so the two never drift apart.

“Make my producer resilient to transient gRPC failures and stream disconnections.”

import time
import json
from zerobus.sdk.sync import ZerobusSdk
from zerobus.sdk.shared import RecordType, StreamConfigurationOptions, TableProperties
def create_resilient_stream(sdk, client_id, client_secret, table_props, options):
max_retries = 5
for attempt in range(max_retries):
try:
return sdk.create_stream(client_id, client_secret, table_props, options)
except Exception as e:
if attempt == max_retries - 1:
raise
wait = min(2 ** attempt, 30)
print(f"Stream creation failed: \{e\}. Retrying in \{wait\}s...")
time.sleep(wait)
def ingest_with_retry(records, sdk, client_id, client_secret, table_props, options):
stream = create_resilient_stream(sdk, client_id, client_secret, table_props, options)
pending = list(records)
while pending:
try:
for record in pending:
stream.ingest_record(json.dumps(record))
stream.flush()
pending = [] # all acknowledged
except Exception as e:
print(f"Ingest failed: \{e\}. Reconnecting...")
stream.close()
stream = create_resilient_stream(
sdk, client_id, client_secret, table_props, options
)
stream.close()

Zerobus provides at-least-once delivery, so your retry logic can safely re-send records after a failure. Design downstream consumers to handle duplicates (idempotent merges or deduplication on a unique key).

“Build a Go microservice that ingests events into Zerobus.”

package main
import (
"encoding/json"
"fmt"
"log"
"time"
zerobus "github.com/databricks/zerobus-ingest-sdk-go"
)
func main() {
sdk, err := zerobus.NewSdk(serverEndpoint, workspaceURL)
if err != nil {
log.Fatal(err)
}
stream, err := sdk.CreateStream(clientID, clientSecret, tableName, zerobus.JSON)
if err != nil {
log.Fatal(err)
}
defer stream.Close()
for i := 0; i < 1000; i++ {
record := map[string]interface{}{
"device_id": fmt.Sprintf("sensor-%d", i%10),
"temperature": 20.0 + float64(i%15),
"event_ts": time.Now().UnixMicro(),
}
data, _ := json.Marshal(record)
if err := stream.IngestRecord(data); err != nil {
log.Printf("ingest error: %v", err)
}
}
stream.Flush()
}

Same pattern in any language: init SDK, create stream, ingest records, flush, close. The gRPC transport is language-agnostic, so pick whatever fits your service architecture.

  • String timestamps instead of integers — Zerobus rejects ISO-format timestamps. All timestamp fields must be Unix integer timestamps in microseconds (int(time.time() * 1_000_000) in Python). This is the single most common integration failure.
  • SDK on serverless compute — The Zerobus Ingest SDK cannot be pip-installed on serverless compute. Use classic compute clusters, or use the Zerobus REST API (Beta) for notebook-based ingestion without the SDK.
  • Schema-level inherited grants — Service principals need explicit MODIFY and SELECT grants directly on the target table. Schema-level inherited permissions trigger error 4024 (authorization_details failure) even though they appear sufficient in the Unity Catalog UI.
  • No table auto-creation — Zerobus does not create or alter tables. The target must be a pre-existing managed Delta table in Unity Catalog with a schema that exactly matches your records. A field name mismatch or missing column causes silent record rejection.