Python异步编程进阶指南:破解高并发系统的七重封印
title: Python异步编程进阶指南:破解高并发系统的七重封印
class AsyncDataStream:
def __init__(self, urls):
self.urls = urls
def __aiter__(self):
self.index = 0
return self
async def __anext__(self):
if self.index >= len(self.urls):
raise StopAsyncIteration
async with aiohttp.ClientSession() as session:
async with session.get(self.urls[self.index]) as resp:
data = await resp.json()
self.index += 1
return data
# 使用示例
async for record in AsyncDataStream(api_endpoints):
process(record)
# Redis Pub/Sub集成
import aioredis
async def redis_subscriber(channel):
redis = await aioredis.create_redis('redis://localhost')
async with redis.pubsub() as pubsub:
await pubsub.subscribe(channel)
async for message in pubsub.listen():
print(f"Received: {message}")
async def redis_publisher(channel):
redis = await aioredis.create_redis('redis://localhost')
await redis.publish(channel, "紧急消息!")
from asyncpg import create_pool
async def init_db():
return await create_pool(
dsn=DSN,
min_size=5,
max_size=100,
max_queries=50000,
max_inactive_connection_lifetime=300
)
async def query_users(pool):
async with pool.acquire() as conn:
return await conn.fetch("SELECT * FROM users WHERE is_active = $1", True)
async def transfer_funds(pool, from_id, to_id, amount):
async with pool.acquire() as conn:
async with conn.transaction(isolation='repeatable_read'):
# 扣款
await conn.execute(
"UPDATE accounts SET balance = balance - $1 WHERE id = $2",
amount, from_id
)
# 存款
await conn.execute(
"UPDATE accounts SET balance = balance + $1 WHERE id = $2",
amount, to_id
)
import pytest
@pytest.mark.asyncio
async def test_api_endpoint():
async with httpx.AsyncClient(app=app, base_url="http://test") as client:
response = await client.get("/items/42")
assert response.status_code == 200
assert response.json()["id"] == 42
# 使用Hypothesis进行属性测试
from hypothesis import given
from hypothesis.strategies import integers
@given(integers(min_value=1))
@pytest.mark.asyncio
async def test_item_lookup(item_id):
async with httpx.AsyncClient() as client:
response = await client.get(f"{API_URL}/items/{item_id}")
assert response.status_code in (200, 404)
# 使用cProfile进行协程分析
import cProfile
import asyncio
async def target_task():
await asyncio.sleep(1)
# 业务代码...
def profile_async():
loop = asyncio.get_event_loop()
with cProfile.Profile() as pr:
loop.run_until_complete(target_task())
pr.print_stats(sort='cumtime')
from concurrent.futures import ThreadPoolExecutor
import numpy as np
async def hybrid_processing(data):
loop = asyncio.get_event_loop()
# CPU密集型任务交给线程池
with ThreadPoolExecutor() as pool:
processed = await loop.run_in_executor(
pool, np.fft.fft, data
)
# IO密集型任务使用协程
async with aiohttp.ClientSession() as session:
await session.post(API_URL, json=processed)
class SmartSemaphore:
def __init__(self, max_concurrent):
self.sem = asyncio.Semaphore(max_concurrent)
self.active = 0
async def acquire(self):
await self.sem.acquire()
self.active += 1
# 动态调整并发数(基于系统负载)
if psutil.cpu_percent() < 70:
self.sem._value += 1 # 小心操作内部属性
def release(self):
self.sem.release()
self.active -= 1
# 使用asyncio调试模式
import sys
import asyncio
async def suspect_coro():
await asyncio.sleep(1)
# 意外同步调用
time.sleep(5) # 危险!
if __name__ == "__main__":
# 启用调试检测
asyncio.run(suspect_coro(), debug=True)
Executing <Task ...> took 5.003 seconds
import objgraph
async def leaking_coro():
cache = []
while True:
cache.append(await get_data())
await asyncio.sleep(1)
# 生成内存快照对比
objgraph.show_growth(limit=10)
# 实现要求:
# 1. 支持RabbitMQ/Kafka双协议
# 2. 消息去重与重试机制
# 3. 死信队列处理
class MessageHub:
def __init__(self, backend):
self.backend = backend
async def consume(self):
async for msg in self.backend.stream():
try:
await process(msg)
except Exception:
await self.dead_letters.put(msg)
async def retry_failed(self):
while True:
msg = await self.dead_letters.get()
await self.backend.publish(msg)
# 实现要求:
# 1. LRU淘汰策略
# 2. 缓存穿透保护
# 3. 分布式锁机制
class AsyncCache:
def __init__(self, size=1000):
self._store = {}
self._order = []
self.size = size
async def get(self, key):
if key in self._store:
self._order.remove(key)
self._order.append(key)
return self._store[key]
else:
# 防止缓存穿透
async with async_lock:
value = await fetch_from_db(key)
self._set(key, value)
return value
评论
发表评论