19.11.4 日志与审计平台实践案例

日志与审计平台实践案例聚焦“全量采集、统一存储、可检索与可追溯、合规审计与告警联动”。平台以统一日志规范与采集标准为基础,覆盖操作系统、应用、中间件与容器平台。统一字段建议包含:时间戳、主机/容器标识、应用与环境、请求标识、用户与权限上下文、业务标签,确保跨系统关联查询与审计可追溯。

原理与架构分层如下:

文章图片

一、日志规范与字段示例(含脱敏)
- 目标:确保可检索、可关联、可审计。
- 关键字段:@timestamp, host, env, app, trace_id, user, action, resource, result, severity

示例:应用结构化日志(JSON)

{
  "@timestamp": "2025-01-10T10:12:03+08:00",
  "host": "app-01",
  "env": "prod",
  "app": "order-service",
  "trace_id": "a1b2c3",
  "user": "u_1001",
  "action": "create_order",
  "resource": "order/8899",
  "result": "success",
  "severity": "INFO",
  "ip": "10.0.1.12",
  "msg": "order created"
}

二、采集与传输(以 Filebeat + Kafka 为例)
1) 安装与启动 Filebeat(Linux)

# 安装(以 RHEL/CentOS 为例)
sudo rpm -ivh https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-8.12.0-x86_64.rpm

# 启动与自启
sudo systemctl enable filebeat
sudo systemctl start filebeat
sudo systemctl status filebeat

2) 采集配置示例:/etc/filebeat/filebeat.yml

filebeat.inputs:
  - type: log
    enabled: true
    paths:
      - /var/log/secure
      - /var/log/messages
      - /var/log/nginx/access.log
    fields:
      env: prod
      app: os-nginx
    fields_under_root: true

processors:
  - add_host_metadata: ~
  - add_fields:
      target: ''
      fields:
        platform: log-audit
  - drop_fields:
      fields: ["agent", "ecs", "input", "log", "host.os"]

output.kafka:
  hosts: ["kafka-1:9092","kafka-2:9092"]
  topic: "log-audit"
  compression: gzip
  required_acks: 1

预期效果:日志从多源采集后进入 Kafka 主题 log-audit,采集字段统一,且多余字段被剔除减少存储成本。

三、存储检索与冷热分层(以 Elasticsearch 为例)
1) 索引模板与生命周期策略(ILM)

# 创建 ILM 策略:热7天,温30天,冷180天
curl -X PUT http://es-1:9200/_ilm/policy/log-ilm -H 'Content-Type: application/json' -d '
{
  "policy": {
    "phases": {
      "hot": { "actions": { "rollover": { "max_age": "7d", "max_size": "50gb" } } },
      "warm": { "min_age": "7d", "actions": { "shrink": { "number_of_shards": 1 } } },
      "cold": { "min_age": "30d", "actions": { "freeze": {} } },
      "delete": { "min_age": "180d", "actions": { "delete": {} } }
    }
  }
}'

2) 索引模板

curl -X PUT http://es-1:9200/_index_template/log-template -H 'Content-Type: application/json' -d '
{
  "index_patterns": ["log-audit-*"],
  "template": {
    "settings": {
      "index.lifecycle.name": "log-ilm",
      "number_of_shards": 3
    },
    "mappings": {
      "properties": {
        "@timestamp": { "type": "date" },
        "env": { "type": "keyword" },
        "app": { "type": "keyword" },
        "trace_id": { "type": "keyword" },
        "user": { "type": "keyword" },
        "action": { "type": "keyword" },
        "result": { "type": "keyword" },
        "severity": { "type": "keyword" }
      }
    }
  }
}'

四、审计规则与告警联动(示例)
规则:同一账号 5 分钟内 3 次失败登录触发告警(可落地到 SIEM/Prometheus Alertmanager)

# 规则逻辑示意(伪配置)
rule: audit_login_fail
match:
  action: "login"
  result: "fail"
group_by: "user"
window: "5m"
threshold: 3
severity: "high"
notify: ["alertmanager", "ticket"]

告警联动脚本示例(调用工单系统 API)

#!/usr/bin/env bash
API="https://itsm.example.com/api/ticket"
TOKEN="xxxx"
curl -s -X POST "$API" \
  -H "Authorization: Bearer $TOKEN" \
  -H "Content-Type: application/json" \
  -d '{"title":"审计告警:异常登录","priority":"P1","desc":"5分钟内3次失败登录"}'

五、Kubernetes 审计日志接入示例
1) K8s 审计策略(/etc/kubernetes/audit-policy.yaml)

apiVersion: audit.k8s.io/v1
kind: Policy
rules:
- level: Metadata
  verbs: ["create","update","delete"]
  resources:
  - group: ""
    resources: ["pods","secrets"]

2) kube-apiserver 启动参数追加

--audit-policy-file=/etc/kubernetes/audit-policy.yaml \
--audit-log-path=/var/log/k8s-audit/audit.log

3) Filebeat 采集新增路径

paths:
  - /var/log/k8s-audit/audit.log
fields:
  app: k8s-audit

六、常见排错清单(命令级)
1) 采集器无日志

# 看采集器状态与日志
systemctl status filebeat
tail -n 200 /var/log/filebeat/filebeat
# 检查日志文件权限
ls -l /var/log/secure

2) Kafka 堆积严重

# 查看消费滞后
/opt/kafka/bin/kafka-consumer-groups.sh --bootstrap-server kafka-1:9092 \
  --describe --group log-consumer

3) Elasticsearch 索引写入失败

# 查看集群健康与拒绝写入原因
curl -s http://es-1:9200/_cluster/health?pretty
curl -s http://es-1:9200/_cat/indices?v

七、落地流程(可执行清单)
1) 需求与合规范围确认(保留周期、敏感字段)。
2) 字段规范与采集标准发布(统一 JSON/字段名)。
3) 日志源梳理与接入(OS/中间件/K8s/应用/CI/CD)。
4) 审计规则与告警联动上线(异常登录、权限提升、敏感变更)。
5) 验收与持续优化(性能、成本、规则误报率)。

八、练习题(含命令)
1) 新增 Nginx 日志字段并接入平台
要求:采集 /var/log/nginx/access.log,新增字段 app=nginx,验证入库。

# 修改 filebeat.yml 后重启
sudo systemctl restart filebeat
# 验证 Kafka 是否有日志
/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server kafka-1:9092 \
  --topic log-audit --from-beginning --max-messages 3

2) 制作一个审计规则:同一用户 10 分钟内删除资源超过 5 次
输出规则配置和触发样例日志。

{"@timestamp":"2025-01-10T12:00:00+08:00","user":"u_22","action":"delete","resource":"db/table","result":"success"}

3) 实战排错:ES 索引写入 429
要求给出排查步骤并说明如何通过增加分片或限制写入速率缓解。

九、效果与度量
- 关键指标:日志接入覆盖率、审计规则命中率、告警闭环时长、查询响应时间、存储成本与保留周期。
- 持续优化:日志质量评分、智能异常检测、跨系统链路追踪与审计联动。