Skip to content

Python SDK

The databricks-sdk package gives you programmatic access to every Databricks workspace API from any Python environment. You can automate cluster management, create and trigger jobs, execute SQL against warehouses, manage Unity Catalog objects, query model serving endpoints, and build applications that integrate with the platform. Your AI coding assistant uses this skill to generate SDK code that handles authentication, pagination, long-running operations, and error handling correctly.

“Write a Python script that authenticates with a named profile, lists all running clusters, and stops any that have been idle for more than 2 hours.”

from datetime import datetime, timezone
from databricks.sdk import WorkspaceClient
w = WorkspaceClient(profile="my-workspace")
# Verify connection
me = w.current_user.me()
print(f"Authenticated as: {me.user_name}")
# Find and stop idle clusters
for cluster in w.clusters.list():
if cluster.state.value != "RUNNING":
continue
idle_minutes = (
datetime.now(timezone.utc) - cluster.last_activity_time
).total_seconds() / 60
if idle_minutes > 120:
print(f"Stopping {cluster.cluster_name} (idle {idle_minutes:.0f}m)")
w.clusters.stop(cluster_id=cluster.cluster_id)

Key decisions:

  • Named profile authentication (profile="my-workspace") reads credentials from ~/.databrickscfg — cleaner than environment variables when working with multiple workspaces
  • w.clusters.list() returns an iterator that handles pagination automatically, so you get all clusters without managing page tokens
  • w.clusters.stop() returns a Wait object; the stop is initiated but not blocking. Call .result() if you need to wait for the cluster to fully terminate

“Run a parameterized SQL query against a warehouse and process the results, in Python.”

from databricks.sdk import WorkspaceClient
from databricks.sdk.service.sql import (
StatementState, StatementParameterListItem,
)
w = WorkspaceClient()
response = w.statement_execution.execute_statement(
warehouse_id="abc123",
statement="""
SELECT order_id, customer_id, total
FROM main.sales.orders
WHERE order_date >= :start_date
AND total > :min_total
""",
parameters=[
StatementParameterListItem(
name="start_date", value="2024-01-01", type="DATE"
),
StatementParameterListItem(
name="min_total", value="100.0", type="DECIMAL"
),
],
wait_timeout="30s",
)
if response.status.state == StatementState.SUCCEEDED:
columns = [col.name for col in response.manifest.schema.columns]
print(f"Columns: {columns}")
for row in response.result.data_array:
print(row)

Parameterized queries with :name placeholders prevent SQL injection and handle type casting for you. Always set wait_timeout — without it, the call returns immediately and you have to poll for results.

“Create a job with a notebook task on an ephemeral cluster and wait for it to finish, in Python.”

from datetime import timedelta
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.compute import AutoScale
from databricks.sdk.service.jobs import (
Task, NotebookTask, JobCluster, ClusterSpec,
)
w = WorkspaceClient()
spark_version = w.clusters.select_spark_version(
latest=True, long_term_support=True
)
node_type = w.clusters.select_node_type(
local_disk=True, min_memory_gb=16
)
job = w.jobs.create(
name="one-time-etl",
job_clusters=[
JobCluster(
job_cluster_key="etl",
new_cluster=ClusterSpec(
spark_version=spark_version,
node_type_id=node_type,
autoscale=AutoScale(min_workers=1, max_workers=4),
),
),
],
tasks=[
Task(
task_key="main",
job_cluster_key="etl",
notebook_task=NotebookTask(
notebook_path="/Workspace/etl/transform",
base_parameters={"date": "2024-01-15"},
),
),
],
)
run = w.jobs.run_now_and_wait(
job_id=job.job_id,
timeout=timedelta(hours=1),
)
print(f"Run {run.run_id}: {run.state.result_state}")

select_spark_version() and select_node_type() pick the best runtime and instance type programmatically — you don’t need to hardcode version strings that go stale.

“List all tables in a schema and print their column details, in Python.”

from databricks.sdk import WorkspaceClient
w = WorkspaceClient()
for table in w.tables.list(
catalog_name="main", schema_name="sales"
):
print(f"\n{table.full_name} ({table.table_type})")
detail = w.tables.get(full_name=table.full_name)
for col in detail.columns:
print(f" {col.name}: {col.type_name} (nullable: {col.nullable})")

The list() and get() pattern works across all catalog objects — catalogs, schemas, tables, volumes. Iterators handle pagination transparently.

“Send a chat completion request to an LLM serving endpoint and print the response, in Python.”

from databricks.sdk import WorkspaceClient
w = WorkspaceClient()
response = w.serving_endpoints.query(
name="my-llm-endpoint",
messages=[
{"role": "system", "content": "You are a helpful data analyst."},
{"role": "user", "content": "Summarize Q4 revenue trends."},
],
max_tokens=200,
)
print(response.choices[0].message.content)

LLM endpoints accept OpenAI-compatible chat messages. For tighter integration, use w.serving_endpoints.get_open_ai_client() to get an OpenAI client pre-configured with your workspace credentials.

“Upload a local CSV file to a Unity Catalog volume and verify it landed, in Python.”

from databricks.sdk import WorkspaceClient
w = WorkspaceClient()
# Upload
w.files.upload(
file_path="/Volumes/main/raw/uploads/data.csv",
contents=open("local_data.csv", "rb"),
overwrite=True,
)
# Verify
for entry in w.files.list_directory_contents(
"/Volumes/main/raw/uploads/"
):
print(f"{entry.name}: {'dir' if entry.is_directory else 'file'}")

w.files.upload() writes directly to Unity Catalog volumes. Use overwrite=True for idempotent uploads in automated workflows.

“Use the SDK safely inside a FastAPI endpoint without blocking the event loop, in Python.”

import asyncio
from databricks.sdk import WorkspaceClient
from fastapi import FastAPI
app = FastAPI()
w = WorkspaceClient()
@app.get("/tables")
async def list_tables():
tables = await asyncio.to_thread(
lambda: list(w.tables.list(
catalog_name="main", schema_name="default"
))
)
return [{"name": t.full_name, "type": t.table_type.value} for t in tables]

The Databricks SDK is synchronous. In async frameworks like FastAPI, wrap every SDK call in asyncio.to_thread() to prevent blocking the event loop. Property access like w.config.host is safe without wrapping.

  • Blocking the event loop in async apps — Every SDK method is synchronous and makes HTTP calls. In FastAPI or asyncio contexts, a single w.tables.list() call blocks the entire event loop. Always use asyncio.to_thread().
  • Ignoring Wait objects on long-running operationsw.clusters.create() returns a Wait, not a ready cluster. Either use create_and_wait() for blocking calls or call .result() on the Wait object with a timeout.
  • Catching generic exceptions instead of SDK-specific ones — Import NotFound, PermissionDenied, and ResourceAlreadyExists from databricks.sdk.errors for precise error handling. A bare except Exception swallows useful error context.
  • Hardcoding Spark versions and node types — Spark version strings change with every runtime release. Use w.clusters.select_spark_version() and w.clusters.select_node_type() to resolve them dynamically.
  • Using DATABRICKS_PROFILE instead of DATABRICKS_CONFIG_PROFILE — The SDK expects DATABRICKS_CONFIG_PROFILE for environment-based profile selection. The wrong variable name silently falls through to default credentials.