Skip to content

End-to-End RAG

Skill: databricks-vector-search

You can wire a Delta table full of documents into a production RAG pipeline — semantic search, hybrid search, metadata filtering, and agent integration — without leaving the Databricks SDK. The full lifecycle runs from source table creation through querying and into an agent’s retrieval tool.

“Build a complete RAG pipeline: create a knowledge base table, index it with Vector Search, and query it with hybrid search. Use Python.”

from databricks.sdk import WorkspaceClient
w = WorkspaceClient()
# Step 1: Create the source table (or use execute_sql via MCP)
# CREATE TABLE catalog.schema.knowledge_base (
# doc_id STRING, title STRING, content STRING,
# category STRING, updated_at TIMESTAMP DEFAULT current_timestamp()
# );
# Step 2: Create endpoint + index
w.vector_search_endpoints.create_endpoint(
name="rag-endpoint", endpoint_type="STORAGE_OPTIMIZED"
)
w.vector_search_indexes.create_index(
name="catalog.schema.kb_index",
endpoint_name="rag-endpoint",
primary_key="doc_id",
index_type="DELTA_SYNC",
delta_sync_index_spec={
"source_table": "catalog.schema.knowledge_base",
"embedding_source_columns": [{
"name": "content",
"embedding_model_endpoint_name": "databricks-gte-large-en"
}],
"pipeline_type": "TRIGGERED",
"columns_to_sync": ["doc_id", "title", "content", "category"]
}
)
# Step 3: Sync and query
w.vector_search_indexes.sync_index(index_name="catalog.schema.kb_index")
results = w.vector_search_indexes.query_index(
index_name="catalog.schema.kb_index",
columns=["doc_id", "title", "content", "category"],
query_text="How do I govern my data?",
query_type="HYBRID",
num_results=5
)

Key decisions:

  • STORAGE_OPTIMIZED is the default choice — 20x faster indexing, lower cost, and SQL-like filter syntax
  • query_type="HYBRID" combines semantic similarity with keyword matching, which handles both conceptual and exact-term queries
  • columns_to_sync is set explicitly to avoid indexing columns you won’t use in results
  • Managed embeddings remove the need to run your own embedding pipeline

“Query my vector index for documents similar to a natural language question. Use Python.”

results = w.vector_search_indexes.query_index(
index_name="catalog.schema.kb_index",
columns=["doc_id", "title", "content"],
query_text="What is machine learning?",
num_results=5
)
for doc in results.result.data_array:
score = doc[-1] # similarity score is always the last element
print(f"Score: {score:.3f} | {doc[1]}: {doc[2][:80]}...")

The similarity score is appended as the last element in each result row, regardless of which columns you requested. Higher scores mean closer semantic match.

“Search only governance-related documents using the Storage-Optimized endpoint’s SQL filter syntax. Use Python.”

# Storage-Optimized endpoints use filter_string (SQL syntax)
results = w.vector_search_indexes.query_index(
index_name="catalog.schema.kb_index",
columns=["doc_id", "title", "content"],
query_text="data access controls",
num_results=5,
filter_string="category = 'governance' AND updated_at >= '2024-01-01'"
)
# Standard endpoints use filters_json (dictionary format) instead:
# filters_json='{"category": "governance"}'

The filter syntax depends on which endpoint type you created. This is the most common source of “filter not working” issues — always match the syntax to your endpoint.

“Build a ChatAgent that retrieves context from Vector Search before generating a response. Use Python.”

from databricks.agents import ChatAgent
from databricks.sdk import WorkspaceClient
class RAGAgent(ChatAgent):
def __init__(self):
self.w = WorkspaceClient()
def predict(self, messages, context=None):
query = messages[-1].content
results = self.w.vector_search_indexes.query_index(
index_name="catalog.schema.kb_index",
columns=["title", "content"],
query_text=query,
num_results=3,
)
context_docs = "\n\n".join(
f"**{row[0]}**: {row[1]}"
for row in results.result.data_array
)
response = self.w.serving_endpoints.query(
name="databricks-meta-llama-3-3-70b-instruct",
messages=[
{"role": "system", "content": f"Answer using this context:\n{context_docs}"},
{"role": "user", "content": query},
],
)
return {"content": response.choices[0].message.content}

The agent queries the index at inference time, builds a context string from the top results, and passes it to the LLM as a system message. For production deployments, use VectorSearchRetrieverTool with LangGraph for built-in tracing and tool-calling support.

“I added new documents to my source table. Sync the index and verify it picked them up.”

# Trigger sync after inserting new rows
w.vector_search_indexes.sync_index(index_name="catalog.schema.kb_index")
# Check index status -- wait for ONLINE state
status = w.vector_search_indexes.get_index(index_name="catalog.schema.kb_index")
print(f"State: {status.status.ready}")

For TRIGGERED pipelines, the index won’t reflect new or deleted rows until you call sync_index(). Delta change data feed handles deletions automatically — just delete from the source table and sync.

  • Index stuck in PROVISIONING — the endpoint itself might still be creating. Check get_endpoint status before investigating the index.
  • Query returns no results after insert — for TRIGGERED pipelines, you must call sync_index() and wait for the sync to complete before querying.
  • “Column not found in index” — the column must be listed in columns_to_sync at index creation time. You can’t add columns after the fact; recreate the index.
  • Mixing up filters_json and filter_string — Standard endpoints use the dictionary-based filters_json, Storage-Optimized use SQL-like filter_string. The error message when you use the wrong one is not obvious.
  • Stale results after table update — if you’re on TRIGGERED and forget to sync, you’ll query old data without any warning.