FastAPI日志审计:你的权限系统是否真的安全无虞?
title: FastAPI日志审计:你的权限系统是否真的安全无虞?


请求流程:
HTTP请求 -> 认证中间件 -> 权限校验 -> 业务处理 -> 响应生成
↗日志收集↗ ↗日志收集↗ ↘日志收集↘
└─────────────────日志存储器───────────────┘
from fastapi import Request
from datetime import datetime
async def audit_logger(request: Request, call_next):
start_time = datetime.utcnow()
response = await call_next(request)
log_data = {
"client_ip": request.client.host,
"method": request.method,
"path": request.url.path,
"status_code": response.status_code,
"response_time": (datetime.utcnow() - start_time).total_seconds(),
"user_agent": request.headers.get("user-agent", "")
}
# 写入数据库或日志文件
print(f"[AUDIT] {log_data}")
return response
pip install fastapi==0.68.0 uvicorn==0.15.0 sqlalchemy==1.4.35 pydantic==1.10.7
from sqlalchemy import Column, Integer, String, DateTime, JSON
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
class AuditLog(Base):
__tablename__ = "audit_logs"
id = Column(Integer, primary_key=True)
user_id = Column(Integer, index=True) # 操作者ID
action_type = Column(String(50)) # 操作类型:LOGIN/CREATE/UPDATE
target_resource = Column(String(100)) # 操作资源:/users /posts
details = Column(JSON) # 操作详情
created_at = Column(DateTime, default=datetime.utcnow)
from pydantic import BaseModel
from fastapi import Depends
class AuditLogCreate(BaseModel):
user_id: int
action_type: str
target_resource: str
details: dict
class AuditService:
async def create_log(self, log_data: AuditLogCreate):
# 实际生产环境应异步写入
async with SessionLocal() as session:
session.add(AuditLog(**log_data.dict()))
await session.commit()
from fastapi.security import HTTPBearer
from fastapi import HTTPException
security = HTTPBearer()
async def check_permission(
request: Request,
credentials: HTTPAuthorizationCredentials = Depends(security)
):
try:
user = authenticate(credentials.credentials)
if not has_permission(user, request):
await log_access_denied(user, request)
raise HTTPException(403)
return user
except Exception as e:
await log_auth_error(e)
raise
@app.post("/users")
async def create_user(
user_data: UserCreate,
current_user: User = Depends(check_permission("USER_CREATE"))
):
new_user = await UserService.create(user_data)
# 记录审计日志
await AuditService().create_log(
AuditLogCreate(
user_id=current_user.id,
action_type="CREATE",
target_resource="/users",
details={
"operator_ip": request.client.host,
"new_user_id": new_user.id,
"created_data": user_data.dict(exclude={"password"})
}
)
)
return new_user
@app.post("/login")
async def login(credentials: OAuth2PasswordRequestForm = Depends()):
try:
user = await authenticate(credentials)
await log_success_login(user)
return {"token": create_token(user)}
except AuthError as e:
await log_failed_login(
username=credentials.username,
client_ip=request.client.host,
error=str(e)
)
raise
评论
发表评论