@khaosagent Decorator

The @khaosagent decorator is the only required integration point for Khaos. It wraps your agent handler function, enabling automatic telemetry capture, fault injection, and security testing without modifying your agent logic.

Basic Usage

Decorate your agent's message handler function with @khaosagent:

Python
from khaos import khaosagent

@khaosagent(name="my-agent", version="1.0.0")
def handle(message):
    """Your agent's message handler."""
    prompt = (message.get("payload") or {}).get("text", "")

    # Your agent logic here - call LLMs, tools, etc.
    result = your_agent_logic(prompt)

    return {"text": result}

The decorator intercepts incoming messages and outgoing responses, capturing telemetry and enabling Khaos to inject faults and security tests during evaluation runs.

Decorator Parameters

ParameterRequiredDefaultDescription
nameYes-Unique identifier for your agent (used in khaos run)
versionYes-Semantic version string (e.g., "1.0.0", "2.1.3")
frameworkNoauto-detectFramework hint: "openai", "anthropic", "langchain", "crewai", etc.
descriptionNo-Human-readable description for dashboard display
tagsNo[]List of tags for categorization and filtering
timeout_msNo0 (disabled)Timeout in milliseconds. Raises AgentTimeoutError if exceeded
capture_errorsNoTrueCatch exceptions and return structured error responses
security_modeNo"agent_input""agent_input" tests full stack; "llm" tests raw LLM defenses only
mcp_serversNo[]List of MCP server names for auto-discovery and fault injection
Python
@khaosagent(
    name="customer-support-agent",
    version="2.1.0",
    framework="langchain",
    description="Handles customer support inquiries with RAG",
    tags=["production", "customer-facing", "rag"]
)
def handle(message):
    # ...

Message Format

The handler receives a standardized message dict and must return a response dict:

Input Message

Python
{
    "type": "user_message",
    "payload": {
        "text": "User's input prompt",
        "metadata": {}  # Optional additional context
    },
    "context": {
        "run_id": "khaos-pack-20250101-abc123",
        "scenario": "release.v1.resilience",
        "phase": "baseline"  # or "resilience", "security"
    }
}

Output Response

Python
{
    "text": "Agent's response text",
    "metadata": {
        "confidence": 0.95,      # Optional
        "sources": ["doc1.pdf"]  # Optional
    }
}

At minimum, return {"text": "..."}. Additional metadata is optional and will be captured in telemetry.

Flexible Output Formats

Khaos is designed to work with whatever format is natural for your agent. You don't need to conform to a specific structure—we'll extract the output automatically.

Supported Return Types

Python
# All of these work:

# Simple string
return "Hello, world!"

# Dict with common keys
return {"text": "Hello"}
return {"content": "Hello"}      # OpenAI-style
return {"result": "Hello"}       # Tool-style
return {"response": "Hello"}     # Generic
return {"answer": "Hello"}       # Q&A-style

# Full envelope (passed through as-is)
return {"name": "custom.response", "payload": {"text": "Hello"}}

# Tuple (name, payload)
return ("my.event", {"data": 123})

# Framework objects (auto-extracted)
return llm_response  # LangChain AIMessage, CrewAI output, etc.

# None for empty success
return None
Key Priority
Khaos checks these keys in order: content, text, message, value, result, response, output, answer.

Class-Based Agents

For agents that need to maintain state across calls, you can use a class with a __call__ method:

Python
from khaos import khaosagent

@khaosagent(name="stateful-agent", version="1.0.0")
class ConversationAgent:
    def __init__(self):
        self.history = []

    def __call__(self, message):
        prompt = (message.get("payload") or {}).get("text", "")
        self.history.append({"role": "user", "content": prompt})

        # Your agent logic with full history
        response = self.generate_response(self.history)
        self.history.append({"role": "assistant", "content": response})

        return {"text": response}

    def generate_response(self, history):
        # Call your LLM with conversation history
        ...

# Instantiate and use
agent = ConversationAgent()
# agent(...) is now fully instrumented by Khaos
Instance State
Each instance maintains its own state. The decorator wraps the __call__ method while preserving the class structure.

Streaming/Generator Agents

For agents that stream responses, you have two options: simple generators (collected automatically) or StreamingResponse for preserving real-time streaming semantics.

Simple Generators (Auto-collected)

Python
from khaos import khaosagent

@khaosagent(name="streaming-agent", version="1.0.0")
def handle(message):
    """Sync generator - chunks collected automatically."""
    prompt = (message.get("payload") or {}).get("text", "")

    for chunk in stream_from_llm(prompt):
        yield chunk  # Each chunk is collected

# Async generators also supported
@khaosagent(name="async-streaming-agent", version="1.0.0")
async def handle_async(message):
    """Async generator for streaming responses."""
    prompt = (message.get("payload") or {}).get("text", "")

    async for chunk in async_stream_from_llm(prompt):
        yield chunk

StreamingResponse (Preserve Streaming)

Use StreamingResponse when you need to preserve real-time streaming behavior while still capturing telemetry:

Python
from khaos import khaosagent, StreamingResponse, AsyncStreamingResponse

@khaosagent(name="streaming-agent", version="1.0.0")
def handle(message):
    """StreamingResponse preserves real-time streaming."""
    prompt = (message.get("payload") or {}).get("text", "")

    def generate():
        for chunk in llm.stream(prompt):
            yield chunk

    return StreamingResponse(generate())

# Consumer can iterate in real-time:
# for chunk in streaming_agent(msg):
#     print(chunk)  # Printed as chunks arrive

# Async version
@khaosagent(name="async-streaming-agent", version="1.0.0")
async def handle_async(message):
    prompt = (message.get("payload") or {}).get("text", "")

    async def generate():
        async for chunk in llm.stream_async(prompt):
            yield chunk

    return AsyncStreamingResponse(generate())
StreamingResponse Properties
StreamingResponse provides .chunks (list of collected chunks), .collected (full concatenated content), and .is_complete for inspection.

Async Handlers

The decorator supports both sync and async handlers:

Python
from khaos import khaosagent
import asyncio

@khaosagent(name="async-agent", version="1.0.0")
async def handle(message):
    """Async handler for concurrent operations."""
    prompt = (message.get("payload") or {}).get("text", "")

    # Async operations work naturally
    result = await your_async_agent_logic(prompt)

    return {"text": result}
Streaming Support
For streaming responses, return a sync or async generator. Khaos will collect all chunks and evaluate the complete response. See the Streaming/Generator Agents section below.

Multiple Agents

Define multiple agents in a single file by giving each a unique name:

Python
from khaos import khaosagent

@khaosagent(name="classifier", version="1.0.0")
def classify_handler(message):
    """Classifies incoming requests."""
    prompt = (message.get("payload") or {}).get("text", "")
    # Classification logic
    return {"text": f"Category: {category}"}

@khaosagent(name="responder", version="1.0.0")
def respond_handler(message):
    """Generates responses based on classification."""
    prompt = (message.get("payload") or {}).get("text", "")
    # Response generation logic
    return {"text": response}
Terminal
# Discover all agents
khaos discover

# Run specific agent by name
khaos run classifier --eval quickstart
khaos run responder --eval quickstart

Agent Discovery

Before running evaluations, register your agents with khaos discover:

Terminal
# Discover agents in current directory
khaos discover

# Discover agents in a specific path
khaos discover ./agents/

# List discovered agents
khaos agents list

Discovery scans Python files for @khaosagent decorators and registers them for use with khaos run <agent-name>.

What Gets Captured

When running under Khaos, the decorator automatically captures:

  • LLM Telemetry - All API calls to supported providers (OpenAI, Anthropic, Google Gemini, Mistral, Cohere)
  • Token Usage - Prompt and completion tokens per call
  • Cost - Estimated USD cost based on token pricing
  • Latency - Time to first token, total duration
  • Tool Calls - Function/tool invocations and results
  • MCP Events - MCP server interactions if using MCP
  • Errors - Exceptions with full traceback and structured frames
  • Streaming - Chunk counts and completion status for streaming responses
Zero Production Overhead
Outside of Khaos runs, the decorator has negligible overhead (<0.0001%). Your production code runs unchanged.

Framework Examples

Here are examples for common frameworks. See Framework Support for full details.

OpenAI

Python
from khaos import khaosagent
from openai import OpenAI

client = OpenAI()

@khaosagent(name="openai-agent", version="1.0.0", framework="openai")
def handle(message):
    prompt = (message.get("payload") or {}).get("text", "")
    response = client.chat.completions.create(
        model="gpt-4o",
        messages=[{"role": "user", "content": prompt}],
    )
    return {"text": response.choices[0].message.content}

LangChain

Python
from khaos import khaosagent
from langchain_openai import ChatOpenAI

llm = ChatOpenAI(model="gpt-4o")

@khaosagent(name="langchain-agent", version="1.0.0", framework="langchain")
def handle(message):
    prompt = (message.get("payload") or {}).get("text", "")
    response = llm.invoke(prompt)
    return {"text": response.content}

Anthropic

Python
from khaos import khaosagent
from anthropic import Anthropic

client = Anthropic()

@khaosagent(name="claude-agent", version="1.0.0", framework="anthropic")
def handle(message):
    prompt = (message.get("payload") or {}).get("text", "")
    response = client.messages.create(
        model="claude-sonnet-4-20250514",
        max_tokens=1024,
        messages=[{"role": "user", "content": prompt}],
    )
    return {"text": response.content[0].text}

Timeout Enforcement

Set timeout_ms to enforce a maximum execution time for your agent. If exceeded, an AgentTimeoutError is raised:

Python
from khaos import khaosagent, AgentTimeoutError

@khaosagent(name="time-limited-agent", version="1.0.0", timeout_ms=30000)  # 30 second timeout
def handle(message):
    prompt = (message.get("payload") or {}).get("text", "")
    result = slow_llm_call(prompt)  # If this takes > 30s, AgentTimeoutError raised
    return {"text": result}

# For custom timeout handling:
try:
    result = time_limited_agent(message)
except AgentTimeoutError as e:
    print(f"Agent {e.agent_name} timed out after {e.timeout_ms}ms")
Timeout Behavior
For sync handlers, timeout uses ThreadPoolExecutor. For async handlers, it uses asyncio.wait_for(). Note that Python threads cannot be forcibly killed—the underlying operation may continue in the background.

Invocation Context

Access the current invocation context from anywhere in your agent code:

Python
from khaos import khaosagent, get_current_context, InvocationContext

@khaosagent(name="context-aware-agent", version="1.0.0")
def handle(message):
    # Get context anywhere in your code (thread-safe)
    ctx = get_current_context()

    if ctx:
        print(f"Agent: {ctx.agent_name} v{ctx.agent_version}")
        print(f"Elapsed: {ctx.elapsed_ms}ms")
        print(f"Input: {ctx.payload}")

    return {"text": "response"}

# Or receive context as a parameter
@khaosagent(name="context-param-agent", version="1.0.0")
def handle(message, context: InvocationContext):
    print(f"Running {context.agent_name} for {context.elapsed_ms}ms")
    return {"text": "response"}

Error Handling

Khaos captures exceptions raised by your handler, including full tracebacks. For graceful error handling:

Python
from khaos import khaosagent

@khaosagent(name="resilient-agent", version="1.0.0")
def handle(message):
    try:
        prompt = (message.get("payload") or {}).get("text", "")
        result = your_agent_logic(prompt)
        return {"text": result}
    except TimeoutError:
        # Graceful degradation
        return {"text": "I'm sorry, the request timed out. Please try again."}
    except Exception as e:
        # Log and return error response
        return {"text": f"An error occurred: {str(e)}", "error": True}
Resilience Testing
Khaos' resilience evaluation tests how your agent handles errors. Good error handling improves your resilience score.

Best Practices

  • Use semantic versioning - Increment versions when behavior changes
  • Keep handlers focused - One handler per logical agent responsibility
  • Handle errors gracefully - Return user-friendly error messages
  • Import frameworks at module level - Ensures proper detection
  • Add descriptive tags - Helps organize agents in the dashboard
  • Run discovery after changes - Re-register agents when you modify decorators

Next Steps