异步IO与Tortoise-ORM的数据库
title: 异步IO与Tortoise-ORM的数据库


# 同步操作(线程阻塞)
def sync_query():
result = db.execute("SELECT * FROM users") # 线程在此等待
process(result)
# 异步操作(事件驱动)
async def async_query():
result = await db.execute("SELECT * FROM users") # 释放控制权
process(result)
层级 | 职责 | 关键技术 |
---|---|---|
协议层 | 数据库通信协议解析 | asyncpg/aiomysql |
连接池层 | 管理异步数据库连接 | asyncio.Queue |
ORM层 | 模型映射与查询构建 | Python元类编程 |
async def get_users():
# 以下三个步骤交替执行,全程无阻塞
users = await User.filter(age__gt=18) # 1.生成SQL语句
# 2.从连接池获取连接
# 3.等待数据库响应
return users
pip install fastapi uvicorn tortoise-orm aiosqlite pydantic
project/
├── config.py
├── models.py
├── schemas.py
└── main.py
from tortoise.models import Model
from tortoise import fields
class User(Model):
id = fields.IntField(pk=True)
username = fields.CharField(max_length=50, unique=True)
hashed_password = fields.CharField(max_length=128)
email = fields.CharField(max_length=100)
created_at = fields.DatetimeField(auto_now_add=True)
class Meta:
table = "users"
from pydantic import BaseModel, EmailStr
class UserCreate(BaseModel):
username: str
password: str
email: EmailStr
class Config:
schema_extra = {
"example": {
"username": "fastapi_user",
"password": "strongpassword123",
"email": "user@example.com"
}
}
from fastapi import FastAPI, Depends, HTTPException
from tortoise.contrib.fastapi import register_tortoise
from models import User
from schemas import UserCreate
app = FastAPI()
# 初始化数据库
register_tortoise(
app,
db_url="sqlite://db.sqlite3",
modules={"models": ["models"]},
generate_schemas=True,
add_exception_handlers=True,
)
@app.post("/users/", status_code=201)
async def create_user(user_data: UserCreate):
# 密码哈希处理(实际项目应使用passlib)
hashed_password = f"hashed_{user_data.password}"
try:
user = await User.create(
username=user_data.username,
hashed_password=hashed_password,
email=user_data.email
)
except Exception as e:
raise HTTPException(
status_code=400,
detail="Username already exists"
)
return {
"id": user.id,
"username": user.username,
"email": user.email
}
@app.get("/users/{user_id}")
async def get_user(user_id: int):
user = await User.get_or_none(id=user_id)
if not user:
raise HTTPException(status_code=404, detail="User not found")
return {
"id": user.id,
"username": user.username,
"email": user.email,
"created_at": user.created_at.isoformat()
}
register_tortoise(
app,
db_url="postgres://user:pass@localhost:5432/mydb",
modules={"models": ["models"]},
generate_schemas=True,
add_exception_handlers=True,
connection_params={
"maxsize": 20 # 控制连接池大小
}
)
# 获取用户及其所有文章
async def get_user_with_posts(user_id: int):
user = await User.get(id=user_id).prefetch_related('posts')
return user
async def transfer_funds(from_id, to_id, amount):
async with in_transaction() as conn:
from_user = await User.get(id=from_id).for_update()
to_user = await User.get(id=to_id).for_update()
if from_user.balance < amount:
raise ValueError("Insufficient balance")
from_user.balance -= amount
to_user.balance += amount
await from_user.save(using_db=conn)
await to_user.save(using_db=conn)
评论
发表评论