@khaosagent Decorator
The @khaosagent decorator is the only required integration point for Khaos. It wraps your agent handler function, enabling automatic telemetry capture, fault injection, and security testing without modifying your agent logic.
Basic Usage
Decorate your agent's message handler function with @khaosagent:
from khaos import khaosagent
@khaosagent(name="my-agent", version="1.0.0")
def handle(message):
"""Your agent's message handler."""
prompt = (message.get("payload") or {}).get("text", "")
# Your agent logic here - call LLMs, tools, etc.
result = your_agent_logic(prompt)
return {"text": result}The decorator intercepts incoming messages and outgoing responses, capturing telemetry and enabling Khaos to inject faults and security tests during evaluation runs.
Decorator Parameters
| Parameter | Required | Default | Description |
|---|---|---|---|
name | Yes | - | Unique identifier for your agent (used in khaos run) |
version | Yes | - | Semantic version string (e.g., "1.0.0", "2.1.3") |
framework | No | auto-detect | Framework hint: "openai", "anthropic", "langchain", "crewai", etc. |
description | No | - | Human-readable description for dashboard display |
tags | No | [] | List of tags for categorization and filtering |
timeout_ms | No | 0 (disabled) | Timeout in milliseconds. Raises AgentTimeoutError if exceeded |
capture_errors | No | True | Catch exceptions and return structured error responses |
security_mode | No | "agent_input" | "agent_input" tests full stack; "llm" tests raw LLM defenses only |
mcp_servers | No | [] | List of MCP server names for auto-discovery and fault injection |
@khaosagent(
name="customer-support-agent",
version="2.1.0",
framework="langchain",
description="Handles customer support inquiries with RAG",
tags=["production", "customer-facing", "rag"]
)
def handle(message):
# ...Message Format
The handler receives a standardized message dict and must return a response dict:
Input Message
{
"type": "user_message",
"payload": {
"text": "User's input prompt",
"metadata": {} # Optional additional context
},
"context": {
"run_id": "khaos-pack-20250101-abc123",
"scenario": "release.v1.resilience",
"phase": "baseline" # or "resilience", "security"
}
}Output Response
{
"text": "Agent's response text",
"metadata": {
"confidence": 0.95, # Optional
"sources": ["doc1.pdf"] # Optional
}
}At minimum, return {"text": "..."}. Additional metadata is optional and will be captured in telemetry.
Flexible Output Formats
Khaos is designed to work with whatever format is natural for your agent. You don't need to conform to a specific structure—we'll extract the output automatically.
Supported Return Types
# All of these work:
# Simple string
return "Hello, world!"
# Dict with common keys
return {"text": "Hello"}
return {"content": "Hello"} # OpenAI-style
return {"result": "Hello"} # Tool-style
return {"response": "Hello"} # Generic
return {"answer": "Hello"} # Q&A-style
# Full envelope (passed through as-is)
return {"name": "custom.response", "payload": {"text": "Hello"}}
# Tuple (name, payload)
return ("my.event", {"data": 123})
# Framework objects (auto-extracted)
return llm_response # LangChain AIMessage, CrewAI output, etc.
# None for empty success
return Nonecontent, text, message, value, result, response, output, answer.Class-Based Agents
For agents that need to maintain state across calls, you can use a class with a __call__ method:
from khaos import khaosagent
@khaosagent(name="stateful-agent", version="1.0.0")
class ConversationAgent:
def __init__(self):
self.history = []
def __call__(self, message):
prompt = (message.get("payload") or {}).get("text", "")
self.history.append({"role": "user", "content": prompt})
# Your agent logic with full history
response = self.generate_response(self.history)
self.history.append({"role": "assistant", "content": response})
return {"text": response}
def generate_response(self, history):
# Call your LLM with conversation history
...
# Instantiate and use
agent = ConversationAgent()
# agent(...) is now fully instrumented by Khaos__call__ method while preserving the class structure.Streaming/Generator Agents
For agents that stream responses, you have two options: simple generators (collected automatically) or StreamingResponse for preserving real-time streaming semantics.
Simple Generators (Auto-collected)
from khaos import khaosagent
@khaosagent(name="streaming-agent", version="1.0.0")
def handle(message):
"""Sync generator - chunks collected automatically."""
prompt = (message.get("payload") or {}).get("text", "")
for chunk in stream_from_llm(prompt):
yield chunk # Each chunk is collected
# Async generators also supported
@khaosagent(name="async-streaming-agent", version="1.0.0")
async def handle_async(message):
"""Async generator for streaming responses."""
prompt = (message.get("payload") or {}).get("text", "")
async for chunk in async_stream_from_llm(prompt):
yield chunkStreamingResponse (Preserve Streaming)
Use StreamingResponse when you need to preserve real-time streaming behavior while still capturing telemetry:
from khaos import khaosagent, StreamingResponse, AsyncStreamingResponse
@khaosagent(name="streaming-agent", version="1.0.0")
def handle(message):
"""StreamingResponse preserves real-time streaming."""
prompt = (message.get("payload") or {}).get("text", "")
def generate():
for chunk in llm.stream(prompt):
yield chunk
return StreamingResponse(generate())
# Consumer can iterate in real-time:
# for chunk in streaming_agent(msg):
# print(chunk) # Printed as chunks arrive
# Async version
@khaosagent(name="async-streaming-agent", version="1.0.0")
async def handle_async(message):
prompt = (message.get("payload") or {}).get("text", "")
async def generate():
async for chunk in llm.stream_async(prompt):
yield chunk
return AsyncStreamingResponse(generate())StreamingResponse provides .chunks (list of collected chunks), .collected (full concatenated content), and .is_complete for inspection.Async Handlers
The decorator supports both sync and async handlers:
from khaos import khaosagent
import asyncio
@khaosagent(name="async-agent", version="1.0.0")
async def handle(message):
"""Async handler for concurrent operations."""
prompt = (message.get("payload") or {}).get("text", "")
# Async operations work naturally
result = await your_async_agent_logic(prompt)
return {"text": result}Multiple Agents
Define multiple agents in a single file by giving each a unique name:
from khaos import khaosagent
@khaosagent(name="classifier", version="1.0.0")
def classify_handler(message):
"""Classifies incoming requests."""
prompt = (message.get("payload") or {}).get("text", "")
# Classification logic
return {"text": f"Category: {category}"}
@khaosagent(name="responder", version="1.0.0")
def respond_handler(message):
"""Generates responses based on classification."""
prompt = (message.get("payload") or {}).get("text", "")
# Response generation logic
return {"text": response}# Discover all agents
khaos discover
# Run specific agent by name
khaos run classifier --eval quickstart
khaos run responder --eval quickstartAgent Discovery
Before running evaluations, register your agents with khaos discover:
# Discover agents in current directory
khaos discover
# Discover agents in a specific path
khaos discover ./agents/
# List discovered agents
khaos agents listDiscovery scans Python files for @khaosagent decorators and registers them for use with khaos run <agent-name>.
What Gets Captured
When running under Khaos, the decorator automatically captures:
- LLM Telemetry - All API calls to supported providers (OpenAI, Anthropic, Google Gemini, Mistral, Cohere)
- Token Usage - Prompt and completion tokens per call
- Cost - Estimated USD cost based on token pricing
- Latency - Time to first token, total duration
- Tool Calls - Function/tool invocations and results
- MCP Events - MCP server interactions if using MCP
- Errors - Exceptions with full traceback and structured frames
- Streaming - Chunk counts and completion status for streaming responses
Framework Examples
Here are examples for common frameworks. See Framework Support for full details.
OpenAI
from khaos import khaosagent
from openai import OpenAI
client = OpenAI()
@khaosagent(name="openai-agent", version="1.0.0", framework="openai")
def handle(message):
prompt = (message.get("payload") or {}).get("text", "")
response = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}],
)
return {"text": response.choices[0].message.content}LangChain
from khaos import khaosagent
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(model="gpt-4o")
@khaosagent(name="langchain-agent", version="1.0.0", framework="langchain")
def handle(message):
prompt = (message.get("payload") or {}).get("text", "")
response = llm.invoke(prompt)
return {"text": response.content}Anthropic
from khaos import khaosagent
from anthropic import Anthropic
client = Anthropic()
@khaosagent(name="claude-agent", version="1.0.0", framework="anthropic")
def handle(message):
prompt = (message.get("payload") or {}).get("text", "")
response = client.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=1024,
messages=[{"role": "user", "content": prompt}],
)
return {"text": response.content[0].text}Timeout Enforcement
Set timeout_ms to enforce a maximum execution time for your agent. If exceeded, an AgentTimeoutError is raised:
from khaos import khaosagent, AgentTimeoutError
@khaosagent(name="time-limited-agent", version="1.0.0", timeout_ms=30000) # 30 second timeout
def handle(message):
prompt = (message.get("payload") or {}).get("text", "")
result = slow_llm_call(prompt) # If this takes > 30s, AgentTimeoutError raised
return {"text": result}
# For custom timeout handling:
try:
result = time_limited_agent(message)
except AgentTimeoutError as e:
print(f"Agent {e.agent_name} timed out after {e.timeout_ms}ms")asyncio.wait_for(). Note that Python threads cannot be forcibly killed—the underlying operation may continue in the background.Invocation Context
Access the current invocation context from anywhere in your agent code:
from khaos import khaosagent, get_current_context, InvocationContext
@khaosagent(name="context-aware-agent", version="1.0.0")
def handle(message):
# Get context anywhere in your code (thread-safe)
ctx = get_current_context()
if ctx:
print(f"Agent: {ctx.agent_name} v{ctx.agent_version}")
print(f"Elapsed: {ctx.elapsed_ms}ms")
print(f"Input: {ctx.payload}")
return {"text": "response"}
# Or receive context as a parameter
@khaosagent(name="context-param-agent", version="1.0.0")
def handle(message, context: InvocationContext):
print(f"Running {context.agent_name} for {context.elapsed_ms}ms")
return {"text": "response"}Error Handling
Khaos captures exceptions raised by your handler, including full tracebacks. For graceful error handling:
from khaos import khaosagent
@khaosagent(name="resilient-agent", version="1.0.0")
def handle(message):
try:
prompt = (message.get("payload") or {}).get("text", "")
result = your_agent_logic(prompt)
return {"text": result}
except TimeoutError:
# Graceful degradation
return {"text": "I'm sorry, the request timed out. Please try again."}
except Exception as e:
# Log and return error response
return {"text": f"An error occurred: {str(e)}", "error": True}Best Practices
- Use semantic versioning - Increment versions when behavior changes
- Keep handlers focused - One handler per logical agent responsibility
- Handle errors gracefully - Return user-friendly error messages
- Import frameworks at module level - Ensures proper detection
- Add descriptive tags - Helps organize agents in the dashboard
- Run discovery after changes - Re-register agents when you modify decorators
Next Steps
- CLI Reference - Full command documentation
- Evaluation Packs - Choose the right test suite
- Framework Support - Detailed framework integration
- Fault Injection - Chaos testing and fault types