LLM Observability: OpenTelemetry Tracing for Non-Deterministic AI Chains
LLM Observability: Implementing OpenTelemetry Tracing for Non-Deterministic AI Chains
Implementing OpenTelemetry (OTel) tracing for LLM workflows requires capturing non-deterministic execution paths, token metrics, and model attributes. This guide covers manual instrumentation using OTel semantic conventions for Generative AI (v1.39.0, experimental).
Prerequisites
Install the OpenTelemetry SDK and exporter packages:
pip install opentelemetry-api opentelemetry-sdk opentelemetry-exporter-otlp
Provider Configuration
Initialize the tracer provider with an OTLP exporter for downstream visualization:
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
provider = TracerProvider()
processor = BatchSpanProcessor(OTLPSpanExporter(endpoint="localhost:4317"))
provider.add_span_processor(processor)
trace.set_tracer_provider(provider)
tracer = trace.get_tracer(__name__)
GenAI Semantic Conventions
OTel v1.39.0 defines experimental attributes for LLM operations. Use these keys for consistent telemetry across systems:
| Attribute | Description |
|---|---|
gen_ai.system |
Provider name (e.g., openai, anthropic, azure.ai.inference) |
gen_ai.request.model |
Model identifier (e.g., gpt-4, claude-3-opus) |
gen_ai.request.max_tokens |
Maximum tokens requested |
gen_ai.request.temperature |
Sampling temperature |
gen_ai.response.model |
Actual model used in response |
gen_ai.usage.input_tokens |
Input token count |
gen_ai.usage.output_tokens |
Output token count |
gen_ai.prompt |
Prompt text (optional, may contain sensitive data) |
gen_ai.completion |
Completion text (optional) |
Instrumenting LLM Calls
Wrap LLM invocations with spans capturing request and response metadata:
import os
from openai import OpenAI
from opentelemetry import trace
from opentelemetry.trace import Status, StatusCode
client = OpenAI(api_key=os.environ.get("OPENAI_API_KEY"))
tracer = trace.get_tracer(__name__)
def call_llm(prompt: str, model: str = "gpt-4", temperature: float = 0.7) -> str:
with tracer.start_as_current_span("gen_ai.chat.completion") as span:
span.set_attribute("gen_ai.system", "openai")
span.set_attribute("gen_ai.request.model", model)
span.set_attribute("gen_ai.request.temperature", temperature)
span.set_attribute("gen_ai.request.max_tokens", 1024)
try:
response = client.chat.completions.create(
model=model,
messages=[{"role": "user", "content": prompt}],
temperature=temperature,
max_tokens=1024
)
completion = response.choices[0].message.content
usage = response.usage
span.set_attribute("gen_ai.response.model", response.model)
span.set_attribute("gen_ai.usage.input_tokens", usage.prompt_tokens)
span.set_attribute("gen_ai.usage.output_tokens", usage.completion_tokens)
span.set_status(Status(StatusCode.OK))
return completion
except Exception as e:
span.set_status(Status(StatusCode.ERROR))
span.record_exception(e)
raise
Tracing Non-Deterministic Chains
AI chains with branching logic require nested spans to capture execution paths. Use parent-child relationships to represent the chain hierarchy:
import asyncio
from opentelemetry import trace
from opentelemetry.trace import Status, StatusCode
tracer = trace.get_tracer(__name__)
# Stub implementations for chain components
async def retrieve_documents(query: str) -> list[dict]:
"""Retrieve relevant documents from vector store."""
await asyncio.sleep(0.1) # Simulate async retrieval
return [{"content": "Sample document", "score": 0.95}]
def classify_intent(query: str) -> "IntentResult":
"""Classify user intent for routing."""
class IntentResult:
name = "search"
confidence = 0.87
model = "gpt-4"
return IntentResult()
def build_prompt(query: str, docs: list[dict], intent) -> str:
"""Construct prompt from query, context, and routing info."""
context = "\n".join(d["content"] for d in docs)
return f"Context: {context}\n\nQuery: {query}"
async def execute_chain(user_input: str) -> dict:
with tracer.start_as_current_span("ai_chain.execution") as chain_span:
chain_span.set_attribute("chain.name", "rag_pipeline")
chain_span.set_attribute("chain.input", user_input[:100]) # Truncate for privacy
# Step 1: Retrieval
with tracer.start_as_current_span("chain.retrieval") as retrieval_span:
docs = await retrieve_documents(user_input)
retrieval_span.set_attribute("retrieval.doc_count", len(docs))
retrieval_span.set_attribute("retrieval.source", "vector_db")
# Step 2: Conditional routing (non-deterministic)
with tracer.start_as_current_span("chain.routing") as routing_span:
route = classify_intent(user_input)
routing_span.set_attribute("routing.decision", route.name)
routing_span.set_attribute("routing.confidence", route.confidence)
# Step 3: LLM call based on route
with tracer.start_as_current_span("chain.generation") as gen_span:
prompt = build_prompt(user_input, docs, route)
result = call_llm(prompt, model=route.model)
gen_span.set_attribute("gen_ai.request.model", route.model)
chain_span.set_attribute("chain.output_length", len(result))
return {"result": result, "route": route.name}
Handling Asynchronous Operations
For concurrent LLM calls, use start_as_current_span with proper context propagation to maintain parent-child linkage:
import asyncio
import os
from openai import AsyncOpenAI
from opentelemetry import trace
from opentelemetry.trace import Status, StatusCode
client = AsyncOpenAI(api_key=os.environ.get("OPENAI_API_KEY"))
tracer = trace.get_tracer(__name__)
async def async_llm_call(prompt: str, model: str = "gpt-4") -> str:
"""Async LLM invocation helper."""
response = await client.chat.completions.create(
model=model,
messages=[{"role": "user", "content": prompt}],
max_tokens=1024
)
return response.choices[0].message.content
async def parallel_llm_calls(prompts: list[str]) -> list[str]:
results = []
async def traced_call(idx: int, prompt: str) -> str:
with tracer.start_as_current_span(f"llm_call_{idx}") as span:
span.set_attribute("gen_ai.system", "openai")
span.set_attribute("call.index", idx)
try:
result = await async_llm_call(prompt)
span.set_status(Status(StatusCode.OK))
return result
except Exception as e:
span.set_status(Status(StatusCode.ERROR))
span.record_exception(e)
raise
tasks = [traced_call(i, p) for i, p in enumerate(prompts)]
return await asyncio.gather(*tasks)
Adding Events for Reasoning Steps
Capture intermediate reasoning within a span using events:
with tracer.start_as_current_span("chain.reasoning") as span:
span.add_event("Started context analysis")
context = analyze_context(query)
span.add_event("Context analysis complete", {"context.entities": len(context.entities)})
reasoning = generate_reasoning(context)
span.add_event("Reasoning generated", {"reasoning.steps": reasoning.step_count})
Metric Collection
Track token usage and latency with OTel metrics:
import time
import os
from openai import OpenAI
from opentelemetry import trace, metrics
client = OpenAI(api_key=os.environ.get("OPENAI_API_KEY"))
tracer = trace.get_tracer(__name__)
meter = metrics.get_meter(__name__)
token_counter = meter.create_counter("gen_ai.tokens.total")
latency_histogram = meter.create_histogram("gen_ai.latency")
def traced_llm_call(prompt: str, model: str = "gpt-4") -> str:
start_time = time.time()
with tracer.start_as_current_span("llm.call") as span:
response = client.chat.completions.create(
model=model,
messages=[{"role": "user", "content": prompt}],
max_tokens=1024
)
# Record metrics
token_counter.add(
response.usage.prompt_tokens,
{"type": "input", "model": response.model}
)
token_counter.add(
response.usage.completion_tokens,
{"type": "output", "model": response.model}
)
latency_histogram.record(
time.time() - start_time,
{"model": response.model}
)
return response.choices[0].message.content
Export and Visualization
Configure the OTLP exporter to send traces to a backend (Jaeger, Grafana Tempo, Honeycomb, etc.):
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.trace.export import BatchSpanProcessor
# For local development with Jaeger
exporter = OTLPSpanExporter(endpoint="http://localhost:4317", insecure=True)
# For production with authentication
exporter = OTLPSpanExporter(
endpoint="https://telemetry.your-domain.com:4317",
headers={"api-key": os.environ.get("OTEL_API_KEY")}
)
provider.add_span_processor(BatchSpanProcessor(exporter))
Environment Variable Configuration
Enable experimental GenAI conventions and configure exporters via environment variables:
export OTEL_TRACES_EXPORTER=otlp
export OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4317
export OTEL_SEMCONV_STABILITY_OPT_IN=gen_ai
export OTEL_SERVICE_NAME=llm-application
Quick Implementation Checklist
- Install
opentelemetry-api,opentelemetry-sdk, and OTLP exporter - Initialize
TracerProviderwithBatchSpanProcessor - Create spans around LLM calls with
gen_ai.*attributes - Nest spans for chain steps to capture execution paths
- Add events for intermediate reasoning steps
- Record token metrics with counters
- Export to visualization backend via OTLP
Share this Guide:
More Guides
Agentic Workflows: Building Self-Correcting Loops with LangGraph and CrewAI State Machines
Build production-ready AI agents that iteratively improve their outputs through automated feedback loops, combining LangGraph's state machine architecture with CrewAI's multi-agent orchestration for robust, self-correcting workflows.
14 min readBun Runtime Migration: Porting High-Traffic Node.js APIs with Native APIs and SQLite
Learn how to migrate high-traffic Node.js APIs to Bun for 4× HTTP throughput and 3.8× database performance gains using native APIs and bun:sqlite.
10 min readDeno 2.0 Workspaces: Build Monorepos with JSR Packages and TypeScript-First Development
Learn how to configure Deno 2.0 workspaces for monorepo management, publish TypeScript packages to JSR, and automate releases with OIDC-authenticated CI/CD pipelines.
7 min readGleam on BEAM: Building Type-Safe, Fault-Tolerant Distributed Systems
Learn how Gleam combines Hindley-Milner type inference with Erlang's actor-based concurrency model to build systems that are both compile-time safe and runtime fault-tolerant. Covers OTP integration, supervision trees, and seamless interoperability with the BEAM ecosystem.
5 min readHono Edge Framework: Build Ultra-Fast APIs for Cloudflare Workers and Bun
Master Hono's zero-dependency web framework to build low-latency edge APIs that deploy seamlessly across Cloudflare Workers, Bun, and other JavaScript runtimes. Learn routing, middleware, validation, and real-time streaming patterns optimized for edge computing.
6 min readContinue Reading
Agentic Workflows: Building Self-Correcting Loops with LangGraph and CrewAI State Machines
Build production-ready AI agents that iteratively improve their outputs through automated feedback loops, combining LangGraph's state machine architecture with CrewAI's multi-agent orchestration for robust, self-correcting workflows.
14 min readBun Runtime Migration: Porting High-Traffic Node.js APIs with Native APIs and SQLite
Learn how to migrate high-traffic Node.js APIs to Bun for 4× HTTP throughput and 3.8× database performance gains using native APIs and bun:sqlite.
10 min readDeno 2.0 Workspaces: Build Monorepos with JSR Packages and TypeScript-First Development
Learn how to configure Deno 2.0 workspaces for monorepo management, publish TypeScript packages to JSR, and automate releases with OIDC-authenticated CI/CD pipelines.
7 min readReady to Supercharge Your Development Workflow?
Join thousands of engineering teams using MatterAI to accelerate code reviews, catch bugs earlier, and ship faster.
