Skip to content

Architecture

Technical deep-dive into Fast-CrewAI's architecture and implementation.

Overview

Fast-CrewAI uses a hybrid architecture combining Python's flexibility with Rust's performance through strategic monkey patching and PyO3 bindings.

┌─────────────────┐    ┌──────────────────┐    ┌─────────────────┐
│   Python API    │    │  Rust Core       │    │  Native Libs    │
│   (CrewAI)      │◄──►│  (Performance)   │◄──►│  (System)       │
└─────────────────┘    └──────────────────┘    └─────────────────┘
        │                       │                       │
   ┌────▼────┐             ┌────▼────┐             ┌────▼────┐
   │ Memory  │             │ SIMD    │             │ SQLite  │
   │ Tools   │             │ Async   │             │ Threading│
   │ Tasks   │             │ Zero-Copy│            │ I/O     │
   └─────────┘             └─────────┘             └─────────┘

Core Components

1. Monkey Patching System

Location: fast_crewai/shim.py

The shim system intelligently replaces CrewAI components at import time:

def _monkey_patch_class(module_path: str, class_name: str, new_class: Any) -> bool:
    """Replace a class in a module with a new implementation."""
    try:
        if module_path in sys.modules:
            module = sys.modules[module_path]
        else:
            module = importlib.import_module(module_path)

        # Save original for restoration
        if hasattr(module, class_name):
            original_class = getattr(module, class_name)
            _original_classes[f"{module_path}.{class_name}"] = original_class

        # Replace with Rust implementation
        setattr(module, class_name, new_class)
        return True
    except Exception:
        return False  # Graceful fallback

Key Features:

  • Graceful Fallback: If patching fails, continues with Python implementation
  • Restoration Support: Can restore original classes if needed
  • Import Safety: Only patches after successful module import

2. Rust Core Implementation

Location: src/lib.rs

All Rust components are implemented as Python extensions using PyO3:

use pyo3::prelude::*;

#[pyclass]
pub struct RustMemoryStorage {
    data: Arc<Mutex<Vec<MemoryItem>>>,
}

#[pymethods]
impl RustMemoryStorage {
    #[new]
    pub fn new() -> Self { /* ... */ }

    pub fn save(&self, value: &str) -> PyResult<()> { /* ... */ }

    pub fn search(&self, query: &str, limit: usize) -> PyResult<Vec<String>> { /* ... */ }
}

Performance Optimizations:

  • Thread Safety: Arc<Mutex<>> for safe concurrent access
  • Zero-Copy: Direct memory access without Python object overhead
  • Async Runtime: Tokio-based concurrency for I/O operations
  • Connection Pooling: Shared database connections with r2d2

3. Dynamic Inheritance Pattern

Tool and Task acceleration uses dynamic inheritance:

def create_accelerated_base_tool():
    """Create accelerated version at runtime."""
    try:
        from crewai.tools.base_tool import BaseTool as _OriginalBaseTool

        class AcceleratedBaseTool(_OriginalBaseTool):
            def __init__(self, *args, **kwargs):
                super().__init__(*args, **kwargs)
                self._acceleration_enabled = True

            def _run(self, *args, **kwargs):
                # Add acceleration logic
                return super()._run(*args, **kwargs)

        return AcceleratedBaseTool
    except ImportError:
        return None

Benefits:

  • Full API compatibility (inherits all methods)
  • Works even if CrewAI isn't installed
  • Can override specific methods for acceleration

4. Fallback Mechanism

Each component implements automatic fallback:

class AcceleratedMemoryStorage:
    def __init__(self, use_rust: Optional[bool] = None):
        if use_rust is None:
            self._use_rust = _RUST_AVAILABLE
        else:
            self._use_rust = use_rust and _RUST_AVAILABLE

        if self._use_rust:
            try:
                self._storage = _CoreMemoryStorage()
                self._implementation = "rust"
            except Exception:
                self._use_rust = False
                self._storage = []
                self._implementation = "python"

Component Architectures

Memory Storage

TF-IDF semantic search implementation:

pub fn search(&self, query: &str, limit: usize) -> PyResult<Vec<String>> {
    let data = self.data.lock().unwrap();

    // Compute query word frequencies
    let query_frequencies = self.compute_word_frequencies(query);

    // Calculate similarity scores
    let mut scored_results: Vec<(String, f64)> = Vec::new();
    for item in &*data {
        let similarity = self.calculate_cosine_similarity(
            &query_frequencies,
            &item.word_frequencies
        );
        scored_results.push((item.content.clone(), similarity));
    }

    // Sort by similarity (descending)
    scored_results.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap());

    Ok(scored_results.into_iter().take(limit).map(|(c, _)| c).collect())
}

Database with FTS5

SQLite with FTS5 full-text search:

#[pyclass]
pub struct RustSQLiteWrapper {
    connection_pool: Arc<Mutex<r2d2::Pool<SqliteConnectionManager>>>,
}

impl RustSQLiteWrapper {
    pub fn new(db_path: &str, pool_size: u32) -> PyResult<Self> {
        let manager = SqliteConnectionManager::file(db_path);
        let pool = r2d2::Pool::builder()
            .max_size(pool_size)
            .build(manager)?;

        // Create FTS5 virtual table
        let conn = pool.get()?;
        conn.execute(
            "CREATE VIRTUAL TABLE IF NOT EXISTS memories_fts
             USING fts5(task_description, content='long_term_memories')",
            [],
        )?;

        Ok(Self { connection_pool: Arc::new(Mutex::new(pool)) })
    }

    pub fn search_fts(&self, query: &str, limit: usize) -> PyResult<Vec<Memory>> {
        let conn = self.connection_pool.lock()?.get()?;
        let mut stmt = conn.prepare(
            "SELECT *, bm25(memories_fts) as rank
             FROM long_term_memories
             JOIN memories_fts ON long_term_memories.id = memories_fts.rowid
             WHERE memories_fts MATCH ?
             ORDER BY rank
             LIMIT ?"
        )?;
        // Execute and return results
    }
}

Task Execution with Tokio

Async task execution:

pub fn execute_concurrent_tasks(&self, tasks: Vec<&str>) -> PyResult<Vec<String>> {
    let runtime = self.runtime.clone();

    Python::with_gil(|py| {
        py.allow_threads(|| {
            runtime.block_on(async {
                let mut handles = Vec::new();

                for task in tasks {
                    let task_str = task.to_string();
                    let handle = tokio::spawn(async move {
                        // Execute task
                        format!("Completed: {}", task_str)
                    });
                    handles.push(handle);
                }

                let mut results = Vec::new();
                for handle in handles {
                    results.push(handle.await?);
                }
                Ok(results)
            })
        })
    })
}

Serialization with serde

Zero-copy JSON:

#[derive(Debug, Clone, Serialize, Deserialize)]
#[pyclass]
pub struct AgentMessage {
    #[pyo3(get, set)]
    pub id: String,
    #[pyo3(get, set)]
    pub sender: String,
    #[pyo3(get, set)]
    pub recipient: String,
    #[pyo3(get, set)]
    pub content: String,
    #[pyo3(get, set)]
    pub timestamp: u64,
}

#[pymethods]
impl AgentMessage {
    pub fn to_json(&self) -> PyResult<String> {
        serde_json::to_string(self).map_err(|e| {
            PyErr::new::<PyRuntimeError, _>(format!("Serialization failed: {}", e))
        })
    }

    #[staticmethod]
    pub fn from_json(json_str: &str) -> PyResult<AgentMessage> {
        serde_json::from_str(json_str).map_err(|e| {
            PyErr::new::<PyRuntimeError, _>(format!("Deserialization failed: {}", e))
        })
    }
}

Error Handling

Layered Error Handling

// 1. Rust Result Types (zero-cost)
fn parse_data(input: &str) -> Result<Data, ParseError> { /* ... */ }

// 2. PyO3 Error Conversion
impl From<ParseError> for PyErr {
    fn from(err: ParseError) -> PyErr {
        PyErr::new::<PyRuntimeError, _>(err.to_string())
    }
}

// 3. Python Exception Handling
pub fn safe_operation(&self) -> PyResult<String> {
    let data = parse_data(input)?;  // Automatic conversion
    Ok(process_data(data))
}

Graceful Degradation

def save(self, value: Any, metadata: Optional[Dict] = None) -> None:
    if self._use_rust:
        try:
            self._storage.save(serialized)
        except Exception as e:
            # Automatic fallback
            print(f"Warning: Rust failed, using Python: {e}")
            self._use_rust = False
            self._storage.append(data)
    else:
        self._storage.append(data)

Performance Characteristics

Complexity Analysis

Operation Python Rust Improvement
Memory Search O(n) O(n) with optimizations 10-20x constant factor
Tool Execution O(1) + GIL O(1) no GIL 2-5x reduced overhead
Task Concurrency Limited by GIL True parallelism 3-5x better utilization
Serialization O(n) object creation O(n) zero-copy 5-10x reduced allocation

Memory Layout

Python Implementation:     Rust Implementation:
┌─────────────────────┐    ┌─────────────────────┐
│ Python Objects      │    │ Raw Data            │
│ ├─ Wrapper Objects  │    │ ├─ Contiguous Memory│
│ ├─ Reference Counts │    │ ├─ Zero Allocation  │
│ ├─ Type Information │    │ └─ SIMD Alignment   │
│ └─ GC Overhead      │    └─ No GC Overhead    │
└─────────────────────┘    └─────────────────────┘
     ~5x Memory                  Baseline

Build System

Maturin Configuration

# Cargo.toml
[lib]
name = "fast_crewai"
crate-type = ["cdylib"]

# pyproject.toml
[build-system]
requires = ["maturin>=1.0,<2.0"]
build-backend = "maturin"

Cross-Platform Builds

# Development
maturin develop

# Release
maturin build --release

# Cross-platform wheels
maturin build --release --target x86_64-pc-windows-msvc
maturin build --release --target x86_64-apple-darwin
maturin build --release --target aarch64-apple-darwin