AI & Machine Learning Engineering

LLM Integration for AI Agents: A Complete Engineering FAQ

MatterAI
MatterAI
22 min read·

LLM Integration for AI Agents: A Complete Engineering FAQ

Building AI agents that use LLMs as reasoning engines is deceptively simple in demos and brutally complex in production. This guide answers the questions every engineer hits when moving from prototype to production: Which model? How do I handle failures? How do I test this? How much will it cost?

Model Selection: What Actually Matters

Capability vs. Latency vs. Cost

The triangle every agent builder navigates. Here is how current models map to it:

ModelStrengthLatency (TTFT)Input $/1MOutput $/1MBest For
Axon 2.5 ProFrontier reasoning, coding~350ms$2.00$8.00Complex agents, coding
Axon 2.5 MiniFast, 1M context, capable~120ms$0.50$2.00Daily work, RAG, tool calling
GPT-4oGeneral reasoning, vision~300ms$2.50$10.00Complex multi-step agents
GPT-4o-miniFast, cheap, capable~150ms$0.15$0.60High-volume tool calling
Claude 3.5 SonnetLong context, coding~400ms$3.00$15.00Code agents, 100K+ context
Claude 3 HaikuSpeed, cost~200ms$0.25$1.25Simple classification, routing
Gemini 1.5 Pro1M+ context, multimodal~500ms$3.50$10.50Document analysis agents
Gemini 1.5 FlashSpeed, cost~200ms$0.35$1.05Streaming chat agents
Llama 3.1 70BSelf-hosted, no API limitsDepends on hardware$0$0On-premise, compliance
Llama 3.1 8BEdge deployment~50ms (local)$0$0Client-side agents

Why Axon matters in 2026: MatterAI's Axon models deliver frontier-level performance at a fraction of the cost. Axon 2.5 Mini handles most agent tasks (tool calling, RAG, classification) with 1M context and sub-120ms latency. Axon 2.5 Pro matches Opus/GPT-5.5 class reasoning for complex code review and multi-step planning. Both are priced aggressively — roughly 5-10x cheaper than equivalent capability from other providers.

Rule of thumb: Start with Axon 2.5 Mini. It covers 80% of agent use cases. Upgrade to Axon 2.5 Pro only when you need frontier reasoning, or fall back to GPT-4o/Claude for vision or provider-specific features.

When to Use Which Model Class

Small models (8B-13B parameters):

  • Intent classification and routing
  • Simple extraction with structured schemas
  • Guardrails and safety checks
  • Embedding generation (if not using dedicated embedding models)

Medium models (70B parameters):

  • General tool calling with 5-10 tools
  • Multi-turn conversation with moderate context
  • Code review and simple generation
  • RAG with standard retrieval

Large models (400B+ parameters / frontier):

  • Complex reasoning with 10+ tool calls
  • Long-context synthesis (100K+ tokens)
  • Ambiguous user intent resolution
  • Agent orchestration (deciding what other agents to call)

The Multi-Model Pattern

Production agents rarely use one model. They use a router:

class ModelRouter:
    def __init__(self):
        self.classifier = AxonClient("axon-2-5-mini")   # Cheap, fast routing
        self.reasoner = AxonClient("axon-2-5-pro")      # Complex reasoning
        self.coder = AxonClient("axon-2-5-pro")         # Code tasks
        self.fast = AxonClient("axon-2-5-mini")         # Daily tasks, RAG
        self.vision = OpenAIClient("gpt-4o")            # Vision fallback

    async def route(self, query: str, context: AgentContext) -> LLMResponse:
        # Classify intent with Mini (cheap, 1M context for large inputs)
        intent = await self.classifier.classify(
            query,
            categories=["code", "reasoning", "simple", "creative", "vision"]
        )

        # Route to appropriate model
        if intent == "vision":
            return await self.vision.complete(query, context)
        elif intent in ["code", "reasoning"]:
            return await self.reasoner.complete(query, context)
        else:
            return await self.fast.complete(query, context)

Cost impact: A well-tuned router using Axon models as the default can reduce API costs by 80-90% compared to using frontier models for every call. Axon 2.5 Mini handles the bulk of routing and simple tasks; Pro handles the 20% that actually need frontier reasoning.

Context Window Management

How Much Context Do You Actually Have?

The advertised context window (128K, 200K, 1M) is not what you can use. You must account for:

usable_context = model_context_window
    - system_prompt_tokens
    - tool_definitions_tokens
    - conversation_history_tokens
    - output_token_budget
    - safety_margin (10%)

Example for GPT-4o with 128K window:

  • System prompt: 500 tokens
  • 10 tool definitions: 2,000 tokens
  • 10-turn conversation: 8,000 tokens
  • Output budget: 4,000 tokens
  • Safety margin: 12,800 tokens (10%)
  • Available for retrieval: ~100,700 tokens

Conversation History Truncation Strategies

Agents accumulate conversation history. You need a truncation strategy:

class ConversationManager:
    def __init__(self, max_tokens: int = 100000):
        self.max_tokens = max_tokens
        self.messages: list[Message] = []

    def add_message(self, message: Message):
        self.messages.append(message)
        self._truncate_if_needed()

    def _truncate_if_needed(self):
        total = sum(self._estimate_tokens(m) for m in self.messages)

        while total > self.max_tokens and len(self.messages) > 2:
            # Strategy 1: Remove oldest user/assistant pairs
            # Strategy 2: Summarize old turns
            # Strategy 3: Keep system + last N turns
            removed = self.messages.pop(1)  # Keep system, remove oldest
            total -= self._estimate_tokens(removed)

    def _summarize_old_turns(self, turns_to_summarize: list[Message]) -> Message:
        """Compress old conversation into a summary."""
        summary_prompt = f"""Summarize the following conversation turns
        into key facts the assistant should remember:

        {self._format_turns(turns_to_summarize)}

        Summary:"""

        # Use a cheap model for summarization
        return Message(
            role="system",
            content=call_cheap_llm(summary_prompt)
        )

The "Lost in the Middle" Problem

Research shows LLMs perform worse on information in the middle of long contexts. For RAG agents:

def optimize_context_order(chunks: list[RetrievedChunk], query: str) -> list[RetrievedChunk]:
    """
    Reorder chunks to put most relevant at beginning AND end.
    Middle positions get less attention.
    """
    # Sort by relevance score
    sorted_chunks = sorted(chunks, key=lambda c: c.score, reverse=True)

    # Interleave: most relevant first, then alternate end/beginning
    optimized = []
    left, right = 0, len(sorted_chunks) - 1
    while left <= right:
        if left == right:
            optimized.append(sorted_chunks[left])
        else:
            optimized.append(sorted_chunks[left])   # High relevance at start
            optimized.append(sorted_chunks[right])   # Some relevance at end
        left += 1
        right -= 1

    return optimized

Tool Calling and Function Calling

Defining Tools That Actually Work

Tool schemas must be precise. Vague descriptions cause hallucinated arguments.

Bad tool definition:

{
  "name": "search",
  "description": "Search for information",
  "parameters": {
    "query": { "type": "string" }
  }
}

Good tool definition:

{
  "name": "search_documentation",
  "description": "Search internal API documentation. Use this when the user asks about specific endpoints, parameters, or error codes. Do NOT use for general questions.",
  "parameters": {
    "query": {
      "type": "string",
      "description": "Specific search terms. Include the API endpoint path or error code if mentioned by the user."
    },
    "max_results": {
      "type": "integer",
      "description": "Number of results to return. Use 3 for overview, 1 for specific lookup.",
      "default": 3
    }
  },
  "required": ["query"]
}

Handling Tool Call Failures

Tools fail. Your agent must handle it gracefully:

class ToolExecutor:
    def __init__(self, max_retries: int = 2):
        self.max_retries = max_retries

    async def execute_with_recovery(
        self,
        tool_call: ToolCall,
        context: AgentContext
    ) -> ToolResult:
        tool = self.get_tool(tool_call.name)

        for attempt in range(self.max_retries + 1):
            try:
                result = await tool.run(**tool_call.arguments)
                return ToolResult(success=True, data=result)

            except ToolArgumentError as e:
                # LLM sent bad arguments - ask it to fix
                if attempt < self.max_retries:
                    correction = await self._request_correction(
                        tool_call, str(e), context
                    )
                    tool_call = correction
                else:
                    return ToolResult(
                        success=False,
                        error=f"Invalid arguments after {self.max_retries} retries: {e}"
                    )

            except ToolTimeoutError:
                # Tool is slow - return partial result or fallback
                return ToolResult(
                    success=False,
                    error="Tool timed out. Try a more specific query."
                )

            except Exception as e:
                # Unexpected error - don't expose internals to LLM
                logger.exception(f"Tool {tool_call.name} failed")
                return ToolResult(
                    success=False,
                    error=f"Tool execution failed. Error type: {type(e).__name__}"
                )

Parallel vs. Sequential Tool Calls

OpenAI and Anthropic support parallel tool calls. Use them:

# Sequential (slow)
for tool_call in tool_calls:
    result = await execute_tool(tool_call)  # Wait for each

# Parallel (fast)
results = await asyncio.gather(*[
    execute_tool(tc) for tc in tool_calls
])

When to force sequential:

  • Tool B depends on output of Tool A
  • Tools have side effects that could conflict
  • Rate limits on external APIs

Structured Outputs and JSON Mode

JSON Mode vs. Function Calling vs. Response Format

Three different mechanisms for structured output:

MechanismUse CaseReliabilityNotes
response_format={"type": "json_object"}Simple JSON outputMediumMust mention JSON in prompt
Function callingTool useHighMost reliable structured output
response_format={"type": "json_schema", "schema": {...}}Validated JSONHighGPT-4o, Claude 3.5+
Pydantic parsing + retryPost-hoc validationMediumFallback for all models

Production-Grade Structured Output

from pydantic import BaseModel, ValidationError
import json

class ExtractedEntities(BaseModel):
    person_names: list[str]
    organizations: list[str]
    dates: list[str]
    confidence: float

class StructuredOutputHandler:
    def __init__(self, client, max_retries: int = 3):
        self.client = client
        self.max_retries = max_retries

    async def extract(
        self,
        text: str,
        schema: type[BaseModel],
        model: str = "gpt-4o"
    ) -> BaseModel:
        """Extract structured data with validation and retry."""

        for attempt in range(self.max_retries):
            try:
                # Use JSON schema mode if available
                response = await self.client.chat.completions.create(
                    model=model,
                    messages=[{
                        "role": "user",
                        "content": f"Extract entities from this text as JSON:

{text}"
                    }],
                    response_format={
                        "type": "json_schema",
                        "json_schema": {
                            "name": "extraction",
                            "schema": schema.model_json_schema()
                        }
                    }
                )

                raw_json = response.choices[0].message.content
                parsed = json.loads(raw_json)
                return schema(**parsed)

            except (ValidationError, json.JSONDecodeError) as e:
                if attempt < self.max_retries - 1:
                    # Include error in retry prompt
                    correction_prompt = f"""
                    Your previous response failed validation: {e}

                    Original text: {text}

                    Please respond with valid JSON matching this schema:
                    {schema.model_json_schema()}
                    """
                    text = correction_prompt
                else:
                    raise ExtractionError(f"Failed after {self.max_retries} attempts: {e}")

Handling Partial JSON (Streaming)

When streaming structured output, you may receive incomplete JSON:

class StreamingJSONParser:
    def __init__(self):
        self.buffer = ""

    def feed(self, chunk: str) -> dict | None:
        self.buffer += chunk

        # Try to parse complete JSON objects
        try:
            # Find the last complete JSON object
            for i in range(len(self.buffer), 0, -1):
                try:
                    return json.loads(self.buffer[:i])
                except json.JSONDecodeError:
                    continue
        except Exception:
            pass

        return None  # Incomplete, wait for more chunks

Error Handling and Resilience

LLM Failure Modes You Will Hit

FailureCauseMitigation
Rate limit (429)Too many requestsExponential backoff, request queueing
Context length (400)Input too longTruncate, summarize, chunk
Content filter (400)Safety triggerRetry with sanitized input, fallback model
TimeoutSlow generationReduce max_tokens, use faster model
Invalid JSONModel driftSchema validation, retry with correction
Hallucinated tool argsPoor tool descriptionBetter schemas, argument validation
Empty responseOver-filteringTemperature adjustment, prompt revision
Repeated loopsNo progress detectionMax turn limits, state hashing

Exponential Backoff with Jitter

import random
import asyncio

async def call_with_backoff(
    func,
    max_retries: int = 5,
    base_delay: float = 1.0,
    max_delay: float = 60.0
):
    for attempt in range(max_retries):
        try:
            return await func()
        except RateLimitError as e:
            if attempt == max_retries - 1:
                raise

            # Exponential backoff with full jitter
            delay = min(base_delay * (2 ** attempt), max_delay)
            jitter = random.uniform(0, delay)
            wait = delay + jitter

            logger.warning(f"Rate limited. Waiting {wait:.1f}s (attempt {attempt + 1})")
            await asyncio.sleep(wait)

        except APIError as e:
            # Don't retry on client errors (4xx except 429)
            if e.status_code < 500:
                raise
            raise  # Or retry server errors

Circuit Breaker Pattern

Prevent cascading failures when an LLM provider is down:

class CircuitBreaker:
    def __init__(
        self,
        failure_threshold: int = 5,
        recovery_timeout: float = 30.0
    ):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.failures = 0
        self.last_failure_time: float | None = None
        self.state = "closed"  # closed, open, half-open

    async def call(self, func) -> Any:
        if self.state == "open":
            if time.time() - self.last_failure_time > self.recovery_timeout:
                self.state = "half-open"
            else:
                raise CircuitOpenError("Circuit breaker is open")

        try:
            result = await func()
            self._on_success()
            return result
        except Exception as e:
            self._on_failure()
            raise

    def _on_failure(self):
        self.failures += 1
        self.last_failure_time = time.time()
        if self.failures >= self.failure_threshold:
            self.state = "open"
            logger.error(f"Circuit opened after {self.failures} failures")

    def _on_success(self):
        if self.state == "half-open":
            self.state = "closed"
            self.failures = 0

Token Management and Cost Optimization

Estimating Tokens Before the Call

import tiktoken

def estimate_tokens(messages: list[dict], model: str = "gpt-4") -> int:
    """Estimate token count for a message list."""
    encoding = tiktoken.encoding_for_model(model)

    tokens_per_message = 3  # Every message follows <|start|>{role/name}
{content}<|end|>

    tokens_per_name = 1

    num_tokens = 0
    for message in messages:
        num_tokens += tokens_per_message
        for key, value in message.items():
            num_tokens += len(encoding.encode(value))
            if key == "name":
                num_tokens += tokens_per_name

    num_tokens += 3  # Every reply is primed with <|start|>assistant<|message|>
    return num_tokens

Cost-Aware Agent Design

class CostTracker:
    def __init__(self, budget_usd: float = 10.0):
        self.budget = budget_usd
        self.spent = 0.0
        self.calls = []

    def log_call(self, model: str, input_tokens: int, output_tokens: int):
        cost = self._calculate_cost(model, input_tokens, output_tokens)
        self.spent += cost
        self.calls.append({
            "model": model,
            "input_tokens": input_tokens,
            "output_tokens": output_tokens,
            "cost": cost,
            "timestamp": time.time()
        })

    def can_afford(self, estimated_tokens: int, model: str) -> bool:
        estimated_cost = self._calculate_cost(model, estimated_tokens, estimated_tokens // 2)
        return (self.spent + estimated_cost) < self.budget

    def _calculate_cost(self, model: str, input_t: int, output_t: int) -> float:
        pricing = {
            "axon-2-5-pro": (0.50, 2.00),
            "axon-2-5-mini": (0.10, 0.40),
            "gpt-4o": (2.50, 10.00),
            "gpt-4o-mini": (0.15, 0.60),
            "claude-3-5-sonnet": (3.00, 15.00),
        }
        input_price, output_price = pricing.get(model, (0, 0))
        return (input_t * input_price + output_t * output_price) / 1_000_000

Caching Strategies

class LLMCache:
    def __init__(self, redis_client, ttl_seconds: int = 3600):
        self.redis = redis_client
        self.ttl = ttl_seconds

    def _hash_request(self, messages: list, model: str, **kwargs) -> str:
        """Deterministic hash for cache key."""
        content = json.dumps({
            "messages": messages,
            "model": model,
            **kwargs
        }, sort_keys=True)
        return hashlib.sha256(content.encode()).hexdigest()

    async def get_or_call(
        self,
        messages: list,
        model: str,
        call_func: Callable,
        temperature: float = 0.0,  # Only cache deterministic calls
        **kwargs
    ):
        # Only cache when temperature is 0 (deterministic)
        if temperature > 0:
            return await call_func(messages, model, **kwargs)

        cache_key = self._hash_request(messages, model, **kwargs)
        cached = await self.redis.get(cache_key)

        if cached:
            return json.loads(cached)

        result = await call_func(messages, model, **kwargs)
        await self.redis.setex(cache_key, self.ttl, json.dumps(result))
        return result

Streaming vs. Batch Processing

When to Stream

ScenarioStream?Why
Chat UIYesPerceived latency matters
Background jobNoSimpler code, easier retry
Tool calling agentNoNeed complete response to parse tools
Code generationYesProgressive display improves UX
ClassificationNoTiny output, no benefit

Implementing Streaming with Tool Call Detection

async def stream_with_tool_detection(
    client,
    messages: list,
    tools: list
) -> AsyncGenerator[str, None]:
    """
    Stream to user while detecting if model wants to call tools.
    If tool calls detected, buffer and handle them.
    """
    buffer = ""
    tool_call_buffer = []

    stream = await client.chat.completions.create(
        model="gpt-4o",
        messages=messages,
        tools=tools,
        stream=True
    )

    async for chunk in stream:
        delta = chunk.choices[0].delta

        # Check for tool calls
        if delta.tool_calls:
            tool_call_buffer.append(delta.tool_calls[0])
            # Don't yield tool call content to user
            continue

        # Regular content
        if delta.content:
            buffer += delta.content
            yield delta.content

    # If we collected tool calls, execute them
    if tool_call_buffer:
        tool_results = await execute_tool_calls(tool_call_buffer)
        # Recursively continue with tool results
        async for token in stream_with_tool_detection(
            client,
            messages + [
                {"role": "assistant", "tool_calls": tool_call_buffer},
                *tool_results
            ],
            tools
        ):
            yield token

Testing LLM Agents

Unit Testing Tool Logic

Test your tools independently from the LLM:

@pytest.mark.parametrize("query,expected_tool", [
    ("What is the weather in Paris?", "get_weather"),
    ("Search for Python tutorials", "web_search"),
    ("Create a JIRA ticket", "create_ticket"),
])
async def test_intent_classification(query, expected_tool):
    router = IntentRouter()
    result = await router.classify(query)
    assert result.tool_name == expected_tool

Evaluating Agent Trajectories

class AgentEvaluator:
    def __init__(self):
        self.metrics = {
            "success_rate": [],
            "tool_accuracy": [],
            "token_efficiency": [],
            "latency": []
        }

    async def evaluate_dataset(self, test_cases: list[TestCase]):
        for case in test_cases:
            start = time.time()
            result = await self.agent.run(case.input)
            latency = time.time() - start

            # Check if final answer is correct
            success = self._check_answer(result, case.expected)

            # Check if right tools were called
            tool_accuracy = self._check_tool_calls(
                result.tool_calls,
                case.expected_tools
            )

            self.metrics["success_rate"].append(success)
            self.metrics["tool_accuracy"].append(tool_accuracy)
            self.metrics["token_efficiency"].append(
                result.total_tokens / max(len(case.expected), 1)
            )
            self.metrics["latency"].append(latency)

    def report(self) -> dict:
        return {
            "success_rate": sum(self.metrics["success_rate"]) / len(self.metrics["success_rate"]),
            "tool_accuracy": sum(self.metrics["tool_accuracy"]) / len(self.metrics["tool_accuracy"]),
            "avg_latency": sum(self.metrics["latency"]) / len(self.metrics["latency"]),
            "p95_latency": sorted(self.metrics["latency"])[int(len(self.metrics["latency"]) * 0.95)]
        }

Mocking LLM Responses for Tests

class MockLLMClient:
    """Deterministic mock for testing agent logic without API calls."""

    def __init__(self, responses: list[str] = None):
        self.responses = responses or []
        self.call_count = 0

    async def chat_completions_create(self, **kwargs):
        if self.call_count < len(self.responses):
            response = self.responses[self.call_count]
            self.call_count += 1
            return MockResponse(response)

        # Default response for unexpected calls
        return MockResponse("I don't know how to help with that.")

# Usage in tests
@pytest.fixture
def mock_agent():
    client = MockLLMClient(responses=[
        # First call: classify intent
        '{"intent": "search", "query": "Python asyncio"}',
        # Second call: synthesize results
        "Here are the top Python asyncio resources..."
    ])
    return Agent(llm_client=client)

Regression Testing with Snapshots

def test_agent_regression():
    """Ensure agent behavior doesn't drift."""
    agent = Agent()

    test_queries = load_test_queries("regression_tests.json")

    for query in test_queries:
        result = agent.run(query)
        # Compare against stored snapshot
        assert_matches_snapshot(result, snapshot_path=f"snapshots/{query.id}.json")

Observability and Monitoring

What to Log

@dataclass
class LLMCallEvent:
    timestamp: float
    trace_id: str
    model: str
    input_tokens: int
    output_tokens: int
    latency_ms: float
    tool_calls: list[str]
    finish_reason: str  # "stop", "length", "tool_calls"
    error: str | None
    cost_usd: float

Structured Logging

import structlog

logger = structlog.get_logger()

async def logged_llm_call(client, messages, model, **kwargs):
    trace_id = generate_trace_id()
    start = time.time()

    logger.info(
        "llm_request_started",
        trace_id=trace_id,
        model=model,
        message_count=len(messages),
        estimated_tokens=estimate_tokens(messages, model)
    )

    try:
        response = await client.chat.completions.create(
            model=model,
            messages=messages,
            **kwargs
        )

        latency = (time.time() - start) * 1000
        usage = response.usage

        logger.info(
            "llm_request_completed",
            trace_id=trace_id,
            model=model,
            input_tokens=usage.prompt_tokens,
            output_tokens=usage.completion_tokens,
            latency_ms=latency,
            finish_reason=response.choices[0].finish_reason,
            cost_usd=calculate_cost(model, usage)
        )

        return response

    except Exception as e:
        logger.error(
            "llm_request_failed",
            trace_id=trace_id,
            model=model,
            error_type=type(e).__name__,
            error_message=str(e),
            latency_ms=(time.time() - start) * 1000
        )
        raise

Key Metrics Dashboard

Track these in production:

MetricTargetAlert If
Requests per minuteBaseline> 2x or < 0.5x baseline
P95 latency< 2s> 5s
Error rate< 1%> 5%
Cost per requestBaseline> 2x baseline
Token utilization50-80%> 95% (context limit risk)
Tool call success rate> 95%< 90%
User satisfaction (implicit)> 80%< 70%

Hallucination Mitigation

Techniques That Actually Work

  1. Retrieval-Augmented Generation (RAG): Ground responses in retrieved documents
  2. Self-consistency: Generate multiple answers, vote on most common
  3. Chain-of-verification: Ask model to verify its own claims
  4. Constrained decoding: Force output to match known entities
  5. Human-in-the-loop: Flag uncertain responses for review

Self-Consistency Implementation

async def self_consistent_answer(
    client,
    question: str,
    num_samples: int = 5,
    temperature: float = 0.7
) -> str:
    """Generate multiple answers and return the most consistent one."""

    # Generate N answers
    answers = await asyncio.gather(*[
        generate_answer(client, question, temperature)
        for _ in range(num_samples)
    ])

    # Extract key claims from each answer
    claims_per_answer = [extract_claims(a) for a in answers]

    # Score each answer by how many of its claims appear in other answers
    scores = []
    for i, claims in enumerate(claims_per_answer):
        score = 0
        for claim in claims:
            # Count how many other answers support this claim
            support = sum(
                1 for j, other_claims in enumerate(claims_per_answer)
                if i != j and claim in other_claims
            )
            score += support
        scores.append(score)

    # Return highest-scoring answer
    best_idx = scores.index(max(scores))
    return answers[best_idx]

Grounding with Citations

class CitationGroundedRAG:
    def __init__(self, retriever, llm_client):
        self.retriever = retriever
        self.llm = llm_client

    async def answer(self, query: str) -> GroundedAnswer:
        # Retrieve documents
        docs = await self.retriever.search(query, top_k=5)

        # Build prompt with citations
        context = "

".join([
            f"[{i+1}] {doc.content}
Source: {doc.source}"
            for i, doc in enumerate(docs)
        ])

        prompt = f"""Answer the question using only the provided documents.
        Cite sources using [1], [2], etc. If the answer isn't in the documents, say "I don't know".

        Documents:
        {context}

        Question: {query}

        Answer:"""

        response = await self.llm.complete(prompt)

        # Verify citations exist in retrieved docs
        citations = extract_citations(response)
        valid_citations = [c for c in citations if c <= len(docs)]

        return GroundedAnswer(
            text=response,
            citations=valid_citations,
            sources=[docs[c-1] for c in valid_citations]
        )

Memory and State Management

Short-Term vs. Long-Term Memory

class AgentMemory:
    def __init__(self):
        # Short-term: In-conversation context
        self.conversation_history: list[Message] = []

        # Long-term: Persistent facts about user/domain
        self.fact_store = FactStore()  # Vector DB or key-value store

    async def recall(self, query: str, k: int = 5) -> list[Fact]:
        """Retrieve relevant facts from long-term memory."""
        return await self.fact_store.similarity_search(query, k=k)

    async def memorize(self, fact: str, importance: float = 0.5):
        """Store a new fact in long-term memory."""
        # Only store important facts
        if importance > 0.3:
            await self.fact_store.add(fact)

    def get_context_window(self, max_tokens: int = 8000) -> list[Message]:
        """Build messages for LLM call within token budget."""
        # Start with system prompt + relevant memories
        messages = [self.system_prompt]

        # Add recalled facts as context
        facts = self.recall(self.last_user_message)
        if facts:
            messages.append(Message(
                role="system",
                content=f"Relevant context: {'; '.join(facts)}"
            ))

        # Add recent conversation (truncated to fit)
        messages.extend(self._truncate_history(max_tokens))

        return messages

Episodic Memory for Multi-Session Agents

class EpisodicMemory:
    """Store and retrieve past conversation episodes."""

    def __init__(self, vector_store):
        self.store = vector_store

    async def save_episode(self, session_id: str, summary: str, outcome: str):
        """Save a summary of a completed interaction."""
        await self.store.add_document(
            content=summary,
            metadata={
                "session_id": session_id,
                "outcome": outcome,
                "timestamp": time.time()
            }
        )

    async def find_similar_episodes(self, current_query: str, k: int = 3):
        """Find past episodes similar to current situation."""
        return await self.store.similarity_search(current_query, k=k)

Production Deployment Patterns

Request Batching

class BatchProcessor:
    def __init__(self, client, max_batch_size: int = 10, max_wait_ms: float = 50):
        self.client = client
        self.max_batch_size = max_batch_size
        self.max_wait_ms = max_wait_ms
        self.queue = asyncio.Queue()
        self._start_processor()

    async def submit(self, request: LLMRequest) -> Future:
        future = asyncio.Future()
        await self.queue.put((request, future))
        return future

    def _start_processor(self):
        asyncio.create_task(self._process_loop())

    async def _process_loop(self):
        while True:
            batch = []
            deadline = asyncio.get_event_loop().time() + (self.max_wait_ms / 1000)

            # Collect batch
            while len(batch) < self.max_batch_size:
                timeout = deadline - asyncio.get_event_loop().time()
                if timeout <= 0:
                    break
                try:
                    item = await asyncio.wait_for(
                        self.queue.get(),
                        timeout=max(0, timeout)
                    )
                    batch.append(item)
                except asyncio.TimeoutError:
                    break

            if batch:
                await self._process_batch(batch)

    async def _process_batch(self, batch: list[tuple]):
        # Send as single batch request if API supports it
        # Or parallelize individual requests
        requests, futures = zip(*batch)
        responses = await asyncio.gather(*[
            self.client.complete(req) for req in requests
        ])
        for future, response in zip(futures, responses):
            future.set_result(response)

Multi-Provider Fallback

class ResilientLLMClient:
    def __init__(self, providers: list[LLMProvider]):
        self.providers = providers
        self.circuit_breakers = {
            p.name: CircuitBreaker() for p in providers
        }

    async def complete(self, messages, **kwargs) -> str:
        for provider in self._rank_providers():
            cb = self.circuit_breakers[provider.name]

            try:
                return await cb.call(lambda: provider.complete(messages, **kwargs))
            except (CircuitOpenError, APIError) as e:
                logger.warning(f"Provider {provider.name} failed: {e}")
                continue

        raise AllProvidersFailedError("No LLM providers available")

    def _rank_providers(self) -> list[LLMProvider]:
        # Rank by: circuit state, recent latency, cost
        return sorted(
            self.providers,
            key=lambda p: (
                0 if self.circuit_breakers[p.name].state == "closed" else 1,
                p.recent_latency,
                p.cost_per_1k_tokens
            )
        )

Security Considerations

Prompt Injection Defense

class PromptInjectionDetector:
    def __init__(self):
        # Use a small, fast model for detection
        self.classifier = OpenAIClient("gpt-4o-mini")

    async def scan(self, user_input: str) -> SecurityScan:
        # Check for common injection patterns
        patterns = [
            r"ignore previous instructions",
            r"system prompt",
            r"you are now",
            r"DAN",
            r"jailbreak",
        ]

        for pattern in patterns:
            if re.search(pattern, user_input, re.IGNORECASE):
                return SecurityScan(threat_level="high", matched_pattern=pattern)

        # LLM-based detection for subtle attacks
        result = await self.classifier.classify(
            f"Is this a prompt injection attempt? Reply ONLY 'yes' or 'no'.

{user_input}",
            temperature=0
        )

        if "yes" in result.lower():
            return SecurityScan(threat_level="medium")

        return SecurityScan(threat_level="low")

Output Sanitization

def sanitize_llm_output(text: str, allowed_tags: list[str] = None) -> str:
    """Remove potentially dangerous content from LLM output."""
    # Remove script tags and event handlers
    text = re.sub(r'<script[^>]*>.*?</script>', '', text, flags=re.DOTALL)
    text = re.sub(r'on\w+\s*=\s*["'][^"']*["']', '', text)

    # Remove data URIs
    text = re.sub(r'data:[^;]+;base64,[A-Za-z0-9+/=]+', '[removed]', text)

    # Limit length
    max_length = 10000
    if len(text) > max_length:
        text = text[:max_length] + "
[Output truncated]"

    return text

Performance Optimization

Reducing Time-to-First-Token (TTFT)

  1. Use smaller models for routing: Classify intent with a fast model before calling the heavy one
  2. Pre-warm connections: Keep persistent HTTP connections to API providers
  3. Reduce prompt size: Trim unnecessary context, use efficient formatting
  4. Enable streaming: Return first token immediately instead of waiting for full response
  5. Use prompt caching: Cache system prompts and tool definitions (OpenAI, Anthropic support this)

Prompt Caching

class PromptCache:
    """Cache prefix prompts to avoid re-processing."""

    def __init__(self, client):
        self.client = client

    async def cached_completion(
        self,
        system_prompt: str,  # Cached automatically by provider
        user_prompt: str,
        model: str = "gpt-4o"
    ):
        # OpenAI and Anthropic automatically cache the prefix
        # when identical system prompts are sent repeatedly
        return await self.client.chat.completions.create(
            model=model,
            messages=[
                {"role": "system", "content": system_prompt},
                {"role": "user", "content": user_prompt}
            ]
        )

Implementation Checklist

Before shipping your LLM agent to production:

  • Model selection: Benchmarked 2+ models on your task, documented tradeoffs
  • Error handling: All API errors caught with appropriate retry logic
  • Circuit breakers: Failover to backup providers configured
  • Rate limiting: Request queuing prevents 429 storms
  • Token management: Context window never exceeded, truncation strategy tested
  • Cost controls: Budget limits, per-request cost tracking, alerting
  • Structured output: JSON schemas validated, fallback parsing implemented
  • Tool definitions: Descriptions are specific, examples included, edge cases handled
  • Testing: Unit tests for tools, integration tests for full agent, regression suite
  • Observability: All LLM calls logged with trace IDs, metrics dashboard configured
  • Security: Prompt injection detection, output sanitization, no PII in logs
  • Hallucination: RAG grounding, self-consistency, or human review for critical outputs
  • Memory: Conversation history managed, long-term memory if multi-session
  • Performance: P95 latency measured, streaming for interactive use cases
  • Monitoring: Alerts for error rate, latency, cost, and token utilization

Quick Reference: Common Configurations

High-Reliability Agent (Customer Support)

config = {
    "model": "axon-2-5-mini",  # 1M context for long support threads
    "temperature": 0.3,
    "max_tokens": 500,
    "tools": [search_kb, create_ticket, escalate],
    "max_turns": 5,
    "timeout": 10,
    "retries": 3
}

Creative Agent (Content Generation)

config = {
    "model": "axon-2-5-pro",
    "temperature": 0.8,
    "max_tokens": 2000,
    "top_p": 0.95,
    "presence_penalty": 0.3
}

Code Agent (Developer Tool)

config = {
    "model": "axon-2-5-pro",  # Matches Opus-class reasoning
    "temperature": 0.2,
    "max_tokens": 4000,
    "tools": [read_file, write_file, run_tests, git_commit],
    "response_format": "json"
}

Share this Guide:

More Guides

Agentic Workflows: Building Self-Correcting Loops with LangGraph and CrewAI State Machines

Build production-ready AI agents that iteratively improve their outputs through automated feedback loops, combining LangGraph's state machine architecture with CrewAI's multi-agent orchestration for robust, self-correcting workflows.

14 min read

Bun Runtime Migration: Porting High-Traffic Node.js APIs with Native APIs and SQLite

Learn how to migrate high-traffic Node.js APIs to Bun for 4× HTTP throughput and 3.8× database performance gains using native APIs and bun:sqlite.

10 min read

Deno 2.0 Workspaces: Build Monorepos with JSR Packages and TypeScript-First Development

Learn how to configure Deno 2.0 workspaces for monorepo management, publish TypeScript packages to JSR, and automate releases with OIDC-authenticated CI/CD pipelines.

7 min read

Gleam on BEAM: Building Type-Safe, Fault-Tolerant Distributed Systems

Learn how Gleam combines Hindley-Milner type inference with Erlang's actor-based concurrency model to build systems that are both compile-time safe and runtime fault-tolerant. Covers OTP integration, supervision trees, and seamless interoperability with the BEAM ecosystem.

5 min read

Hono Edge Framework: Build Ultra-Fast APIs for Cloudflare Workers and Bun

Master Hono's zero-dependency web framework to build low-latency edge APIs that deploy seamlessly across Cloudflare Workers, Bun, and other JavaScript runtimes. Learn routing, middleware, validation, and real-time streaming patterns optimized for edge computing.

6 min read

Ship Faster. Ship Safer.

Join thousands of engineering teams using MatterAI to autonomously build, review, and deploy code with enterprise-grade precision.

No credit card requiredSOC 2 Type IISetup in 2 min