19.11.4 日志与审计平台实践案例
日志与审计平台实践案例聚焦“全量采集、统一存储、可检索与可追溯、合规审计与告警联动”。平台以统一日志规范与采集标准为基础,覆盖操作系统、应用、中间件与容器平台。统一字段建议包含:时间戳、主机/容器标识、应用与环境、请求标识、用户与权限上下文、业务标签,确保跨系统关联查询与审计可追溯。
原理与架构分层如下:
一、日志规范与字段示例(含脱敏)
- 目标:确保可检索、可关联、可审计。
- 关键字段:@timestamp, host, env, app, trace_id, user, action, resource, result, severity
示例:应用结构化日志(JSON)
{
"@timestamp": "2025-01-10T10:12:03+08:00",
"host": "app-01",
"env": "prod",
"app": "order-service",
"trace_id": "a1b2c3",
"user": "u_1001",
"action": "create_order",
"resource": "order/8899",
"result": "success",
"severity": "INFO",
"ip": "10.0.1.12",
"msg": "order created"
}
二、采集与传输(以 Filebeat + Kafka 为例)
1) 安装与启动 Filebeat(Linux)
# 安装(以 RHEL/CentOS 为例)
sudo rpm -ivh https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-8.12.0-x86_64.rpm
# 启动与自启
sudo systemctl enable filebeat
sudo systemctl start filebeat
sudo systemctl status filebeat
2) 采集配置示例:/etc/filebeat/filebeat.yml
filebeat.inputs:
- type: log
enabled: true
paths:
- /var/log/secure
- /var/log/messages
- /var/log/nginx/access.log
fields:
env: prod
app: os-nginx
fields_under_root: true
processors:
- add_host_metadata: ~
- add_fields:
target: ''
fields:
platform: log-audit
- drop_fields:
fields: ["agent", "ecs", "input", "log", "host.os"]
output.kafka:
hosts: ["kafka-1:9092","kafka-2:9092"]
topic: "log-audit"
compression: gzip
required_acks: 1
预期效果:日志从多源采集后进入 Kafka 主题 log-audit,采集字段统一,且多余字段被剔除减少存储成本。
三、存储检索与冷热分层(以 Elasticsearch 为例)
1) 索引模板与生命周期策略(ILM)
# 创建 ILM 策略:热7天,温30天,冷180天
curl -X PUT http://es-1:9200/_ilm/policy/log-ilm -H 'Content-Type: application/json' -d '
{
"policy": {
"phases": {
"hot": { "actions": { "rollover": { "max_age": "7d", "max_size": "50gb" } } },
"warm": { "min_age": "7d", "actions": { "shrink": { "number_of_shards": 1 } } },
"cold": { "min_age": "30d", "actions": { "freeze": {} } },
"delete": { "min_age": "180d", "actions": { "delete": {} } }
}
}
}'
2) 索引模板
curl -X PUT http://es-1:9200/_index_template/log-template -H 'Content-Type: application/json' -d '
{
"index_patterns": ["log-audit-*"],
"template": {
"settings": {
"index.lifecycle.name": "log-ilm",
"number_of_shards": 3
},
"mappings": {
"properties": {
"@timestamp": { "type": "date" },
"env": { "type": "keyword" },
"app": { "type": "keyword" },
"trace_id": { "type": "keyword" },
"user": { "type": "keyword" },
"action": { "type": "keyword" },
"result": { "type": "keyword" },
"severity": { "type": "keyword" }
}
}
}
}'
四、审计规则与告警联动(示例)
规则:同一账号 5 分钟内 3 次失败登录触发告警(可落地到 SIEM/Prometheus Alertmanager)
# 规则逻辑示意(伪配置)
rule: audit_login_fail
match:
action: "login"
result: "fail"
group_by: "user"
window: "5m"
threshold: 3
severity: "high"
notify: ["alertmanager", "ticket"]
告警联动脚本示例(调用工单系统 API)
#!/usr/bin/env bash
API="https://itsm.example.com/api/ticket"
TOKEN="xxxx"
curl -s -X POST "$API" \
-H "Authorization: Bearer $TOKEN" \
-H "Content-Type: application/json" \
-d '{"title":"审计告警:异常登录","priority":"P1","desc":"5分钟内3次失败登录"}'
五、Kubernetes 审计日志接入示例
1) K8s 审计策略(/etc/kubernetes/audit-policy.yaml)
apiVersion: audit.k8s.io/v1
kind: Policy
rules:
- level: Metadata
verbs: ["create","update","delete"]
resources:
- group: ""
resources: ["pods","secrets"]
2) kube-apiserver 启动参数追加
--audit-policy-file=/etc/kubernetes/audit-policy.yaml \
--audit-log-path=/var/log/k8s-audit/audit.log
3) Filebeat 采集新增路径
paths:
- /var/log/k8s-audit/audit.log
fields:
app: k8s-audit
六、常见排错清单(命令级)
1) 采集器无日志
# 看采集器状态与日志
systemctl status filebeat
tail -n 200 /var/log/filebeat/filebeat
# 检查日志文件权限
ls -l /var/log/secure
2) Kafka 堆积严重
# 查看消费滞后
/opt/kafka/bin/kafka-consumer-groups.sh --bootstrap-server kafka-1:9092 \
--describe --group log-consumer
3) Elasticsearch 索引写入失败
# 查看集群健康与拒绝写入原因
curl -s http://es-1:9200/_cluster/health?pretty
curl -s http://es-1:9200/_cat/indices?v
七、落地流程(可执行清单)
1) 需求与合规范围确认(保留周期、敏感字段)。
2) 字段规范与采集标准发布(统一 JSON/字段名)。
3) 日志源梳理与接入(OS/中间件/K8s/应用/CI/CD)。
4) 审计规则与告警联动上线(异常登录、权限提升、敏感变更)。
5) 验收与持续优化(性能、成本、规则误报率)。
八、练习题(含命令)
1) 新增 Nginx 日志字段并接入平台
要求:采集 /var/log/nginx/access.log,新增字段 app=nginx,验证入库。
# 修改 filebeat.yml 后重启
sudo systemctl restart filebeat
# 验证 Kafka 是否有日志
/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server kafka-1:9092 \
--topic log-audit --from-beginning --max-messages 3
2) 制作一个审计规则:同一用户 10 分钟内删除资源超过 5 次
输出规则配置和触发样例日志。
{"@timestamp":"2025-01-10T12:00:00+08:00","user":"u_22","action":"delete","resource":"db/table","result":"success"}
3) 实战排错:ES 索引写入 429
要求给出排查步骤并说明如何通过增加分片或限制写入速率缓解。
九、效果与度量
- 关键指标:日志接入覆盖率、审计规则命中率、告警闭环时长、查询响应时间、存储成本与保留周期。
- 持续优化:日志质量评分、智能异常检测、跨系统链路追踪与审计联动。