Python异步编程终极指南:用协程与事件循环重构你的高并发系统
title: Python异步编程终极指南:用协程与事件循环重构你的高并发系统
# 同步HTTP请求的阻塞示例
import requests
def fetch_sync(urls):
results = []
for url in urls:
resp = requests.get(url) # 每个请求阻塞2秒
results.append(resp.text)
return results
# 10个URL耗时约20秒(2秒/请求 × 10)
# 异步HTTP请求示例
import aiohttp
import asyncio
async def fetch_async(urls):
async with aiohttp.ClientSession() as session:
tasks = [session.get(url) for url in urls]
responses = await asyncio.gather(*tasks)
return [await r.text() for r in responses]
# 10个URL仅需2秒(所有请求并行)
指标 | 同步 | 异步 |
---|---|---|
10请求耗时 | 20秒 | 2秒 |
CPU利用率 | 15% | 85% |
内存占用 | 低 | 中等 |
import uvloop
import asyncio
# 配置uvloop(比默认循环快30%)
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
# 获取当前循环
loop = asyncio.get_event_loop()
# 手动调度协程
async def task(name):
print(f"{name} start")
await asyncio.sleep(1)
print(f"{name} end")
coro1 = task("A")
coro2 = task("B")
loop.run_until_complete(asyncio.gather(coro1, coro2))
# 方式1:async def
async def simple_coro():
await asyncio.sleep(1)
# 方式2:@asyncio.coroutine装饰器(旧式)
@asyncio.coroutine
def legacy_coro():
yield from asyncio.sleep(1)
# 方式3:生成器协程
def gen_coro():
yield from asyncio.sleep(1)
# 方式4:async with上下文
async with aiohttp.ClientSession() as session:
await session.get(url)
async def worker(q: asyncio.Queue):
while True:
item = await q.get()
try:
# 处理任务...
finally:
q.task_done()
async def main():
q = asyncio.Queue(maxsize=100)
# 创建worker池
tasks = [asyncio.create_task(worker(q)) for _ in range(5)]
# 添加任务
for i in range(1000):
await q.put(i)
# 等待队列清空
await q.join()
# 取消worker
for t in tasks:
t.cancel()
await asyncio.gather(*tasks, return_exceptions=True)
from bs4 import BeautifulSoup
import aiohttp
async def scrape_page(url):
async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(10)) as session:
async with session.get(url) as response:
html = await response.text()
soup = BeautifulSoup(html, 'lxml')
# 解析逻辑...
return data
async def scrape_all(urls):
sem = asyncio.Semaphore(20) # 限制并发数
async def limited_scrape(url):
async with sem:
return await scrape_page(url)
return await asyncio.gather(*[limited_scrape(url) for url in urls])
from fastapi import FastAPI
import httpx
app = FastAPI()
@app.get("/proxy")
async def proxy_request(url: str):
async with httpx.AsyncClient() as client:
resp = await client.get(url)
return resp.json()
async def test():
print("Running")
# 错误:直接调用协程
test() # 输出:RuntimeWarning: coroutine 'test' was never awaited
# 正确执行方式:
asyncio.run(test())
错误:RuntimeError: Event loop is closed
解决方案:
1. 确保使用async/await管理资源生命周期
2. 避免在协程外创建ClientSession
3. 显式关闭循环:
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(main())
finally:
loop.close()
# 原同步代码(PostgreSQL)
def query_users():
with psycopg2.connect(DSN) as conn:
cursor = conn.cursor()
cursor.execute("SELECT * FROM users")
return cursor.fetchall()
# 任务:改写为异步版本(使用asyncpg)
# 要求:
# 1. 支持连接池
# 2. 实现分页查询
# 3. 处理查询超时
# 实现一个滑动窗口限流器
class RateLimiter:
def __init__(self, rate=10, period=1):
self.rate = rate
self.period = period
async def __aenter__(self):
# 实现限流逻辑...
async def __aexit__(self, *args):
pass
# 使用示例
async def limited_api_call():
async with RateLimiter(100, 60): # 每分钟最多100次
return await call_external_api()
评论
发表评论