Multi-Language Clients
Skill: databricks-zerobus-ingest
What You Can Build
Section titled “What You Can Build”A Zerobus producer in your language of choice. The SDK is available for Java, Go, TypeScript, and Rust, and every language follows the same lifecycle: initialize the SDK, create a stream, ingest records, handle acknowledgments, flush, and close. Your AI coding assistant generates the correct idioms for whichever language you specify in the prompt.
In Action
Section titled “In Action”“Build a Java microservice that sends air quality sensor data to Databricks using Zerobus with Protobuf serialization.”
import com.databricks.zerobus.*;import com.example.proto.Record.AirQuality;
public class ZerobusProducer { public static void main(String[] args) throws Exception { String serverEndpoint = System.getenv("ZEROBUS_SERVER_ENDPOINT"); String workspaceUrl = System.getenv("DATABRICKS_WORKSPACE_URL"); String tableName = System.getenv("ZEROBUS_TABLE_NAME"); String clientId = System.getenv("DATABRICKS_CLIENT_ID"); String clientSecret = System.getenv("DATABRICKS_CLIENT_SECRET");
ZerobusSdk sdk = new ZerobusSdk(serverEndpoint, workspaceUrl); TableProperties<AirQuality> tableProperties = new TableProperties<>( tableName, AirQuality.getDefaultInstance());
ZerobusStream<AirQuality> stream = sdk.createStream( tableProperties, clientId, clientSecret).join();
try { for (int i = 0; i < 100; i++) { AirQuality record = AirQuality.newBuilder() .setDeviceName("sensor-" + i) .setTemp(22) .setHumidity(55) .build(); long offset = stream.ingestRecordOffset(record); stream.waitForOffset(offset); } } finally { stream.close(); } }}Key decisions:
- Java defaults to Protobuf — pass a
getDefaultInstance()toTablePropertiesso the SDK knows the message shape createStreamreturns aCompletableFuture, so.join()blocks until the stream is ready- The
try/finallyensures the gRPC stream closes even if ingestion throws - Generate your
.protofrom the Unity Catalog table schema (see the Protobuf Schema page) to keep field mappings in sync
More Patterns
Section titled “More Patterns”Go producer with JSON serialization
Section titled “Go producer with JSON serialization”“Write a Go service that sends JSON sensor data to Databricks through Zerobus.”
package main
import ( "fmt" "log" "os"
zerobus "github.com/databricks/zerobus-go-sdk/sdk")
func main() { sdk, err := zerobus.NewZerobusSdk( os.Getenv("ZEROBUS_SERVER_ENDPOINT"), os.Getenv("DATABRICKS_WORKSPACE_URL"), ) if err != nil { log.Fatal(err) } defer sdk.Free()
options := zerobus.DefaultStreamConfigurationOptions() options.RecordType = zerobus.RecordTypeJson
stream, err := sdk.CreateStream( zerobus.TableProperties{TableName: os.Getenv("ZEROBUS_TABLE_NAME")}, os.Getenv("DATABRICKS_CLIENT_ID"), os.Getenv("DATABRICKS_CLIENT_SECRET"), options, ) if err != nil { log.Fatal(err) } defer stream.Close()
for i := 0; i < 100; i++ { record := fmt.Sprintf( `{"device_name": "sensor-%d", "temp": 22, "humidity": 55}`, i, ) offset, err := stream.IngestRecordOffset(record) if err != nil { log.Printf("Ingest failed for record %d: %v", i, err) continue } stream.WaitForOffset(offset) } stream.Flush()}Go’s SDK uses defer for cleanup, which pairs naturally with the stream lifecycle. Notice sdk.Free() releases the underlying native resources — skip it and you’ll leak memory in long-running services. JSON records are raw strings, not structs, so you format them directly.
TypeScript producer for Node.js
Section titled “TypeScript producer for Node.js”“Create a TypeScript Zerobus producer that ingests JSON events from a Node.js application.”
import { ZerobusSdk, RecordType } from "@databricks/zerobus-ingest-sdk";
const sdk = new ZerobusSdk( process.env.ZEROBUS_SERVER_ENDPOINT!, process.env.DATABRICKS_WORKSPACE_URL!,);
const stream = await sdk.createStream( { tableName: process.env.ZEROBUS_TABLE_NAME! }, process.env.DATABRICKS_CLIENT_ID!, process.env.DATABRICKS_CLIENT_SECRET!, { recordType: RecordType.Json },);
try { for (let i = 0; i < 100; i++) { const offset = await stream.ingestRecordOffset({ device_name: `sensor-${i}`, temp: 22, humidity: 55, }); await stream.waitForOffset(offset); } await stream.flush();} finally { await stream.close();}TypeScript’s native async/await maps directly to the SDK’s Promise-based API. Every method that touches the network — createStream, ingestRecordOffset, waitForOffset, flush, close — returns a Promise, so you await each one in sequence.
TypeScript producer with retry logic
Section titled “TypeScript producer with retry logic”“Add retry and exponential backoff to my TypeScript Zerobus producer.”
async function ingestWithRetry( stream: any, record: Record<string, unknown>, maxRetries = 3,): Promise<boolean> { for (let attempt = 0; attempt < maxRetries; attempt++) { try { const offset = await stream.ingestRecordOffset(record); await stream.waitForOffset(offset); return true; } catch (error) { console.warn(`Attempt ${attempt + 1}/${maxRetries} failed:`, error); if (attempt < maxRetries - 1) { await new Promise((r) => setTimeout(r, 2 ** attempt * 1000)); } } } return false;}This wraps the ingest-and-wait cycle with exponential backoff. For production, you’d also reinitialize the stream on connection errors (the same pattern as the Python reusable client class on the Python Client page).
Rust producer with Tokio async runtime
Section titled “Rust producer with Tokio async runtime”“Build a Rust application that ingests JSON data into Databricks via Zerobus.”
use databricks_zerobus_ingest_sdk::{ RecordType, StreamConfigurationOptions, TableProperties, ZerobusSdk,};use std::env;use std::error::Error;
#[tokio::main]async fn main() -> Result<(), Box<dyn Error>> { let sdk = ZerobusSdk::new( env::var("ZEROBUS_SERVER_ENDPOINT")?, env::var("DATABRICKS_WORKSPACE_URL")?, )?;
let table_properties = TableProperties { table_name: env::var("ZEROBUS_TABLE_NAME")?, descriptor_proto: None, }; let options = StreamConfigurationOptions { record_type: RecordType::Json, ..Default::default() };
let mut stream = sdk .create_stream( table_properties, env::var("DATABRICKS_CLIENT_ID")?, env::var("DATABRICKS_CLIENT_SECRET")?, Some(options), ) .await?;
for i in 0..100 { let record = format!( r#"{{"device_name": "sensor-{}", "temp": 22, "humidity": 55}}"#, i ); let offset = stream.ingest_record_offset(record.into_bytes()).await?; stream.wait_for_offset(offset).await?; }
stream.close().await?; Ok(())}Rust’s SDK runs on Tokio. Records are byte vectors, so you serialize to JSON manually with format! and call .into_bytes(). For Protobuf, set descriptor_proto: Some(proto_bytes) in TableProperties and pass prost-encoded messages instead.
Watch Out For
Section titled “Watch Out For”- Java defaults to Protobuf, everything else defaults to JSON — if you forget to set
RecordTypein a non-Java client, you’ll get JSON. If you forget the proto descriptor in Java, you’ll get a confusing type error at stream creation. - Go requires
sdk.Free()to release native resources — the Go SDK wraps a native library via CGo. Missing thedefer sdk.Free()call leaks memory in long-running processes. - Rust records are byte vectors, not strings —
ingest_record_offsetexpectsVec<u8>. Use.into_bytes()on your JSON string or.encode_to_vec()on your prost message. - All languages share the same server-side limits — 100 MB/s and 15,000 rows/s per stream, 10 MB max message size, 2,000 column max. These are stream-level caps regardless of client language.
- Async APIs vary by language — Java uses
CompletableFuture, Go uses goroutines, TypeScript uses native Promises, Rust uses Tokio futures. Match your concurrency model to your runtime.