Python SDK
What You Can Build
Section titled “What You Can Build”The databricks-sdk package gives you programmatic access to every Databricks workspace API from any Python environment. You can automate cluster management, create and trigger jobs, execute SQL against warehouses, manage Unity Catalog objects, query model serving endpoints, and build applications that integrate with the platform. Your AI coding assistant uses this skill to generate SDK code that handles authentication, pagination, long-running operations, and error handling correctly.
In Action
Section titled “In Action”“Write a Python script that authenticates with a named profile, lists all running clusters, and stops any that have been idle for more than 2 hours.”
from datetime import datetime, timezonefrom databricks.sdk import WorkspaceClient
w = WorkspaceClient(profile="my-workspace")
# Verify connectionme = w.current_user.me()print(f"Authenticated as: {me.user_name}")
# Find and stop idle clustersfor cluster in w.clusters.list(): if cluster.state.value != "RUNNING": continue idle_minutes = ( datetime.now(timezone.utc) - cluster.last_activity_time ).total_seconds() / 60 if idle_minutes > 120: print(f"Stopping {cluster.cluster_name} (idle {idle_minutes:.0f}m)") w.clusters.stop(cluster_id=cluster.cluster_id)Key decisions:
- Named profile authentication (
profile="my-workspace") reads credentials from~/.databrickscfg— cleaner than environment variables when working with multiple workspaces w.clusters.list()returns an iterator that handles pagination automatically, so you get all clusters without managing page tokensw.clusters.stop()returns aWaitobject; the stop is initiated but not blocking. Call.result()if you need to wait for the cluster to fully terminate
More Patterns
Section titled “More Patterns”Execute Parameterized SQL
Section titled “Execute Parameterized SQL”“Run a parameterized SQL query against a warehouse and process the results, in Python.”
from databricks.sdk import WorkspaceClientfrom databricks.sdk.service.sql import ( StatementState, StatementParameterListItem,)
w = WorkspaceClient()
response = w.statement_execution.execute_statement( warehouse_id="abc123", statement=""" SELECT order_id, customer_id, total FROM main.sales.orders WHERE order_date >= :start_date AND total > :min_total """, parameters=[ StatementParameterListItem( name="start_date", value="2024-01-01", type="DATE" ), StatementParameterListItem( name="min_total", value="100.0", type="DECIMAL" ), ], wait_timeout="30s",)
if response.status.state == StatementState.SUCCEEDED: columns = [col.name for col in response.manifest.schema.columns] print(f"Columns: {columns}") for row in response.result.data_array: print(row)Parameterized queries with :name placeholders prevent SQL injection and handle type casting for you. Always set wait_timeout — without it, the call returns immediately and you have to poll for results.
Create and Run a Job
Section titled “Create and Run a Job”“Create a job with a notebook task on an ephemeral cluster and wait for it to finish, in Python.”
from datetime import timedeltafrom databricks.sdk import WorkspaceClientfrom databricks.sdk.service.compute import AutoScalefrom databricks.sdk.service.jobs import ( Task, NotebookTask, JobCluster, ClusterSpec,)
w = WorkspaceClient()
spark_version = w.clusters.select_spark_version( latest=True, long_term_support=True)node_type = w.clusters.select_node_type( local_disk=True, min_memory_gb=16)
job = w.jobs.create( name="one-time-etl", job_clusters=[ JobCluster( job_cluster_key="etl", new_cluster=ClusterSpec( spark_version=spark_version, node_type_id=node_type, autoscale=AutoScale(min_workers=1, max_workers=4), ), ), ], tasks=[ Task( task_key="main", job_cluster_key="etl", notebook_task=NotebookTask( notebook_path="/Workspace/etl/transform", base_parameters={"date": "2024-01-15"}, ), ), ],)
run = w.jobs.run_now_and_wait( job_id=job.job_id, timeout=timedelta(hours=1),)print(f"Run {run.run_id}: {run.state.result_state}")select_spark_version() and select_node_type() pick the best runtime and instance type programmatically — you don’t need to hardcode version strings that go stale.
Manage Unity Catalog Objects
Section titled “Manage Unity Catalog Objects”“List all tables in a schema and print their column details, in Python.”
from databricks.sdk import WorkspaceClient
w = WorkspaceClient()
for table in w.tables.list( catalog_name="main", schema_name="sales"): print(f"\n{table.full_name} ({table.table_type})") detail = w.tables.get(full_name=table.full_name) for col in detail.columns: print(f" {col.name}: {col.type_name} (nullable: {col.nullable})")The list() and get() pattern works across all catalog objects — catalogs, schemas, tables, volumes. Iterators handle pagination transparently.
Query a Serving Endpoint
Section titled “Query a Serving Endpoint”“Send a chat completion request to an LLM serving endpoint and print the response, in Python.”
from databricks.sdk import WorkspaceClient
w = WorkspaceClient()
response = w.serving_endpoints.query( name="my-llm-endpoint", messages=[ {"role": "system", "content": "You are a helpful data analyst."}, {"role": "user", "content": "Summarize Q4 revenue trends."}, ], max_tokens=200,)print(response.choices[0].message.content)LLM endpoints accept OpenAI-compatible chat messages. For tighter integration, use w.serving_endpoints.get_open_ai_client() to get an OpenAI client pre-configured with your workspace credentials.
Upload Files to Volumes
Section titled “Upload Files to Volumes”“Upload a local CSV file to a Unity Catalog volume and verify it landed, in Python.”
from databricks.sdk import WorkspaceClient
w = WorkspaceClient()
# Uploadw.files.upload( file_path="/Volumes/main/raw/uploads/data.csv", contents=open("local_data.csv", "rb"), overwrite=True,)
# Verifyfor entry in w.files.list_directory_contents( "/Volumes/main/raw/uploads/"): print(f"{entry.name}: {'dir' if entry.is_directory else 'file'}")w.files.upload() writes directly to Unity Catalog volumes. Use overwrite=True for idempotent uploads in automated workflows.
Async Safety in Web Applications
Section titled “Async Safety in Web Applications”“Use the SDK safely inside a FastAPI endpoint without blocking the event loop, in Python.”
import asynciofrom databricks.sdk import WorkspaceClientfrom fastapi import FastAPI
app = FastAPI()w = WorkspaceClient()
@app.get("/tables")async def list_tables(): tables = await asyncio.to_thread( lambda: list(w.tables.list( catalog_name="main", schema_name="default" )) ) return [{"name": t.full_name, "type": t.table_type.value} for t in tables]The Databricks SDK is synchronous. In async frameworks like FastAPI, wrap every SDK call in asyncio.to_thread() to prevent blocking the event loop. Property access like w.config.host is safe without wrapping.
Watch Out For
Section titled “Watch Out For”- Blocking the event loop in async apps — Every SDK method is synchronous and makes HTTP calls. In FastAPI or asyncio contexts, a single
w.tables.list()call blocks the entire event loop. Always useasyncio.to_thread(). - Ignoring
Waitobjects on long-running operations —w.clusters.create()returns aWait, not a ready cluster. Either usecreate_and_wait()for blocking calls or call.result()on theWaitobject with a timeout. - Catching generic exceptions instead of SDK-specific ones — Import
NotFound,PermissionDenied, andResourceAlreadyExistsfromdatabricks.sdk.errorsfor precise error handling. A bareexcept Exceptionswallows useful error context. - Hardcoding Spark versions and node types — Spark version strings change with every runtime release. Use
w.clusters.select_spark_version()andw.clusters.select_node_type()to resolve them dynamically. - Using
DATABRICKS_PROFILEinstead ofDATABRICKS_CONFIG_PROFILE— The SDK expectsDATABRICKS_CONFIG_PROFILEfor environment-based profile selection. The wrong variable name silently falls through to default credentials.