Async Patterns¶
FastWorker provides both blocking and non-blocking task submission patterns. Understanding the difference is crucial for building responsive applications.
Quick Comparison¶
| Method | Behavior | Returns | Use Case |
|---|---|---|---|
submit_task() |
Blocking - Waits for result | TaskResult |
When you need the result immediately |
delay() |
Non-blocking - Returns immediately | task_id (str) |
Background tasks, fire-and-forget |
delay_with_callback() |
Non-blocking - Returns immediately | task_id (str) |
Reactive workflows with notifications |
Blocking: submit_task()¶
Submits a task and waits for the result before continuing.
from fastworker import Client
from fastworker.tasks.models import TaskPriority
client = Client()
await client.start()
# BLOCKS until task completes
result = await client.submit_task(
task_name="process_data",
args=(data,),
priority=TaskPriority.HIGH
)
# Result is available immediately
if result.status == "success":
print(f"Result: {result.result}")
else:
print(f"Error: {result.error}")
When to Use Blocking¶
✅ Use when:
- You need the result immediately
- The calling code depends on the task result
- Tasks complete quickly (< 1 second)
❌ Don't use when:
- Handling web requests (causes slow response times)
- Task takes a long time to complete
- Processing multiple tasks in parallel
Non-Blocking: delay()¶
Submits a task and returns immediately with a task ID. The task executes in the background.
from fastworker import Client
client = Client()
await client.start()
# Returns IMMEDIATELY with task ID
task_id = await client.delay("process_data", data, priority="high")
print(f"Task submitted: {task_id}")
# Task is processing in background...
# You can continue doing other work here
# Check result later
await asyncio.sleep(2) # Simulate doing other work
result = await client.get_task_result(task_id)
if result:
print(f"Result: {result.result}")
When to Use Non-Blocking¶
✅ Use when:
- Handling web requests (fast response times)
- Task takes time to complete
- Fire-and-forget scenarios
- Processing multiple tasks in parallel
❌ Don't use when:
- You need the result immediately
- Next steps depend on task result
Performance Comparison¶
Blocking (Sequential)¶
# Blocking: Sequential processing
start = time.time()
results = []
for item in items: # 100 items
result = await client.submit_task("process", item) # Waits each time
results.append(result)
duration = time.time() - start
# Duration: ~50 seconds (if each task takes 0.5s)
Problem: Processes one at a time. Total time = sum of all task times.
Non-Blocking (Parallel)¶
# Non-blocking: Parallel processing
start = time.time()
task_ids = []
for item in items: # 100 items
task_id = await client.delay("process", item) # Returns immediately
task_ids.append(task_id)
# All tasks submitted in < 1 second!
print(f"All tasks submitted in {time.time() - start:.2f}s")
# Collect results later
await asyncio.sleep(5) # Give tasks time to complete
results = [await client.get_task_result(tid) for tid in task_ids]
# Duration: ~5 seconds (all tasks run in parallel)
Benefit: Submits all tasks immediately, processes in parallel.
Real-World Patterns¶
Pattern 1: Web API Background Processing¶
from fastapi import FastAPI
from fastworker import Client
app = FastAPI()
client = Client()
@app.post("/users/")
async def create_user(user_data: dict):
"""Create user and send welcome email in background."""
# 1. Save user to database (fast)
user = db.save_user(user_data)
# 2. Send welcome email in background (non-blocking!)
task_id = await client.delay("send_welcome_email", user.id, user.email)
# 3. Return immediately (fast response!)
return {
"user_id": user.id,
"email_task_id": task_id,
"message": "User created, welcome email sending"
}
Pattern 2: Batch Processing¶
async def process_batch(items: list):
"""Process many items in parallel."""
# Submit all tasks (non-blocking)
task_ids = [await client.delay("process_item", item) for item in items]
print(f"Submitted {len(task_ids)} tasks")
# Wait for completion
while True:
results = [await client.get_task_result(tid) for tid in task_ids]
completed = sum(1 for r in results if r and r.status != "pending")
print(f"Progress: {completed}/{len(task_ids)}")
if completed == len(task_ids):
break
await asyncio.sleep(1)
print("All tasks complete!")
Pattern 3: Fire and Forget¶
@app.post("/analytics/")
async def track_event(event_data: dict):
"""Track analytics event - don't care about result."""
await client.delay("track_analytics", event_data, priority="low")
return {"status": "tracked"}
Retrieving Results¶
Option 1: Poll for Results¶
task_id = await client.delay("long_task", data)
# Poll periodically
while True:
result = await client.get_task_result(task_id)
if result and result.status != TaskStatus.PENDING:
break
await asyncio.sleep(0.5)
print(f"Result: {result.result}")
Option 2: Use Callbacks¶
task_id = await client.delay_with_callback(
"long_task",
callback_address="tcp://localhost:6000",
data,
callback_data={"request_id": 123}
)
# Callback listener will be notified when complete
Best Practices¶
1. Default to Non-Blocking¶
# Good - Fast response
task_id = await client.delay("send_email", email)
return {"task_id": task_id}
# Bad - Slow response
result = await client.submit_task("send_email", email)
return {"result": result}
2. Handle Missing Results¶
# Good - Check if result exists
result = await client.get_task_result(task_id)
if result and result.status == TaskStatus.SUCCESS:
return {"result": result.result}
else:
return {"status": "processing", "task_id": task_id}
# Bad - Assume result exists
result = await client.get_task_result(task_id)
return {"result": result.result} # Might be None!
3. Use Appropriate Poll Intervals¶
# Good - Reasonable poll interval
await asyncio.sleep(0.5) # Polls 2 times per second
# Bad - Too frequent
await asyncio.sleep(0.01) # Polls 100 times per second!
Common Pitfalls¶
Blocking in Request Handlers¶
# ❌ Bad - Blocks request
@app.post("/process/")
async def process(data: dict):
result = await client.submit_task("heavy_task", data)
return {"result": result.result}
# ✅ Good - Returns immediately
@app.post("/process/")
async def process(data: dict):
task_id = await client.delay("heavy_task", data)
return {"task_id": task_id}
Not Handling Pending Status¶
# ❌ Bad - Assumes immediate completion
task_id = await client.delay("task", data)
result = await client.get_task_result(task_id)
return result.result # Might be None!
# ✅ Good - Check status
result = await client.get_task_result(task_id)
if result and result.status == TaskStatus.SUCCESS:
return result.result