Transport Layer

The transport layer defines how Khaos communicates with your agent during evaluation. It provides a protocol-agnostic messaging interface that supports both in-process and remote agents.

AgentTransport Protocol

The AgentTransport protocol defines the three async methods that every transport implementation must provide.

MethodSignatureDescription
sendasync send(message: TransportMessage) -> NoneSend a message to the agent
receiveasync receive(timeout: float) -> TransportEventWait for the next event from the agent
closeasync close() -> NoneClean up resources and close the connection
Python
from khaos.transport import AgentTransport, TransportMessage, TransportEvent

class MyTransport(AgentTransport):
    async def send(self, message: TransportMessage) -> None:
        # Deliver message to your agent
        await self._connection.send(message.payload)

    async def receive(self, timeout: float) -> TransportEvent:
        # Wait for agent response
        raw = await self._connection.receive(timeout=timeout)
        return TransportEvent.create(
            event="response",
            payload={"content": raw},
            run_id=self._run_id,
            agent_id=self._agent_id,
        )

    async def close(self) -> None:
        await self._connection.disconnect()

TransportMessage

Messages sent to agents are wrapped in a TransportMessage dataclass with slots enabled for performance.

FieldTypeDescription
namestrMessage type identifier (e.g. "user_input", "system")
payloaddictMessage content and parameters
metadatadictAdditional metadata (run_id, turn number, etc.)
Python
from khaos.transport import TransportMessage

message = TransportMessage(
    name="user_input",
    payload={"content": "What is the weather in London?"},
    metadata={"run_id": "run-001", "turn": 1},
)

TransportEvent

Events received from agents are wrapped in a TransportEvent dataclass. Use the create() classmethod for convenient construction.

FieldTypeDescription
tsdatetimeTimestamp of the event
run_idstrEvaluation run identifier
agent_idstrAgent identifier
eventstrEvent type (e.g. "response", "tool_call", "error")
payloaddictEvent data
metadictAdditional metadata
Python
from khaos.transport import TransportEvent

# Create an event using the classmethod
event = TransportEvent.create(
    event="response",
    payload={"content": "The weather in London is 15°C and cloudy."},
    run_id="run-001",
    agent_id="my-agent",
    meta={"latency_ms": 342},
)

print(event.ts)        # datetime when created
print(event.event)     # "response"
print(event.payload)   # {"content": "The weather in London is..."}

InProcessTransport

The InProcessTransport is the default transport for agents running in the same Python process. It wraps an async handler function and manages message passing without network overhead.

Python
from khaos.transport import InProcessTransport, TransportMessage, TransportEvent

# Define an async message handler
async def my_handler(message: TransportMessage) -> TransportEvent:
    # Process the message and return an event
    response_text = await my_agent.process(message.payload["content"])
    return TransportEvent.create(
        event="response",
        payload={"content": response_text},
        run_id=message.metadata.get("run_id", ""),
        agent_id="my-agent",
    )

# Create the transport
transport = InProcessTransport(handler=my_handler)

# Use it
await transport.send(TransportMessage(
    name="user_input",
    payload={"content": "Hello"},
    metadata={"run_id": "run-001"},
))
event = await transport.receive(timeout=30.0)
print(event.payload["content"])

# Clean up
await transport.close()

Wrapping Sync Handlers

If your agent uses synchronous code, use wrap_sync_handler() to adapt it to the async transport interface.

Python
from khaos.transport import wrap_sync_handler, InProcessTransport

# Your synchronous agent function
def my_sync_agent(message: dict) -> str:
    return f"You said: {message['content']}"

# Wrap it for async transport
async_handler = wrap_sync_handler(my_sync_agent)

# Use with InProcessTransport
transport = InProcessTransport(handler=async_handler)
Thread safety
wrap_sync_handler() runs your synchronous function in a thread pool executor to avoid blocking the async event loop. Ensure your handler is thread-safe.

Transport Factory

The build_inproc_transport() factory function creates anInProcessTransport with the standard context and options. This is what Khaos uses internally when registering transports.

Python
from khaos.transport import build_inproc_transport

# Build a transport from context and options
transport = build_inproc_transport(
    context=agent_context,   # AgentContext from discovery
    options=transport_options,  # TransportOptions configuration
)

Transport Errors

The transport layer defines a hierarchy of errors for handling communication failures.

ErrorDescriptionWhen Raised
TransportErrorBase error for all transport failuresGeneric transport issues
TransportTimeoutAgent did not respond within the timeoutreceive() exceeds timeout
TransportProtocolErrorInvalid message format or protocol violationMalformed messages, unexpected event types
TransportExitAgent signaled it is shutting downAgent process exited or connection closed
Python
from khaos.transport import (
    TransportError,
    TransportTimeout,
    TransportProtocolError,
    TransportExit,
)

try:
    event = await transport.receive(timeout=30.0)
except TransportTimeout:
    print("Agent did not respond in time")
except TransportProtocolError as e:
    print(f"Protocol error: {e}")
except TransportExit:
    print("Agent has shut down")
except TransportError as e:
    print(f"Transport failure: {e}")

When to Implement Custom Transport

The built-in InProcessTransport works for most use cases. Consider a custom transport when:

  • Remote agents: Your agent runs on a different machine or in a container and communicates over HTTP, gRPC, or WebSocket
  • Distributed testing: You need to run evaluations across multiple machines or cloud instances
  • Custom protocols: Your agent uses a proprietary communication protocol that Khaos doesn't natively support
  • Message queues: Your agent communicates through a message broker like RabbitMQ, Kafka, or Redis Streams
Start simple
Most agents work out of the box with InProcessTransport. Only implement a custom transport when you have a specific requirement that the default cannot meet.

Related Documentation