FastAPI中的复杂查询与原子更新指南
title: FastAPI中的复杂查询与原子更新指南


pip install fastapi uvicorn tortoise-orm pydantic
from tortoise.models import Model
from tortoise import fields
class Product(Model):
id = fields.IntField(pk=True)
name = fields.CharField(max_length=255)
stock = fields.IntField(default=0)
price = fields.DecimalField(max_digits=10, decimal_places=2)
class Order(Model):
id = fields.IntField(pk=True)
status = fields.CharField(max_length=20) # pending/completed/canceled
total_amount = fields.DecimalField(max_digits=10, decimal_places=2)
product = fields.ForeignKeyField('models.Product', related_name='orders')
created_at = fields.DatetimeField(auto_now_add=True)
from pydantic import BaseModel
from datetime import datetime
class OrderOut(BaseModel):
id: int
status: str
total_amount: float
product_id: int
created_at: datetime
class Config:
orm_mode = True
from tortoise.expressions import Q
async def get_orders():
return await Order.filter(
Q(total_amount__gt=100) & Q(status="pending")
).all()
from fastapi import APIRouter, Query
from tortoise.expressions import Q
router = APIRouter()
@router.get("/orders", response_model=list[OrderOut])
async def search_orders(
min_amount: float = Query(None),
max_amount: float = Query(None),
status: str = Query(None)
):
query = Q()
if min_amount:
query &= Q(total_amount__gte=min_amount)
if max_amount:
query &= Q(total_amount__lte=max_amount)
if status:
query &= Q(status=status)
return await Order.filter(query).prefetch_related('product')
from datetime import datetime, timedelta
async def complex_query():
seven_days_ago = datetime.now() - timedelta(days=7)
return await Order.filter(
Q(
Q(created_at__gte=seven_days_ago) &
Q(total_amount__gt=500) &
Q(status='completed')
) |
Q(
Q(total_amount__lt=100) &
Q(status='pending')
)
).order_by('-created_at')
from tortoise.expressions import F
async def decrease_stock(product_id: int, quantity: int):
await Product.filter(id=product_id).update(
stock=F('stock') - quantity
)
async def update_product_price(product_id: int, new_price: float):
await Product.filter(id=product_id).update(
price=new_price,
last_updated=datetime.now(),
version=F('version') + 1
)
async def safe_purchase(product_id: int, quantity: int):
updated = await Product.filter(
id=product_id,
stock__gte=quantity
).update(stock=F('stock') - quantity)
if not updated:
raise HTTPException(400, "库存不足")
@router.post("/products/{product_id}/promotion")
async def create_promotion(
product_id: int,
discount: float = Body(..., gt=0, lt=1)
):
# 原子更新价格并记录操作
updated = await Product.filter(id=product_id).update(
price=F('price') * (1 - discount),
promotion_count=F('promotion_count') + 1
)
if not updated:
raise HTTPException(404, "商品不存在")
# 获取更新后的对象
product = await Product.get(id=product_id)
return {
"new_price": float(product.price),
"promotion_count": product.promotion_count
}
评论
发表评论