Architecture¶
Technical deep-dive into Fast-CrewAI's architecture and implementation.
Overview¶
Fast-CrewAI uses a hybrid architecture combining Python's flexibility with Rust's performance through strategic monkey patching and PyO3 bindings.
┌─────────────────┐ ┌──────────────────┐ ┌─────────────────┐
│ Python API │ │ Rust Core │ │ Native Libs │
│ (CrewAI) │◄──►│ (Performance) │◄──►│ (System) │
└─────────────────┘ └──────────────────┘ └─────────────────┘
│ │ │
┌────▼────┐ ┌────▼────┐ ┌────▼────┐
│ Memory │ │ SIMD │ │ SQLite │
│ Tools │ │ Async │ │ Threading│
│ Tasks │ │ Zero-Copy│ │ I/O │
└─────────┘ └─────────┘ └─────────┘
Core Components¶
1. Monkey Patching System¶
Location: fast_crewai/shim.py
The shim system intelligently replaces CrewAI components at import time:
def _monkey_patch_class(module_path: str, class_name: str, new_class: Any) -> bool:
"""Replace a class in a module with a new implementation."""
try:
if module_path in sys.modules:
module = sys.modules[module_path]
else:
module = importlib.import_module(module_path)
# Save original for restoration
if hasattr(module, class_name):
original_class = getattr(module, class_name)
_original_classes[f"{module_path}.{class_name}"] = original_class
# Replace with Rust implementation
setattr(module, class_name, new_class)
return True
except Exception:
return False # Graceful fallback
Key Features:
- Graceful Fallback: If patching fails, continues with Python implementation
- Restoration Support: Can restore original classes if needed
- Import Safety: Only patches after successful module import
2. Rust Core Implementation¶
Location: src/lib.rs
All Rust components are implemented as Python extensions using PyO3:
use pyo3::prelude::*;
#[pyclass]
pub struct RustMemoryStorage {
data: Arc<Mutex<Vec<MemoryItem>>>,
}
#[pymethods]
impl RustMemoryStorage {
#[new]
pub fn new() -> Self { /* ... */ }
pub fn save(&self, value: &str) -> PyResult<()> { /* ... */ }
pub fn search(&self, query: &str, limit: usize) -> PyResult<Vec<String>> { /* ... */ }
}
Performance Optimizations:
- Thread Safety:
Arc<Mutex<>>for safe concurrent access - Zero-Copy: Direct memory access without Python object overhead
- Async Runtime: Tokio-based concurrency for I/O operations
- Connection Pooling: Shared database connections with r2d2
3. Dynamic Inheritance Pattern¶
Tool and Task acceleration uses dynamic inheritance:
def create_accelerated_base_tool():
"""Create accelerated version at runtime."""
try:
from crewai.tools.base_tool import BaseTool as _OriginalBaseTool
class AcceleratedBaseTool(_OriginalBaseTool):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._acceleration_enabled = True
def _run(self, *args, **kwargs):
# Add acceleration logic
return super()._run(*args, **kwargs)
return AcceleratedBaseTool
except ImportError:
return None
Benefits:
- Full API compatibility (inherits all methods)
- Works even if CrewAI isn't installed
- Can override specific methods for acceleration
4. Fallback Mechanism¶
Each component implements automatic fallback:
class AcceleratedMemoryStorage:
def __init__(self, use_rust: Optional[bool] = None):
if use_rust is None:
self._use_rust = _RUST_AVAILABLE
else:
self._use_rust = use_rust and _RUST_AVAILABLE
if self._use_rust:
try:
self._storage = _CoreMemoryStorage()
self._implementation = "rust"
except Exception:
self._use_rust = False
self._storage = []
self._implementation = "python"
Component Architectures¶
Memory Storage¶
TF-IDF semantic search implementation:
pub fn search(&self, query: &str, limit: usize) -> PyResult<Vec<String>> {
let data = self.data.lock().unwrap();
// Compute query word frequencies
let query_frequencies = self.compute_word_frequencies(query);
// Calculate similarity scores
let mut scored_results: Vec<(String, f64)> = Vec::new();
for item in &*data {
let similarity = self.calculate_cosine_similarity(
&query_frequencies,
&item.word_frequencies
);
scored_results.push((item.content.clone(), similarity));
}
// Sort by similarity (descending)
scored_results.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap());
Ok(scored_results.into_iter().take(limit).map(|(c, _)| c).collect())
}
Database with FTS5¶
SQLite with FTS5 full-text search:
#[pyclass]
pub struct RustSQLiteWrapper {
connection_pool: Arc<Mutex<r2d2::Pool<SqliteConnectionManager>>>,
}
impl RustSQLiteWrapper {
pub fn new(db_path: &str, pool_size: u32) -> PyResult<Self> {
let manager = SqliteConnectionManager::file(db_path);
let pool = r2d2::Pool::builder()
.max_size(pool_size)
.build(manager)?;
// Create FTS5 virtual table
let conn = pool.get()?;
conn.execute(
"CREATE VIRTUAL TABLE IF NOT EXISTS memories_fts
USING fts5(task_description, content='long_term_memories')",
[],
)?;
Ok(Self { connection_pool: Arc::new(Mutex::new(pool)) })
}
pub fn search_fts(&self, query: &str, limit: usize) -> PyResult<Vec<Memory>> {
let conn = self.connection_pool.lock()?.get()?;
let mut stmt = conn.prepare(
"SELECT *, bm25(memories_fts) as rank
FROM long_term_memories
JOIN memories_fts ON long_term_memories.id = memories_fts.rowid
WHERE memories_fts MATCH ?
ORDER BY rank
LIMIT ?"
)?;
// Execute and return results
}
}
Task Execution with Tokio¶
Async task execution:
pub fn execute_concurrent_tasks(&self, tasks: Vec<&str>) -> PyResult<Vec<String>> {
let runtime = self.runtime.clone();
Python::with_gil(|py| {
py.allow_threads(|| {
runtime.block_on(async {
let mut handles = Vec::new();
for task in tasks {
let task_str = task.to_string();
let handle = tokio::spawn(async move {
// Execute task
format!("Completed: {}", task_str)
});
handles.push(handle);
}
let mut results = Vec::new();
for handle in handles {
results.push(handle.await?);
}
Ok(results)
})
})
})
}
Serialization with serde¶
Zero-copy JSON:
#[derive(Debug, Clone, Serialize, Deserialize)]
#[pyclass]
pub struct AgentMessage {
#[pyo3(get, set)]
pub id: String,
#[pyo3(get, set)]
pub sender: String,
#[pyo3(get, set)]
pub recipient: String,
#[pyo3(get, set)]
pub content: String,
#[pyo3(get, set)]
pub timestamp: u64,
}
#[pymethods]
impl AgentMessage {
pub fn to_json(&self) -> PyResult<String> {
serde_json::to_string(self).map_err(|e| {
PyErr::new::<PyRuntimeError, _>(format!("Serialization failed: {}", e))
})
}
#[staticmethod]
pub fn from_json(json_str: &str) -> PyResult<AgentMessage> {
serde_json::from_str(json_str).map_err(|e| {
PyErr::new::<PyRuntimeError, _>(format!("Deserialization failed: {}", e))
})
}
}
Error Handling¶
Layered Error Handling¶
// 1. Rust Result Types (zero-cost)
fn parse_data(input: &str) -> Result<Data, ParseError> { /* ... */ }
// 2. PyO3 Error Conversion
impl From<ParseError> for PyErr {
fn from(err: ParseError) -> PyErr {
PyErr::new::<PyRuntimeError, _>(err.to_string())
}
}
// 3. Python Exception Handling
pub fn safe_operation(&self) -> PyResult<String> {
let data = parse_data(input)?; // Automatic conversion
Ok(process_data(data))
}
Graceful Degradation¶
def save(self, value: Any, metadata: Optional[Dict] = None) -> None:
if self._use_rust:
try:
self._storage.save(serialized)
except Exception as e:
# Automatic fallback
print(f"Warning: Rust failed, using Python: {e}")
self._use_rust = False
self._storage.append(data)
else:
self._storage.append(data)
Performance Characteristics¶
Complexity Analysis¶
| Operation | Python | Rust | Improvement |
|---|---|---|---|
| Memory Search | O(n) | O(n) with optimizations | 10-20x constant factor |
| Tool Execution | O(1) + GIL | O(1) no GIL | 2-5x reduced overhead |
| Task Concurrency | Limited by GIL | True parallelism | 3-5x better utilization |
| Serialization | O(n) object creation | O(n) zero-copy | 5-10x reduced allocation |
Memory Layout¶
Python Implementation: Rust Implementation:
┌─────────────────────┐ ┌─────────────────────┐
│ Python Objects │ │ Raw Data │
│ ├─ Wrapper Objects │ │ ├─ Contiguous Memory│
│ ├─ Reference Counts │ │ ├─ Zero Allocation │
│ ├─ Type Information │ │ └─ SIMD Alignment │
│ └─ GC Overhead │ └─ No GC Overhead │
└─────────────────────┘ └─────────────────────┘
~5x Memory Baseline
Build System¶
Maturin Configuration¶
# Cargo.toml
[lib]
name = "fast_crewai"
crate-type = ["cdylib"]
# pyproject.toml
[build-system]
requires = ["maturin>=1.0,<2.0"]
build-backend = "maturin"