Skip to content

Datasets & Trace Analysis

Skill: databricks-mlflow-evaluation

You can close the loop between production and evaluation by mining real traffic for test cases. Search traces for interesting patterns — errors, slow responses, edge cases — tag them, and convert them into versioned evaluation datasets stored in Unity Catalog. Then profile those traces to find latency bottlenecks, token-usage hotspots, and systematic failures.

“Build an evaluation dataset from the last week of production traces, filtering for successful runs. Use Python.”

import mlflow
import time
one_week_ago = int((time.time() - 7 * 86400) * 1000)
prod_traces = mlflow.search_traces(
filter_string=f"""
attributes.status = 'OK' AND
attributes.timestamp_ms > {one_week_ago} AND
tags.environment = 'production'
""",
order_by=["attributes.timestamp_ms DESC"],
max_results=100
)
# Convert to evaluation format with pre-computed outputs
eval_data = []
for _, trace in prod_traces.iterrows():
eval_data.append({
"inputs": trace["request"],
"outputs": trace["response"]
})
# Evaluate existing responses without re-running the agent
from mlflow.genai.scorers import Safety, Guidelines
results = mlflow.genai.evaluate(
data=eval_data,
scorers=[Safety(), Guidelines(name="quality", guidelines="Must be helpful and accurate")]
)

Key decisions:

  • Pre-computed outputs skip the predict_fn entirely — you’re scoring what the agent already produced in production
  • attributes.status = 'OK' filters to successful traces; use 'ERROR' to build regression test datasets instead
  • tags.environment = 'production' separates real traffic from dev/test traces
  • Timestamp filtering keeps the dataset fresh and avoids scoring stale behavior

“Create a versioned evaluation dataset in Unity Catalog that I can reuse across evaluation runs. Use Python.”

import mlflow.genai.datasets
from databricks.connect import DatabricksSession
spark = DatabricksSession.builder.remote(serverless=True).getOrCreate()
# Create a persistent, version-controlled dataset
eval_dataset = mlflow.genai.datasets.create_dataset(
uc_table_name="catalog.schema.support_agent_eval_v1"
)
# Add records with expectations for ground truth checking
records = [
{
"inputs": {"query": "What is our return policy?"},
"expectations": {"expected_facts": ["30-day returns", "original packaging required"]}
},
{
"inputs": {"query": "How do I reset my password?"},
"expectations": {"expected_response": "Go to Settings > Security > Reset Password"}
},
]
eval_dataset.merge_records(records)
# Use the dataset in evaluation
results = mlflow.genai.evaluate(
data=eval_dataset,
predict_fn=my_agent,
scorers=[Correctness()]
)

merge_records() upserts into the dataset, so you can add new examples over time without losing existing ones. The UC table gives you full lineage tracking and sharing across teams.

“Tag interesting traces during analysis, then build an evaluation dataset from all tagged traces. Use Python.”

import mlflow
# Step 1: Tag traces during analysis (or via MCP tools)
mlflow.set_trace_tag(trace_id="tr-abc123", key="eval_candidate", value="error_case")
mlflow.set_trace_tag(trace_id="tr-def456", key="eval_candidate", value="slow_response")
# Step 2: Build dataset from tagged traces
tagged = mlflow.search_traces(
filter_string="tags.eval_candidate IS NOT NULL",
max_results=100
)
eval_data = [
{"inputs": row["request"], "outputs": row["response"]}
for _, row in tagged.iterrows()
]

Tagging creates a lightweight curation workflow. Analysts tag traces during investigation, and the evaluation dataset is built from those tags later. Use different tag values (error_case, slow_response, edge_case) to create category-specific test sets.

“Analyze a trace to find which span types (LLM calls, retrieval, tools) are consuming the most time. Use Python.”

import mlflow
from mlflow.entities import SpanType
def profile_trace(trace_id: str):
traces = mlflow.search_traces(
filter_string=f"tags.`mlflow.traceId` = '{trace_id}'",
return_type="list"
)
trace = traces[0]
for span_type in [SpanType.CHAT_MODEL, SpanType.RETRIEVER, SpanType.TOOL]:
spans = trace.search_spans(span_type=span_type)
if spans:
durations = [(s.end_time_ns - s.start_time_ns) / 1e9 for s in spans]
print(f"{span_type.name}: {len(spans)} calls, "
f"total={sum(durations):.2f}s, avg={sum(durations)/len(durations):.2f}s")
profile_trace("tr-abc123")

Most agent latency hides in LLM calls and retrieval spans. Profiling by span type tells you exactly where to optimize — reduce retrieval num_results, shorten the system prompt, or switch to a faster model endpoint.

“Analyze the last 24 hours of traces to find systematic failures and calculate error rates. Use Python.”

import mlflow
import time
now = int(time.time() * 1000)
yesterday = now - (24 * 60 * 60 * 1000)
traces = mlflow.search_traces(
filter_string=f"attributes.timestamp_ms >= {yesterday}"
)
total = len(traces)
errors = traces[traces["status"] == "ERROR"]
success_rate = (total - len(errors)) / total if total > 0 else 0
print(f"Total: {total}, Errors: {len(errors)}, Success rate: {success_rate:.1%}")
print(f"P50 latency: {traces['execution_time_ms'].median():.0f}ms")
print(f"P95 latency: {traces['execution_time_ms'].quantile(0.95):.0f}ms")
if len(errors) > 0:
print(f"\nSample error inputs:")
for _, row in errors.head(5).iterrows():
print(f" - {row['request']}")

Run this daily to catch regressions early. A sudden spike in error rate or P95 latency usually points to a model endpoint issue, a schema change in the source data, or a retrieval index that needs syncing.

  • mlflow.search_traces() returns a DataFrame by default — use return_type="list" when you need Trace objects with span-level access for profiling.
  • Trace filter syntax only supports AND — there’s no OR operator. If you need to filter by multiple tag values, run separate queries and merge the results.
  • Forgetting to set experiment_ids — without it, search_traces() searches the current active experiment. If your traces are in a different experiment, you’ll get empty results.
  • Building datasets without diversity — if you only pull successful traces, your evaluation dataset won’t catch edge cases. Sample across success/error, fast/slow, and different query types for a representative test set.