-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathapp.py
More file actions
33 lines (24 loc) · 886 Bytes
/
Copy pathapp.py
File metadata and controls
33 lines (24 loc) · 886 Bytes
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
from celery import Celery
from fastapi import FastAPI
from examples.models import OrderCreate, OrderResult
from queuebridge.celery import register_queuebridge, typed_result
app = FastAPI()
celery_app = Celery(
"orders",
broker="redis://localhost:6379/0",
backend="redis://localhost:6379/0",
)
register_queuebridge(celery_app)
@celery_app.task(pydantic=True)
def process_order(order: OrderCreate) -> OrderResult:
return OrderResult(id=order.id, status="processed")
@app.post("/orders")
def enqueue(order: OrderCreate) -> dict[str, str]:
ar = process_order.delay(order)
return {"task_id": ar.id}
@app.get("/orders/{task_id}")
def get_result(task_id: str) -> dict[str, object]:
from celery.result import AsyncResult
ar = AsyncResult(task_id, app=celery_app)
result = typed_result(ar, OrderResult).get(timeout=10)
return result.model_dump()