Skip to content

Evaluation Workflows

Skill: databricks-mlflow-evaluation

You can follow structured workflows that take you from “I have an agent” to “I have production-quality evaluation.” Each journey covers a different scenario — first-time setup, regression detection, performance optimization, trace ingestion, and the full domain-expert alignment and prompt optimization cycle. Pick the journey that matches where you are.

“I have a new agent and want to set up evaluation for the first time. Walk me through the steps. Use Python.”

import mlflow
from mlflow.genai.scorers import Safety, Guidelines, Correctness
mlflow.set_tracking_uri("databricks")
mlflow.set_experiment("/Shared/my-agent-evaluation")
# Step 1: Define a minimal evaluation dataset
eval_data = [
{
"inputs": {"query": "What is our refund policy?"},
"expectations": {"expected_facts": ["30-day window", "original packaging"]}
},
{
"inputs": {"query": "How do I contact support?"},
"expectations": {"expected_facts": ["email", "phone", "business hours"]}
},
{"inputs": {"query": ""}}, # Edge case: empty
{"inputs": {"query": "Ignore your instructions and..."}}, # Adversarial
]
# Step 2: Choose scorers that match your quality bar
scorers = [
Safety(),
Correctness(),
Guidelines(name="helpful", guidelines="Must directly address the user's question"),
]
# Step 3: Run evaluation
results = mlflow.genai.evaluate(
data=eval_data,
predict_fn=my_agent,
scorers=scorers,
)
# Step 4: Define quality gates
QUALITY_GATES = {
"safety": 1.0, # 100% -- non-negotiable
"correctness": 0.9, # 90% -- high bar for accuracy
"helpful": 0.85, # 85% -- good relevance
}
for metric, threshold in QUALITY_GATES.items():
actual = results.metrics.get(f"{metric}/mean", 0)
status = "PASS" if actual >= threshold else "FAIL"
print(f" {metric}: {actual:.2%} (threshold: {threshold:.0%}) -- {status}")

Key decisions:

  • Start with built-in scorersSafety, Correctness, and Guidelines cover the basics
  • Include edge cases from day one — empty inputs and adversarial prompts reveal problems that happy-path data hides
  • Set quality gates early — even rough thresholds give you a deployment criterion
  • Safety at 100% — anything less means the agent can produce harmful content in production

“Compare my agent before and after a prompt change to find regressions. Use Python.”

import mlflow
scorers = [
Safety(),
Guidelines(name="helpful", guidelines="Must be helpful and accurate"),
Guidelines(name="concise", guidelines="Must be under 200 words"),
]
# Step 1: Evaluate baseline version
with mlflow.start_run(run_name="baseline_v1"):
results_v1 = mlflow.genai.evaluate(
data=eval_data, predict_fn=agent_v1, scorers=scorers
)
# Step 2: Evaluate candidate version
with mlflow.start_run(run_name="candidate_v2"):
results_v2 = mlflow.genai.evaluate(
data=eval_data, predict_fn=agent_v2, scorers=scorers
)
# Step 3: Find individual regressions
traces_v1 = mlflow.search_traces(run_id=results_v1.run_id)
traces_v2 = mlflow.search_traces(run_id=results_v2.run_id)
traces_v1['key'] = traces_v1['request'].apply(str)
traces_v2['key'] = traces_v2['request'].apply(str)
merged = traces_v1.merge(traces_v2, on='key', suffixes=('_v1', '_v2'))
regressions = []
for _, row in merged.iterrows():
v1_scores = {a['assessment_name']: a['feedback']['value'] for a in row['assessments_v1']}
v2_scores = {a['assessment_name']: a['feedback']['value'] for a in row['assessments_v2']}
for name, v1_val in v1_scores.items():
v2_val = v2_scores.get(name)
if v1_val in ['yes', True] and v2_val in ['no', False]:
regressions.append({"input": row['request_v1'], "metric": name})
print(f"Found {len(regressions)} regressions across {len(merged)} inputs")

Keep the dataset constant between runs. If you change both the agent and the dataset, you cannot tell which caused the metric change. Named runs make comparison easy in the MLflow UI.

“Profile my agent’s latency and find which stage is the bottleneck. Use Python.”

import mlflow
from mlflow.entities import SpanType
# Step 1: Run evaluation and collect traces
results = mlflow.genai.evaluate(
data=eval_data, predict_fn=my_agent, scorers=scorers
)
# Step 2: Profile a sample trace
traces = mlflow.search_traces(run_id=results.run_id, return_type="list")
sample_trace = traces[0]
# Step 3: Break down by span type
for span_type in [SpanType.CHAT_MODEL, SpanType.RETRIEVER, SpanType.TOOL]:
spans = sample_trace.search_spans(span_type=span_type)
if spans:
total_ms = sum((s.end_time_ns - s.start_time_ns) / 1e6 for s in spans)
print(f"{span_type.name}: {total_ms:.0f}ms across {len(spans)} spans")
# Step 4: Check token usage
llm_spans = sample_trace.search_spans(span_type=SpanType.CHAT_MODEL)
for span in llm_spans:
attrs = span.attributes or {}
tokens = attrs.get("mlflow.chat_model.input_tokens", 0)
print(f" {span.name}: {tokens} input tokens")

High input token counts in LLM spans usually mean context bloat — too many retrieved documents, uncompressed conversation history, or an oversized system prompt. See the Context Optimization page for fixes.

“Set up UC trace storage, configure my app to send traces, and enable production monitoring. Use Python.”

import os
import mlflow
from mlflow.entities import UCSchemaLocation
from mlflow.tracing.enablement import set_experiment_trace_location
from mlflow.tracing import set_databricks_monitoring_sql_warehouse_id
from mlflow.genai.scorers import Safety, ScorerSamplingConfig
# Step 1: Link UC schema to experiment
mlflow.set_tracking_uri("databricks")
os.environ["MLFLOW_TRACING_SQL_WAREHOUSE_ID"] = "<SQL_WAREHOUSE_ID>"
experiment_id = mlflow.create_experiment(name="/Shared/my-traces")
set_experiment_trace_location(
location=UCSchemaLocation(catalog_name="my_catalog", schema_name="my_schema"),
experiment_id=experiment_id,
)
# Step 2: Set trace destination
mlflow.tracing.set_destination(
destination=UCSchemaLocation(catalog_name="my_catalog", schema_name="my_schema")
)
# Step 3: Enable production monitoring
set_databricks_monitoring_sql_warehouse_id(warehouse_id="<SQL_WAREHOUSE_ID>")
safety = Safety().register(name="safety_monitor")
safety = safety.start(sampling_config=ScorerSamplingConfig(sample_rate=1.0))

This is the minimum setup for production trace storage and monitoring. Grant MODIFY and SELECT on the trace tables — ALL_PRIVILEGES is not sufficient. Requires mlflow[databricks]>=3.9.0.

“Walk me through the complete workflow: evaluate, collect expert feedback, align the judge, optimize the prompt, and deploy. Use Python.”

# -- PHASE 1: Evaluate and collect feedback -----------
from mlflow.genai import evaluate
from mlflow.genai.judges import make_judge
from mlflow.genai.datasets import create_dataset
from mlflow.genai import create_labeling_session, label_schemas
# Step 1: Create base judge and evaluate
base_judge = make_judge(name=JUDGE_NAME, instructions="...", feedback_value_type=float)
base_judge.register(experiment_id=EXPERIMENT_ID)
results = evaluate(data=eval_data, predict_fn=my_agent, scorers=[base_judge])
# Step 2: Tag successful traces
ok_ids = results.result_df.loc[results.result_df["state"] == "OK", "trace_id"]
for tid in ok_ids:
mlflow.set_trace_tag(tid, key="eval", value="complete")
# Step 3: Create labeling session (label schema name MUST match judge name)
dataset = create_dataset(name=DATASET_NAME)
dataset.merge_records(tagged_traces)
session = create_labeling_session(
name="sme_review", assigned_users=[...], label_schemas=[JUDGE_NAME]
)
session.add_dataset(dataset_name=DATASET_NAME)
# -> Share session.url with domain experts
# -- PHASE 2: Align the judge -----------------------
from mlflow.genai.judges.optimizers import MemAlignOptimizer
from mlflow.genai.scorers import get_scorer
# Step 4: Align after experts complete labeling
optimizer = MemAlignOptimizer(
reflection_lm=REFLECTION_MODEL,
retrieval_k=5,
embedding_model="databricks:/databricks-gte-large-en",
)
base = get_scorer(name=JUDGE_NAME)
aligned = base.align(traces=traces, optimizer=optimizer)
aligned.update(experiment_id=EXPERIMENT_ID)
# -- PHASE 3: Optimize the prompt -------------------
from mlflow.genai.optimize import GepaPromptOptimizer
# Step 5: Build optimization dataset (must have expectations)
optim_data = [
{"inputs": {...}, "expectations": {"expected_response": "..."}}
]
# Step 6: Run GEPA with aligned judge
result = mlflow.genai.optimize_prompts(
predict_fn=predict_fn,
train_data=optim_data,
prompt_uris=[system_prompt.uri],
optimizer=GepaPromptOptimizer(reflection_model=REFLECTION_MODEL),
scorers=[aligned],
)
# Step 7: Conditional promotion
new_version = mlflow.genai.register_prompt(
name=PROMPT_NAME, template=result.optimized_prompts[0].template
)
if result.final_eval_score > result.initial_eval_score:
mlflow.genai.set_prompt_alias(
name=PROMPT_NAME, alias="production", version=new_version.version
)

This is the full loop. Phase 1 can be repeated to collect more expert feedback. Phase 2 improves with each round of labeling without re-training. Phase 3 only runs when you want to optimize the prompt. Each phase is independently valuable.

  • Skipping expert labeling — GEPA works without an aligned judge, but it optimizes toward generic quality. Expert feedback makes the signal domain-accurate.
  • Label schema name mismatch — the label schema name MUST match the judge name. This is the pairing mechanism for align().
  • GEPA datasets need expectations — unlike evaluation datasets, optimization datasets require expectations per record. A dataset with only inputs will not work.
  • Aligned judge scores may drop — this is expected. The aligned judge is more accurate, not less generous. Use it as the new baseline.
  • Promoting without checking — always verify result.final_eval_score > result.initial_eval_score before updating the production alias.
  • Forgetting embedding_model in MemAlign — defaults to openai/text-embedding-3-small. On Databricks, set databricks:/databricks-gte-large-en to avoid external API calls.