Skip to content

Lakebase Connectivity

Skill: databricks-app-python

You can add low-latency transactional storage to your Databricks App using Lakebase, a managed PostgreSQL service. While SQL warehouses handle analytical queries on Delta tables, Lakebase handles the CRUD workloads that apps need — user sessions, form submissions, app configuration, and any data your app writes and reads at sub-second latency.

“Using Python and psycopg2, connect a Databricks App to Lakebase using the auto-injected environment variables.”

import os
import psycopg2
conn = psycopg2.connect(
host=os.getenv("PGHOST"),
database=os.getenv("PGDATABASE"),
user=os.getenv("PGUSER"),
password=os.getenv("PGPASSWORD"),
port=os.getenv("PGPORT", "5432"),
)
with conn.cursor() as cur:
cur.execute("SELECT * FROM app_data.user_sessions LIMIT 10")
rows = cur.fetchall()
conn.close()

Key decisions:

  • Lakebase auto-injects PGHOST, PGDATABASE, PGUSER, PGPASSWORD, and PGPORT when you add it as an app resource
  • You connect with standard PostgreSQL drivers — no Databricks-specific library needed
  • Use Lakebase for transactional CRUD (sessions, config, user-generated content); use a SQL warehouse for analytical queries on Delta tables
  • psycopg2-binary and asyncpg are NOT pre-installed — you must add them to requirements.txt

“Using Python and asyncpg, connect to Lakebase for async request handling.”

import os
import asyncpg
async def get_data():
conn = await asyncpg.connect(
host=os.getenv("PGHOST"),
database=os.getenv("PGDATABASE"),
user=os.getenv("PGUSER"),
password=os.getenv("PGPASSWORD"),
port=int(os.getenv("PGPORT", "5432")),
)
rows = await conn.fetch("SELECT * FROM app_data.events LIMIT 10")
await conn.close()
return rows

Use asyncpg when your app framework supports async (FastAPI, Gradio). It provides higher throughput under concurrent load than psycopg2. Add asyncpg to requirements.txt.

“Using Python and SQLAlchemy, set up a Lakebase connection engine.”

import os
from sqlalchemy import create_engine
DATABASE_URL = (
f"postgresql://{os.getenv('PGUSER')}:{os.getenv('PGPASSWORD')}"
f"@{os.getenv('PGHOST')}:{os.getenv('PGPORT', '5432')}"
f"/{os.getenv('PGDATABASE')}"
)
engine = create_engine(DATABASE_URL)

SQLAlchemy works when you want an ORM layer or need to integrate with frameworks that expect an engine object (Alembic migrations, Flask-SQLAlchemy).

“Using Python and Streamlit, cache a Lakebase connection to avoid exhausting the pool.”

import os
import streamlit as st
import psycopg2
@st.cache_resource
def get_db_connection():
return psycopg2.connect(
host=os.getenv("PGHOST"),
database=os.getenv("PGDATABASE"),
user=os.getenv("PGUSER"),
password=os.getenv("PGPASSWORD"),
)
conn = get_db_connection()

Without @st.cache_resource, Streamlit creates a new connection on every script rerun. That exhausts the Lakebase connection pool within minutes under normal usage.

  • Missing psycopg2-binary in requirements.txt — this is the most common cause of Lakebase app crashes. Neither psycopg2 nor asyncpg is pre-installed in the Apps runtime. Your app will fail at startup with an ImportError if you forget to add the driver.
  • Using Lakebase for analytical queries — Lakebase is a PostgreSQL transactional database, not a columnar analytics engine. For aggregations over large Delta tables, use a SQL warehouse. Lakebase is for the app layer: sessions, form data, CRUD operations.
  • Creating connections inside request handlers — opening a new connection per request adds latency and risks hitting connection limits. Use connection pooling (SQLAlchemy engine) or framework-level caching (@st.cache_resource for Streamlit).
  • Assuming the public schema works — the Lakebase service principal often cannot access the default public schema. Create a custom schema (e.g., app_data) at startup and use it for all tables.