FastAPI Integration¶
This guide covers integrating OrmAI with FastAPI applications.
Basic Setup¶
Installation¶
Project Structure¶
myapp/
├── main.py
├── models.py
├── policy.py
├── dependencies.py
└── routes/
├── __init__.py
└── tools.py
Complete Example¶
Models (models.py)¶
from sqlalchemy import Column, Integer, String, ForeignKey, create_engine
from sqlalchemy.orm import declarative_base, relationship, sessionmaker
Base = declarative_base()
class User(Base):
__tablename__ = "users"
id = Column(String, primary_key=True)
tenant_id = Column(String, nullable=False)
email = Column(String, nullable=False)
name = Column(String)
class Order(Base):
__tablename__ = "orders"
id = Column(Integer, primary_key=True)
tenant_id = Column(String, nullable=False)
user_id = Column(String, ForeignKey("users.id"))
status = Column(String, default="pending")
total = Column(Integer)
user = relationship("User")
engine = create_engine("postgresql://localhost/mydb")
SessionLocal = sessionmaker(bind=engine)
Policy (policy.py)¶
from ormai.policy import Policy, ModelPolicy, FieldPolicy, FieldAction, WritePolicy, WriteAction
policy = Policy(
models={
"User": ModelPolicy(
allowed=True,
fields={
"id": FieldPolicy(action=FieldAction.Allow),
"tenant_id": FieldPolicy(action=FieldAction.Allow),
"email": FieldPolicy(action=FieldAction.Mask),
"name": FieldPolicy(action=FieldAction.Allow),
},
scoping={"tenant_id": "principal.tenant_id"},
),
"Order": ModelPolicy(
allowed=True,
fields={
"id": FieldPolicy(action=FieldAction.Allow),
"tenant_id": FieldPolicy(action=FieldAction.Allow),
"user_id": FieldPolicy(action=FieldAction.Allow),
"status": FieldPolicy(action=FieldAction.Allow),
"total": FieldPolicy(action=FieldAction.Allow),
},
scoping={"tenant_id": "principal.tenant_id"},
write_policy=WritePolicy(
create=WriteAction.Allow,
update=WriteAction.Allow,
delete=WriteAction.Deny,
),
),
},
)
Dependencies (dependencies.py)¶
from fastapi import Request, Depends, HTTPException
from sqlalchemy.orm import Session
from ormai.core import Principal, RunContext
from ormai.quickstart import mount_sqlalchemy
from .models import engine, Base, SessionLocal
from .policy import policy
# Create toolset once at startup
toolset = mount_sqlalchemy(engine=engine, base=Base, policy=policy)
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
def get_principal(request: Request) -> Principal:
tenant_id = request.headers.get("X-Tenant-ID")
user_id = request.headers.get("X-User-ID")
if not tenant_id or not user_id:
raise HTTPException(status_code=401, detail="Missing auth headers")
return Principal(
tenant_id=tenant_id,
user_id=user_id,
roles=request.headers.get("X-Roles", "").split(","),
)
def get_context(
request: Request,
db: Session = Depends(get_db),
principal: Principal = Depends(get_principal),
) -> RunContext:
return RunContext(
principal=principal,
db=db,
request_id=request.headers.get("X-Request-ID"),
trace_id=request.headers.get("X-Trace-ID"),
)
def get_toolset():
return toolset
Main Application (main.py)¶
from fastapi import FastAPI
from .routes import tools
app = FastAPI(title="My App with OrmAI")
app.include_router(tools.router, prefix="/api")
@app.get("/health")
def health():
return {"status": "ok"}
Routes (routes/tools.py)¶
from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel
from ormai.core import RunContext, ModelNotAllowedError, QueryBudgetExceededError
from ..dependencies import get_context, get_toolset
router = APIRouter(tags=["tools"])
# Request models
class QueryRequest(BaseModel):
model: str
filters: list[dict] = []
select: list[str] | None = None
order: list[dict] = []
include: list[dict] = []
limit: int = 50
cursor: str | None = None
class GetRequest(BaseModel):
model: str
id: str | int
select: list[str] | None = None
include: list[dict] = []
class CreateRequest(BaseModel):
model: str
data: dict
class UpdateRequest(BaseModel):
model: str
id: str | int
data: dict
# Endpoints
@router.get("/schema")
async def describe_schema(
ctx: RunContext = Depends(get_context),
toolset = Depends(get_toolset),
):
result = await toolset.describe_schema(ctx)
return result.data
@router.post("/query")
async def query(
request: QueryRequest,
ctx: RunContext = Depends(get_context),
toolset = Depends(get_toolset),
):
try:
result = await toolset.query(
ctx,
model=request.model,
filters=request.filters,
select=request.select,
order=request.order,
include=request.include,
limit=request.limit,
cursor=request.cursor,
)
return result.data
except ModelNotAllowedError:
raise HTTPException(status_code=403, detail="Model not allowed")
except QueryBudgetExceededError as e:
raise HTTPException(status_code=400, detail=str(e))
@router.post("/get")
async def get(
request: GetRequest,
ctx: RunContext = Depends(get_context),
toolset = Depends(get_toolset),
):
result = await toolset.get(
ctx,
model=request.model,
id=request.id,
select=request.select,
include=request.include,
)
if not result.success:
raise HTTPException(status_code=404, detail="Not found")
return result.data
@router.post("/create")
async def create(
request: CreateRequest,
ctx: RunContext = Depends(get_context),
toolset = Depends(get_toolset),
):
result = await toolset.create(
ctx,
model=request.model,
data=request.data,
)
if not result.success:
raise HTTPException(status_code=400, detail=result.error)
return result.data
@router.post("/update")
async def update(
request: UpdateRequest,
ctx: RunContext = Depends(get_context),
toolset = Depends(get_toolset),
):
result = await toolset.update(
ctx,
model=request.model,
id=request.id,
data=request.data,
)
if not result.success:
raise HTTPException(status_code=400, detail=result.error)
return result.data
Usage¶
Query Example¶
curl -X POST http://localhost:8000/api/query \
-H "Content-Type: application/json" \
-H "X-Tenant-ID: acme-corp" \
-H "X-User-ID: user-123" \
-d '{
"model": "Order",
"filters": [{"field": "status", "op": "eq", "value": "pending"}],
"select": ["id", "status", "total"],
"limit": 10
}'
Create Example¶
curl -X POST http://localhost:8000/api/create \
-H "Content-Type: application/json" \
-H "X-Tenant-ID: acme-corp" \
-H "X-User-ID: user-123" \
-d '{
"model": "Order",
"data": {"status": "pending", "total": 5000, "user_id": "user-123"}
}'
Async SQLAlchemy¶
For async SQLAlchemy:
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
engine = create_async_engine("postgresql+asyncpg://localhost/mydb")
AsyncSessionLocal = sessionmaker(engine, class_=AsyncSession)
async def get_db():
async with AsyncSessionLocal() as session:
yield session
Middleware¶
Audit Logging Middleware¶
from fastapi import Request
from starlette.middleware.base import BaseHTTPMiddleware
from ormai.store import JsonlAuditStore, AuditMiddleware
audit_store = JsonlAuditStore("./audit.jsonl")
audit_middleware = AuditMiddleware(store=audit_store)
class OrmAIAuditMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next):
# Audit is handled at tool level
response = await call_next(request)
return response
app.add_middleware(OrmAIAuditMiddleware)
Error Handling Middleware¶
from fastapi import Request
from fastapi.responses import JSONResponse
from ormai.core import OrmAIError
@app.exception_handler(OrmAIError)
async def ormai_exception_handler(request: Request, exc: OrmAIError):
return JSONResponse(
status_code=400,
content={
"error": exc.code,
"message": exc.message,
"details": exc.details,
},
)
Authentication Integration¶
JWT Authentication¶
from fastapi import Depends, HTTPException
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
import jwt
security = HTTPBearer()
def get_principal(
credentials: HTTPAuthorizationCredentials = Depends(security),
) -> Principal:
try:
payload = jwt.decode(
credentials.credentials,
"your-secret",
algorithms=["HS256"],
)
return Principal(
tenant_id=payload["org_id"],
user_id=payload["sub"],
roles=payload.get("roles", []),
)
except jwt.InvalidTokenError:
raise HTTPException(status_code=401, detail="Invalid token")
API Key Authentication¶
from fastapi import Header, HTTPException
API_KEYS = {
"key-123": {"tenant_id": "acme", "user_id": "api-user"},
}
def get_principal(x_api_key: str = Header(...)) -> Principal:
if x_api_key not in API_KEYS:
raise HTTPException(status_code=401, detail="Invalid API key")
config = API_KEYS[x_api_key]
return Principal(
tenant_id=config["tenant_id"],
user_id=config["user_id"],
)
OpenAPI Documentation¶
Add tool schemas to OpenAPI:
from fastapi import FastAPI
from ormai.tools import ToolRegistry
def custom_openapi():
if app.openapi_schema:
return app.openapi_schema
openapi_schema = get_openapi(
title="My App API",
version="1.0.0",
routes=app.routes,
)
# Add OrmAI tool schemas
openapi_schema["components"]["schemas"].update(
toolset.get_openapi_schemas()
)
app.openapi_schema = openapi_schema
return app.openapi_schema
app.openapi = custom_openapi
Testing¶
from fastapi.testclient import TestClient
import pytest
@pytest.fixture
def client():
return TestClient(app)
def test_query_orders(client):
response = client.post(
"/api/query",
headers={
"X-Tenant-ID": "test-tenant",
"X-User-ID": "test-user",
},
json={
"model": "Order",
"limit": 10,
},
)
assert response.status_code == 200
assert "rows" in response.json()
def test_forbidden_model(client):
response = client.post(
"/api/query",
headers={
"X-Tenant-ID": "test-tenant",
"X-User-ID": "test-user",
},
json={
"model": "SecretModel",
},
)
assert response.status_code == 403
Next Steps¶
- LangGraph Integration - Use with LangGraph agents
- MCP Integration - Expose via MCP
- Multi-Tenant Setup - Tenant isolation