Skip to content

Trace Analysis

Skill: databricks-mlflow-evaluation

You can diagnose exactly why an agent call was slow, wrong, or broken by inspecting its trace — the full execution graph with timing, inputs, and outputs at every step. These patterns work across any agent architecture: DSPy multi-agent, LangGraph, RAG pipelines, or simple tool-calling agents. Every trace is a tree of spans that tells you what happened.

“Get a trace by ID and break down its latency by span type. Use Python.”

from mlflow.entities import Trace, SpanType
from collections import defaultdict
def latency_by_span_type(trace: Trace) -> dict:
"""Break down latency by span type (LLM, TOOL, RETRIEVER, etc.)."""
spans = trace.data.spans if hasattr(trace, 'data') else trace.search_spans()
type_latencies = defaultdict(list)
for span in spans:
duration_ms = (span.end_time_ns - span.start_time_ns) / 1e6
span_type = str(span.span_type) if span.span_type else "UNKNOWN"
type_latencies[span_type].append({
"name": span.name,
"duration_ms": duration_ms
})
results = {}
for span_type, items in type_latencies.items():
durations = [i["duration_ms"] for i in items]
results[span_type] = {
"count": len(items),
"total_ms": round(sum(durations), 2),
"avg_ms": round(sum(durations) / len(durations), 2),
"max_ms": round(max(durations), 2),
}
return results
# Usage
stats = latency_by_span_type(trace)
for span_type, s in sorted(stats.items(), key=lambda x: -x[1]["total_ms"]):
print(f"{span_type}: {s['total_ms']}ms total ({s['count']} spans)")

Key decisions:

  • Profile by span type first — this tells you whether LLM calls, tool calls, or retrieval is the bottleneck
  • Use trace.search_spans(span_type=...) for filtering — more reliable than name matching
  • Duration is in nanoseconds — always convert: (span.end_time_ns - span.start_time_ns) / 1e6 for milliseconds

“Find recent traces matching specific criteria using the search API. Use Python.”

import mlflow
from mlflow import MlflowClient
import time
client = MlflowClient()
# By experiment
traces = client.search_traces(
experiment_ids=["your_experiment_id"],
max_results=100
)
# By time range
yesterday = int((time.time() - 86400) * 1000)
recent = client.search_traces(
experiment_ids=["your_experiment_id"],
filter_string=f"timestamp_ms > {yesterday}"
)
# By status and environment
filtered = mlflow.search_traces(
filter_string="""
attributes.status = 'OK' AND
tags.environment = 'production' AND
attributes.execution_time_ms > 5000
"""
)
# By trace name (note backticks for dotted names)
specific = mlflow.search_traces(
filter_string="tags.`mlflow.traceName` = 'my_app_function'"
)

Filters require the attributes. prefix for built-in fields. Tag names with dots need backticks. Only AND is supported — no OR. Values use single quotes.

“Map the parent-child relationships in a trace to understand the execution flow. Use Python.”

from mlflow.entities import Trace
def analyze_span_hierarchy(trace: Trace) -> dict:
"""Build a tree representation of span hierarchy."""
spans = trace.data.spans if hasattr(trace, 'data') else trace.search_spans()
span_by_id = {s.span_id: s for s in spans}
children = {}
root_spans = []
for span in spans:
if span.parent_id is None:
root_spans.append(span)
else:
children.setdefault(span.parent_id, []).append(span)
def build_tree(span, depth=0):
duration_ms = (span.end_time_ns - span.start_time_ns) / 1e6
node = {
"name": span.name,
"span_type": str(span.span_type) if span.span_type else "UNKNOWN",
"duration_ms": round(duration_ms, 2),
"depth": depth,
"children": [build_tree(c, depth + 1) for c in children.get(span.span_id, [])]
}
return node
return {
"root_count": len(root_spans),
"total_spans": len(spans),
"hierarchy": [build_tree(root) for root in root_spans]
}

Hierarchy analysis reveals the execution flow regardless of framework. A DSPy pipeline, LangGraph, or custom orchestration all produce span trees that this function can decompose.

“Identify the slowest spans in a trace, excluding wrapper spans. Use Python.”

from mlflow.entities import Trace
def find_bottlenecks(trace: Trace, top_n: int = 5) -> list:
"""Find the slowest spans in a trace."""
spans = trace.data.spans if hasattr(trace, 'data') else trace.search_spans()
exclude = ["forward", "predict", "__init__", "root"]
span_timings = []
for span in spans:
if any(p in span.name.lower() for p in exclude):
continue
duration_ms = (span.end_time_ns - span.start_time_ns) / 1e6
span_timings.append({
"name": span.name,
"span_type": str(span.span_type) if span.span_type else "UNKNOWN",
"duration_ms": round(duration_ms, 2),
})
span_timings.sort(key=lambda x: -x["duration_ms"])
return span_timings[:top_n]
# Usage
bottlenecks = find_bottlenecks(trace, top_n=5)
for i, b in enumerate(bottlenecks, 1):
print(f" {i}. {b['name']} ({b['span_type']}): {b['duration_ms']}ms")

Exclude wrapper spans like forward, predict, and root — they encompass child spans and show inflated durations that mask the real bottleneck.

“Find all error spans, exceptions, and empty outputs in a trace. Use Python.”

from mlflow.entities import Trace, SpanStatusCode
def detect_errors(trace: Trace) -> dict:
"""Detect error patterns in a trace."""
spans = trace.data.spans if hasattr(trace, 'data') else trace.search_spans()
errors = {"failed_spans": [], "exceptions": [], "empty_outputs": []}
for span in spans:
if span.status and span.status.status_code == SpanStatusCode.ERROR:
errors["failed_spans"].append({
"name": span.name,
"span_type": str(span.span_type),
"error_message": span.status.description or "Unknown error"
})
if span.events:
for event in span.events:
if "exception" in event.name.lower():
errors["exceptions"].append({
"span_name": span.name,
"event": event.name,
})
if span.outputs is None or span.outputs == {} or span.outputs == []:
errors["empty_outputs"].append({
"name": span.name,
"span_type": str(span.span_type)
})
return errors

Empty outputs are a warning sign, not always an error. A span might legitimately return nothing. But combined with an ERROR status, it points to a failure that needs investigation.

“Extract model info, token counts, and latency from all LLM calls in a trace. Use Python.”

from mlflow.entities import Trace, SpanType
def analyze_llm_calls(trace: Trace) -> dict:
"""Extract LLM call details including token usage."""
spans = trace.data.spans if hasattr(trace, 'data') else trace.search_spans()
llm_spans = [s for s in spans
if s.span_type in [SpanType.LLM, SpanType.CHAT_MODEL]]
llm_calls = []
for span in llm_spans:
duration_ms = (span.end_time_ns - span.start_time_ns) / 1e6
attrs = span.attributes or {}
llm_calls.append({
"name": span.name,
"duration_ms": round(duration_ms, 2),
"model": attrs.get("mlflow.chat_model.model"),
"input_tokens": attrs.get("mlflow.chat_model.input_tokens"),
"output_tokens": attrs.get("mlflow.chat_model.output_tokens"),
})
total_input = sum(c["input_tokens"] or 0 for c in llm_calls)
total_output = sum(c["output_tokens"] or 0 for c in llm_calls)
return {
"total_llm_calls": len(llm_calls),
"total_latency_ms": round(sum(c["duration_ms"] for c in llm_calls), 2),
"total_input_tokens": total_input,
"total_output_tokens": total_output,
"calls": llm_calls
}

Token usage is stored in span attributes under mlflow.chat_model.* keys. This works for auto-traced OpenAI calls and manually traced LLM calls. High input token counts usually point to context bloat.

“Compare latency, span count, and success rate across a batch of traces. Use Python.”

from mlflow.entities import Trace
def compare_traces(traces: list[Trace]) -> dict:
"""Compare metrics across multiple traces."""
trace_stats = []
for trace in traces:
spans = trace.data.spans if hasattr(trace, 'data') else trace.search_spans()
root_spans = [s for s in spans if s.parent_id is None]
total_ms = 0
if root_spans:
total_ms = (root_spans[0].end_time_ns - root_spans[0].start_time_ns) / 1e6
trace_stats.append({
"trace_id": trace.info.trace_id,
"total_ms": round(total_ms, 2),
"span_count": len(spans),
"status": str(trace.info.status)
})
latencies = [t["total_ms"] for t in trace_stats]
return {
"trace_count": len(traces),
"avg_latency_ms": round(sum(latencies) / len(latencies), 2) if latencies else 0,
"min_latency_ms": round(min(latencies), 2) if latencies else 0,
"max_latency_ms": round(max(latencies), 2) if latencies else 0,
"success_rate": sum(1 for t in trace_stats if "OK" in t["status"]) / len(trace_stats) if trace_stats else 0,
}

Batch comparison reveals patterns that single-trace analysis misses. If latency variance is high, the bottleneck is likely a non-deterministic step like retrieval or a rate-limited API call.

  • Incomplete span data — always guard against None timestamps: duration = (span.end_time_ns - span.start_time_ns) / 1e6 if span.end_time_ns else 0
  • Fully qualified span names — UC function calls produce names like catalog.schema.function. Normalize with name.split(".")[-1] when matching by name.
  • Wrapper spans inflating bottleneck detection — exclude patterns like forward, predict, and root which encompass child spans and show misleading durations.
  • Caching trace analysis — if you call get_trace() repeatedly for the same trace ID, use functools.lru_cache to avoid redundant API calls.