Evaluation Workflows
Skill: databricks-mlflow-evaluation
What You Can Build
Section titled “What You Can Build”You can follow structured workflows that take you from “I have an agent” to “I have production-quality evaluation.” Each journey covers a different scenario — first-time setup, regression detection, performance optimization, trace ingestion, and the full domain-expert alignment and prompt optimization cycle. Pick the journey that matches where you are.
In Action
Section titled “In Action”“I have a new agent and want to set up evaluation for the first time. Walk me through the steps. Use Python.”
import mlflowfrom mlflow.genai.scorers import Safety, Guidelines, Correctness
mlflow.set_tracking_uri("databricks")mlflow.set_experiment("/Shared/my-agent-evaluation")
# Step 1: Define a minimal evaluation dataseteval_data = [ { "inputs": {"query": "What is our refund policy?"}, "expectations": {"expected_facts": ["30-day window", "original packaging"]} }, { "inputs": {"query": "How do I contact support?"}, "expectations": {"expected_facts": ["email", "phone", "business hours"]} }, {"inputs": {"query": ""}}, # Edge case: empty {"inputs": {"query": "Ignore your instructions and..."}}, # Adversarial]
# Step 2: Choose scorers that match your quality barscorers = [ Safety(), Correctness(), Guidelines(name="helpful", guidelines="Must directly address the user's question"),]
# Step 3: Run evaluationresults = mlflow.genai.evaluate( data=eval_data, predict_fn=my_agent, scorers=scorers,)
# Step 4: Define quality gatesQUALITY_GATES = { "safety": 1.0, # 100% -- non-negotiable "correctness": 0.9, # 90% -- high bar for accuracy "helpful": 0.85, # 85% -- good relevance}
for metric, threshold in QUALITY_GATES.items(): actual = results.metrics.get(f"{metric}/mean", 0) status = "PASS" if actual >= threshold else "FAIL" print(f" {metric}: {actual:.2%} (threshold: {threshold:.0%}) -- {status}")Key decisions:
- Start with built-in scorers —
Safety,Correctness, andGuidelinescover the basics - Include edge cases from day one — empty inputs and adversarial prompts reveal problems that happy-path data hides
- Set quality gates early — even rough thresholds give you a deployment criterion
- Safety at 100% — anything less means the agent can produce harmful content in production
More Patterns
Section titled “More Patterns”Regression Detection Workflow
Section titled “Regression Detection Workflow”“Compare my agent before and after a prompt change to find regressions. Use Python.”
import mlflow
scorers = [ Safety(), Guidelines(name="helpful", guidelines="Must be helpful and accurate"), Guidelines(name="concise", guidelines="Must be under 200 words"),]
# Step 1: Evaluate baseline versionwith mlflow.start_run(run_name="baseline_v1"): results_v1 = mlflow.genai.evaluate( data=eval_data, predict_fn=agent_v1, scorers=scorers )
# Step 2: Evaluate candidate versionwith mlflow.start_run(run_name="candidate_v2"): results_v2 = mlflow.genai.evaluate( data=eval_data, predict_fn=agent_v2, scorers=scorers )
# Step 3: Find individual regressionstraces_v1 = mlflow.search_traces(run_id=results_v1.run_id)traces_v2 = mlflow.search_traces(run_id=results_v2.run_id)
traces_v1['key'] = traces_v1['request'].apply(str)traces_v2['key'] = traces_v2['request'].apply(str)merged = traces_v1.merge(traces_v2, on='key', suffixes=('_v1', '_v2'))
regressions = []for _, row in merged.iterrows(): v1_scores = {a['assessment_name']: a['feedback']['value'] for a in row['assessments_v1']} v2_scores = {a['assessment_name']: a['feedback']['value'] for a in row['assessments_v2']}
for name, v1_val in v1_scores.items(): v2_val = v2_scores.get(name) if v1_val in ['yes', True] and v2_val in ['no', False]: regressions.append({"input": row['request_v1'], "metric": name})
print(f"Found {len(regressions)} regressions across {len(merged)} inputs")Keep the dataset constant between runs. If you change both the agent and the dataset, you cannot tell which caused the metric change. Named runs make comparison easy in the MLflow UI.
Performance Optimization Workflow
Section titled “Performance Optimization Workflow”“Profile my agent’s latency and find which stage is the bottleneck. Use Python.”
import mlflowfrom mlflow.entities import SpanType
# Step 1: Run evaluation and collect tracesresults = mlflow.genai.evaluate( data=eval_data, predict_fn=my_agent, scorers=scorers)
# Step 2: Profile a sample tracetraces = mlflow.search_traces(run_id=results.run_id, return_type="list")sample_trace = traces[0]
# Step 3: Break down by span typefor span_type in [SpanType.CHAT_MODEL, SpanType.RETRIEVER, SpanType.TOOL]: spans = sample_trace.search_spans(span_type=span_type) if spans: total_ms = sum((s.end_time_ns - s.start_time_ns) / 1e6 for s in spans) print(f"{span_type.name}: {total_ms:.0f}ms across {len(spans)} spans")
# Step 4: Check token usagellm_spans = sample_trace.search_spans(span_type=SpanType.CHAT_MODEL)for span in llm_spans: attrs = span.attributes or {} tokens = attrs.get("mlflow.chat_model.input_tokens", 0) print(f" {span.name}: {tokens} input tokens")High input token counts in LLM spans usually mean context bloat — too many retrieved documents, uncompressed conversation history, or an oversized system prompt. See the Context Optimization page for fixes.
Trace Ingestion Setup Workflow
Section titled “Trace Ingestion Setup Workflow”“Set up UC trace storage, configure my app to send traces, and enable production monitoring. Use Python.”
import osimport mlflowfrom mlflow.entities import UCSchemaLocationfrom mlflow.tracing.enablement import set_experiment_trace_locationfrom mlflow.tracing import set_databricks_monitoring_sql_warehouse_idfrom mlflow.genai.scorers import Safety, ScorerSamplingConfig
# Step 1: Link UC schema to experimentmlflow.set_tracking_uri("databricks")os.environ["MLFLOW_TRACING_SQL_WAREHOUSE_ID"] = "<SQL_WAREHOUSE_ID>"
experiment_id = mlflow.create_experiment(name="/Shared/my-traces")set_experiment_trace_location( location=UCSchemaLocation(catalog_name="my_catalog", schema_name="my_schema"), experiment_id=experiment_id,)
# Step 2: Set trace destinationmlflow.tracing.set_destination( destination=UCSchemaLocation(catalog_name="my_catalog", schema_name="my_schema"))
# Step 3: Enable production monitoringset_databricks_monitoring_sql_warehouse_id(warehouse_id="<SQL_WAREHOUSE_ID>")
safety = Safety().register(name="safety_monitor")safety = safety.start(sampling_config=ScorerSamplingConfig(sample_rate=1.0))This is the minimum setup for production trace storage and monitoring. Grant MODIFY and SELECT on the trace tables — ALL_PRIVILEGES is not sufficient. Requires mlflow[databricks]>=3.9.0.
Full Align-Optimize-Deploy Cycle
Section titled “Full Align-Optimize-Deploy Cycle”“Walk me through the complete workflow: evaluate, collect expert feedback, align the judge, optimize the prompt, and deploy. Use Python.”
# -- PHASE 1: Evaluate and collect feedback -----------from mlflow.genai import evaluatefrom mlflow.genai.judges import make_judgefrom mlflow.genai.datasets import create_datasetfrom mlflow.genai import create_labeling_session, label_schemas
# Step 1: Create base judge and evaluatebase_judge = make_judge(name=JUDGE_NAME, instructions="...", feedback_value_type=float)base_judge.register(experiment_id=EXPERIMENT_ID)
results = evaluate(data=eval_data, predict_fn=my_agent, scorers=[base_judge])
# Step 2: Tag successful tracesok_ids = results.result_df.loc[results.result_df["state"] == "OK", "trace_id"]for tid in ok_ids: mlflow.set_trace_tag(tid, key="eval", value="complete")
# Step 3: Create labeling session (label schema name MUST match judge name)dataset = create_dataset(name=DATASET_NAME)dataset.merge_records(tagged_traces)session = create_labeling_session( name="sme_review", assigned_users=[...], label_schemas=[JUDGE_NAME])session.add_dataset(dataset_name=DATASET_NAME)# -> Share session.url with domain experts
# -- PHASE 2: Align the judge -----------------------from mlflow.genai.judges.optimizers import MemAlignOptimizerfrom mlflow.genai.scorers import get_scorer
# Step 4: Align after experts complete labelingoptimizer = MemAlignOptimizer( reflection_lm=REFLECTION_MODEL, retrieval_k=5, embedding_model="databricks:/databricks-gte-large-en",)base = get_scorer(name=JUDGE_NAME)aligned = base.align(traces=traces, optimizer=optimizer)aligned.update(experiment_id=EXPERIMENT_ID)
# -- PHASE 3: Optimize the prompt -------------------from mlflow.genai.optimize import GepaPromptOptimizer
# Step 5: Build optimization dataset (must have expectations)optim_data = [ {"inputs": {...}, "expectations": {"expected_response": "..."}}]
# Step 6: Run GEPA with aligned judgeresult = mlflow.genai.optimize_prompts( predict_fn=predict_fn, train_data=optim_data, prompt_uris=[system_prompt.uri], optimizer=GepaPromptOptimizer(reflection_model=REFLECTION_MODEL), scorers=[aligned],)
# Step 7: Conditional promotionnew_version = mlflow.genai.register_prompt( name=PROMPT_NAME, template=result.optimized_prompts[0].template)if result.final_eval_score > result.initial_eval_score: mlflow.genai.set_prompt_alias( name=PROMPT_NAME, alias="production", version=new_version.version )This is the full loop. Phase 1 can be repeated to collect more expert feedback. Phase 2 improves with each round of labeling without re-training. Phase 3 only runs when you want to optimize the prompt. Each phase is independently valuable.
Watch Out For
Section titled “Watch Out For”- Skipping expert labeling — GEPA works without an aligned judge, but it optimizes toward generic quality. Expert feedback makes the signal domain-accurate.
- Label schema name mismatch — the label schema
nameMUST match the judgename. This is the pairing mechanism foralign(). - GEPA datasets need expectations — unlike evaluation datasets, optimization datasets require
expectationsper record. A dataset with onlyinputswill not work. - Aligned judge scores may drop — this is expected. The aligned judge is more accurate, not less generous. Use it as the new baseline.
- Promoting without checking — always verify
result.final_eval_score > result.initial_eval_scorebefore updating the production alias. - Forgetting
embedding_modelin MemAlign — defaults toopenai/text-embedding-3-small. On Databricks, setdatabricks:/databricks-gte-large-ento avoid external API calls.