19.4.4 任务调度与依赖关系管理
任务调度与依赖关系管理是自动化运维平台的核心能力之一,需要覆盖时间调度、事件触发、资源约束与多任务依赖。调度策略应支持 Cron/日历型、事件型(监控告警、工单状态、代码合并)、以及条件型(资源阈值、配置一致性)三类触发方式,满足批量作业、周期作业与应急作业的统一编排。
依赖关系管理应提供DAG建模能力,明确前置/后置、并行/互斥、强依赖/弱依赖等关系类型,并支持依赖状态继承与失败传播策略。常见设计包括:上游失败即阻断下游、允许部分失败继续、失败自动重试与补偿性作业触发。依赖配置需支持跨系统任务(如数据库备份→配置更新→服务滚动重启→健康检查→流量切换)的可视化与版本化管理,确保变更可追踪、可回溯。
为了具象化调度与依赖管理,以下以 Airflow 为例演示安装、编排、执行与排错(可替换为你平台的调度引擎理念一致)。
安装与初始化(Ubuntu/CentOS 通用,建议 Python 3.9+):
# 1) 安装依赖与虚拟环境
sudo apt-get update && sudo apt-get install -y python3-venv
python3 -m venv /opt/airflow-venv
source /opt/airflow-venv/bin/activate
# 2) 安装 Airflow(sqlite 仅用于演示)
export AIRFLOW_HOME=/opt/airflow
pip install "apache-airflow==2.6.3" --constraint \
"https://raw.githubusercontent.com/apache/airflow/constraints-2.6.3/constraints-3.9.txt"
# 3) 初始化元数据库与创建用户
airflow db init
airflow users create \
--username admin --password admin \
--firstname ops --lastname admin \
--role Admin --email ops@example.com
# 4) 启动 Web 与 Scheduler
airflow webserver -p 8080 >/opt/airflow/web.log 2>&1 &
airflow scheduler >/opt/airflow/scheduler.log 2>&1 &
依赖与调度DAG示例(包含强依赖、并行、失败回调与重试):
# /opt/airflow/dags/ops_pipeline.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
default_args = {
"owner": "ops",
"retries": 2,
"retry_delay": timedelta(minutes=1),
}
with DAG(
dag_id="ops_pipeline",
start_date=datetime(2023, 1, 1),
schedule_interval="0 2 * * *", # 每天 02:00
catchup=False,
default_args=default_args,
tags=["ops", "deploy"],
) as dag:
backup = BashOperator(
task_id="mysql_backup",
bash_command="echo backup && sleep 2",
)
update_cfg = BashOperator(
task_id="update_config",
bash_command="echo update config && sleep 1",
)
restart = BashOperator(
task_id="rolling_restart",
bash_command="echo rolling restart && sleep 3",
)
health = BashOperator(
task_id="health_check",
bash_command="echo health check && sleep 1",
)
switch = BashOperator(
task_id="traffic_switch",
bash_command="echo switch traffic && sleep 1",
)
# 依赖关系
backup >> update_cfg
update_cfg >> restart >> health >> switch
关键命令与解释:
# 查看 DAG 列表(确认加载成功)
airflow dags list
# 手动触发一次执行
airflow dags trigger ops_pipeline
# 查看运行状态与任务列表
airflow tasks list ops_pipeline
airflow tasks state ops_pipeline mysql_backup 2023-09-01
调度并发、资源与窗口期控制示例(限制并发与指定运行窗口):
# /opt/airflow/dags/ops_window.py
from datetime import datetime
from airflow import DAG
from airflow.operators.bash import BashOperator
with DAG(
dag_id="ops_window",
start_date=datetime(2023, 1, 1),
schedule_interval="*/10 * * * *",
catchup=False,
max_active_runs=1, # 同一 DAG 并发限制
concurrency=2, # 同一 DAG 总并发
tags=["ops", "window"],
) as dag:
task1 = BashOperator(task_id="t1", bash_command="echo t1 && sleep 5")
task2 = BashOperator(task_id="t2", bash_command="echo t2 && sleep 5")
task1 >> task2
事件触发示例(基于文件/事件驱动的“条件型触发”):
# 示例:当目录出现触发文件时调用 API 触发 DAG
# 在你的平台可替换为 Webhook 或告警触发器
inotifywait -m /data/trigger -e create |
while read dir action file; do
if [ "$file" = "run_ops_pipeline" ]; then
curl -X POST "http://localhost:8080/api/v1/dags/ops_pipeline/dagRuns" \
-H "Content-Type: application/json" \
-u "admin:admin" \
-d '{"conf":{"reason":"event_trigger"}}'
fi
done
排错清单与命令:
# 1) DAG 未加载:检查语法与路径
python -m py_compile /opt/airflow/dags/ops_pipeline.py
ls -l /opt/airflow/dags/
# 2) Scheduler 未运行或无心跳
ps -ef | grep "airflow scheduler"
tail -n 200 /opt/airflow/scheduler.log
# 3) 任务卡住:查看任务日志
airflow tasks log ops_pipeline rolling_restart 2023-09-01
# 4) 元数据库连接异常
airflow db check
常见依赖策略配置建议(可映射到你的平台配置项):
- 上游失败即阻断下游:用于强一致链路(备份→更新→重启)
- 允许部分失败继续:用于灰度或旁路作业(非核心节点)
- 失败自动重试与补偿作业:适合短暂故障(网络抖动、锁等待)
# 依赖策略示例(平台配置概念)
task: rolling_restart
depends:
- task: update_config
on_fail: block # 上游失败阻断
- task: config_check
on_fail: continue # 允许继续
retry:
max: 2
interval_sec: 60
compensate:
task: rollback_config
练习:
1) 将 ops_pipeline 的 schedule_interval 改为每 5 分钟执行,并验证并发限制是否生效。
2) 增加一个“弱依赖”任务 config_check(失败不阻断),观察下游是否继续执行。
3) 手动制造 mysql_backup 失败(修改 bash_command 为 exit 1),确认失败传播与重试策略。
4) 使用 inotify 事件触发 DAG 执行,观察 conf 参数是否在运行记录中可见。