LLM Integration for AI Agents: A Complete Engineering FAQ
LLM Integration for AI Agents: A Complete Engineering FAQ
Building AI agents that use LLMs as reasoning engines is deceptively simple in demos and brutally complex in production. This guide answers the questions every engineer hits when moving from prototype to production: Which model? How do I handle failures? How do I test this? How much will it cost?
Model Selection: What Actually Matters
Capability vs. Latency vs. Cost
The triangle every agent builder navigates. Here is how current models map to it:
| Model | Strength | Latency (TTFT) | Input $/1M | Output $/1M | Best For |
|---|---|---|---|---|---|
| Axon 2.5 Pro | Frontier reasoning, coding | ~350ms | $2.00 | $8.00 | Complex agents, coding |
| Axon 2.5 Mini | Fast, 1M context, capable | ~120ms | $0.50 | $2.00 | Daily work, RAG, tool calling |
| GPT-4o | General reasoning, vision | ~300ms | $2.50 | $10.00 | Complex multi-step agents |
| GPT-4o-mini | Fast, cheap, capable | ~150ms | $0.15 | $0.60 | High-volume tool calling |
| Claude 3.5 Sonnet | Long context, coding | ~400ms | $3.00 | $15.00 | Code agents, 100K+ context |
| Claude 3 Haiku | Speed, cost | ~200ms | $0.25 | $1.25 | Simple classification, routing |
| Gemini 1.5 Pro | 1M+ context, multimodal | ~500ms | $3.50 | $10.50 | Document analysis agents |
| Gemini 1.5 Flash | Speed, cost | ~200ms | $0.35 | $1.05 | Streaming chat agents |
| Llama 3.1 70B | Self-hosted, no API limits | Depends on hardware | $0 | $0 | On-premise, compliance |
| Llama 3.1 8B | Edge deployment | ~50ms (local) | $0 | $0 | Client-side agents |
Why Axon matters in 2026: MatterAI's Axon models deliver frontier-level performance at a fraction of the cost. Axon 2.5 Mini handles most agent tasks (tool calling, RAG, classification) with 1M context and sub-120ms latency. Axon 2.5 Pro matches Opus/GPT-5.5 class reasoning for complex code review and multi-step planning. Both are priced aggressively — roughly 5-10x cheaper than equivalent capability from other providers.
Rule of thumb: Start with Axon 2.5 Mini. It covers 80% of agent use cases. Upgrade to Axon 2.5 Pro only when you need frontier reasoning, or fall back to GPT-4o/Claude for vision or provider-specific features.
When to Use Which Model Class
Small models (8B-13B parameters):
- Intent classification and routing
- Simple extraction with structured schemas
- Guardrails and safety checks
- Embedding generation (if not using dedicated embedding models)
Medium models (70B parameters):
- General tool calling with 5-10 tools
- Multi-turn conversation with moderate context
- Code review and simple generation
- RAG with standard retrieval
Large models (400B+ parameters / frontier):
- Complex reasoning with 10+ tool calls
- Long-context synthesis (100K+ tokens)
- Ambiguous user intent resolution
- Agent orchestration (deciding what other agents to call)
The Multi-Model Pattern
Production agents rarely use one model. They use a router:
class ModelRouter:
def __init__(self):
self.classifier = AxonClient("axon-2-5-mini") # Cheap, fast routing
self.reasoner = AxonClient("axon-2-5-pro") # Complex reasoning
self.coder = AxonClient("axon-2-5-pro") # Code tasks
self.fast = AxonClient("axon-2-5-mini") # Daily tasks, RAG
self.vision = OpenAIClient("gpt-4o") # Vision fallback
async def route(self, query: str, context: AgentContext) -> LLMResponse:
# Classify intent with Mini (cheap, 1M context for large inputs)
intent = await self.classifier.classify(
query,
categories=["code", "reasoning", "simple", "creative", "vision"]
)
# Route to appropriate model
if intent == "vision":
return await self.vision.complete(query, context)
elif intent in ["code", "reasoning"]:
return await self.reasoner.complete(query, context)
else:
return await self.fast.complete(query, context)
Cost impact: A well-tuned router using Axon models as the default can reduce API costs by 80-90% compared to using frontier models for every call. Axon 2.5 Mini handles the bulk of routing and simple tasks; Pro handles the 20% that actually need frontier reasoning.
Context Window Management
How Much Context Do You Actually Have?
The advertised context window (128K, 200K, 1M) is not what you can use. You must account for:
usable_context = model_context_window
- system_prompt_tokens
- tool_definitions_tokens
- conversation_history_tokens
- output_token_budget
- safety_margin (10%)
Example for GPT-4o with 128K window:
- System prompt: 500 tokens
- 10 tool definitions: 2,000 tokens
- 10-turn conversation: 8,000 tokens
- Output budget: 4,000 tokens
- Safety margin: 12,800 tokens (10%)
- Available for retrieval: ~100,700 tokens
Conversation History Truncation Strategies
Agents accumulate conversation history. You need a truncation strategy:
class ConversationManager:
def __init__(self, max_tokens: int = 100000):
self.max_tokens = max_tokens
self.messages: list[Message] = []
def add_message(self, message: Message):
self.messages.append(message)
self._truncate_if_needed()
def _truncate_if_needed(self):
total = sum(self._estimate_tokens(m) for m in self.messages)
while total > self.max_tokens and len(self.messages) > 2:
# Strategy 1: Remove oldest user/assistant pairs
# Strategy 2: Summarize old turns
# Strategy 3: Keep system + last N turns
removed = self.messages.pop(1) # Keep system, remove oldest
total -= self._estimate_tokens(removed)
def _summarize_old_turns(self, turns_to_summarize: list[Message]) -> Message:
"""Compress old conversation into a summary."""
summary_prompt = f"""Summarize the following conversation turns
into key facts the assistant should remember:
{self._format_turns(turns_to_summarize)}
Summary:"""
# Use a cheap model for summarization
return Message(
role="system",
content=call_cheap_llm(summary_prompt)
)
The "Lost in the Middle" Problem
Research shows LLMs perform worse on information in the middle of long contexts. For RAG agents:
def optimize_context_order(chunks: list[RetrievedChunk], query: str) -> list[RetrievedChunk]:
"""
Reorder chunks to put most relevant at beginning AND end.
Middle positions get less attention.
"""
# Sort by relevance score
sorted_chunks = sorted(chunks, key=lambda c: c.score, reverse=True)
# Interleave: most relevant first, then alternate end/beginning
optimized = []
left, right = 0, len(sorted_chunks) - 1
while left <= right:
if left == right:
optimized.append(sorted_chunks[left])
else:
optimized.append(sorted_chunks[left]) # High relevance at start
optimized.append(sorted_chunks[right]) # Some relevance at end
left += 1
right -= 1
return optimized
Tool Calling and Function Calling
Defining Tools That Actually Work
Tool schemas must be precise. Vague descriptions cause hallucinated arguments.
Bad tool definition:
{
"name": "search",
"description": "Search for information",
"parameters": {
"query": { "type": "string" }
}
}
Good tool definition:
{
"name": "search_documentation",
"description": "Search internal API documentation. Use this when the user asks about specific endpoints, parameters, or error codes. Do NOT use for general questions.",
"parameters": {
"query": {
"type": "string",
"description": "Specific search terms. Include the API endpoint path or error code if mentioned by the user."
},
"max_results": {
"type": "integer",
"description": "Number of results to return. Use 3 for overview, 1 for specific lookup.",
"default": 3
}
},
"required": ["query"]
}
Handling Tool Call Failures
Tools fail. Your agent must handle it gracefully:
class ToolExecutor:
def __init__(self, max_retries: int = 2):
self.max_retries = max_retries
async def execute_with_recovery(
self,
tool_call: ToolCall,
context: AgentContext
) -> ToolResult:
tool = self.get_tool(tool_call.name)
for attempt in range(self.max_retries + 1):
try:
result = await tool.run(**tool_call.arguments)
return ToolResult(success=True, data=result)
except ToolArgumentError as e:
# LLM sent bad arguments - ask it to fix
if attempt < self.max_retries:
correction = await self._request_correction(
tool_call, str(e), context
)
tool_call = correction
else:
return ToolResult(
success=False,
error=f"Invalid arguments after {self.max_retries} retries: {e}"
)
except ToolTimeoutError:
# Tool is slow - return partial result or fallback
return ToolResult(
success=False,
error="Tool timed out. Try a more specific query."
)
except Exception as e:
# Unexpected error - don't expose internals to LLM
logger.exception(f"Tool {tool_call.name} failed")
return ToolResult(
success=False,
error=f"Tool execution failed. Error type: {type(e).__name__}"
)
Parallel vs. Sequential Tool Calls
OpenAI and Anthropic support parallel tool calls. Use them:
# Sequential (slow)
for tool_call in tool_calls:
result = await execute_tool(tool_call) # Wait for each
# Parallel (fast)
results = await asyncio.gather(*[
execute_tool(tc) for tc in tool_calls
])
When to force sequential:
- Tool B depends on output of Tool A
- Tools have side effects that could conflict
- Rate limits on external APIs
Structured Outputs and JSON Mode
JSON Mode vs. Function Calling vs. Response Format
Three different mechanisms for structured output:
| Mechanism | Use Case | Reliability | Notes |
|---|---|---|---|
response_format={"type": "json_object"} | Simple JSON output | Medium | Must mention JSON in prompt |
| Function calling | Tool use | High | Most reliable structured output |
response_format={"type": "json_schema", "schema": {...}} | Validated JSON | High | GPT-4o, Claude 3.5+ |
| Pydantic parsing + retry | Post-hoc validation | Medium | Fallback for all models |
Production-Grade Structured Output
from pydantic import BaseModel, ValidationError
import json
class ExtractedEntities(BaseModel):
person_names: list[str]
organizations: list[str]
dates: list[str]
confidence: float
class StructuredOutputHandler:
def __init__(self, client, max_retries: int = 3):
self.client = client
self.max_retries = max_retries
async def extract(
self,
text: str,
schema: type[BaseModel],
model: str = "gpt-4o"
) -> BaseModel:
"""Extract structured data with validation and retry."""
for attempt in range(self.max_retries):
try:
# Use JSON schema mode if available
response = await self.client.chat.completions.create(
model=model,
messages=[{
"role": "user",
"content": f"Extract entities from this text as JSON:
{text}"
}],
response_format={
"type": "json_schema",
"json_schema": {
"name": "extraction",
"schema": schema.model_json_schema()
}
}
)
raw_json = response.choices[0].message.content
parsed = json.loads(raw_json)
return schema(**parsed)
except (ValidationError, json.JSONDecodeError) as e:
if attempt < self.max_retries - 1:
# Include error in retry prompt
correction_prompt = f"""
Your previous response failed validation: {e}
Original text: {text}
Please respond with valid JSON matching this schema:
{schema.model_json_schema()}
"""
text = correction_prompt
else:
raise ExtractionError(f"Failed after {self.max_retries} attempts: {e}")
Handling Partial JSON (Streaming)
When streaming structured output, you may receive incomplete JSON:
class StreamingJSONParser:
def __init__(self):
self.buffer = ""
def feed(self, chunk: str) -> dict | None:
self.buffer += chunk
# Try to parse complete JSON objects
try:
# Find the last complete JSON object
for i in range(len(self.buffer), 0, -1):
try:
return json.loads(self.buffer[:i])
except json.JSONDecodeError:
continue
except Exception:
pass
return None # Incomplete, wait for more chunks
Error Handling and Resilience
LLM Failure Modes You Will Hit
| Failure | Cause | Mitigation |
|---|---|---|
| Rate limit (429) | Too many requests | Exponential backoff, request queueing |
| Context length (400) | Input too long | Truncate, summarize, chunk |
| Content filter (400) | Safety trigger | Retry with sanitized input, fallback model |
| Timeout | Slow generation | Reduce max_tokens, use faster model |
| Invalid JSON | Model drift | Schema validation, retry with correction |
| Hallucinated tool args | Poor tool description | Better schemas, argument validation |
| Empty response | Over-filtering | Temperature adjustment, prompt revision |
| Repeated loops | No progress detection | Max turn limits, state hashing |
Exponential Backoff with Jitter
import random
import asyncio
async def call_with_backoff(
func,
max_retries: int = 5,
base_delay: float = 1.0,
max_delay: float = 60.0
):
for attempt in range(max_retries):
try:
return await func()
except RateLimitError as e:
if attempt == max_retries - 1:
raise
# Exponential backoff with full jitter
delay = min(base_delay * (2 ** attempt), max_delay)
jitter = random.uniform(0, delay)
wait = delay + jitter
logger.warning(f"Rate limited. Waiting {wait:.1f}s (attempt {attempt + 1})")
await asyncio.sleep(wait)
except APIError as e:
# Don't retry on client errors (4xx except 429)
if e.status_code < 500:
raise
raise # Or retry server errors
Circuit Breaker Pattern
Prevent cascading failures when an LLM provider is down:
class CircuitBreaker:
def __init__(
self,
failure_threshold: int = 5,
recovery_timeout: float = 30.0
):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.failures = 0
self.last_failure_time: float | None = None
self.state = "closed" # closed, open, half-open
async def call(self, func) -> Any:
if self.state == "open":
if time.time() - self.last_failure_time > self.recovery_timeout:
self.state = "half-open"
else:
raise CircuitOpenError("Circuit breaker is open")
try:
result = await func()
self._on_success()
return result
except Exception as e:
self._on_failure()
raise
def _on_failure(self):
self.failures += 1
self.last_failure_time = time.time()
if self.failures >= self.failure_threshold:
self.state = "open"
logger.error(f"Circuit opened after {self.failures} failures")
def _on_success(self):
if self.state == "half-open":
self.state = "closed"
self.failures = 0
Token Management and Cost Optimization
Estimating Tokens Before the Call
import tiktoken
def estimate_tokens(messages: list[dict], model: str = "gpt-4") -> int:
"""Estimate token count for a message list."""
encoding = tiktoken.encoding_for_model(model)
tokens_per_message = 3 # Every message follows <|start|>{role/name}
{content}<|end|>
tokens_per_name = 1
num_tokens = 0
for message in messages:
num_tokens += tokens_per_message
for key, value in message.items():
num_tokens += len(encoding.encode(value))
if key == "name":
num_tokens += tokens_per_name
num_tokens += 3 # Every reply is primed with <|start|>assistant<|message|>
return num_tokens
Cost-Aware Agent Design
class CostTracker:
def __init__(self, budget_usd: float = 10.0):
self.budget = budget_usd
self.spent = 0.0
self.calls = []
def log_call(self, model: str, input_tokens: int, output_tokens: int):
cost = self._calculate_cost(model, input_tokens, output_tokens)
self.spent += cost
self.calls.append({
"model": model,
"input_tokens": input_tokens,
"output_tokens": output_tokens,
"cost": cost,
"timestamp": time.time()
})
def can_afford(self, estimated_tokens: int, model: str) -> bool:
estimated_cost = self._calculate_cost(model, estimated_tokens, estimated_tokens // 2)
return (self.spent + estimated_cost) < self.budget
def _calculate_cost(self, model: str, input_t: int, output_t: int) -> float:
pricing = {
"axon-2-5-pro": (0.50, 2.00),
"axon-2-5-mini": (0.10, 0.40),
"gpt-4o": (2.50, 10.00),
"gpt-4o-mini": (0.15, 0.60),
"claude-3-5-sonnet": (3.00, 15.00),
}
input_price, output_price = pricing.get(model, (0, 0))
return (input_t * input_price + output_t * output_price) / 1_000_000
Caching Strategies
class LLMCache:
def __init__(self, redis_client, ttl_seconds: int = 3600):
self.redis = redis_client
self.ttl = ttl_seconds
def _hash_request(self, messages: list, model: str, **kwargs) -> str:
"""Deterministic hash for cache key."""
content = json.dumps({
"messages": messages,
"model": model,
**kwargs
}, sort_keys=True)
return hashlib.sha256(content.encode()).hexdigest()
async def get_or_call(
self,
messages: list,
model: str,
call_func: Callable,
temperature: float = 0.0, # Only cache deterministic calls
**kwargs
):
# Only cache when temperature is 0 (deterministic)
if temperature > 0:
return await call_func(messages, model, **kwargs)
cache_key = self._hash_request(messages, model, **kwargs)
cached = await self.redis.get(cache_key)
if cached:
return json.loads(cached)
result = await call_func(messages, model, **kwargs)
await self.redis.setex(cache_key, self.ttl, json.dumps(result))
return result
Streaming vs. Batch Processing
When to Stream
| Scenario | Stream? | Why |
|---|---|---|
| Chat UI | Yes | Perceived latency matters |
| Background job | No | Simpler code, easier retry |
| Tool calling agent | No | Need complete response to parse tools |
| Code generation | Yes | Progressive display improves UX |
| Classification | No | Tiny output, no benefit |
Implementing Streaming with Tool Call Detection
async def stream_with_tool_detection(
client,
messages: list,
tools: list
) -> AsyncGenerator[str, None]:
"""
Stream to user while detecting if model wants to call tools.
If tool calls detected, buffer and handle them.
"""
buffer = ""
tool_call_buffer = []
stream = await client.chat.completions.create(
model="gpt-4o",
messages=messages,
tools=tools,
stream=True
)
async for chunk in stream:
delta = chunk.choices[0].delta
# Check for tool calls
if delta.tool_calls:
tool_call_buffer.append(delta.tool_calls[0])
# Don't yield tool call content to user
continue
# Regular content
if delta.content:
buffer += delta.content
yield delta.content
# If we collected tool calls, execute them
if tool_call_buffer:
tool_results = await execute_tool_calls(tool_call_buffer)
# Recursively continue with tool results
async for token in stream_with_tool_detection(
client,
messages + [
{"role": "assistant", "tool_calls": tool_call_buffer},
*tool_results
],
tools
):
yield token
Testing LLM Agents
Unit Testing Tool Logic
Test your tools independently from the LLM:
@pytest.mark.parametrize("query,expected_tool", [
("What is the weather in Paris?", "get_weather"),
("Search for Python tutorials", "web_search"),
("Create a JIRA ticket", "create_ticket"),
])
async def test_intent_classification(query, expected_tool):
router = IntentRouter()
result = await router.classify(query)
assert result.tool_name == expected_tool
Evaluating Agent Trajectories
class AgentEvaluator:
def __init__(self):
self.metrics = {
"success_rate": [],
"tool_accuracy": [],
"token_efficiency": [],
"latency": []
}
async def evaluate_dataset(self, test_cases: list[TestCase]):
for case in test_cases:
start = time.time()
result = await self.agent.run(case.input)
latency = time.time() - start
# Check if final answer is correct
success = self._check_answer(result, case.expected)
# Check if right tools were called
tool_accuracy = self._check_tool_calls(
result.tool_calls,
case.expected_tools
)
self.metrics["success_rate"].append(success)
self.metrics["tool_accuracy"].append(tool_accuracy)
self.metrics["token_efficiency"].append(
result.total_tokens / max(len(case.expected), 1)
)
self.metrics["latency"].append(latency)
def report(self) -> dict:
return {
"success_rate": sum(self.metrics["success_rate"]) / len(self.metrics["success_rate"]),
"tool_accuracy": sum(self.metrics["tool_accuracy"]) / len(self.metrics["tool_accuracy"]),
"avg_latency": sum(self.metrics["latency"]) / len(self.metrics["latency"]),
"p95_latency": sorted(self.metrics["latency"])[int(len(self.metrics["latency"]) * 0.95)]
}
Mocking LLM Responses for Tests
class MockLLMClient:
"""Deterministic mock for testing agent logic without API calls."""
def __init__(self, responses: list[str] = None):
self.responses = responses or []
self.call_count = 0
async def chat_completions_create(self, **kwargs):
if self.call_count < len(self.responses):
response = self.responses[self.call_count]
self.call_count += 1
return MockResponse(response)
# Default response for unexpected calls
return MockResponse("I don't know how to help with that.")
# Usage in tests
@pytest.fixture
def mock_agent():
client = MockLLMClient(responses=[
# First call: classify intent
'{"intent": "search", "query": "Python asyncio"}',
# Second call: synthesize results
"Here are the top Python asyncio resources..."
])
return Agent(llm_client=client)
Regression Testing with Snapshots
def test_agent_regression():
"""Ensure agent behavior doesn't drift."""
agent = Agent()
test_queries = load_test_queries("regression_tests.json")
for query in test_queries:
result = agent.run(query)
# Compare against stored snapshot
assert_matches_snapshot(result, snapshot_path=f"snapshots/{query.id}.json")
Observability and Monitoring
What to Log
@dataclass
class LLMCallEvent:
timestamp: float
trace_id: str
model: str
input_tokens: int
output_tokens: int
latency_ms: float
tool_calls: list[str]
finish_reason: str # "stop", "length", "tool_calls"
error: str | None
cost_usd: float
Structured Logging
import structlog
logger = structlog.get_logger()
async def logged_llm_call(client, messages, model, **kwargs):
trace_id = generate_trace_id()
start = time.time()
logger.info(
"llm_request_started",
trace_id=trace_id,
model=model,
message_count=len(messages),
estimated_tokens=estimate_tokens(messages, model)
)
try:
response = await client.chat.completions.create(
model=model,
messages=messages,
**kwargs
)
latency = (time.time() - start) * 1000
usage = response.usage
logger.info(
"llm_request_completed",
trace_id=trace_id,
model=model,
input_tokens=usage.prompt_tokens,
output_tokens=usage.completion_tokens,
latency_ms=latency,
finish_reason=response.choices[0].finish_reason,
cost_usd=calculate_cost(model, usage)
)
return response
except Exception as e:
logger.error(
"llm_request_failed",
trace_id=trace_id,
model=model,
error_type=type(e).__name__,
error_message=str(e),
latency_ms=(time.time() - start) * 1000
)
raise
Key Metrics Dashboard
Track these in production:
| Metric | Target | Alert If |
|---|---|---|
| Requests per minute | Baseline | > 2x or < 0.5x baseline |
| P95 latency | < 2s | > 5s |
| Error rate | < 1% | > 5% |
| Cost per request | Baseline | > 2x baseline |
| Token utilization | 50-80% | > 95% (context limit risk) |
| Tool call success rate | > 95% | < 90% |
| User satisfaction (implicit) | > 80% | < 70% |
Hallucination Mitigation
Techniques That Actually Work
- Retrieval-Augmented Generation (RAG): Ground responses in retrieved documents
- Self-consistency: Generate multiple answers, vote on most common
- Chain-of-verification: Ask model to verify its own claims
- Constrained decoding: Force output to match known entities
- Human-in-the-loop: Flag uncertain responses for review
Self-Consistency Implementation
async def self_consistent_answer(
client,
question: str,
num_samples: int = 5,
temperature: float = 0.7
) -> str:
"""Generate multiple answers and return the most consistent one."""
# Generate N answers
answers = await asyncio.gather(*[
generate_answer(client, question, temperature)
for _ in range(num_samples)
])
# Extract key claims from each answer
claims_per_answer = [extract_claims(a) for a in answers]
# Score each answer by how many of its claims appear in other answers
scores = []
for i, claims in enumerate(claims_per_answer):
score = 0
for claim in claims:
# Count how many other answers support this claim
support = sum(
1 for j, other_claims in enumerate(claims_per_answer)
if i != j and claim in other_claims
)
score += support
scores.append(score)
# Return highest-scoring answer
best_idx = scores.index(max(scores))
return answers[best_idx]
Grounding with Citations
class CitationGroundedRAG:
def __init__(self, retriever, llm_client):
self.retriever = retriever
self.llm = llm_client
async def answer(self, query: str) -> GroundedAnswer:
# Retrieve documents
docs = await self.retriever.search(query, top_k=5)
# Build prompt with citations
context = "
".join([
f"[{i+1}] {doc.content}
Source: {doc.source}"
for i, doc in enumerate(docs)
])
prompt = f"""Answer the question using only the provided documents.
Cite sources using [1], [2], etc. If the answer isn't in the documents, say "I don't know".
Documents:
{context}
Question: {query}
Answer:"""
response = await self.llm.complete(prompt)
# Verify citations exist in retrieved docs
citations = extract_citations(response)
valid_citations = [c for c in citations if c <= len(docs)]
return GroundedAnswer(
text=response,
citations=valid_citations,
sources=[docs[c-1] for c in valid_citations]
)
Memory and State Management
Short-Term vs. Long-Term Memory
class AgentMemory:
def __init__(self):
# Short-term: In-conversation context
self.conversation_history: list[Message] = []
# Long-term: Persistent facts about user/domain
self.fact_store = FactStore() # Vector DB or key-value store
async def recall(self, query: str, k: int = 5) -> list[Fact]:
"""Retrieve relevant facts from long-term memory."""
return await self.fact_store.similarity_search(query, k=k)
async def memorize(self, fact: str, importance: float = 0.5):
"""Store a new fact in long-term memory."""
# Only store important facts
if importance > 0.3:
await self.fact_store.add(fact)
def get_context_window(self, max_tokens: int = 8000) -> list[Message]:
"""Build messages for LLM call within token budget."""
# Start with system prompt + relevant memories
messages = [self.system_prompt]
# Add recalled facts as context
facts = self.recall(self.last_user_message)
if facts:
messages.append(Message(
role="system",
content=f"Relevant context: {'; '.join(facts)}"
))
# Add recent conversation (truncated to fit)
messages.extend(self._truncate_history(max_tokens))
return messages
Episodic Memory for Multi-Session Agents
class EpisodicMemory:
"""Store and retrieve past conversation episodes."""
def __init__(self, vector_store):
self.store = vector_store
async def save_episode(self, session_id: str, summary: str, outcome: str):
"""Save a summary of a completed interaction."""
await self.store.add_document(
content=summary,
metadata={
"session_id": session_id,
"outcome": outcome,
"timestamp": time.time()
}
)
async def find_similar_episodes(self, current_query: str, k: int = 3):
"""Find past episodes similar to current situation."""
return await self.store.similarity_search(current_query, k=k)
Production Deployment Patterns
Request Batching
class BatchProcessor:
def __init__(self, client, max_batch_size: int = 10, max_wait_ms: float = 50):
self.client = client
self.max_batch_size = max_batch_size
self.max_wait_ms = max_wait_ms
self.queue = asyncio.Queue()
self._start_processor()
async def submit(self, request: LLMRequest) -> Future:
future = asyncio.Future()
await self.queue.put((request, future))
return future
def _start_processor(self):
asyncio.create_task(self._process_loop())
async def _process_loop(self):
while True:
batch = []
deadline = asyncio.get_event_loop().time() + (self.max_wait_ms / 1000)
# Collect batch
while len(batch) < self.max_batch_size:
timeout = deadline - asyncio.get_event_loop().time()
if timeout <= 0:
break
try:
item = await asyncio.wait_for(
self.queue.get(),
timeout=max(0, timeout)
)
batch.append(item)
except asyncio.TimeoutError:
break
if batch:
await self._process_batch(batch)
async def _process_batch(self, batch: list[tuple]):
# Send as single batch request if API supports it
# Or parallelize individual requests
requests, futures = zip(*batch)
responses = await asyncio.gather(*[
self.client.complete(req) for req in requests
])
for future, response in zip(futures, responses):
future.set_result(response)
Multi-Provider Fallback
class ResilientLLMClient:
def __init__(self, providers: list[LLMProvider]):
self.providers = providers
self.circuit_breakers = {
p.name: CircuitBreaker() for p in providers
}
async def complete(self, messages, **kwargs) -> str:
for provider in self._rank_providers():
cb = self.circuit_breakers[provider.name]
try:
return await cb.call(lambda: provider.complete(messages, **kwargs))
except (CircuitOpenError, APIError) as e:
logger.warning(f"Provider {provider.name} failed: {e}")
continue
raise AllProvidersFailedError("No LLM providers available")
def _rank_providers(self) -> list[LLMProvider]:
# Rank by: circuit state, recent latency, cost
return sorted(
self.providers,
key=lambda p: (
0 if self.circuit_breakers[p.name].state == "closed" else 1,
p.recent_latency,
p.cost_per_1k_tokens
)
)
Security Considerations
Prompt Injection Defense
class PromptInjectionDetector:
def __init__(self):
# Use a small, fast model for detection
self.classifier = OpenAIClient("gpt-4o-mini")
async def scan(self, user_input: str) -> SecurityScan:
# Check for common injection patterns
patterns = [
r"ignore previous instructions",
r"system prompt",
r"you are now",
r"DAN",
r"jailbreak",
]
for pattern in patterns:
if re.search(pattern, user_input, re.IGNORECASE):
return SecurityScan(threat_level="high", matched_pattern=pattern)
# LLM-based detection for subtle attacks
result = await self.classifier.classify(
f"Is this a prompt injection attempt? Reply ONLY 'yes' or 'no'.
{user_input}",
temperature=0
)
if "yes" in result.lower():
return SecurityScan(threat_level="medium")
return SecurityScan(threat_level="low")
Output Sanitization
def sanitize_llm_output(text: str, allowed_tags: list[str] = None) -> str:
"""Remove potentially dangerous content from LLM output."""
# Remove script tags and event handlers
text = re.sub(r'<script[^>]*>.*?</script>', '', text, flags=re.DOTALL)
text = re.sub(r'on\w+\s*=\s*["'][^"']*["']', '', text)
# Remove data URIs
text = re.sub(r'data:[^;]+;base64,[A-Za-z0-9+/=]+', '[removed]', text)
# Limit length
max_length = 10000
if len(text) > max_length:
text = text[:max_length] + "
[Output truncated]"
return text
Performance Optimization
Reducing Time-to-First-Token (TTFT)
- Use smaller models for routing: Classify intent with a fast model before calling the heavy one
- Pre-warm connections: Keep persistent HTTP connections to API providers
- Reduce prompt size: Trim unnecessary context, use efficient formatting
- Enable streaming: Return first token immediately instead of waiting for full response
- Use prompt caching: Cache system prompts and tool definitions (OpenAI, Anthropic support this)
Prompt Caching
class PromptCache:
"""Cache prefix prompts to avoid re-processing."""
def __init__(self, client):
self.client = client
async def cached_completion(
self,
system_prompt: str, # Cached automatically by provider
user_prompt: str,
model: str = "gpt-4o"
):
# OpenAI and Anthropic automatically cache the prefix
# when identical system prompts are sent repeatedly
return await self.client.chat.completions.create(
model=model,
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt}
]
)
Implementation Checklist
Before shipping your LLM agent to production:
- Model selection: Benchmarked 2+ models on your task, documented tradeoffs
- Error handling: All API errors caught with appropriate retry logic
- Circuit breakers: Failover to backup providers configured
- Rate limiting: Request queuing prevents 429 storms
- Token management: Context window never exceeded, truncation strategy tested
- Cost controls: Budget limits, per-request cost tracking, alerting
- Structured output: JSON schemas validated, fallback parsing implemented
- Tool definitions: Descriptions are specific, examples included, edge cases handled
- Testing: Unit tests for tools, integration tests for full agent, regression suite
- Observability: All LLM calls logged with trace IDs, metrics dashboard configured
- Security: Prompt injection detection, output sanitization, no PII in logs
- Hallucination: RAG grounding, self-consistency, or human review for critical outputs
- Memory: Conversation history managed, long-term memory if multi-session
- Performance: P95 latency measured, streaming for interactive use cases
- Monitoring: Alerts for error rate, latency, cost, and token utilization
Quick Reference: Common Configurations
High-Reliability Agent (Customer Support)
config = {
"model": "axon-2-5-mini", # 1M context for long support threads
"temperature": 0.3,
"max_tokens": 500,
"tools": [search_kb, create_ticket, escalate],
"max_turns": 5,
"timeout": 10,
"retries": 3
}
Creative Agent (Content Generation)
config = {
"model": "axon-2-5-pro",
"temperature": 0.8,
"max_tokens": 2000,
"top_p": 0.95,
"presence_penalty": 0.3
}
Code Agent (Developer Tool)
config = {
"model": "axon-2-5-pro", # Matches Opus-class reasoning
"temperature": 0.2,
"max_tokens": 4000,
"tools": [read_file, write_file, run_tests, git_commit],
"response_format": "json"
}
Share this Guide:
More Guides
Agentic Workflows: Building Self-Correcting Loops with LangGraph and CrewAI State Machines
Build production-ready AI agents that iteratively improve their outputs through automated feedback loops, combining LangGraph's state machine architecture with CrewAI's multi-agent orchestration for robust, self-correcting workflows.
14 min readBun Runtime Migration: Porting High-Traffic Node.js APIs with Native APIs and SQLite
Learn how to migrate high-traffic Node.js APIs to Bun for 4× HTTP throughput and 3.8× database performance gains using native APIs and bun:sqlite.
10 min readDeno 2.0 Workspaces: Build Monorepos with JSR Packages and TypeScript-First Development
Learn how to configure Deno 2.0 workspaces for monorepo management, publish TypeScript packages to JSR, and automate releases with OIDC-authenticated CI/CD pipelines.
7 min readGleam on BEAM: Building Type-Safe, Fault-Tolerant Distributed Systems
Learn how Gleam combines Hindley-Milner type inference with Erlang's actor-based concurrency model to build systems that are both compile-time safe and runtime fault-tolerant. Covers OTP integration, supervision trees, and seamless interoperability with the BEAM ecosystem.
5 min readHono Edge Framework: Build Ultra-Fast APIs for Cloudflare Workers and Bun
Master Hono's zero-dependency web framework to build low-latency edge APIs that deploy seamlessly across Cloudflare Workers, Bun, and other JavaScript runtimes. Learn routing, middleware, validation, and real-time streaming patterns optimized for edge computing.
6 min readContinue Reading
Agentic Workflows: Building Self-Correcting Loops with LangGraph and CrewAI State Machines
Build production-ready AI agents that iteratively improve their outputs through automated feedback loops, combining LangGraph's state machine architecture with CrewAI's multi-agent orchestration for robust, self-correcting workflows.
14 min readBun Runtime Migration: Porting High-Traffic Node.js APIs with Native APIs and SQLite
Learn how to migrate high-traffic Node.js APIs to Bun for 4× HTTP throughput and 3.8× database performance gains using native APIs and bun:sqlite.
10 min readDeno 2.0 Workspaces: Build Monorepos with JSR Packages and TypeScript-First Development
Learn how to configure Deno 2.0 workspaces for monorepo management, publish TypeScript packages to JSR, and automate releases with OIDC-authenticated CI/CD pipelines.
7 min readShip Faster. Ship Safer.
Join thousands of engineering teams using MatterAI to autonomously build, review, and deploy code with enterprise-grade precision.
