Building Custom Adapters¶
Create adapters for proprietary frameworks or unique requirements by implementing the BaseAdapter interface.
The Runnable Interface¶
from fastagentic.adapters.base import BaseAdapter, Event, EventType
from typing import AsyncIterator, Any
class MyAdapter(BaseAdapter):
"""Adapter for my custom agent framework."""
def __init__(self, agent: MyAgent, **options):
self.agent = agent
self.options = options
async def invoke(self, input: dict, config: dict) -> dict:
"""Synchronous execution - return final result."""
result = await self.agent.run(input)
return {"output": result}
async def stream(self, input: dict, config: dict) -> AsyncIterator[Event]:
"""Streaming execution - yield events as they occur."""
async for chunk in self.agent.stream(input):
yield self.create_event(EventType.TOKEN, {"content": chunk})
# Final result
result = await self.agent.get_result()
yield self.create_event(EventType.RUN_COMPLETE, {"result": result})
async def checkpoint(self, state: dict) -> str:
"""Save state, return checkpoint ID."""
checkpoint_id = str(uuid4())
await self.store.save(checkpoint_id, state)
return checkpoint_id
async def resume(self, checkpoint_id: str) -> dict:
"""Restore from checkpoint, return state."""
return await self.store.load(checkpoint_id)
def get_schema(self) -> dict:
"""Return JSON schema for MCP tool registration."""
return {
"name": "my_agent",
"description": self.agent.description,
"inputSchema": self.agent.input_schema,
}
Event Protocol¶
Event Types¶
from enum import Enum
class EventType(Enum):
TOKEN = "token" # LLM output token
NODE_START = "node_start" # Workflow node begins
NODE_END = "node_end" # Workflow node completes
TOOL_CALL = "tool_call" # Tool invocation
TOOL_RESULT = "tool_result" # Tool returns
CHECKPOINT = "checkpoint" # State persisted
COST = "cost" # Usage metrics
RUN_COMPLETE = "run_complete"# Execution finished
ERROR = "error" # Error occurred
Event Structure¶
@dataclass
class Event:
type: EventType
data: dict
timestamp: float
run_id: str | None = None
metadata: dict | None = None
Creating Events¶
Use the helper method:
# Simple event
yield self.create_event(EventType.TOKEN, {"content": "Hello"})
# With metadata
yield self.create_event(
EventType.TOOL_CALL,
{"tool": "search", "args": {"query": "AI"}},
metadata={"latency_ms": 150}
)
Lifecycle Hooks¶
Override these methods for custom behavior:
class MyAdapter(BaseAdapter):
async def on_start(self, input: dict, config: dict) -> None:
"""Called before execution begins."""
self.start_time = time.time()
async def on_complete(self, result: dict, config: dict) -> None:
"""Called after successful completion."""
duration = time.time() - self.start_time
logger.info(f"Completed in {duration:.2f}s")
async def on_error(self, error: Exception, config: dict) -> None:
"""Called when execution fails."""
logger.error(f"Failed: {error}")
async def on_checkpoint(self, checkpoint_id: str, state: dict) -> None:
"""Called after checkpoint is saved."""
logger.debug(f"Checkpoint saved: {checkpoint_id}")
Complete Example: AutoGen Adapter¶
from fastagentic.adapters.base import BaseAdapter, Event, EventType
from autogen import AssistantAgent, UserProxyAgent
from typing import AsyncIterator
import asyncio
class AutoGenAdapter(BaseAdapter):
"""Adapter for Microsoft AutoGen agents."""
def __init__(
self,
assistant: AssistantAgent,
user_proxy: UserProxyAgent,
max_rounds: int = 10,
):
self.assistant = assistant
self.user_proxy = user_proxy
self.max_rounds = max_rounds
self._events: list[Event] = []
async def invoke(self, input: dict, config: dict) -> dict:
"""Run AutoGen conversation synchronously."""
message = input.get("message", "")
# Collect events during execution
self._events = []
# Run the conversation
await self.user_proxy.a_initiate_chat(
self.assistant,
message=message,
max_turns=self.max_rounds,
)
# Get final response
last_message = self.assistant.last_message()
return {"response": last_message["content"]}
async def stream(self, input: dict, config: dict) -> AsyncIterator[Event]:
"""Stream AutoGen conversation events."""
message = input.get("message", "")
# Set up message callback
async def on_message(sender, recipient, message):
yield self.create_event(
EventType.TOKEN,
{
"content": message.get("content", ""),
"sender": sender.name,
"recipient": recipient.name,
}
)
# Register callback
self.assistant.register_reply(
trigger=self.user_proxy,
reply_func=on_message,
)
# Emit start event
yield self.create_event(
EventType.NODE_START,
{"node": "conversation", "message": message}
)
# Run conversation
await self.user_proxy.a_initiate_chat(
self.assistant,
message=message,
max_turns=self.max_rounds,
)
# Emit completion
last_message = self.assistant.last_message()
yield self.create_event(
EventType.RUN_COMPLETE,
{"result": last_message["content"]}
)
async def checkpoint(self, state: dict) -> str:
"""Save conversation state."""
checkpoint_id = str(uuid4())
checkpoint_data = {
"assistant_messages": self.assistant.chat_messages,
"user_proxy_messages": self.user_proxy.chat_messages,
"state": state,
}
await self.store.save(checkpoint_id, checkpoint_data)
return checkpoint_id
async def resume(self, checkpoint_id: str) -> dict:
"""Restore conversation from checkpoint."""
data = await self.store.load(checkpoint_id)
self.assistant.chat_messages = data["assistant_messages"]
self.user_proxy.chat_messages = data["user_proxy_messages"]
return data["state"]
def get_schema(self) -> dict:
"""MCP schema for AutoGen agent."""
return {
"name": "autogen_chat",
"description": f"Chat with {self.assistant.name}",
"inputSchema": {
"type": "object",
"properties": {
"message": {
"type": "string",
"description": "Message to send"
}
},
"required": ["message"]
}
}
Usage¶
from fastagentic import App, agent_endpoint
from autogen import AssistantAgent, UserProxyAgent
assistant = AssistantAgent("assistant", llm_config={...})
user_proxy = UserProxyAgent("user", code_execution_config={...})
adapter = AutoGenAdapter(assistant, user_proxy, max_rounds=5)
app = App(title="AutoGen Chat", ...)
@agent_endpoint(
path="/chat",
runnable=adapter,
stream=True,
durable=True,
)
async def chat(message: str) -> str:
pass
Testing Custom Adapters¶
import pytest
from fastagentic.testing import AdapterTestCase
class TestMyAdapter(AdapterTestCase):
adapter_class = MyAdapter
@pytest.fixture
def adapter(self):
return MyAdapter(mock_agent)
async def test_invoke(self, adapter):
result = await adapter.invoke({"input": "test"}, {})
assert "output" in result
async def test_stream_emits_events(self, adapter):
events = [e async for e in adapter.stream({"input": "test"}, {})]
assert any(e.type == EventType.TOKEN for e in events)
assert events[-1].type == EventType.RUN_COMPLETE
async def test_checkpoint_resume(self, adapter):
# Run partially
state = {"step": 1, "data": "partial"}
checkpoint_id = await adapter.checkpoint(state)
# Resume
restored = await adapter.resume(checkpoint_id)
assert restored == state
Best Practices¶
-
Always emit RUN_COMPLETE: Ensure streaming ends with a completion event
-
Include timestamps: Use
self.create_event()which adds timestamps automatically -
Handle errors gracefully: Emit ERROR events and re-raise for proper handling
-
Checkpoint strategically: Save state at meaningful boundaries, not every token
-
Test thoroughly: Use
AdapterTestCasebase class for consistent testing
Next Steps¶
- Adapters Overview - Compare with built-in adapters
- PydanticAI Adapter - Reference implementation
- Operations Guide - Deploy custom adapters