Adapters API Reference¶
Adapters translate OrmAI operations to native ORM calls.
Base Interface¶
All adapters implement this interface:
from ormai.adapters import Adapter
class Adapter(Protocol):
def introspect(self) -> SchemaMetadata: ...
def compile(self, request, policy, principal) -> CompiledQuery: ...
async def execute(self, compiled, ctx) -> ExecutionResult: ...
def transaction(self, ctx) -> ContextManager: ...
SQLAlchemyAdapter¶
Adapter for SQLAlchemy ORM.
Constructor¶
SQLAlchemyAdapter(
engine: Engine,
base: DeclarativeBase,
schema_cache: SchemaCache | None = None,
)
Parameters¶
| Parameter | Type | Description |
|---|---|---|
engine |
Engine |
SQLAlchemy engine |
base |
DeclarativeBase |
Declarative base class |
schema_cache |
SchemaCache \| None |
Optional schema cache |
Example¶
from sqlalchemy import create_engine
from sqlalchemy.orm import declarative_base
engine = create_engine("postgresql://localhost/mydb")
Base = declarative_base()
adapter = SQLAlchemyAdapter(engine=engine, base=Base)
Methods¶
introspect¶
compile¶
def compile(
self,
request: QueryRequest | GetRequest | AggregateRequest,
policy: Policy,
principal: Principal,
) -> CompiledQuery:
"""Compile request to SQLAlchemy query."""
execute¶
def execute(
self,
compiled: CompiledQuery,
ctx: RunContext,
) -> ExecutionResult:
"""Execute synchronously."""
transaction¶
AsyncSQLAlchemyAdapter¶
Async version for SQLAlchemy 2.0+.
Constructor¶
AsyncSQLAlchemyAdapter(
engine: AsyncEngine,
base: DeclarativeBase,
schema_cache: SchemaCache | None = None,
)
Example¶
from sqlalchemy.ext.asyncio import create_async_engine
engine = create_async_engine("postgresql+asyncpg://localhost/mydb")
adapter = AsyncSQLAlchemyAdapter(engine=engine, base=Base)
Methods¶
execute¶
async def execute(
self,
compiled: CompiledQuery,
ctx: RunContext,
) -> ExecutionResult:
"""Execute asynchronously."""
transaction¶
@asynccontextmanager
async def transaction(self, ctx: RunContext):
"""Async transaction context manager."""
TortoiseAdapter¶
Adapter for Tortoise ORM (async only).
Constructor¶
Parameters¶
| Parameter | Type | Description |
|---|---|---|
models_module |
str |
Module path containing models |
schema_cache |
SchemaCache \| None |
Optional schema cache |
Example¶
from tortoise import Tortoise
await Tortoise.init(
db_url="postgres://localhost/mydb",
modules={"models": ["myapp.models"]},
)
adapter = TortoiseAdapter(models_module="myapp.models")
PeeweeAdapter¶
Adapter for Peewee ORM (sync only).
Constructor¶
Parameters¶
| Parameter | Type | Description |
|---|---|---|
database |
Database |
Peewee database instance |
models |
list[type] |
List of model classes |
schema_cache |
SchemaCache \| None |
Optional schema cache |
Example¶
from peewee import PostgresqlDatabase
db = PostgresqlDatabase("mydb")
adapter = PeeweeAdapter(
database=db,
models=[User, Order, Item],
)
DjangoAdapter¶
Adapter for Django ORM.
Constructor¶
Parameters¶
| Parameter | Type | Description |
|---|---|---|
app_labels |
list[str] |
Django app labels to include |
schema_cache |
SchemaCache \| None |
Optional schema cache |
Example¶
SQLModelAdapter¶
Adapter for SQLModel.
Constructor¶
Example¶
from sqlmodel import create_engine
engine = create_engine("postgresql://localhost/mydb")
adapter = SQLModelAdapter(
engine=engine,
models=[User, Order],
)
CompiledQuery¶
Result of query compilation.
@dataclass
class CompiledQuery:
native_query: Any # ORM-specific query object
model: str # Target model name
operation: str # "select", "insert", "update", "delete"
scopes_applied: list[str] # Applied scoping rules
budget_limits: dict # Applied budget constraints
ExecutionResult¶
Result of query execution.
@dataclass
class ExecutionResult:
rows: list[dict] # Query results
row_count: int # Number of rows
has_more: bool # More results available
next_cursor: str | None # Pagination cursor
execution_time_ms: float # Execution time
SchemaCache¶
Caches introspected schemas.
Constructor¶
Methods¶
get_or_introspect¶
Example¶
cache = SchemaCache(ttl_seconds=3600)
# First call introspects
schema = cache.get_or_introspect(adapter)
# Subsequent calls use cache
schema = cache.get_or_introspect(adapter)
# Force refresh
schema = cache.get_or_introspect(adapter, force_refresh=True)
PersistentSchemaCache¶
File-based schema cache.
Constructor¶
Example¶
cache = PersistentSchemaCache(
path="./schema_cache.json",
ttl_seconds=86400, # 24 hours
)
schema = cache.get_or_introspect(adapter)
TransactionManager¶
Advanced transaction management.
Methods¶
begin¶
savepoint¶
@asynccontextmanager
async def savepoint(self, name: str | None = None):
"""Create a savepoint within a transaction."""
Example¶
manager = TransactionManager(adapter)
async with manager.begin(ctx) as tx:
await toolset.create(ctx, model="Order", data={...})
async with tx.savepoint("items"):
try:
await toolset.create(ctx, model="Item", data={...})
except ValidationError:
# Savepoint rolled back, transaction continues
pass
# Commits if no exception
RetryConfig¶
Configuration for operation retries.
Constructor¶
RetryConfig(
max_attempts: int = 3,
initial_delay_ms: int = 100,
max_delay_ms: int = 5000,
exponential_base: float = 2.0,
retryable_exceptions: tuple = (DeadlockError, TimeoutError),
)