数据库安全实战:访问控制与行级权限管理
title: 数据库安全实战:访问控制与行级权限管理
-- PostgreSQL 角色继承示例
CREATE ROLE reader;
CREATE ROLE analyst INHERIT reader;
CREATE ROLE dba INHERIT analyst;
CREATE USER john WITH ROLE analyst;
-- 权限继承验证
GRANT SELECT ON sales TO reader;
SET ROLE analyst;
SELECT * FROM sales; -- 成功继承reader权限
# Kubernetes中数据库凭据管理
kubectl create secret generic db-creds \
--from-literal=username=svc_app \
--from-literal=password=$(openssl rand -base64 16)
# 密码自动轮换策略(Vault示例)
vault write database/rotate-root/my-mysql-db
CREATE USER auditor@'%' IDENTIFIED BY 'SecurePass123!';
GRANT SELECT, SHOW VIEW ON sales.* TO auditor;
-- 细粒度权限回收
REVOKE DELETE HISTORY ON *.* FROM auditor;
-- Oracle Virtual Private Database (VPD)
BEGIN
DBMS_RLS.ADD_POLICY(
object_schema => 'hr',
object_name => 'employees',
policy_name => 'dept_policy',
function_schema => 'sec',
policy_function => 'check_dept',
statement_types => 'SELECT'
);
END;
-- 策略函数实现
CREATE FUNCTION check_dept (
schema_var IN VARCHAR2,
table_var IN VARCHAR2
) RETURN VARCHAR2
IS
BEGIN
RETURN 'department_id = SYS_CONTEXT(''USERENV'', ''SESSION_DEPT'')';
END;
CREATE POLICY tenant_access ON invoices
USING (tenant_id = current_setting('app.current_tenant')::INT);
-- 查询时自动过滤
SET app.current_tenant = '123';
SELECT * FROM invoices; -- 仅返回租户123的数据
数据量 | 无RLS查询时间 | 有RLS查询时间 |
---|---|---|
100万行 | 12ms | 14ms (+16%) |
1亿行 | 1.2s | 1.3s (+8%) |
# 生成列主密钥
$cert = New-SelfSignedCertificate -Subject "CN=ColumnMasterKey"
Export-Certificate -Cert $cert -FilePath "CMK.cer"
# 加密社保号列
ALTER TABLE employees
ALTER COLUMN ssn VARCHAR(11)
ENCRYPTED WITH (
ENCRYPTION_TYPE = DETERMINISTIC,
ALGORITHM = 'AEAD_AES_256_CBC_HMAC_SHA_256',
COLUMN_ENCRYPTION_KEY = CEK1
);
CREATE MASKING POLICY phone_mask AS (val STRING) RETURNS STRING ->
CASE
WHEN CURRENT_ROLE() = 'HR' THEN val
ELSE CONCAT('***-***-', SUBSTR(val, 9, 4))
END;
ALTER TABLE customers MODIFY COLUMN phone
SET MASKING POLICY phone_mask;
# my.cnf配置
[mysqld]
audit_log=ON
audit_log_format=JSON
audit_log_policy=ALL
{
"timestamp": "2023-10-05T14:23:15Z",
"user": "app_user@192.168.1.100",
"action": "SELECT",
"database": "hr",
"object": "salary",
"sql": "SELECT * FROM salary WHERE emp_id=101"
}
# 基于机器学习的SQL注入检测
from sklearn.ensemble import IsolationForest
# 特征工程
query_features = [
len(query),
num_special_chars(query),
keyword_ratio(query)
]
# 训练检测模型
clf = IsolationForest(contamination=0.01)
clf.fit(training_data)
# 实时检测
if clf.predict([current_query_features]) == -1:
block_query()
评论
发表评论