数据库审计与智能监控:从日志分析到异常检测
title: 数据库审计与智能监控:从日志分析到异常检测
-- 创建审计策略
CREATE AUDIT POLICY sql_audit_policy
ACTIONS SELECT, INSERT, UPDATE, DELETE,
ACTIONS COMPONENT=Datapump EXPORT, IMPORT;
-- 应用审计策略
AUDIT POLICY sql_audit_policy BY app_user;
-- 查看审计日志
SELECT * FROM UNIFIED_AUDIT_TRAIL
WHERE SQL_TEXT LIKE '%salary%';
BEGIN
DBMS_AUDIT_MGMT.SET_LAST_ARCHIVE_TIMESTAMP(
audit_trail_type => DBMS_AUDIT_MGMT.AUDIT_TRAIL_UNIFIED,
last_archive_time => SYSDATE-30
);
DBMS_AUDIT_MGMT.CLEAN_AUDIT_TRAIL(
audit_trail_type => DBMS_AUDIT_MGMT.AUDIT_TRAIL_UNIFIED,
use_last_arch_timestamp => TRUE
);
END;
-- 安装pgAudit扩展
CREATE EXTENSION pgaudit;
-- 配置审计规则
ALTER DATABASE sales SET pgaudit.log = 'write, ddl';
ALTER ROLE auditor SET pgaudit.log = 'all';
-- 审计日志示例
[2024-06-15 09:30:23 UTC] LOG: AUDIT: SESSION,1,1,DDL,CREATE TABLE,,,user=admin,db=sales
# mysqld_exporter配置
scraper_configs:
- job_name: 'mysql'
static_configs:
- targets: ['mysql-server:9104']
params:
collect[]:
- global_status
- innodb_metrics
- perf_schema.eventswaits
# 慢查询日志分析DSL
POST /_sql
{
"query": """
SELECT client_ip, COUNT(*) as cnt
FROM mysql-slowlogs-*
WHERE query_time > 5
GROUP BY client_ip
HAVING cnt > 10
ORDER BY cnt DESC
"""
}
# SQL注入模式识别
import re
def detect_sql_injection(query):
patterns = [
r'\b(union\s+select)\b',
r'\b(;\s*--)\b',
r'\b(exec\s+master\.dbo\.xp_cmdshell)\b'
]
return any(re.search(p, query, re.I) for p in patterns)
# 审计日志流式检测
from kafka import KafkaConsumer
consumer = KafkaConsumer('audit-logs')
for msg in consumer:
if detect_sql_injection(msg.value.decode()):
alert_soc(f"SQL注入尝试: {msg.value[:100]}")
# Isolation Forest异常检测
from sklearn.ensemble import IsolationForest
import pandas as pd
# 特征工程
logs = pd.read_parquet('audit_logs.parquet')
features = logs[['query_duration', 'rows_affected', 'error_code']]
# 模型训练
model = IsolationForest(contamination=0.01)
model.fit(features)
# 实时预测
new_query = [[1.2, 10000, 0]]
if model.predict(new_query)[0] == -1:
trigger_alert("异常查询行为", new_query)
# 审计日志签名
openssl dgst -sha256 -sign private.key -out audit.log.sig audit.log
# 验证签名
openssl dgst -sha256 -verify public.key -signature audit.log.sig audit.log
# 使用Jinja2生成PDF报告
from jinja2 import Template
from pdfkit import from_string
template = Template('''
<h1>{{ month }}审计报告</h1>
<table>
<tr><th>事件类型</th><th>次数</th></tr>
{% for item in stats %}
<tr><td>{{ item.type }}</td><td>{{ item.count }}</td></tr>
{% endfor %}
</table>
''')
html = template.render(month="2024-06", stats=audit_stats)
from_string(html, output_path="audit_report.pdf")
| 指标 | 警告阈值 | 严重阈值 |
|---|---|---|
| CPU使用率 | 70% | 90% |
| 连接池等待数 | 50 | 100 |
| 磁盘IO延迟 | 20ms | 50ms |


评论
发表评论