Skip to content

Context Optimization

Skill: databricks-mlflow-evaluation

You can cut agent latency and cost by managing how much context flows through each step of your pipeline. Most agent performance problems are context problems — bloated conversation histories, over-retrieved documents, system prompts that grow without bounds. These patterns help you detect context waste with MLflow traces and fix it with targeted strategies.

“Write a scorer that detects when my agent’s context window is being wasted on long tool outputs. Use Python.”

from mlflow.genai.scorers import scorer
from mlflow.entities import Feedback, Trace, SpanType
@scorer
def context_efficiency(trace: Trace) -> list[Feedback]:
"""Measure context usage across LLM calls in a trace."""
feedbacks = []
llm_spans = trace.search_spans(span_type=SpanType.CHAT_MODEL)
if not llm_spans:
return [Feedback(name="context_efficiency", value="skip",
rationale="No LLM spans found")]
total_input_tokens = 0
for span in llm_spans:
attrs = span.attributes or {}
tokens = attrs.get("mlflow.chat_model.input_tokens", 0)
total_input_tokens += tokens or 0
feedbacks.append(Feedback(
name="total_input_tokens",
value=total_input_tokens,
rationale=f"Total input tokens across {len(llm_spans)} LLM calls"
))
# Flag if any single call exceeds a threshold
max_tokens = max(
(span.attributes or {}).get("mlflow.chat_model.input_tokens", 0) or 0
for span in llm_spans
)
feedbacks.append(Feedback(
name="max_single_call_tokens",
value=max_tokens,
rationale="Largest single LLM call input" if max_tokens < 8000
else "Single LLM call exceeds 8k tokens -- review context"
))
return feedbacks

Key decisions:

  • Measure at the span level, not the trace level — total trace tokens hide which step is the problem
  • Set thresholds per use case — a RAG summarizer legitimately uses more tokens than a classifier
  • Track token growth over time — context bloat is gradual and shows up in trends, not single traces

“Analyze how conversation history size grows across multi-turn traces and flag when it gets excessive. Use Python.”

from mlflow.genai.scorers import scorer
from mlflow.entities import Feedback, Trace, SpanType
@scorer
def conversation_bloat_check(trace: Trace) -> Feedback:
"""Flag traces where conversation history dominates the context."""
llm_spans = trace.search_spans(span_type=SpanType.CHAT_MODEL)
if not llm_spans:
return Feedback(name="conversation_bloat", value="skip",
rationale="No LLM calls found")
# Check the last LLM call -- it carries the full conversation
last_span = sorted(llm_spans, key=lambda s: s.start_time_ns)[-1]
messages = last_span.inputs.get("messages", []) if last_span.inputs else []
total_chars = sum(len(str(m.get("content", ""))) for m in messages)
user_turns = sum(1 for m in messages if m.get("role") == "user")
if total_chars > 20000:
return Feedback(
name="conversation_bloat",
value="no",
rationale=f"{total_chars} chars across {user_turns} user turns -- consider summarization"
)
return Feedback(
name="conversation_bloat",
value="yes",
rationale=f"{total_chars} chars across {user_turns} turns -- within bounds"
)

Multi-turn agents accumulate context linearly. After 5-10 turns, the conversation history alone can consume most of the context window. Summarize older turns or use a sliding window.

Profile Token Usage Across Pipeline Stages

Section titled “Profile Token Usage Across Pipeline Stages”

“Break down token consumption by pipeline stage to find where context is wasted. Use Python.”

import mlflow
from mlflow.entities import SpanType
def profile_token_usage(trace_id: str) -> dict:
"""Profile token usage by stage in a single trace."""
traces = mlflow.search_traces(
filter_string=f"tags.`mlflow.traceId` = '{trace_id}'",
return_type="list"
)
if not traces:
return {"error": "Trace not found"}
trace = traces[0]
llm_spans = trace.search_spans(span_type=SpanType.CHAT_MODEL)
stage_usage = {}
for span in llm_spans:
attrs = span.attributes or {}
stage_usage[span.name] = {
"input_tokens": attrs.get("mlflow.chat_model.input_tokens", 0),
"output_tokens": attrs.get("mlflow.chat_model.output_tokens", 0),
"duration_ms": (span.end_time_ns - span.start_time_ns) / 1e6,
}
# Identify the most expensive stage
if stage_usage:
worst = max(stage_usage, key=lambda k: stage_usage[k]["input_tokens"] or 0)
stage_usage["_worst_stage"] = worst
return stage_usage

Run this across a sample of production traces. If one stage consistently dominates token usage, that is where you focus optimization — not the whole pipeline.

“Show how to structure system prompts so they scale with complexity instead of growing unbounded. Use Python.”

# BEFORE: Monolithic system prompt that grows with every feature
SYSTEM_PROMPT_BAD = """
You are a support agent. Always be professional. Handle refunds by...
[500 words of refund policy]
Handle shipping by...
[500 words of shipping policy]
Handle technical issues by...
[500 words of troubleshooting steps]
"""
# AFTER: Layered prompt with core rules and conditional sections
CORE_RULES = """You are a support agent. Be professional and concise.
Classify the user's intent first, then apply the relevant policy."""
POLICY_SECTIONS = {
"refund": "Refund policy: 30-day window, original packaging required...",
"shipping": "Shipping policy: 3-5 business days standard...",
"technical": "Troubleshooting: Check connection first, then restart..."
}
def build_context(query: str, classified_intent: str) -> str:
"""Build minimal context based on classified intent."""
relevant_policy = POLICY_SECTIONS.get(classified_intent, "")
return f"{CORE_RULES}\n\nRelevant policy:\n{relevant_policy}"

A classifier-first architecture keeps context small. Instead of stuffing every policy into every call, load only what the current query needs. This is the single highest-impact optimization for most agents.

“Score whether retrieved documents are actually being used in the agent’s response. Use Python.”

from mlflow.genai.scorers import scorer
from mlflow.entities import Feedback, Trace, SpanType
@scorer
def retrieval_utilization(trace: Trace) -> Feedback:
"""Check if retrieved documents are actually referenced in the response."""
retriever_spans = trace.search_spans(span_type=SpanType.RETRIEVER)
if not retriever_spans:
return Feedback(name="retrieval_utilization", value="skip",
rationale="No retriever spans found")
# Count retrieved docs
total_docs = 0
for span in retriever_spans:
outputs = span.outputs or []
if isinstance(outputs, list):
total_docs += len(outputs)
# Get the final response
llm_spans = trace.search_spans(span_type=SpanType.CHAT_MODEL)
if not llm_spans:
return Feedback(name="retrieval_utilization", value="skip",
rationale="No LLM response to compare")
last_llm = sorted(llm_spans, key=lambda s: s.start_time_ns)[-1]
response = str(last_llm.outputs) if last_llm.outputs else ""
if total_docs > 5 and len(response) < 200:
return Feedback(
name="retrieval_utilization",
value="no",
rationale=f"Retrieved {total_docs} docs but response is only "
f"{len(response)} chars -- likely over-retrieving"
)
return Feedback(
name="retrieval_utilization",
value="yes",
rationale=f"Retrieved {total_docs} docs, response is {len(response)} chars"
)

Over-retrieval is the most common RAG context problem. Retrieving 20 documents when the answer is in one wastes tokens and can confuse the model. Start with top_k=3 and increase only if recall suffers.

  • Optimizing the wrong stage — always profile token usage per span before optimizing. The bottleneck is rarely where you think it is.
  • Summarization losing critical details — when compressing conversation history, keep entity names, numbers, and user preferences. Generic summaries lose the information the agent needs most.
  • Context reduction hurting quality — always re-evaluate after optimization. Run the same scorers before and after to confirm quality did not degrade. A faster agent that gives wrong answers is worse.
  • Ignoring output tokens — input tokens get all the attention, but verbose responses also cost money and slow down downstream processing. Set output length guidelines in your system prompt.