如何在FastAPI中打造一个既安全又灵活的权限管理系统?
title: 如何在FastAPI中打造一个既安全又灵活的权限管理系统?


# 所需环境配置(运行前请安装)
# fastapi==0.95.0
# uvicorn==0.21.1
# python-multipart==0.0.6
# sqlalchemy==1.4.46
# pydantic==1.10.7
# passlib==1.7.4
from sqlalchemy import Boolean, Column, ForeignKey, Integer, String
from sqlalchemy.orm import relationship
from databases import Base
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True)
username = Column(String(50), unique=True)
hashed_password = Column(String(300))
is_active = Column(Boolean, default=True)
role_id = Column(Integer, ForeignKey("roles.id"))
role = relationship("Role", back_populates="users")
class Role(Base):
__tablename__ = "roles"
id = Column(Integer, primary_key=True)
name = Column(String(20), unique=True)
permissions = Column(String(500)) # 存储逗号分隔的权限标识
users = relationship("User", back_populates="role")
class PermissionRegistry(Base):
__tablename__ = "permission_registry"
id = Column(Integer, primary_key=True)
endpoint = Column(String(100)) # 路由路径
method = Column(String(10)) # HTTP方法
perm_code = Column(String(50)) # 权限标识
from fastapi import Depends, HTTPException, status
from pydantic import BaseModel
class PermissionValidator:
def __init__(self, required_perm: str):
self.required_perm = required_perm
async def __call__(self,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db)):
# 获取角色关联的权限
role_perms = current_user.role.permissions.split(",")
# 验证权限是否存在
if self.required_perm not in role_perms:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="没有访问权限"
)
# 记录审计日志(示例)
audit_log = AuditLog(
user_id=current_user.id,
action=f"访问需要 {self.required_perm} 权限的端点"
)
db.add(audit_log)
db.commit()
# 使用示例
@app.get("/admin/dashboard")
async def admin_dashboard(
perm_check: bool = Depends(PermissionValidator("admin_dashboard"))):
return {"message": "欢迎来到管理面板"}
class PermissionRegistration(BaseModel):
endpoint: str
methods: List[str]
perm_code: str
@app.post("/manage/permissions")
async def register_permission(
perm_data: PermissionRegistration,
db: Session = Depends(get_db)
):
for method in perm_data.methods:
existing = db.query(PermissionRegistry).filter_by(
endpoint=perm_data.endpoint,
method=method
).first()
if not existing:
new_perm = PermissionRegistry(
endpoint=perm_data.endpoint,
method=method,
perm_code=perm_data.perm_code
)
db.add(new_perm)
db.commit()
return {"status": "权限注册成功"}
@app.middleware("http")
async def dynamic_permission_check(request: Request, call_next):
# 跳过非业务端点
if request.url.path.startswith(("/docs", "/redoc")):
return await call_next(request)
# 查询权限注册表
db = SessionLocal()
perm_record = db.query(PermissionRegistry).filter_by(
endpoint=request.url.path,
method=request.method
).first()
if perm_record:
# 验证用户权限
current_user = await get_current_user(request)
if perm_record.perm_code not in current_user.role.permissions.split(","):
return JSONResponse(
status_code=403,
content={"detail": "权限不足"}
)
response = await call_next(request)
return response
SQLALCHEMY_DATABASE_URL = "postgresql://user:pass@localhost/dbname?connect_timeout=10"
engine = create_engine(
SQLALCHEMY_DATABASE_URL,
pool_size=20,
max_overflow=10,
pool_timeout=30
)
评论
发表评论