Trace Analysis
Skill: databricks-mlflow-evaluation
What You Can Build
Section titled “What You Can Build”You can diagnose exactly why an agent call was slow, wrong, or broken by inspecting its trace — the full execution graph with timing, inputs, and outputs at every step. These patterns work across any agent architecture: DSPy multi-agent, LangGraph, RAG pipelines, or simple tool-calling agents. Every trace is a tree of spans that tells you what happened.
In Action
Section titled “In Action”“Get a trace by ID and break down its latency by span type. Use Python.”
from mlflow.entities import Trace, SpanTypefrom collections import defaultdict
def latency_by_span_type(trace: Trace) -> dict: """Break down latency by span type (LLM, TOOL, RETRIEVER, etc.).""" spans = trace.data.spans if hasattr(trace, 'data') else trace.search_spans() type_latencies = defaultdict(list)
for span in spans: duration_ms = (span.end_time_ns - span.start_time_ns) / 1e6 span_type = str(span.span_type) if span.span_type else "UNKNOWN" type_latencies[span_type].append({ "name": span.name, "duration_ms": duration_ms })
results = {} for span_type, items in type_latencies.items(): durations = [i["duration_ms"] for i in items] results[span_type] = { "count": len(items), "total_ms": round(sum(durations), 2), "avg_ms": round(sum(durations) / len(durations), 2), "max_ms": round(max(durations), 2), }
return results
# Usagestats = latency_by_span_type(trace)for span_type, s in sorted(stats.items(), key=lambda x: -x[1]["total_ms"]): print(f"{span_type}: {s['total_ms']}ms total ({s['count']} spans)")Key decisions:
- Profile by span type first — this tells you whether LLM calls, tool calls, or retrieval is the bottleneck
- Use
trace.search_spans(span_type=...)for filtering — more reliable than name matching - Duration is in nanoseconds — always convert:
(span.end_time_ns - span.start_time_ns) / 1e6for milliseconds
More Patterns
Section titled “More Patterns”Search and Filter Traces
Section titled “Search and Filter Traces”“Find recent traces matching specific criteria using the search API. Use Python.”
import mlflowfrom mlflow import MlflowClientimport time
client = MlflowClient()
# By experimenttraces = client.search_traces( experiment_ids=["your_experiment_id"], max_results=100)
# By time rangeyesterday = int((time.time() - 86400) * 1000)recent = client.search_traces( experiment_ids=["your_experiment_id"], filter_string=f"timestamp_ms > {yesterday}")
# By status and environmentfiltered = mlflow.search_traces( filter_string=""" attributes.status = 'OK' AND tags.environment = 'production' AND attributes.execution_time_ms > 5000 """)
# By trace name (note backticks for dotted names)specific = mlflow.search_traces( filter_string="tags.`mlflow.traceName` = 'my_app_function'")Filters require the attributes. prefix for built-in fields. Tag names with dots need backticks. Only AND is supported — no OR. Values use single quotes.
Analyze Span Hierarchy
Section titled “Analyze Span Hierarchy”“Map the parent-child relationships in a trace to understand the execution flow. Use Python.”
from mlflow.entities import Trace
def analyze_span_hierarchy(trace: Trace) -> dict: """Build a tree representation of span hierarchy.""" spans = trace.data.spans if hasattr(trace, 'data') else trace.search_spans()
span_by_id = {s.span_id: s for s in spans} children = {} root_spans = []
for span in spans: if span.parent_id is None: root_spans.append(span) else: children.setdefault(span.parent_id, []).append(span)
def build_tree(span, depth=0): duration_ms = (span.end_time_ns - span.start_time_ns) / 1e6 node = { "name": span.name, "span_type": str(span.span_type) if span.span_type else "UNKNOWN", "duration_ms": round(duration_ms, 2), "depth": depth, "children": [build_tree(c, depth + 1) for c in children.get(span.span_id, [])] } return node
return { "root_count": len(root_spans), "total_spans": len(spans), "hierarchy": [build_tree(root) for root in root_spans] }Hierarchy analysis reveals the execution flow regardless of framework. A DSPy pipeline, LangGraph, or custom orchestration all produce span trees that this function can decompose.
Find Bottleneck Spans
Section titled “Find Bottleneck Spans”“Identify the slowest spans in a trace, excluding wrapper spans. Use Python.”
from mlflow.entities import Trace
def find_bottlenecks(trace: Trace, top_n: int = 5) -> list: """Find the slowest spans in a trace.""" spans = trace.data.spans if hasattr(trace, 'data') else trace.search_spans() exclude = ["forward", "predict", "__init__", "root"]
span_timings = [] for span in spans: if any(p in span.name.lower() for p in exclude): continue
duration_ms = (span.end_time_ns - span.start_time_ns) / 1e6 span_timings.append({ "name": span.name, "span_type": str(span.span_type) if span.span_type else "UNKNOWN", "duration_ms": round(duration_ms, 2), })
span_timings.sort(key=lambda x: -x["duration_ms"]) return span_timings[:top_n]
# Usagebottlenecks = find_bottlenecks(trace, top_n=5)for i, b in enumerate(bottlenecks, 1): print(f" {i}. {b['name']} ({b['span_type']}): {b['duration_ms']}ms")Exclude wrapper spans like forward, predict, and root — they encompass child spans and show inflated durations that mask the real bottleneck.
Detect Errors in a Trace
Section titled “Detect Errors in a Trace”“Find all error spans, exceptions, and empty outputs in a trace. Use Python.”
from mlflow.entities import Trace, SpanStatusCode
def detect_errors(trace: Trace) -> dict: """Detect error patterns in a trace.""" spans = trace.data.spans if hasattr(trace, 'data') else trace.search_spans()
errors = {"failed_spans": [], "exceptions": [], "empty_outputs": []}
for span in spans: if span.status and span.status.status_code == SpanStatusCode.ERROR: errors["failed_spans"].append({ "name": span.name, "span_type": str(span.span_type), "error_message": span.status.description or "Unknown error" })
if span.events: for event in span.events: if "exception" in event.name.lower(): errors["exceptions"].append({ "span_name": span.name, "event": event.name, })
if span.outputs is None or span.outputs == {} or span.outputs == []: errors["empty_outputs"].append({ "name": span.name, "span_type": str(span.span_type) })
return errorsEmpty outputs are a warning sign, not always an error. A span might legitimately return nothing. But combined with an ERROR status, it points to a failure that needs investigation.
Analyze LLM Token Usage
Section titled “Analyze LLM Token Usage”“Extract model info, token counts, and latency from all LLM calls in a trace. Use Python.”
from mlflow.entities import Trace, SpanType
def analyze_llm_calls(trace: Trace) -> dict: """Extract LLM call details including token usage.""" spans = trace.data.spans if hasattr(trace, 'data') else trace.search_spans()
llm_spans = [s for s in spans if s.span_type in [SpanType.LLM, SpanType.CHAT_MODEL]]
llm_calls = [] for span in llm_spans: duration_ms = (span.end_time_ns - span.start_time_ns) / 1e6 attrs = span.attributes or {}
llm_calls.append({ "name": span.name, "duration_ms": round(duration_ms, 2), "model": attrs.get("mlflow.chat_model.model"), "input_tokens": attrs.get("mlflow.chat_model.input_tokens"), "output_tokens": attrs.get("mlflow.chat_model.output_tokens"), })
total_input = sum(c["input_tokens"] or 0 for c in llm_calls) total_output = sum(c["output_tokens"] or 0 for c in llm_calls)
return { "total_llm_calls": len(llm_calls), "total_latency_ms": round(sum(c["duration_ms"] for c in llm_calls), 2), "total_input_tokens": total_input, "total_output_tokens": total_output, "calls": llm_calls }Token usage is stored in span attributes under mlflow.chat_model.* keys. This works for auto-traced OpenAI calls and manually traced LLM calls. High input token counts usually point to context bloat.
Compare Multiple Traces
Section titled “Compare Multiple Traces”“Compare latency, span count, and success rate across a batch of traces. Use Python.”
from mlflow.entities import Trace
def compare_traces(traces: list[Trace]) -> dict: """Compare metrics across multiple traces.""" trace_stats = []
for trace in traces: spans = trace.data.spans if hasattr(trace, 'data') else trace.search_spans() root_spans = [s for s in spans if s.parent_id is None] total_ms = 0 if root_spans: total_ms = (root_spans[0].end_time_ns - root_spans[0].start_time_ns) / 1e6
trace_stats.append({ "trace_id": trace.info.trace_id, "total_ms": round(total_ms, 2), "span_count": len(spans), "status": str(trace.info.status) })
latencies = [t["total_ms"] for t in trace_stats] return { "trace_count": len(traces), "avg_latency_ms": round(sum(latencies) / len(latencies), 2) if latencies else 0, "min_latency_ms": round(min(latencies), 2) if latencies else 0, "max_latency_ms": round(max(latencies), 2) if latencies else 0, "success_rate": sum(1 for t in trace_stats if "OK" in t["status"]) / len(trace_stats) if trace_stats else 0, }Batch comparison reveals patterns that single-trace analysis misses. If latency variance is high, the bottleneck is likely a non-deterministic step like retrieval or a rate-limited API call.
Watch Out For
Section titled “Watch Out For”- Incomplete span data — always guard against
Nonetimestamps:duration = (span.end_time_ns - span.start_time_ns) / 1e6 if span.end_time_ns else 0 - Fully qualified span names — UC function calls produce names like
catalog.schema.function. Normalize withname.split(".")[-1]when matching by name. - Wrapper spans inflating bottleneck detection — exclude patterns like
forward,predict, androotwhich encompass child spans and show misleading durations. - Caching trace analysis — if you call
get_trace()repeatedly for the same trace ID, usefunctools.lru_cacheto avoid redundant API calls.