地理空间索引:解锁日志分析中的位置智慧
title: 地理空间索引:解锁日志分析中的位置智慧


{
"event_type": "user_login",
"ip": "192.168.1.1",
"location": {
"type": "Point",
"coordinates": [
116.404,
39.915
]
// [经度, 纬度]
},
"timestamp": "2023-07-20T10:00:00"
}
类型 | 应用场景 | 精度控制 |
---|---|---|
2dsphere | 地球表面几何计算(WGS84) | 高 |
2d | 平面地图/自定义坐标系 | 中 |
pip install motor==3.3.2 pydantic==1.10.7
from motor.motor_asyncio import AsyncIOMotorClient
from pydantic import BaseSettings
class Settings(BaseSettings):
MONGO_URI: str = "mongodb://localhost:27017"
DB_NAME: str = "geo_logs"
settings = Settings()
client = AsyncIOMotorClient(settings.MONGO_URI)
db = client[settings.DB_NAME]
from typing import Literal
from pydantic import BaseModel
class GeoPoint(BaseModel):
type: Literal["Point"] = "Point"
coordinates: list[float] # [longitude, latitude]
class LogRecord(BaseModel):
event_type: str
ip: str
location: GeoPoint
timestamp: datetime
# 在FastAPI启动事件中创建索引
@app.on_event("startup")
async def create_indexes():
await db.logs.create_index([("location", "2dsphere")])
print("2dsphere索引创建完成")
from fastapi import APIRouter
from geojson_pydantic import Point
router = APIRouter()
class GeoQuery(BaseModel):
center: Point
radius: confloat(gt=0) # 单位:米
@router.post("/logs/nearby")
async def get_nearby_logs(query: GeoQuery):
"""
查询指定半径范围内的日志记录
示例请求体:
{
"center": {
"type": "Point",
"coordinates": [116.404, 39.915]
},
"radius": 5000
}
"""
result = await db.logs.find({
"location": {
"$near": {
"$geometry": query.center.dict(),
"$maxDistance": query.radius
}
}
}).to_list(1000)
return result
class GeoFenceAlert(BaseModel):
fence: Polygon
@router.post("/alerts/geo-fence")
async def check_geo_fence(alert: GeoFenceAlert):
"""
检查日志是否进入指定地理围栏
多边形示例:
{
"type": "Polygon",
"coordinates": [[
[116.39,39.91],
[116.41,39.91],
[116.41,39.93],
[116.39,39.93],
[116.39,39.91]
]]
}
"""
return await db.logs.count_documents({
"location": {
"$geoWithin": {
"$geometry": alert.fence.dict()
}
}
})
# 组合时间与空间的复合索引
await db.logs.create_index([
("event_type", 1),
("location", "2dsphere"),
("timestamp", -1)
])
async def analyze_heatmap():
pipeline = [
{"$geoNear": {
"near": {"type": "Point", "coordinates": [116.4, 39.9]},
"distanceField": "distance",
"maxDistance": 10000,
"spherical": True
}},
{"$group": {
"_id": "$event_type",
"count": {"$sum": 1},
"avgDistance": {"$avg": "$distance"}
}}
]
return await db.logs.aggregate(pipeline).to_list(None)
radius: confloat(gt=0, le=50000) # 最大50公里
# 所需依赖及版本
fastapi==0.95.2
motor==3.3.2
pydantic==1.10.7
python-multipart==0.0.6
uvicorn==0.22.0
# 启动命令
uvicorn main:app --reload --port 8000
评论
发表评论