FastAPI Integration¶
FastWorker integrates seamlessly with FastAPI applications, providing a brokerless alternative to Celery.
Basic Integration¶
1. Define Tasks¶
# tasks.py
from fastworker import task
@task
def process_user_registration(user_id: int, email: str) -> dict:
"""Process user registration."""
return {"user_id": user_id, "status": "registered"}
@task
def send_notification(user_id: int, message: str) -> dict:
"""Send notification to user."""
return {"user_id": user_id, "notification_sent": True}
2. Configure FastAPI Application¶
# main.py
from fastapi import FastAPI, HTTPException
from fastworker import Client
app = FastAPI(title="My FastAPI App")
client = Client()
@app.on_event("startup")
async def startup_event():
"""Initialize FastWorker client on startup."""
await client.start()
@app.on_event("shutdown")
async def shutdown_event():
"""Clean up FastWorker client on shutdown."""
client.stop()
@app.post("/register/")
async def register_user(user_id: int, email: str):
"""Register a new user."""
task_id = await client.delay("process_user_registration", user_id, email)
return {"message": "Registration started", "task_id": task_id}
@app.get("/task/{task_id}")
async def get_task_status(task_id: str):
"""Get task status."""
result = await client.get_task_result(task_id)
if result:
return {"status": result.status, "result": result.result}
return {"status": "pending"}
3. Start Workers¶
# Terminal 1: Start control plane
fastworker control-plane --task-modules tasks
# Terminal 2: Start FastAPI
uvicorn main:app --reload
Advanced Patterns¶
Background Task Processing¶
@app.post("/users/")
async def create_user(user_data: dict):
"""Create user and send welcome email in background."""
# Save user synchronously
user = db.save_user(user_data)
# Send email in background (non-blocking)
task_id = await client.delay("send_welcome_email", user.id, user.email)
return {
"user_id": user.id,
"email_task_id": task_id,
"message": "User created"
}
Error Handling¶
@app.post("/process/")
async def process_with_fallback(data: dict):
"""Process data with error handling."""
task_id = await client.delay("process_data", data)
# Wait a bit and check result
await asyncio.sleep(2)
result = await client.get_task_result(task_id)
if result:
if result.status == "failure":
raise HTTPException(status_code=500, detail=result.error)
return {"result": result.result}
return {"task_id": task_id, "status": "processing"}
Health Check¶
@app.get("/health/")
async def health_check():
"""Health check endpoint."""
worker_count = len(client.workers) if hasattr(client, 'workers') else 0
if worker_count > 0:
return {"status": "healthy", "workers_online": worker_count}
return {"status": "degraded", "workers_online": 0}
Task Status Tracking¶
from fastworker.tasks.models import TaskStatus
task_storage = {}
@app.post("/process-with-tracking/")
async def process_with_tracking(data: dict):
"""Process data with status tracking."""
task_id = await client.delay("process_data", data)
return {"task_id": task_id, "status": "processing"}
@app.get("/task/{task_id}")
async def get_task_status(task_id: str):
"""Get task status."""
result = await client.get_task_result(task_id)
if result:
return {
"task_id": task_id,
"status": result.status,
"result": result.result if result.status == "success" else None,
"error": result.error if result.status == "failure" else None
}
return {"task_id": task_id, "status": "pending"}
Dependency Injection¶
from fastapi import Depends
async def get_fastworker_client():
"""Dependency to get FastWorker client."""
return client
@app.post("/process/")
async def process_data(
data: dict,
fw_client: Client = Depends(get_fastworker_client)
):
"""Process data with injected client."""
task_id = await fw_client.delay("process_data", data)
return {"task_id": task_id}
Task Callbacks¶
@app.post("/process-with-callback/")
async def process_with_callback(data: dict, callback_url: str):
"""Process data with callback when finished."""
task_id = await client.delay_with_callback(
"process_data",
callback_url,
data,
callback_data={"source": "fastapi"}
)
return {"task_id": task_id, "message": "Task submitted"}
Configuration¶
Environment-Based¶
import os
DISCOVERY_ADDRESS = os.getenv("FASTWORKER_DISCOVERY_ADDRESS", "tcp://127.0.0.1:5550")
TIMEOUT = int(os.getenv("FASTWORKER_TIMEOUT", "30"))
client = Client(
discovery_address=DISCOVERY_ADDRESS,
timeout=TIMEOUT
)
Lifespan Context (Modern FastAPI)¶
from contextlib import asynccontextmanager
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup
await client.start()
yield
# Shutdown
client.stop()
app = FastAPI(lifespan=lifespan)
Best Practices¶
- Initialize Client Once - Create a single client instance per application
- Use Non-Blocking - Use
delay()for fast response times - Handle Errors - Implement fallback strategies
- Monitor Health - Check worker availability
- Set Timeouts - Configure appropriate timeouts
- Use Priorities - Submit tasks with appropriate priority levels
Full Example¶
from fastapi import FastAPI, HTTPException
from fastworker import Client
from contextlib import asynccontextmanager
import os
# Configuration
DISCOVERY = os.getenv("FASTWORKER_DISCOVERY_ADDRESS", "tcp://127.0.0.1:5550")
client = Client(discovery_address=DISCOVERY)
@asynccontextmanager
async def lifespan(app: FastAPI):
await client.start()
yield
client.stop()
app = FastAPI(title="FastWorker Example", lifespan=lifespan)
@app.post("/tasks/")
async def create_task(name: str, data: dict, priority: str = "normal"):
"""Submit a new task."""
task_id = await client.delay(name, data, priority=priority)
return {"task_id": task_id}
@app.get("/tasks/{task_id}")
async def get_task(task_id: str):
"""Get task result."""
result = await client.get_task_result(task_id)
if result:
return {
"task_id": task_id,
"status": result.status,
"result": result.result,
"error": result.error
}
return {"task_id": task_id, "status": "pending"}
@app.get("/health")
async def health():
"""Health check."""
return {"status": "healthy"}