Skip to content

Multi-Language Clients

Skill: databricks-zerobus-ingest

A Zerobus producer in your language of choice. The SDK is available for Java, Go, TypeScript, and Rust, and every language follows the same lifecycle: initialize the SDK, create a stream, ingest records, handle acknowledgments, flush, and close. Your AI coding assistant generates the correct idioms for whichever language you specify in the prompt.

“Build a Java microservice that sends air quality sensor data to Databricks using Zerobus with Protobuf serialization.”

import com.databricks.zerobus.*;
import com.example.proto.Record.AirQuality;
public class ZerobusProducer {
public static void main(String[] args) throws Exception {
String serverEndpoint = System.getenv("ZEROBUS_SERVER_ENDPOINT");
String workspaceUrl = System.getenv("DATABRICKS_WORKSPACE_URL");
String tableName = System.getenv("ZEROBUS_TABLE_NAME");
String clientId = System.getenv("DATABRICKS_CLIENT_ID");
String clientSecret = System.getenv("DATABRICKS_CLIENT_SECRET");
ZerobusSdk sdk = new ZerobusSdk(serverEndpoint, workspaceUrl);
TableProperties<AirQuality> tableProperties = new TableProperties<>(
tableName, AirQuality.getDefaultInstance());
ZerobusStream<AirQuality> stream = sdk.createStream(
tableProperties, clientId, clientSecret).join();
try {
for (int i = 0; i < 100; i++) {
AirQuality record = AirQuality.newBuilder()
.setDeviceName("sensor-" + i)
.setTemp(22)
.setHumidity(55)
.build();
long offset = stream.ingestRecordOffset(record);
stream.waitForOffset(offset);
}
} finally {
stream.close();
}
}
}

Key decisions:

  • Java defaults to Protobuf — pass a getDefaultInstance() to TableProperties so the SDK knows the message shape
  • createStream returns a CompletableFuture, so .join() blocks until the stream is ready
  • The try/finally ensures the gRPC stream closes even if ingestion throws
  • Generate your .proto from the Unity Catalog table schema (see the Protobuf Schema page) to keep field mappings in sync

“Write a Go service that sends JSON sensor data to Databricks through Zerobus.”

package main
import (
"fmt"
"log"
"os"
zerobus "github.com/databricks/zerobus-go-sdk/sdk"
)
func main() {
sdk, err := zerobus.NewZerobusSdk(
os.Getenv("ZEROBUS_SERVER_ENDPOINT"),
os.Getenv("DATABRICKS_WORKSPACE_URL"),
)
if err != nil {
log.Fatal(err)
}
defer sdk.Free()
options := zerobus.DefaultStreamConfigurationOptions()
options.RecordType = zerobus.RecordTypeJson
stream, err := sdk.CreateStream(
zerobus.TableProperties{TableName: os.Getenv("ZEROBUS_TABLE_NAME")},
os.Getenv("DATABRICKS_CLIENT_ID"),
os.Getenv("DATABRICKS_CLIENT_SECRET"),
options,
)
if err != nil {
log.Fatal(err)
}
defer stream.Close()
for i := 0; i < 100; i++ {
record := fmt.Sprintf(
`{"device_name": "sensor-%d", "temp": 22, "humidity": 55}`, i,
)
offset, err := stream.IngestRecordOffset(record)
if err != nil {
log.Printf("Ingest failed for record %d: %v", i, err)
continue
}
stream.WaitForOffset(offset)
}
stream.Flush()
}

Go’s SDK uses defer for cleanup, which pairs naturally with the stream lifecycle. Notice sdk.Free() releases the underlying native resources — skip it and you’ll leak memory in long-running services. JSON records are raw strings, not structs, so you format them directly.

“Create a TypeScript Zerobus producer that ingests JSON events from a Node.js application.”

import { ZerobusSdk, RecordType } from "@databricks/zerobus-ingest-sdk";
const sdk = new ZerobusSdk(
process.env.ZEROBUS_SERVER_ENDPOINT!,
process.env.DATABRICKS_WORKSPACE_URL!,
);
const stream = await sdk.createStream(
{ tableName: process.env.ZEROBUS_TABLE_NAME! },
process.env.DATABRICKS_CLIENT_ID!,
process.env.DATABRICKS_CLIENT_SECRET!,
{ recordType: RecordType.Json },
);
try {
for (let i = 0; i < 100; i++) {
const offset = await stream.ingestRecordOffset({
device_name: `sensor-${i}`,
temp: 22,
humidity: 55,
});
await stream.waitForOffset(offset);
}
await stream.flush();
} finally {
await stream.close();
}

TypeScript’s native async/await maps directly to the SDK’s Promise-based API. Every method that touches the network — createStream, ingestRecordOffset, waitForOffset, flush, close — returns a Promise, so you await each one in sequence.

“Add retry and exponential backoff to my TypeScript Zerobus producer.”

async function ingestWithRetry(
stream: any,
record: Record<string, unknown>,
maxRetries = 3,
): Promise<boolean> {
for (let attempt = 0; attempt < maxRetries; attempt++) {
try {
const offset = await stream.ingestRecordOffset(record);
await stream.waitForOffset(offset);
return true;
} catch (error) {
console.warn(`Attempt ${attempt + 1}/${maxRetries} failed:`, error);
if (attempt < maxRetries - 1) {
await new Promise((r) => setTimeout(r, 2 ** attempt * 1000));
}
}
}
return false;
}

This wraps the ingest-and-wait cycle with exponential backoff. For production, you’d also reinitialize the stream on connection errors (the same pattern as the Python reusable client class on the Python Client page).

“Build a Rust application that ingests JSON data into Databricks via Zerobus.”

use databricks_zerobus_ingest_sdk::{
RecordType, StreamConfigurationOptions, TableProperties, ZerobusSdk,
};
use std::env;
use std::error::Error;
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let sdk = ZerobusSdk::new(
env::var("ZEROBUS_SERVER_ENDPOINT")?,
env::var("DATABRICKS_WORKSPACE_URL")?,
)?;
let table_properties = TableProperties {
table_name: env::var("ZEROBUS_TABLE_NAME")?,
descriptor_proto: None,
};
let options = StreamConfigurationOptions {
record_type: RecordType::Json,
..Default::default()
};
let mut stream = sdk
.create_stream(
table_properties,
env::var("DATABRICKS_CLIENT_ID")?,
env::var("DATABRICKS_CLIENT_SECRET")?,
Some(options),
)
.await?;
for i in 0..100 {
let record = format!(
r#"{{"device_name": "sensor-{}", "temp": 22, "humidity": 55}}"#, i
);
let offset = stream.ingest_record_offset(record.into_bytes()).await?;
stream.wait_for_offset(offset).await?;
}
stream.close().await?;
Ok(())
}

Rust’s SDK runs on Tokio. Records are byte vectors, so you serialize to JSON manually with format! and call .into_bytes(). For Protobuf, set descriptor_proto: Some(proto_bytes) in TableProperties and pass prost-encoded messages instead.

  • Java defaults to Protobuf, everything else defaults to JSON — if you forget to set RecordType in a non-Java client, you’ll get JSON. If you forget the proto descriptor in Java, you’ll get a confusing type error at stream creation.
  • Go requires sdk.Free() to release native resources — the Go SDK wraps a native library via CGo. Missing the defer sdk.Free() call leaks memory in long-running processes.
  • Rust records are byte vectors, not stringsingest_record_offset expects Vec<u8>. Use .into_bytes() on your JSON string or .encode_to_vec() on your prost message.
  • All languages share the same server-side limits — 100 MB/s and 15,000 rows/s per stream, 10 MB max message size, 2,000 column max. These are stream-level caps regardless of client language.
  • Async APIs vary by language — Java uses CompletableFuture, Go uses goroutines, TypeScript uses native Promises, Rust uses Tokio futures. Match your concurrency model to your runtime.