Agentic Workflows: Building Self-Correcting Loops with LangGraph and CrewAI State Machines
Agentic Workflows: Building Self-Correcting Loops with LangGraph and CrewAI State Machines
Self-correcting loops enable AI agents to iteratively improve their outputs through automated feedback and retry mechanisms. This guide covers implementing these patterns using LangGraph's StateGraph architecture and CrewAI's agent orchestration capabilities.
Core Architecture
LangGraph provides low-level workflow control through StateGraph and conditional edges. CrewAI handles high-level agent orchestration and task delegation. Combined, they create robust self-correcting systems.
State Management Pattern
Define persistent state to track iteration counts, error logs, intermediate results, and token usage:
from typing import TypedDict, List, Optional
from langgraph.graph import StateGraph
from langchain_openai import ChatOpenAI
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
from openai import RateLimitError, APIConnectionError
llm = ChatOpenAI(model="gpt-4")
class AgentState(TypedDict):
messages: List[str]
iterations: int
errors: List[str]
current_output: Optional[str]
should_retry: bool
total_tokens: int
quality_score: float
LangGraph Self-Correcting Implementation
Safe Code Validation
Replace direct code execution with AST-based validation to prevent injection attacks:
import ast
from typing import Tuple
ALLOWED_MODULES = {"math", "json", "re", "collections", "itertools", "functools"}
def validate_code_safety(code: str) -> Tuple[bool, str]:
"""Validate code safety using AST analysis. Returns (is_safe, error_message)."""
if not code or not code.strip():
return False, "Empty code provided"
try:
tree = ast.parse(code)
except SyntaxError as e:
return False, f"Syntax error: {e}"
dangerous_nodes = []
for node in ast.walk(tree):
# Block dangerous function calls
if isinstance(node, ast.Call):
if isinstance(node.func, ast.Name):
func_name = node.func.id
if func_name in {"exec", "eval", "compile", "open", "input", "__import__"}:
dangerous_nodes.append(f"Forbidden function: {func_name}")
# Block imports not in allowlist
if isinstance(node, (ast.Import, ast.ImportFrom)):
module_name = node.module if isinstance(node, ast.ImportFrom) else node.names[0].name
root_module = module_name.split(".")[0] if module_name else ""
if root_module not in ALLOWED_MODULES:
dangerous_nodes.append(f"Import not allowed: {module_name}")
# Block access to dangerous dunder methods only (not all underscore-prefixed)
if isinstance(node, ast.Attribute):
if node.attr.startswith("__") and node.attr.endswith("__"):
dangerous_nodes.append(f"Access to dunder method: {node.attr}")
if dangerous_nodes:
return False, "; ".join(dangerous_nodes)
return True, ""
def execute_validated_code(code: str, test_inputs: dict = None) -> dict:
"""Safely execute validated code in restricted namespace."""
is_safe, error = validate_code_safety(code)
if not is_safe:
raise ValueError(f"Code validation failed: {error}")
# Execute in isolated namespace
execution_namespace = {"__builtins__": {}}
try:
exec(code, execution_namespace)
return {"success": True, "namespace": execution_namespace}
except Exception as e:
return {"success": False, "error": str(e)}
Resilient LLM Calls with Retry Logic
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=2, max=10),
retry=retry_if_exception_type((RateLimitError, APIConnectionError)),
reraise=True
)
def invoke_llm_with_retry(prompt: str) -> str:
"""Invoke LLM with exponential backoff for rate limits and network errors."""
response = llm.invoke(prompt)
return response.content
def generate_node(state: AgentState) -> AgentState:
"""Generate solution based on current state."""
try:
if state["iterations"] == 0:
output = invoke_llm_with_retry(state["messages"][-1])
else:
error_context = "\n".join(state["errors"])
prompt = f"Previous errors: {error_context}\nTask: {state['messages'][-1]}"
output = invoke_llm_with_retry(prompt)
# Track token usage
token_estimate = len(output.split()) * 1.3 # Rough approximation
return {
**state,
"current_output": output,
"should_retry": False,
"total_tokens": state.get("total_tokens", 0) + int(token_estimate)
}
except (RateLimitError, APIConnectionError) as e:
return {
**state,
"should_retry": True,
"errors": state["errors"] + [f"API Error: {str(e)}"]
}
def validate_node(state: AgentState) -> AgentState:
"""Validate output and determine if correction needed."""
try:
result = validate_code_safety(state["current_output"])
if result[0]:
return {
**state,
"should_retry": False,
"errors": []
}
else:
return {
**state,
"should_retry": True,
"errors": state["errors"] + [result[1]]
}
except Exception as e:
return {
**state,
"should_retry": True,
"errors": state["errors"] + [str(e)]
}
Conditional Edge Logic
Implement self-correction loop using conditional edges:
def should_continue(state: AgentState) -> str:
"""Determine if loop should continue or end."""
max_iterations = 5
max_tokens = 50000
if state["iterations"] >= max_iterations:
return "end"
if state.get("total_tokens", 0) >= max_tokens:
return "end"
elif state["should_retry"]:
return "generate"
else:
return "end"
def increment_iterations(state: AgentState) -> AgentState:
return {**state, "iterations": state["iterations"] + 1}
# Build workflow
workflow = StateGraph(AgentState)
workflow.add_node("generate", generate_node)
workflow.add_node("validate", validate_node)
workflow.add_node("increment", increment_iterations)
workflow.add_edge("__start__", "generate")
workflow.add_conditional_edges(
"generate",
lambda state: "validate" if state.get("current_output") else "end",
{
"validate": "validate",
"end": "__end__"
}
)
workflow.add_edge("validate", "increment")
workflow.add_conditional_edges(
"increment",
should_continue,
{
"generate": "generate",
"end": "__end__"
}
)
CrewAI Integration
CrewAI as LangGraph Node
Embed CrewAI crews as executable nodes within LangGraph workflows. Note that Agent parameters must be passed as keyword arguments:
from crewai import Agent, Task, Crew
def crewai_node(state: AgentState) -> AgentState:
"""Execute CrewAI workflow as graph node."""
# Define agents with explicit keyword arguments
researcher = Agent(
role="Researcher",
goal="Gather relevant information",
backstory="Expert researcher with web search capabilities",
verbose=False
)
writer = Agent(
role="Writer",
goal="Create comprehensive content",
backstory="Technical writer specializing in AI topics",
verbose=False
)
# Define tasks with pre-existing agent references
research_task = Task(
description=f"Research: {state['messages'][-1]}",
expected_output="Comprehensive research summary",
agent=researcher
)
writing_task = Task(
description="Write comprehensive content based on research",
expected_output="Well-structured technical content",
agent=writer
)
crew = Crew(
agents=[researcher, writer],
tasks=[research_task, writing_task],
verbose=False
)
try:
result = crew.kickoff()
# Properly handle CrewOutput object - preserve metadata
output_text = result.raw if hasattr(result, 'raw') else str(result)
return {
**state,
"current_output": output_text,
"should_retry": False
}
except Exception as e:
return {
**state,
"errors": state["errors"] + [f"CrewAI error: {str(e)}"],
"should_retry": True
}
Self-Correction Patterns
Error-Driven Correction
Implement feedback loops that learn from execution failures:
def analyze_errors(errors: List[str]) -> List[str]:
"""Classify error types from error messages."""
patterns = []
for error in errors:
error_lower = error.lower()
if "syntax" in error_lower or "syntaxerror" in error_lower:
patterns.append("syntax_error")
elif "logic" in error_lower or "assertion" in error_lower:
patterns.append("logic_error")
elif "timeout" in error_lower:
patterns.append("timeout_error")
elif "api" in error_lower or "rate" in error_lower:
patterns.append("api_error")
else:
patterns.append("unknown_error")
return list(set(patterns))
def adaptive_correction(state: AgentState) -> AgentState:
"""Adapt correction strategy based on error patterns."""
error_patterns = analyze_errors(state["errors"])
if "syntax_error" in error_patterns:
correction_prompt = "Fix syntax errors and validate code structure"
elif "logic_error" in error_patterns:
correction_prompt = "Review algorithm logic and edge cases"
elif "api_error" in error_patterns:
correction_prompt = "Handle API errors gracefully with fallbacks"
else:
correction_prompt = "General improvement and optimization"
enhanced_prompt = f"{state['messages'][-1]}\nCorrection focus: {correction_prompt}"
return {
**state,
"messages": state["messages"] + [enhanced_prompt]
}
Quality Gates with LLM-as-Judge
Replace arbitrary scoring with structured LLM evaluation:
from langchain_core.prompts import ChatPromptTemplate
import json
import re
QUALITY_RUBRIC = """
Evaluate the following code output on a scale of 0-10 for each criterion:
1. Correctness: Does it solve the stated problem?
2. Readability: Is it well-structured and documented?
3. Efficiency: Does it use appropriate algorithms?
4. Safety: Does it handle edge cases and errors?
Output format: {"correctness": N, "readability": N, "efficiency": N, "safety": N, "overall": N}
"""
def parse_json_safely(json_str: str) -> dict:
"""Safely parse JSON with robust error handling."""
try:
# Try direct JSON parsing first
return json.loads(json_str)
except json.JSONDecodeError:
try:
# Try extracting JSON from markdown code blocks
json_match = re.search(r'```(?:json)?\s*({.*?})\s*```', json_str, re.DOTALL)
if json_match:
return json.loads(json_match.group(1))
except (json.JSONDecodeError, AttributeError):
pass
try:
# Try finding JSON-like structure in text
json_match = re.search(r'({.*})', json_str, re.DOTALL)
if json_match:
return json.loads(json_match.group(1))
except (json.JSONDecodeError, AttributeError):
pass
# Return fallback if all parsing attempts fail
return {"overall": 5.0, "reason": "JSON parsing failed"}
def evaluate_quality_llm(output: str, original_task: str) -> dict:
"""Use LLM to evaluate output quality against rubric."""
if not output or len(output.strip()) == 0:
return {"overall": 0.0, "reason": "Empty output"}
prompt = ChatPromptTemplate.from_messages([
("system", QUALITY_RUBRIC),
("human", f"Original task: {original_task}\n\nOutput to evaluate:\n{output}")
])
try:
response = llm.invoke(prompt.format_messages())
content = response.content
# Parse JSON response with robust error handling
scores = parse_json_safely(content)
return scores
except Exception as e:
# Fallback to basic heuristics
return {"overall": 5.0, "reason": f"LLM evaluation failed: {str(e)}"}
def quality_gate(state: AgentState) -> AgentState:
"""Evaluate output quality and decide on continuation."""
quality_result = evaluate_quality_llm(
state["current_output"],
state["messages"][0]
)
quality_score = quality_result.get("overall", 0) / 10.0
if quality_score >= 0.8:
return {**state, "should_retry": False, "quality_score": quality_score}
elif state["iterations"] < 3:
return {**state, "should_retry": True, "quality_score": quality_score}
else:
return {**state, "should_retry": False, "quality_score": quality_score}
Observability and Monitoring
LangSmith Integration
Enable tracing for production debugging:
import os
# Configure LangSmith tracing
os.environ["LANGSMITH_TRACING"] = "true"
os.environ["LANGSMITH_PROJECT"] = "agentic-workflows"
# LangSmith automatically captures traces when environment variables are set
# Access traces at https://smith.langchain.com
OpenTelemetry for Custom Metrics
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
# Configure OpenTelemetry
provider = TracerProvider()
processor = BatchSpanProcessor(OTLPSpanExporter())
provider.add_span_processor(processor)
trace.set_tracer_provider(provider)
tracer = trace.get_tracer(__name__)
def traced_generate_node(state: AgentState) -> AgentState:
with tracer.start_as_current_span("generate_node") as span:
span.set_attribute("iterations", state["iterations"])
result = generate_node(state)
span.set_attribute("output_length", len(result.get("current_output", "")))
return result
Cost Management
Track and limit token usage across iterations:
class CostManagedState(TypedDict):
messages: List[str]
iterations: int
errors: List[str]
current_output: Optional[str]
should_retry: bool
total_tokens: int
max_tokens: int
estimated_cost_usd: float
# Pricing per 1K tokens (adjust for your model)
TOKEN_COSTS = {
"gpt-4": {"input": 0.03, "output": 0.06},
"gpt-3.5-turbo": {"input": 0.001, "output": 0.002}
}
def calculate_cost(tokens: int, model: str = "gpt-4") -> float:
"""Calculate estimated cost in USD."""
rates = TOKEN_COSTS.get(model, TOKEN_COSTS["gpt-4"])
return (tokens / 1000) * rates["input"]
def cost_aware_should_continue(state: CostManagedState) -> str:
"""Check iteration limits and cost thresholds."""
max_iterations = 5
max_cost_usd = 1.0
current_cost = calculate_cost(state.get("total_tokens", 0))
if state["iterations"] >= max_iterations:
return "end"
if current_cost >= max_cost_usd:
return "end"
if state.get("total_tokens", 0) >= state.get("max_tokens", 50000):
return "end"
elif state["should_retry"]:
return "generate"
else:
return "end"
Production Deployment
State Persistence for Distributed Systems
Use database-backed checkpointers for production deployments:
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.checkpoint.postgres import PostgresSaver
import asyncpg
import asyncio
# Development: In-memory checkpointer
def get_dev_checkpointer():
return InMemorySaver()
# Production: PostgreSQL-backed checkpointer for distributed systems
async def get_production_checkpointer(db_uri: str):
"""
Production checkpointer with PostgreSQL backend using modern async API.
Enables state persistence across multiple instances.
"""
# Modern LangGraph uses async PostgresSaver
checkpointer = PostgresSaver.from_conn_string(db_uri)
await checkpointer.setup() # Run migrations asynchronously
return checkpointer
# Configuration
DB_URI = "postgresql://user:password@localhost:5432/langgraph_state"
# Compile with appropriate checkpointer
async def setup_production_workflow():
checkpointer = await get_production_checkpointer(DB_URI)
production_workflow = workflow.compile(checkpointer=checkpointer)
return production_workflow
# Execute with state persistence
initial_state = {
"messages": ["Write a Python function to calculate fibonacci numbers"],
"iterations": 0,
"errors": [],
"current_output": None,
"should_retry": False,
"total_tokens": 0,
"quality_score": 0.0
}
# Usage example
async def run_workflow():
production_workflow = await setup_production_workflow()
result = await production_workflow.ainvoke(
initial_state,
config={"configurable": {"thread_id": "production_session_1"}}
)
return result
Thread ID Management for State Recovery
import uuid
import hashlib
from datetime import datetime
def create_thread_id(user_id: str, session_id: str = None, timestamp: str = None) -> str:
"""
Generate deterministic thread ID for state recovery with collision prevention.
Handles edge cases for concurrent sessions and empty inputs.
"""
# Handle edge cases
if not user_id or not isinstance(user_id, str):
raise ValueError("user_id must be a non-empty string")
if not session_id:
session_id = str(uuid.uuid4())
if not timestamp:
timestamp = datetime.utcnow().isoformat()
# Create unique identifier to prevent collisions
unique_string = f"{user_id}_{session_id}_{timestamp}"
# Use hash for consistent length and collision resistance
thread_hash = hashlib.sha256(unique_string.encode()).hexdigest()[:16]
return f"{user_id}_{thread_hash}"
def resume_workflow(thread_id: str, checkpointer, production_workflow):
"""Resume a previously interrupted workflow with proper error handling."""
try:
# Retrieve last state with error handling
config = {"configurable": {"thread_id": thread_id}}
state_history = list(checkpointer.list(config))
if state_history:
# Get the most recent checkpoint
latest_checkpoint = max(state_history, key=lambda x: x.timestamp)
return production_workflow.ainvoke(
latest_checkpoint.values,
config=config
)
else:
raise ValueError(f"No checkpoint history found for thread_id: {thread_id}")
except Exception as e:
raise RuntimeError(f"Failed to resume workflow: {str(e)}")
Implementation Best Practices
- State Persistence: Use modern
PostgresSaverwith async API for distributed production systems;InMemorySaveronly for development - Error Classification: Categorize errors to apply targeted corrections
- Iteration Limits: Set maximum retry attempts and cost thresholds to prevent runaway workflows
- Quality Metrics: Use LLM-as-judge with structured rubrics and robust JSON parsing
- Cost Management: Track token usage and implement cost limits per workflow
- Observability: Enable LangSmith tracing and OpenTelemetry metrics for production debugging
- Security: Never execute unvalidated code; use AST analysis with appropriate underscore handling
- Thread Safety: Implement robust thread ID management for concurrent sessions
- Output Handling: Preserve CrewAI metadata by accessing
.rawattribute instead of string casting - Error Handling: Implement comprehensive error handling for all external API calls
Share this Guide:
More Guides
Bun Runtime Migration: Porting High-Traffic Node.js APIs with Native APIs and SQLite
Learn how to migrate high-traffic Node.js APIs to Bun for 4× HTTP throughput and 3.8× database performance gains using native APIs and bun:sqlite.
10 min readDeno 2.0 Workspaces: Build Monorepos with JSR Packages and TypeScript-First Development
Learn how to configure Deno 2.0 workspaces for monorepo management, publish TypeScript packages to JSR, and automate releases with OIDC-authenticated CI/CD pipelines.
7 min readGleam on BEAM: Building Type-Safe, Fault-Tolerant Distributed Systems
Learn how Gleam combines Hindley-Milner type inference with Erlang's actor-based concurrency model to build systems that are both compile-time safe and runtime fault-tolerant. Covers OTP integration, supervision trees, and seamless interoperability with the BEAM ecosystem.
5 min readHono Edge Framework: Build Ultra-Fast APIs for Cloudflare Workers and Bun
Master Hono's zero-dependency web framework to build low-latency edge APIs that deploy seamlessly across Cloudflare Workers, Bun, and other JavaScript runtimes. Learn routing, middleware, validation, and real-time streaming patterns optimized for edge computing.
6 min readLLM Observability: OpenTelemetry Tracing for Non-Deterministic AI Chains
Master OpenTelemetry tracing for LLM workflows with semantic conventions, token metrics, and non-deterministic chain monitoring for production AI systems.
9 min readContinue Reading
Bun Runtime Migration: Porting High-Traffic Node.js APIs with Native APIs and SQLite
Learn how to migrate high-traffic Node.js APIs to Bun for 4× HTTP throughput and 3.8× database performance gains using native APIs and bun:sqlite.
10 min readDeno 2.0 Workspaces: Build Monorepos with JSR Packages and TypeScript-First Development
Learn how to configure Deno 2.0 workspaces for monorepo management, publish TypeScript packages to JSR, and automate releases with OIDC-authenticated CI/CD pipelines.
7 min readGleam on BEAM: Building Type-Safe, Fault-Tolerant Distributed Systems
Learn how Gleam combines Hindley-Milner type inference with Erlang's actor-based concurrency model to build systems that are both compile-time safe and runtime fault-tolerant. Covers OTP integration, supervision trees, and seamless interoperability with the BEAM ecosystem.
5 min readReady to Supercharge Your Development Workflow?
Join thousands of engineering teams using MatterAI to accelerate code reviews, catch bugs earlier, and ship faster.
