19.4.4 任务调度与依赖关系管理

任务调度与依赖关系管理是自动化运维平台的核心能力之一,需要覆盖时间调度、事件触发、资源约束与多任务依赖。调度策略应支持 Cron/日历型、事件型(监控告警、工单状态、代码合并)、以及条件型(资源阈值、配置一致性)三类触发方式,满足批量作业、周期作业与应急作业的统一编排。

文章图片

依赖关系管理应提供DAG建模能力,明确前置/后置、并行/互斥、强依赖/弱依赖等关系类型,并支持依赖状态继承与失败传播策略。常见设计包括:上游失败即阻断下游、允许部分失败继续、失败自动重试与补偿性作业触发。依赖配置需支持跨系统任务(如数据库备份→配置更新→服务滚动重启→健康检查→流量切换)的可视化与版本化管理,确保变更可追踪、可回溯。

文章图片

为了具象化调度与依赖管理,以下以 Airflow 为例演示安装、编排、执行与排错(可替换为你平台的调度引擎理念一致)。

安装与初始化(Ubuntu/CentOS 通用,建议 Python 3.9+):

# 1) 安装依赖与虚拟环境
sudo apt-get update && sudo apt-get install -y python3-venv
python3 -m venv /opt/airflow-venv
source /opt/airflow-venv/bin/activate

# 2) 安装 Airflow(sqlite 仅用于演示)
export AIRFLOW_HOME=/opt/airflow
pip install "apache-airflow==2.6.3" --constraint \
  "https://raw.githubusercontent.com/apache/airflow/constraints-2.6.3/constraints-3.9.txt"

# 3) 初始化元数据库与创建用户
airflow db init
airflow users create \
  --username admin --password admin \
  --firstname ops --lastname admin \
  --role Admin --email ops@example.com

# 4) 启动 Web 与 Scheduler
airflow webserver -p 8080 >/opt/airflow/web.log 2>&1 &
airflow scheduler >/opt/airflow/scheduler.log 2>&1 &

依赖与调度DAG示例(包含强依赖、并行、失败回调与重试):

# /opt/airflow/dags/ops_pipeline.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator

default_args = {
    "owner": "ops",
    "retries": 2,
    "retry_delay": timedelta(minutes=1),
}

with DAG(
    dag_id="ops_pipeline",
    start_date=datetime(2023, 1, 1),
    schedule_interval="0 2 * * *",  # 每天 02:00
    catchup=False,
    default_args=default_args,
    tags=["ops", "deploy"],
) as dag:

    backup = BashOperator(
        task_id="mysql_backup",
        bash_command="echo backup && sleep 2",
    )

    update_cfg = BashOperator(
        task_id="update_config",
        bash_command="echo update config && sleep 1",
    )

    restart = BashOperator(
        task_id="rolling_restart",
        bash_command="echo rolling restart && sleep 3",
    )

    health = BashOperator(
        task_id="health_check",
        bash_command="echo health check && sleep 1",
    )

    switch = BashOperator(
        task_id="traffic_switch",
        bash_command="echo switch traffic && sleep 1",
    )

    # 依赖关系
    backup >> update_cfg
    update_cfg >> restart >> health >> switch

关键命令与解释:

# 查看 DAG 列表(确认加载成功)
airflow dags list

# 手动触发一次执行
airflow dags trigger ops_pipeline

# 查看运行状态与任务列表
airflow tasks list ops_pipeline
airflow tasks state ops_pipeline mysql_backup 2023-09-01

调度并发、资源与窗口期控制示例(限制并发与指定运行窗口):

# /opt/airflow/dags/ops_window.py
from datetime import datetime
from airflow import DAG
from airflow.operators.bash import BashOperator

with DAG(
    dag_id="ops_window",
    start_date=datetime(2023, 1, 1),
    schedule_interval="*/10 * * * *",
    catchup=False,
    max_active_runs=1,     # 同一 DAG 并发限制
    concurrency=2,         # 同一 DAG 总并发
    tags=["ops", "window"],
) as dag:
    task1 = BashOperator(task_id="t1", bash_command="echo t1 && sleep 5")
    task2 = BashOperator(task_id="t2", bash_command="echo t2 && sleep 5")
    task1 >> task2

事件触发示例(基于文件/事件驱动的“条件型触发”):

# 示例:当目录出现触发文件时调用 API 触发 DAG
# 在你的平台可替换为 Webhook 或告警触发器
inotifywait -m /data/trigger -e create |
while read dir action file; do
  if [ "$file" = "run_ops_pipeline" ]; then
    curl -X POST "http://localhost:8080/api/v1/dags/ops_pipeline/dagRuns" \
      -H "Content-Type: application/json" \
      -u "admin:admin" \
      -d '{"conf":{"reason":"event_trigger"}}'
  fi
done

排错清单与命令:

# 1) DAG 未加载:检查语法与路径
python -m py_compile /opt/airflow/dags/ops_pipeline.py
ls -l /opt/airflow/dags/

# 2) Scheduler 未运行或无心跳
ps -ef | grep "airflow scheduler"
tail -n 200 /opt/airflow/scheduler.log

# 3) 任务卡住:查看任务日志
airflow tasks log ops_pipeline rolling_restart 2023-09-01

# 4) 元数据库连接异常
airflow db check

常见依赖策略配置建议(可映射到你的平台配置项):
- 上游失败即阻断下游:用于强一致链路(备份→更新→重启)
- 允许部分失败继续:用于灰度或旁路作业(非核心节点)
- 失败自动重试与补偿作业:适合短暂故障(网络抖动、锁等待)

# 依赖策略示例(平台配置概念)
task: rolling_restart
depends:
  - task: update_config
    on_fail: block          # 上游失败阻断
  - task: config_check
    on_fail: continue       # 允许继续
retry:
  max: 2
  interval_sec: 60
compensate:
  task: rollback_config

练习:
1) 将 ops_pipeline 的 schedule_interval 改为每 5 分钟执行,并验证并发限制是否生效。
2) 增加一个“弱依赖”任务 config_check(失败不阻断),观察下游是否继续执行。
3) 手动制造 mysql_backup 失败(修改 bash_command 为 exit 1),确认失败传播与重试策略。
4) 使用 inotify 事件触发 DAG 执行,观察 conf 参数是否在运行记录中可见。