Skip to content

Adapters API Reference

Adapters translate OrmAI operations to native ORM calls.

Base Interface

All adapters implement this interface:

from ormai.adapters import Adapter

class Adapter(Protocol):
    def introspect(self) -> SchemaMetadata: ...
    def compile(self, request, policy, principal) -> CompiledQuery: ...
    async def execute(self, compiled, ctx) -> ExecutionResult: ...
    def transaction(self, ctx) -> ContextManager: ...

SQLAlchemyAdapter

Adapter for SQLAlchemy ORM.

from ormai.adapters import SQLAlchemyAdapter

Constructor

SQLAlchemyAdapter(
    engine: Engine,
    base: DeclarativeBase,
    schema_cache: SchemaCache | None = None,
)

Parameters

Parameter Type Description
engine Engine SQLAlchemy engine
base DeclarativeBase Declarative base class
schema_cache SchemaCache \| None Optional schema cache

Example

from sqlalchemy import create_engine
from sqlalchemy.orm import declarative_base

engine = create_engine("postgresql://localhost/mydb")
Base = declarative_base()

adapter = SQLAlchemyAdapter(engine=engine, base=Base)

Methods

introspect

def introspect(self) -> SchemaMetadata:
    """Extract schema from SQLAlchemy models."""

compile

def compile(
    self,
    request: QueryRequest | GetRequest | AggregateRequest,
    policy: Policy,
    principal: Principal,
) -> CompiledQuery:
    """Compile request to SQLAlchemy query."""

execute

def execute(
    self,
    compiled: CompiledQuery,
    ctx: RunContext,
) -> ExecutionResult:
    """Execute synchronously."""

transaction

@contextmanager
def transaction(self, ctx: RunContext):
    """Transaction context manager."""

AsyncSQLAlchemyAdapter

Async version for SQLAlchemy 2.0+.

from ormai.adapters import AsyncSQLAlchemyAdapter

Constructor

AsyncSQLAlchemyAdapter(
    engine: AsyncEngine,
    base: DeclarativeBase,
    schema_cache: SchemaCache | None = None,
)

Example

from sqlalchemy.ext.asyncio import create_async_engine

engine = create_async_engine("postgresql+asyncpg://localhost/mydb")

adapter = AsyncSQLAlchemyAdapter(engine=engine, base=Base)

Methods

execute

async def execute(
    self,
    compiled: CompiledQuery,
    ctx: RunContext,
) -> ExecutionResult:
    """Execute asynchronously."""

transaction

@asynccontextmanager
async def transaction(self, ctx: RunContext):
    """Async transaction context manager."""

TortoiseAdapter

Adapter for Tortoise ORM (async only).

from ormai.adapters import TortoiseAdapter

Constructor

TortoiseAdapter(
    models_module: str,
    schema_cache: SchemaCache | None = None,
)

Parameters

Parameter Type Description
models_module str Module path containing models
schema_cache SchemaCache \| None Optional schema cache

Example

from tortoise import Tortoise

await Tortoise.init(
    db_url="postgres://localhost/mydb",
    modules={"models": ["myapp.models"]},
)

adapter = TortoiseAdapter(models_module="myapp.models")

PeeweeAdapter

Adapter for Peewee ORM (sync only).

from ormai.adapters import PeeweeAdapter

Constructor

PeeweeAdapter(
    database: Database,
    models: list[type],
    schema_cache: SchemaCache | None = None,
)

Parameters

Parameter Type Description
database Database Peewee database instance
models list[type] List of model classes
schema_cache SchemaCache \| None Optional schema cache

Example

from peewee import PostgresqlDatabase

db = PostgresqlDatabase("mydb")

adapter = PeeweeAdapter(
    database=db,
    models=[User, Order, Item],
)

DjangoAdapter

Adapter for Django ORM.

from ormai.adapters import DjangoAdapter

Constructor

DjangoAdapter(
    app_labels: list[str],
    schema_cache: SchemaCache | None = None,
)

Parameters

Parameter Type Description
app_labels list[str] Django app labels to include
schema_cache SchemaCache \| None Optional schema cache

Example

adapter = DjangoAdapter(app_labels=["users", "orders"])

SQLModelAdapter

Adapter for SQLModel.

from ormai.adapters import SQLModelAdapter

Constructor

SQLModelAdapter(
    engine: Engine,
    models: list[type],
    schema_cache: SchemaCache | None = None,
)

Example

from sqlmodel import create_engine

engine = create_engine("postgresql://localhost/mydb")

adapter = SQLModelAdapter(
    engine=engine,
    models=[User, Order],
)

CompiledQuery

Result of query compilation.

@dataclass
class CompiledQuery:
    native_query: Any      # ORM-specific query object
    model: str             # Target model name
    operation: str         # "select", "insert", "update", "delete"
    scopes_applied: list[str]  # Applied scoping rules
    budget_limits: dict    # Applied budget constraints

ExecutionResult

Result of query execution.

@dataclass
class ExecutionResult:
    rows: list[dict]       # Query results
    row_count: int         # Number of rows
    has_more: bool         # More results available
    next_cursor: str | None  # Pagination cursor
    execution_time_ms: float  # Execution time

SchemaCache

Caches introspected schemas.

from ormai.utils import SchemaCache

Constructor

SchemaCache(
    ttl_seconds: int = 3600,
)

Methods

get_or_introspect

def get_or_introspect(
    self,
    adapter: Adapter,
    force_refresh: bool = False,
) -> SchemaMetadata:

Example

cache = SchemaCache(ttl_seconds=3600)

# First call introspects
schema = cache.get_or_introspect(adapter)

# Subsequent calls use cache
schema = cache.get_or_introspect(adapter)

# Force refresh
schema = cache.get_or_introspect(adapter, force_refresh=True)

PersistentSchemaCache

File-based schema cache.

from ormai.utils import PersistentSchemaCache

Constructor

PersistentSchemaCache(
    path: str,
    ttl_seconds: int = 86400,
)

Example

cache = PersistentSchemaCache(
    path="./schema_cache.json",
    ttl_seconds=86400,  # 24 hours
)

schema = cache.get_or_introspect(adapter)

TransactionManager

Advanced transaction management.

from ormai.utils import TransactionManager

Methods

begin

@asynccontextmanager
async def begin(self, ctx: RunContext):
    """Start a transaction."""

savepoint

@asynccontextmanager
async def savepoint(self, name: str | None = None):
    """Create a savepoint within a transaction."""

Example

manager = TransactionManager(adapter)

async with manager.begin(ctx) as tx:
    await toolset.create(ctx, model="Order", data={...})

    async with tx.savepoint("items"):
        try:
            await toolset.create(ctx, model="Item", data={...})
        except ValidationError:
            # Savepoint rolled back, transaction continues
            pass

    # Commits if no exception

RetryConfig

Configuration for operation retries.

from ormai.utils import RetryConfig

Constructor

RetryConfig(
    max_attempts: int = 3,
    initial_delay_ms: int = 100,
    max_delay_ms: int = 5000,
    exponential_base: float = 2.0,
    retryable_exceptions: tuple = (DeadlockError, TimeoutError),
)

Example

from ormai.utils import retry_async

config = RetryConfig(max_attempts=3)

@retry_async(config)
async def create_order(ctx, data):
    return await toolset.create(ctx, model="Order", data=data)