FastAPI中Pydantic异步分布式唯一性校验


from pydantic import BaseModel, validator, EmailStr
from typing import Optional
class UserCreate(BaseModel):
username: str
email: EmailStr
mobile: str = Pattern(r"^1[3-9]\d{9}$")
referral_code: Optional[str] = None
@validator('mobile')
def validate_mobile(cls, v):
return v.strip()
from fastapi import Depends
from redis.asyncio import Redis
class ValidationService:
def __init__(self, redis: Redis):
self.redis = redis
self.local_cache = {}
async def check_unique(self, field: str, value: str) -> bool:
# 本地缓存检查(减少网络IO)
if value in self.local_cache.get(field, set()):
return False
# Redis原子操作(避免并发冲突)
key = f"unique:{field}:{value}"
async with self.redis.lock(f"lock:{key}", timeout=5):
if await self.redis.exists(key):
return False
# 数据库实际查询(示例使用伪代码)
exists_in_db = await User.filter(**{field: value}).exists()
if not exists_in_db:
# 设置短期缓存(5分钟)
await self.redis.setex(key, 300, 1)
self.local_cache.setdefault(field, set()).add(value)
return not exists_in_db
from fastapi import APIRouter, HTTPException
router = APIRouter()
@router.post("/users")
async def create_user(
user: UserCreate,
service: ValidationService = Depends()
):
# 并行校验邮箱和手机号
email_check, mobile_check = await asyncio.gather(
service.check_unique("email", user.email),
service.check_unique("mobile", user.mobile)
)
if not email_check:
raise HTTPException(400, "Email already registered")
if not mobile_check:
raise HTTPException(400, "Mobile already registered")
# 创建用户逻辑...
缓存层级 | 存储介质 | 有效期 | 特点 |
---|---|---|---|
本地缓存 | 内存 | 60秒 | 零延迟,进程内共享 |
Redis | 内存 | 5分钟 | 跨进程,分布式一致性 |
数据库 | 磁盘 | 永久 | 最终数据源,强一致性 |
from contextlib import asynccontextmanager
@asynccontextmanager
async def acquire_lock(redis: Redis, key: str, timeout=5):
lock = redis.lock(f"lock:{key}", timeout=timeout)
acquired = await lock.acquire(blocking=False)
try:
if acquired:
yield True
else:
yield False
finally:
if acquired:
await lock.release()
async def rate_limiter(key: str, limit=5, period=60):
counter = await redis.incr(key)
if counter == 1:
await redis.expire(key, period)
return counter <= limit
async with redis.lock("mylock", timeout=10):
await asyncio.sleep(5) # 确保操作在10秒内完成
@validator('mobile')
def clean_mobile(cls, v):
return v.strip().replace(' ', '').replace('-', '')
评论
发表评论