Framework Integration¶
FastWorker is framework-agnostic and works with any Python web framework that supports async operations.
Framework Compatibility¶
| Framework | Async Support | Complexity | Recommended |
|---|---|---|---|
| FastAPI | Native | Easy | Yes |
| Sanic | Native | Easy | Yes |
| Quart | Native | Easy | Yes |
| Tornado | Native | Easy | Yes |
| Flask 2.0+ | Native | Easy | Yes |
| Flask < 2.0 | Threading | Medium | With threading |
| Django 3.1+ | Native | Medium | Yes |
| Django < 3.1 | Threading | Hard | With threading |
Flask Integration¶
Flask with async/await (Flask 2.0+)¶
from flask import Flask, request, jsonify
from fastworker import Client
app = Flask(__name__)
client = Client()
@app.before_serving
async def startup():
await client.start()
@app.after_serving
async def shutdown():
client.stop()
@app.route('/process/', methods=['POST'])
async def process_data():
data = request.get_json()
task_id = await client.delay("process_data", data)
return jsonify({"task_id": task_id})
@app.route('/result/<task_id>', methods=['GET'])
async def get_result(task_id):
result = await client.get_task_result(task_id)
if result:
return jsonify({"status": result.status, "result": result.result})
return jsonify({"error": "Result not found"}), 404
Flask with Threading (Traditional)¶
from flask import Flask, request, jsonify
from fastworker import Client
import asyncio
import threading
app = Flask(__name__)
client = Client()
loop = None
def run_async(coro):
return asyncio.run_coroutine_threadsafe(coro, loop).result()
def start_background_loop(loop):
asyncio.set_event_loop(loop)
loop.run_forever()
@app.before_first_request
def startup():
global loop
loop = asyncio.new_event_loop()
t = threading.Thread(target=start_background_loop, args=(loop,), daemon=True)
t.start()
run_async(client.start())
@app.route('/process/', methods=['POST'])
def process_data():
data = request.get_json()
task_id = run_async(client.delay("process_data", data))
return jsonify({"task_id": task_id})
Django Integration¶
Django Async Views (Django 3.1+)¶
# views.py
from django.http import JsonResponse
from django.views.decorators.http import require_http_methods
from fastworker import Client
import json
client = Client()
@require_http_methods(["POST"])
async def process_data(request):
data = json.loads(request.body)
task_id = await client.delay("process_data", data)
return JsonResponse({"task_id": task_id})
@require_http_methods(["GET"])
async def get_result(request, task_id):
result = await client.get_task_result(task_id)
if result:
return JsonResponse({"status": result.status, "result": result.result})
return JsonResponse({"error": "Result not found"}, status=404)
Django App Configuration¶
# apps.py
from django.apps import AppConfig
from fastworker import Client
import asyncio
class MyAppConfig(AppConfig):
name = 'myapp'
def ready(self):
client = Client()
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(client.start())
Django URLs¶
# urls.py
from django.urls import path
from . import views
urlpatterns = [
path('process/', views.process_data, name='process_data'),
path('result/<str:task_id>/', views.get_result, name='get_result'),
]
Sanic Integration¶
from sanic import Sanic, response
from fastworker import Client
app = Sanic("MyApp")
client = Client()
@app.before_server_start
async def setup_client(app, loop):
await client.start()
@app.after_server_stop
async def cleanup_client(app, loop):
client.stop()
@app.post("/process/")
async def process_data(request):
data = request.json
task_id = await client.delay("process_data", data)
return response.json({"task_id": task_id})
@app.get("/result/<task_id>")
async def get_result(request, task_id):
result = await client.get_task_result(task_id)
if result:
return response.json({"status": result.status, "result": result.result})
return response.json({"error": "Result not found"}, status=404)
Quart Integration¶
from quart import Quart, request, jsonify
from fastworker import Client
app = Quart(__name__)
client = Client()
@app.before_serving
async def startup():
await client.start()
@app.after_serving
async def shutdown():
client.stop()
@app.route('/process/', methods=['POST'])
async def process_data():
data = await request.get_json()
task_id = await client.delay("process_data", data)
return jsonify({"task_id": task_id})
@app.route('/result/<task_id>', methods=['GET'])
async def get_result(task_id):
result = await client.get_task_result(task_id)
if result:
return jsonify({"status": result.status, "result": result.result})
return jsonify({"error": "Result not found"}), 404
Tornado Integration¶
import tornado.ioloop
import tornado.web
from fastworker import Client
import json
client = Client()
class ProcessHandler(tornado.web.RequestHandler):
async def post(self):
data = json.loads(self.request.body)
task_id = await client.delay("process_data", data)
self.write({"task_id": task_id})
class ResultHandler(tornado.web.RequestHandler):
async def get(self, task_id):
result = await client.get_task_result(task_id)
if result:
self.write({"status": result.status, "result": result.result})
else:
self.set_status(404)
self.write({"error": "Result not found"})
async def startup():
await client.start()
def make_app():
return tornado.web.Application([
(r"/process/", ProcessHandler),
(r"/result/([^/]+)", ResultHandler),
])
if __name__ == "__main__":
tornado.ioloop.IOLoop.current().run_sync(startup)
app = make_app()
app.listen(8000)
tornado.ioloop.IOLoop.current().start()
Standalone Python Script¶
from fastworker import Client
import asyncio
async def main():
client = Client()
await client.start()
try:
# Submit multiple tasks
tasks = []
for i in range(10):
task_id = await client.delay("process_number", i)
tasks.append(task_id)
print(f"Submitted task {i}: {task_id}")
# Wait for processing
await asyncio.sleep(2)
# Get results
for task_id in tasks:
result = await client.get_task_result(task_id)
if result:
print(f"Task {task_id}: {result.result}")
finally:
client.stop()
if __name__ == '__main__':
asyncio.run(main())
Best Practices¶
1. Client Lifecycle¶
Always start the client on application startup and stop on shutdown:
2. Singleton Pattern¶
Create a single client instance:
3. Error Handling¶
try:
task_id = await client.delay("my_task", data)
result = await client.get_task_result(task_id)
if result and result.status == "success":
return result.result
except Exception as e:
print(f"Error: {e}")
4. Configuration¶
Use environment variables: