Transport Layer
The transport layer defines how Khaos communicates with your agent during evaluation. It provides a protocol-agnostic messaging interface that supports both in-process and remote agents.
AgentTransport Protocol
The AgentTransport protocol defines the three async methods that every transport implementation must provide.
| Method | Signature | Description |
|---|---|---|
send | async send(message: TransportMessage) -> None | Send a message to the agent |
receive | async receive(timeout: float) -> TransportEvent | Wait for the next event from the agent |
close | async close() -> None | Clean up resources and close the connection |
from khaos.transport import AgentTransport, TransportMessage, TransportEvent
class MyTransport(AgentTransport):
async def send(self, message: TransportMessage) -> None:
# Deliver message to your agent
await self._connection.send(message.payload)
async def receive(self, timeout: float) -> TransportEvent:
# Wait for agent response
raw = await self._connection.receive(timeout=timeout)
return TransportEvent.create(
event="response",
payload={"content": raw},
run_id=self._run_id,
agent_id=self._agent_id,
)
async def close(self) -> None:
await self._connection.disconnect()TransportMessage
Messages sent to agents are wrapped in a TransportMessage dataclass with slots enabled for performance.
| Field | Type | Description |
|---|---|---|
name | str | Message type identifier (e.g. "user_input", "system") |
payload | dict | Message content and parameters |
metadata | dict | Additional metadata (run_id, turn number, etc.) |
from khaos.transport import TransportMessage
message = TransportMessage(
name="user_input",
payload={"content": "What is the weather in London?"},
metadata={"run_id": "run-001", "turn": 1},
)TransportEvent
Events received from agents are wrapped in a TransportEvent dataclass. Use the create() classmethod for convenient construction.
| Field | Type | Description |
|---|---|---|
ts | datetime | Timestamp of the event |
run_id | str | Evaluation run identifier |
agent_id | str | Agent identifier |
event | str | Event type (e.g. "response", "tool_call", "error") |
payload | dict | Event data |
meta | dict | Additional metadata |
from khaos.transport import TransportEvent
# Create an event using the classmethod
event = TransportEvent.create(
event="response",
payload={"content": "The weather in London is 15°C and cloudy."},
run_id="run-001",
agent_id="my-agent",
meta={"latency_ms": 342},
)
print(event.ts) # datetime when created
print(event.event) # "response"
print(event.payload) # {"content": "The weather in London is..."}InProcessTransport
The InProcessTransport is the default transport for agents running in the same Python process. It wraps an async handler function and manages message passing without network overhead.
from khaos.transport import InProcessTransport, TransportMessage, TransportEvent
# Define an async message handler
async def my_handler(message: TransportMessage) -> TransportEvent:
# Process the message and return an event
response_text = await my_agent.process(message.payload["content"])
return TransportEvent.create(
event="response",
payload={"content": response_text},
run_id=message.metadata.get("run_id", ""),
agent_id="my-agent",
)
# Create the transport
transport = InProcessTransport(handler=my_handler)
# Use it
await transport.send(TransportMessage(
name="user_input",
payload={"content": "Hello"},
metadata={"run_id": "run-001"},
))
event = await transport.receive(timeout=30.0)
print(event.payload["content"])
# Clean up
await transport.close()Wrapping Sync Handlers
If your agent uses synchronous code, use wrap_sync_handler() to adapt it to the async transport interface.
from khaos.transport import wrap_sync_handler, InProcessTransport
# Your synchronous agent function
def my_sync_agent(message: dict) -> str:
return f"You said: {message['content']}"
# Wrap it for async transport
async_handler = wrap_sync_handler(my_sync_agent)
# Use with InProcessTransport
transport = InProcessTransport(handler=async_handler)wrap_sync_handler() runs your synchronous function in a thread pool executor to avoid blocking the async event loop. Ensure your handler is thread-safe.Transport Factory
The build_inproc_transport() factory function creates anInProcessTransport with the standard context and options. This is what Khaos uses internally when registering transports.
from khaos.transport import build_inproc_transport
# Build a transport from context and options
transport = build_inproc_transport(
context=agent_context, # AgentContext from discovery
options=transport_options, # TransportOptions configuration
)Transport Errors
The transport layer defines a hierarchy of errors for handling communication failures.
| Error | Description | When Raised |
|---|---|---|
TransportError | Base error for all transport failures | Generic transport issues |
TransportTimeout | Agent did not respond within the timeout | receive() exceeds timeout |
TransportProtocolError | Invalid message format or protocol violation | Malformed messages, unexpected event types |
TransportExit | Agent signaled it is shutting down | Agent process exited or connection closed |
from khaos.transport import (
TransportError,
TransportTimeout,
TransportProtocolError,
TransportExit,
)
try:
event = await transport.receive(timeout=30.0)
except TransportTimeout:
print("Agent did not respond in time")
except TransportProtocolError as e:
print(f"Protocol error: {e}")
except TransportExit:
print("Agent has shut down")
except TransportError as e:
print(f"Transport failure: {e}")When to Implement Custom Transport
The built-in InProcessTransport works for most use cases. Consider a custom transport when:
- Remote agents: Your agent runs on a different machine or in a container and communicates over HTTP, gRPC, or WebSocket
- Distributed testing: You need to run evaluations across multiple machines or cloud instances
- Custom protocols: Your agent uses a proprietary communication protocol that Khaos doesn't natively support
- Message queues: Your agent communicates through a message broker like RabbitMQ, Kafka, or Redis Streams
InProcessTransport. Only implement a custom transport when you have a specific requirement that the default cannot meet.Related Documentation
- Architecture Overview - Where transport fits in the system
- Architecture Decision Records - Design decisions behind the transport layer
- Agent Configuration - Configuring agent transport settings