Menu

  • Home
  • Work
    • Cloud
      • Virtualization
      • IaaS
      • PaaS
    • Java
    • Go
    • C
    • C++
    • JavaScript
    • PHP
    • Python
    • Architecture
    • Others
      • Assembly
      • Ruby
      • Perl
      • Lua
      • Rust
      • XML
      • Network
      • IoT
      • GIS
      • Algorithm
      • AI
      • Math
      • RE
      • Graphic
    • OS
      • Linux
      • Windows
      • Mac OS X
    • BigData
    • Database
      • MySQL
      • Oracle
    • Mobile
      • Android
      • IOS
    • Web
      • HTML
      • CSS
  • Life
    • Cooking
    • Travel
    • Gardening
  • Gallery
  • Video
  • Music
  • Essay
  • Home
  • Work
    • Cloud
      • Virtualization
      • IaaS
      • PaaS
    • Java
    • Go
    • C
    • C++
    • JavaScript
    • PHP
    • Python
    • Architecture
    • Others
      • Assembly
      • Ruby
      • Perl
      • Lua
      • Rust
      • XML
      • Network
      • IoT
      • GIS
      • Algorithm
      • AI
      • Math
      • RE
      • Graphic
    • OS
      • Linux
      • Windows
      • Mac OS X
    • BigData
    • Database
      • MySQL
      • Oracle
    • Mobile
      • Android
      • IOS
    • Web
      • HTML
      • CSS
  • Life
    • Cooking
    • Travel
    • Gardening
  • Gallery
  • Video
  • Music
  • Essay

Kubernetes故障检测和自愈

16
Sep
2019

Kubernetes故障检测和自愈

By Alex
/ in Go,PaaS
/ tags K8S
0 Comments
前言

在Kubernetes日常运维过程中,会出现各种各样的问题,例如:

  1. 节点CNI不可用,其它节点无法连接到故障节点的Pod
  2. Subpath方式挂载的Configmap,特定条件下出现Pod无限重启的问题
  3. 集群DNS服务器无法通过上游DNS解析外部名称
  4. 节点假死,但是持有的Ceph RBD的Watcher不释放,导致有状态服务的Pod调度走后仍然无法启动
  5. 误删Etcd数据、持久卷

这些问题导致部分Pod、节点、甚至整个集群不可用,需要人工运维才能恢复。

从接收到告警到运维人员手工处理完毕,可能已经过了1小时,严重影响服务质量。但是如果能识别这些告警并将运维知识转化为代码,某些问题可能在一分钟内就被发现和解决。

本文调研商业产品和社区的集群/节点故障检测、修复技术的现状,为自研节点自愈产品提供参考。

故障类型
节点故障

Pod所在节点的内核、CRI运行时等出现问题,无法支持Pod的运行。针对这类故障,社区或商业的解决方案较多,例如社区的NPD项目、GKE的节点修复功能。

组件故障

组件故障可以认为是节点故障的子类,只是故障来源是K8S基础组件的一部分。

K8S集群基础组件出现故障,可能导致集群或在节点的部分功能不可用。我在线上环境遇到过的故障包括:

  1. KubeDNS故障:6个DNS Pod中的2个出现无法解析外部DNS名称的情况。后果是大量线上业务因域名解析
  2. Calico CNI故障:少数几个节点的容器网络和外部断开,节点访问自身的Pod IP没有问题,但是其它节点无法访问故障节点的Pod IP。这种情况下,Pod本机的健康检查无效,导致故障实例持续存在,一定比例的业务请求失败

由于K8S生态主要依赖于开源社区,很多组件不成熟,存在缺陷,因此这类故障较为纷杂。社区没有发现知名的解决方案,主要依赖于日常运维中知识的积累,而且这些运维知识往往是环境相关、K8S版本或组件版本相关的,通用性较差。

集群故障

K8S控制平面不存在单点问题 ,通常情况下,出现整个集群的故障的概率是很低的。但是,某些行业对数据安全和可用性要求极高,另外,也出现过误操作导致集群破坏的案例,集群故障恢复还是需要考虑的。

应对集群故障的主要手段就是备份和恢复,Velero可以帮助我们实现这一点。它支持K8S资源(Etcd)、持久卷的备份,通过开发插件,某些存储后端可以基于快照来备份,效率很高。

故障检测

我认为节点/组件的故障,根据需要,可以从两个角度发起。

本地检测

绝大部分故障检测,在节点本地进行就足够了,这样做效率高,避免不必要的网络流量。

远程检测

少部分网络相关的故障,可能从节点无法检测,这就需要远程检测。

还是上面的Calico CNI故障的例子,故障节点本身访问自己的Pod IP是畅通的,然而外部却无法访问。我们的应对方案,是运行在集群中运行3个Health Check Controller,如果大部分副本判定:节点A网络畅通但是节点A上的Pod Ip却无法联通,则认定节点A的CNI出现故障,需要进行处理。

商业产品调研
GKE

谷歌K8S引擎(Google Kubernetes Engine)提供了自动修复节点的功能。GKE会周期性的检测集群中每个节点的健康状态,如果某个节点的健康检查连续N次失败,则启动一个修复进程,对节点进行修复。

处于Ready状态的节点被认为是健康的,不健康节点可能处于以下状况:

  1. 连续数次健康检查,报告NotReady状态
  2. 在指定的时间范围内,节点没有报告任何状态
  3. 节点在一个指定的时间范围内,处于磁盘空间不足的状态

GKE修复节点的方法比较简单,就是Drain并重新创建。Drain操作会导致节点上的Pod被驱除。

如果多个节点需要修复,GKE可以并行的执行修复。

node-problem-detector
简介

这是一个K8S加载项(Addon),目的是将节点故障暴露给集群管理的上层组件。NPD通常运行为DaemonSet,也可以作为独立进程运行。NPD会检测各种各样的节点问题,例如:

  1. 基础设施服务故障:例如NTP服务宕机
  2. 硬件问题:CPU、内存、磁盘故障
  3. 内核问题:内核死锁、文件系统损坏
  4. 容器运行时错误:Docker守护进程假死

并报告给APIServer,报告的主要方式包括:

  1. NodeCondition:当遇到永久性的节点故障,导致其不可用时,设置节点的NodeCondition
  2. Event:可能对Pod产生影响的临时信息

在没有引入NPD的情况下,上面的各种节点问题对于K8S集群管理上层组件不可见,因此K8S会继续向问题节点调度Pod。

PDS(Monitors)

Problem Daemon(在代码内部也叫Monitor)是NPD的子守护进程,每个PD监控一个特定类型的节点故障,并报告给NPD。目前PD以Goroutine的形式运行在NPD中,未来会支持在独立进程(容器)中运行并编排为一个Pod。在编译期间,可以通过相应的标记禁用每一类PD。

目前可用的PD包括:

PD NodeCondition 说明
KernelMonitor KernelDeadlock

监控内核日志,根据预定义规则来报告问题、指标

使用标记禁用:disable_system_log_monitor

KernelMonitor、AbrtAdaptor都属于System Log Monitor,只是使用的配置文件不同。

SLM支持基于文件的日志、Journald、kmsg。要监控其它日志,需要实现LogWatcher接口

 对于临时问题,SLM暴露counter类的指标,示例:

Shell
1
2
3
# HELP problem_counter Number of times a specific type of problem have occurred.
# TYPE problem_counter counter
problem_counter{reason="TaskHung"} 2

对于永久问题,同时报告为gauge、counter:

Shell
1
2
3
# HELP problem_gauge Whether a specific type of problem is affecting the node or not.
# TYPE problem_gauge gauge
problem_gauge{condition="KernelDeadlock",reason="DockerHung"} 1
AbrtAdaptor 无

监控ABRT(Automatic Bug Report Tool)日志并报告。ABRT是一个健康监控守护进程,能够捕获内核问题、各种原因导致的应用崩溃

使用标记禁用:disable_system_log_monitor

CustomPluginMonitor 依用户配置

通过调用用户配置的脚本来检测各种节点问题

脚本退出码:

  1. 0:对于Evnet来说表示Normal,对于NodeCondition表示False
  2. 1:对于Evnet来说表示Warning,对于NodeCondition表示True

脚本输出应该小于80字节,避免给Etcd的存储造成压力

使用标记禁用:disable_custom_plugin_monitor

SystemStatsMonitor 暂无

将各种健康相关的统计信息报告为Metrics

目前支持的组件仅仅有主机信息、磁盘:

disk/io_time 设备队列非空时间,毫秒
disk/weighted_io 设备队列非空时间加权,毫秒
disk/avg_queue_len 上次调用插件以来,平均排队请求数

使用标记禁用:disable_system_stats_monitor

Exporters

NPD提供了若干Exporter组件,能够将节点问题、指标报告给后端:

Exporter  
Kubernetes Exporter 暴露临时问题为Event、永久问题为NodeCondition
Prometheus Exporter 暴露节点问题、指标为Prometheus metrics
Stackdriver Exporter

暴露节点问题、指标给Stackdriver监控API

使用标记禁用:disable_stackdriver_exporter

应用场景
节点监控

故障节点上的事件,会记录在宿主机的某些日志中。这些日志(例如内核日志)中噪音信息太多,NPD会提取其中有价值的信息,记录到自己的Pod日志中。你可以通过EFK收集这些信息,NPD也可以将这些信息报送给Prometheus。

节点自愈

基于NPD的的节点自愈流程如下:

  1. NPD为故障节点添加额外的Condition元数据
  2. Cordon并Drain故障节点
  3. 利用cluster-autoscaler进行集群扩容,补充节点

这个流程本质上是替换,而不是治愈节点。在裸金属K8S集群中,由于缺乏基础设施的支撑,自动扩充节点可能无法实现,只能通过更加精细的自动化运维,治愈节点的异常状态。

以CNI故障为例,可能的治愈流程如下:

  1. 查询运维知识库,如果找到匹配项,执行对应的运维动作
  2. 如果上述步骤无效,尝试删除节点上负责CNI的Pod,以重置节点的路由、Iptables配置
  3. 如果上述步骤无效,尝试重启容器运行时
  4. 告警,要求运维人员介入
构建

NPD使用Go modules管理依赖,因此构建它需要Go SDK 1.11+:

Shell
1
2
3
4
5
6
7
8
9
10
11
12
13
cd $GOPATH/src/k8s.io
go get k8s.io/node-problem-detector
cd node-problem-detector
 
export GO111MODULE=on
go mod vendor
 
# 设置构建标记
export BUILD_TAGS="disable_custom_plugin_monitor disable_system_stats_monitor"
 
# 在Ubuntu 14.04上需要安装
sudo apt install libsystemd-journal-dev
make all
用法
安装

可以通过Helm安装:

Shell
1
helm install stable/node-problem-detector

依据宿主机操作系统的不同,你可能需要修改挂载为HostPath的宿主机日志目录(默认/var/log/)、内核消息目录(默认/dev/kmsg)的路径。 

 各PD的配置,均放在ConfigMap中,按需修改。

命令行参数
参数 说明
--hostname-override 覆盖NPD更新Condition、Evnet时使用的主机节点名,如果不指定,依次尝试NODE_NAME环境变量、os.Hostname
--config.system-log-monitor PD AbrtAdaptor的配置文件路径,逗号分隔,示例:
JSON
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
{
    "plugin": "kmsg",
    // 读取内核环缓冲设备
    "logPath": "/dev/kmsg",
    "lookback": "5m",
    "bufferSize": 10,
    "source": "kernel-monitor",
    "metricsReporting": true,
    // 定义新的NodeConditions
    "conditions": [
        {
            "type": "KernelDeadlock",
            "reason": "KernelHasNoDeadlock",
            "message": "kernel has no deadlock"
        },
        {
            "type": "ReadonlyFilesystem",
            "reason": "FilesystemIsNotReadOnly",
            "message": "Filesystem is not read-only"
        }
    ],
    // 检测问题的规则列表
    "rules": [
        {
            // 问题类别可以是temporary、permanent
            "type": "temporary",
            "reason": "OOMKilling",
            // 匹配日志内容,支持多行匹配
            "pattern": "Kill process \\d+ (.+) score \\d+ or sacrifice child\\nKilled process \\d+ (.+) total-vm:\\d+kB, anon-rss:\\d+kB, file-rss:\\d+kB.*"
        },
        {
            "type": "temporary",
            "reason": "TaskHung",
            "pattern": "task \\S+:\\w+ blocked for more than \\w+ seconds\\."
        },
        {
            "type": "temporary",
            "reason": "UnregisterNetDevice",
            "pattern": "unregister_netdevice: waiting for \\w+ to become free. Usage count = \\d+"
        },
        {
            "type": "temporary",
            "reason": "KernelOops",
            "pattern": "BUG: unable to handle kernel NULL pointer dereference at .*"
        },
        {
            "type": "temporary",
            "reason": "KernelOops",
            "pattern": "divide error: 0000 \\[#\\d+\\] SMP"
        },
        {
            // 永久问题,记录为Node对象的Conditions
            "type": "permanent",
            // NodeCondition.Type,驼峰式大小写
            "condition": "KernelDeadlock",
            // NodeCondition.Reason 同样驼峰式大小写,通常为上述Type的子类型
            "reason": "AUFSUmountHung",
            "pattern": "task umount\\.aufs:\\w+ blocked for more than \\w+ seconds\\."
        },
        {
            "type": "permanent",
            "condition": "KernelDeadlock",
            "reason": "DockerHung",
            "pattern": "task docker:\\w+ blocked for more than \\w+ seconds\\."
        },
        {
            "type": "permanent",
            "condition": "ReadonlyFilesystem",
            "reason": "FilesystemIsReadOnly",
            "pattern": "Remounting filesystem read-only"
        }
    ]
}

对于每个配置,NPD会启动一个独立的日志监控线程 

--config.system-stats-monitor PD SystemStatsMonitor的配置文件路径,逗号分隔,示例:
JSON
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
{
    "disk": {
        // 指定需要收集的指标
        "metricsConfigs": {
            "disk/io_time": {
                "displayName": "disk/io_time"
            },
            "disk/weighted_io": {
                "displayName": "disk/weighted_io"
            },
            "disk/avg_queue_len": {
                "displayName": "disk/avg_queue_len"
            }
        },
        // 设置为true则将所有块设备(slave、holder除外)加入到指标收集列表
        "includeRootBlk": true,
        // 设置为true,则将所有分区加入到指标收集列表
        "includeAllAttachedBlk": true,
        // 此PD通过lsblk获取设备信息,此选项设置获取的超时
        "lsblkTimeout": "5s"
    },
    "host": {
        "metricsConfigs": {
            "host/uptime": {
                "displayName": "host/uptime"
            }
        }
    },
    "invokeInterval": "60s"
}

 对于每个配置,NPD会启动一个独立的统计信息监控线程 

--config.custom-plugin-monitor PD CustomPluginMonitor的配置文件路径,逗号分隔,示例:
JSON
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
{
  "plugin": "custom",
  "pluginConfig": {
    // 调用自定义插件的间隔
    "invoke_interval": "30s",
    // 调用自定义插件的超时,超过5s脚本没有退出则认为超时
    "timeout": "5s",
    // 最多读取自定义插件的标准输出的长度,用作Condition状态消息
    "max_output_length": 80,
    // 工作线程数量,也就是说多少个自定义插件可以被并发的调用
    "concurrency": 3,
    // 状态消息变更,是否应该导致Condition更新
    "enable_message_change_based_condition_update": false
  },
  // 其它字段和SLM类似
  "source": "ntp-custom-plugin-monitor",
  "metricsReporting": true,
  "conditions": [
    {
      "type": "NTPProblem",
      "reason": "NTPIsUp",
      "message": "ntp service is up"
    }
  ],
  "rules": [
    {
      "type": "temporary",
      "reason": "NTPIsDown",
      "path": "./config/plugin/check_ntp.sh",
      "timeout": "3s"
    },
    {
      "type": "permanent",
      "condition": "NTPProblem",
      "reason": "NTPIsDown",
      "path": "./config/plugin/check_ntp.sh",
      "timeout": "3s"
    }
  ]
}

 插件的逻辑编写在脚本中:

check_ntp.sh
Shell
1
2
3
4
5
6
7
8
9
10
11
12
13
14
#!/bin/bash
 
# NOTE: THIS NTP SERVICE CHECK SCRIPT ASSUME THAT NTP SERVICE IS RUNNING UNDER SYSTEMD.
#       THIS IS JUST AN EXAMPLE. YOU CAN WRITE YOUR OWN NODE PROBLEM PLUGIN ON DEMAND.
 
systemctl status ntp.service | grep 'Active:' | grep -q 'running'
ret=$?
if [ $ret -ne 0 ]; then
    echo "NTP service is down."
    exit 1
fi
 
echo "NTP service is up."
exit 0

对于每个配置,NPD会启动一个独立的监控线程

--enable-k8s-exporter  启用Kubernetes Exporter,默认true 
--apiserver-override

覆盖报告到的API Server的地址,格式和Heapster的source标记相同

如果以Standalone方式运行NPD,需要设置inClusterConfig为false:

Shell
1
http://APISERVER_IP:APISERVER_PORT?inClusterConfig=false
--address  NPD服务器的绑定地址
--port NPD服务器的绑定端口,设置为0禁用

--prometheus-address
--prometheus-port

Prometheus Export的监听地址,默认127.0.0.1:20257
治愈系统

在NPD的术语中,治愈系统(Remedy System)是一个或一组进程,负责分析NPD检测出的问题,并且采取补救措施,让K8S集群恢复健康状态。

目前官方提及的治愈系统有只有Draino。NPD项目并没有提供对Draino的集成,你需要手工部署和配置Draino。

Draino

Draino是Planet开源的小项目,最初在Planet用于解决GCE上运行的K8S集群的持久卷相关进程(mkfs.ext4、mount等)永久卡死在不可中断睡眠状态的问题。Draino的工作方式简单粗暴,只是检测到NodeCondition并Cordon、Drain节点。

基于Label和NodeCondition自动的Drain掉故障K8S节点:

  1. 具有匹配标签的的K8S节点,只要进入指定的NodeCondition之一,立即禁止调度(Cordoned) 
  2. 在禁止调度之后一段时间,节点被Drain掉

Draino可以联用Cluster Autoscaler,自动的终结掉Drained的节点。

在Descheduler项目成熟以后,可以代替Draino。

核心源码分析
如何调试

建议本地启动NPD并调试,参考下面的命令行参数:

Shell
1
2
3
--hostname-override=xenial-100 --apiserver-wait-timeout=10s --logtostderr --stderrthreshold=0 -v=10 \
--apiserver-override=http://k8s.gmem.cc:6444?inClusterConfig=false \
--config.system-log-monitor=/home/alex/Go/workspaces/default/src/k8s.io/node-problem-detector/config/docker-monitor-filelog.json 
核心模型

NPD抽象了一系列基本的类型,目前供内部使用,这些类型比起K8S API中的对应物更加轻量:

Go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
package types
 
import (
    "time"
    "github.com/spf13/pflag"
)
 
// 问题的严重性,目前仅仅支持Info和Warn,和K8S事件类型对应
type Severity string
const (
    Info Severity = "info"   // 对应K8S的Normal事件
    Warn Severity = "warn"   // 对应K8S的Warning事件
)
 
// NodeCondition的状态
type ConditionStatus string
const (
    // 节点处于目标状态
    True ConditionStatus = "True"
    // 节点不处于目标状态
    False ConditionStatus = "False"
    // 不清楚
    Unknown ConditionStatus = "Unknown"
)
 
// 建模NodeCondition
type Condition struct {
    // NodeCondition类型,例如KernelDeadlock, OutOfResource
    Type string `json:"type"`
    // 节点是否处于此NodeCondition
    Status ConditionStatus `json:"status"`
    // 节点转换为此Condition的时间
    Transition time.Time `json:"transition"`
    // 记录为何进入此Condition的简短原因
    Reason string `json:"reason"`
    // 人类可读的,进入此Condition的原因
    Message string `json:"message"`
}
 
// 建模Event
type Event struct {
    // 严重性
    Severity Severity `json:"severity"`
    // 事件发生时间
    Timestamp time.Time `json:"timestamp"`
    // 事件的简短原因
    Reason string `json:"reason"`
    // 人类可读的消息
    Message string `json:"message"`
}
 
// PD向NPD核心报告时使用的DTO
type Status struct {
    // PD的名称
    Source string `json:"source"`
    // 临时的节点问题 —— 事件对象,如果此Status用于Condition更新则此字段可以为空
    // 从老到新排列在数组中
    Events []Event `json:"events"`
    // 永久的节点问题 —— NodeCondition。PD必须总是在此字段报告最新的Condition
    Conditions []Condition `json:"conditions"`
}
 
// 建模问题分类
type Type string
const (
    // 临时问题,报告为Event
    Temp Type = "temporary"
    // 永久问题,报告为NodeCondition
    Perm Type = "permanent"
)
 
// PD的接口。PD根据配置的规则,监控并报告节点问题、收集Metrics
type Monitor interface {
    // 启动此PD,将返回一个通道,NPD核心从此通道获取状态更新
    // 如果此PD仅仅报告Metrics,不关注Problem,则返回的通道应该设置为nil
    Start() (<-chan *Status, error)
    // 停止此PD
    Stop()
}
 
// 将节点的监控状态报告给某种控制平面,例如K8S API Server,或者Prometheus
type Exporter interface {
    // 报告问题,此方法由NPD核心调用,传递问题给Exporter,Exporter则将问题传递到NPD外部
    ExportProblems(*Status)
}
 
// PD类型,每个PD类型可以启动多个实例,每个实例对应一个配置
type ProblemDaemonType string
 
// 此映射建模所有PD的所有配置
// 1) 每个键对应一种PD
// 2) 每个值的每个元素对应配置文件的路径
type ProblemDaemonConfigPathMap map[ProblemDaemonType]*[]string
 
// 每种PD负责提供一个下面的结构的实例,其函数指针作为实例化PD的工厂
type ProblemDaemonHandler struct {
    CreateProblemDaemonOrDie func(string) Monitor
    // 描述如何从命令行实例化PD
    CmdOptionDescription string
}
 
// Exporter的类型
type ExporterType string
 
// 每种Exporter负责提供一个下面的结构的实例,其函数指针作为实例化Exporter的工厂
type ExporterHandler struct {
    CreateExporterOrDie func(CommandLineOptions) Exporter
    // 描述如何从命令行实例化PD
    Options CommandLineOptions
}
 
// 用于注入命令行选项
type CommandLineOptions interface {
    SetFlags(*pflag.FlagSet)
} 
入口点
Go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
func main() {
    // ...
    // 读取并验证命令行选项
 
    // 初始化所有配置的PD,参数类型为map[ProblemDaemonType]*[]string,每种PD可提供多个配置文件
    // 每种PD都有对应的ProblemDaemonHandler,调用其CreateProblemDaemonOrDie方法、传入配置文件
    // 并创建Goroutine
    problemDaemons := problemdaemon.NewProblemDaemons(npdo.MonitorConfigPaths)
    // ...
    // 初始化所有Exporters
    defaultExporters := []types.Exporter{}
    if ke := k8sexporter.NewExporterOrDie(npdo); ke != nil {
        defaultExporters = append(defaultExporters, ke)
    }
    if pe := prometheusexporter.NewExporterOrDie(npdo); pe != nil {
        defaultExporters = append(defaultExporters, pe)
    }
    // K8S、Prometheus是内置Exporter,还可以支持可拔插的Experters。
    plugableExporters := exporters.NewExporters()
    npdExporters := []types.Exporter{}
    npdExporters = append(npdExporters, defaultExporters...)
    npdExporters = append(npdExporters, plugableExporters...)
 
    // 初始化NPD核心并启动
    p := problemdetector.NewProblemDetector(problemDaemons, npdExporters)
    if err := p.Run(); err != nil {
        glog.Fatalf("Problem detector failed with error: %v", err)
    }
}
初始化Monitors

NPD要求至少启用一个PD,否则NPD就没有输入,没有实际意义。具体需要初始化哪些PD,取决于你提供的命令行参数。

Go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
func NewProblemDaemons(monitorConfigPaths types.ProblemDaemonConfigPathMap) []types.Monitor {
    problemDaemonMap := make(map[string]types.Monitor)
    // 遍历配置
    for problemDaemonType, configs := range monitorConfigPaths {
        // 处理每个PD类型
        for _, config := range *configs {
            if _, ok := problemDaemonMap[config]; ok {
                // 跳过重复配置
                continue
            }
            // 为每个PD的每个配置文件创建PD实例,                       调用工厂函数
            problemDaemonMap[config] = handlers[problemDaemonType].CreateProblemDaemonOrDie(config)
        }
    }
 
    problemDaemons := []types.Monitor{}
    for _, problemDaemon := range problemDaemonMap {
        problemDaemons = append(problemDaemons, problemDaemon)
    }
    // 返回PD列表
    return problemDaemons
}
初始化Exporters

如果启用了Kubernertes Exporter,检测到的节点问题将报告为K8S的NodeCondition和Event。启动此Exporter的代码如下:

Go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func NewExporterOrDie(npdo *options.NodeProblemDetectorOptions) types.Exporter {
    // ...
    // 创建一个问题客户端,此客户端能够读写当前节点的问题、事件
    c := problemclient.NewClientOrDie(npdo)
 
    // 连接到K8S API Server
    waitForAPIServerReadyWithTimeout(c, npdo)
 
    ke := k8sExporter{
        client:           c,
        // ConditionManager利用ProblemClient,将节点状态同步给API Server
        conditionManager: condition.NewConditionManager(c, clock.RealClock{}),
    }
 
    // 启动一个HTTP服务,在端点/conditions提供NodeCondition查询功能
    ke.startHTTPReporting(npdo)
    // 启动一个异步线程,定期同步到API Server
    ke.conditionManager.Start()
 
    return &ke
}

如果启用了Prometheus Exporter,则会启动一个HTTP服务器,供Prometheus Server来抓取指标:

Go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func NewExporterOrDie(npdo *options.NodeProblemDetectorOptions) types.Exporter {
    // ...
    addr := net.JoinHostPort(npdo.PrometheusServerAddress, strconv.Itoa(npdo.PrometheusServerPort))
    // 创建Prometheus的Exporter对象,它实现server.Handler
    pe, err := prometheus.NewExporter(prometheus.Options{})
    go func() {
        mux := http.NewServeMux()
        // 处理Exporter请求
        mux.Handle("/metrics", pe)
        if err := http.ListenAndServe(addr, mux); err != nil {
        }
    }()
    // 集成OpenCensus
    view.RegisterExporter(pe)
    return &prometheusExporter{}
}
启动NPD 

入口点的最后一步是初始化NPD核心。NPD会持有所有Monitors、Exporters:

Go
1
2
3
4
type problemDetector struct {
    monitors  []types.Monitor
    exporters []types.Exporter
}

NPD的外部接口很简单:

Go
1
2
3
4
type ProblemDetector interface {
    // 运行NPD
    Run() error
}

我们看一下Run方法的实现:

Go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
func (p *problemDetector) Run() error {
    // 逐个启动Monitors
    // 所有Monitor的输出通道
    var chans []<-chan *types.Status
    failureCount := 0
    for _, m := range p.monitors {
        // 启动Monitor
        ch, err := m.Start()
        if err != nil {
            // 失败,尝试下一个
            failureCount += 1
            continue
        }
        if ch != nil {
            // 保存输出通道
            chans = append(chans, ch)
        }
    }
    if len(p.monitors) == failureCount {
        // 所有PD都启动失败,失败
        return fmt.Errorf("no problem daemon is successfully setup")
    }
    // 监听所有PD的输出通道,并将其中的Status归集到单个通道ch中
    ch := groupChannel(chans)
    glog.Info("Problem detector started")
 
    // 收集到的PD输出,必须交给Exporter进行处理,才有价值
    for {
        select {
        case status := <-ch:
            for _, exporter := range p.exporters {
                exporter.ExportProblems(status)
            }
        }
    }
}
 
// 为每个PD的输出通道创建Goroutine
// 这些Goroutine接收到PD的状态报告后,将其合并到单个通道
func groupChannel(chans []<-chan *types.Status) <-chan *types.Status {
    statuses := make(chan *types.Status)
    for _, ch := range chans {
        go func(c <-chan *types.Status) {
            for status := range c {
                statuses <- status
            }
        }(ch)
    }
    return statuses
}
扩展Monitors

入口点中的语句: problemdaemon.NewProblemDaemons(npdo.MonitorConfigPaths)负责初始化所有配置的PD。

对于每种PD的每个配置文件,都会调用:ProblemDaemonHandler.CreateProblemDaemonOrDie进行PD实例化:

Go
1
2
3
4
5
6
7
8
problemDaemonMap[config] = handlers[problemDaemonType].CreateProblemDaemonOrDie(config)
 
type ProblemDaemonHandler struct {
    // 创建PD
    CreateProblemDaemonOrDie func(string) Monitor
    // 命令行选项
    CmdOptionDescription string
}

开发自己的PD时,你需要提供ProblemDaemonHandler的实例并调用problemdaemon.Register进行注册:

Go
1
2
3
4
5
6
7
var (
    handlers = make(map[types.ProblemDaemonType]types.ProblemDaemonHandler)
)
 
func Register(problemDaemonType types.ProblemDaemonType, handler types.ProblemDaemonHandler) {
    handlers[problemDaemonType] = handler
}

同时,提供Monitor接口的实现。 

扩展Exporters

入口点中的语句: plugableExporters := exporters.NewExporters()负责初始化扩展的Exporters,Stackdriver Exporter就是这样的一种扩展。

NewExporters的逻辑很简单,遍历一个集合,取出其中的ExporterHandler并实例化Exporter:

Go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func NewExporters() []types.Exporter {
    exporters := []types.Exporter{}
    for _, handler := range handlers {
        exporter := handler.CreateExporterOrDie(handler.Options)
        if exporter == nil {
            continue
        }
        exporters = append(exporters, exporter)
    }
    return exporters
}
 
 
// ExporterHandler是结构,不是接口。每个Exporter插件需要指定
// 一个函数指针,作为创建Exporter的工厂函数
type ExporterHandler struct {
    CreateExporterOrDie func(CommandLineOptions) Exporter
    Options CommandLineOptions
}

exporters包对外暴露了注册扩展Exporter的接口:

Go
1
2
3
4
5
6
7
var (
    handlers = make(map[types.ExporterType]types.ExporterHandler)
)
 
func Register(exporterType types.ExporterType, handler types.ExporterHandler) {
    handlers[exporterType] = handler
}

开发自己的Exporter时,你需要调用上面的Register注册Exporter的工厂函数,同时实现Exporter接口。

Metric

NPD对指标这一概念也进行了封装,它依赖OpenCensus而不是Prometheus这样具体的实现的API。

OpenCensus是一个开源项目,对比OpenTracing,在Tracing的基础上加了Metrics功能。现在两个项目已经合并为OpenTelemetry并进入CNCF沙箱。OpenTelemetry统一了数据格式规范、SDK,推荐用Prometheus作为Metrics后端,Jaeger做Tracing后端。

所有指标如下:

Go
1
2
3
4
5
6
7
8
const (
    ProblemCounterID  MetricID = "problem_counter"
    ProblemGaugeID    MetricID = "problem_gauge"
    DiskIOTimeID      MetricID = "disk/io_time"
    DiskWeightedIOID  MetricID = "disk/weighted_io"
    DiskAvgQueueLenID MetricID = "disk/avg_queue_len"
    HostUptimeID      MetricID = "host/uptime"
)

前两个是针对所有Problem的Counter/Gauge,后面几个都是SystemStatsMonitor暴露的指标。

NPD定义了两种数据类型的指标Int64Metric、Float64Metric,前者代码如下:

Go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
package metrics
 
import (
    "context"
    "fmt"
 
    "go.opencensus.io/stats"
    "go.opencensus.io/stats/view"
    "go.opencensus.io/tag"
)
 
// Int64MetricRepresentation 表示一个int64类型的指标的快照值
type Int64MetricRepresentation struct {
    // 指标名
    Name string
    // 指标标签集
    Labels map[string]string
    // 指标的顺时值
    Value int64
}
 
// Int64Metric 表示一个int64类型的指标
type Int64Metric struct {
    // 指标名
    name    string
    // OpenCensus中的概念,本质上就是一个描述符
    measure *stats.Int64Measure
}
 
// 工厂函数,创建指标
func NewInt64Metric(metricID MetricID, viewName string, description string, unit string, aggregation Aggregation, tagNames []string) (*Int64Metric, error) {
    // OpenCensus中的View
    if viewName == "" {
        return nil, nil
    }
    // 建立指标ID和视图名的对应关系
    MetricMap.AddMapping(metricID, viewName)
    // 将标签名转换为OpenCensus的Tag键
    tagKeys, err := getTagKeysFromNames(tagNames)
 
    // 将NPD的Aggregation转换为OpenCensus的Aggregation
    // OpenCensus中的Aggregation表示聚合值的方法
    var aggregationMethod *view.Aggregation
    switch aggregation {
    case LastValue:
        aggregationMethod = view.LastValue() // 仅仅报告最后记录的值
    case Sum:
        aggregationMethod = view.Sum() // 对所有收集的值进行求和
    default:
        return nil, fmt.Errorf("unknown aggregation option %q", aggregation)
    }
    // 创建Int64Measure度量
    measure := stats.Int64(viewName, description, unit)
    // 创建上述度量的视图
    newView := &view.View{
        Name:        viewName,
        Measure:     measure,
        Description: description,
        Aggregation: aggregationMethod,
        TagKeys:     tagKeys,
    }
    // 注册此度量的描述符measureDescriptor
    view.Register(newView)
    // 返回NPD的封装
    metric := Int64Metric{viewName, measure}
    return &metric, nil
}
 
// 为指标记录一个度量,并使用提供的Tag作为指标标签
func (metric *Int64Metric) Record(tags map[string]string, measurement int64) error {
    // Mutator能够对tag map 进行变换
    var mutators []tag.Mutator
 
    tagMapMutex.RLock()
    defer tagMapMutex.RUnlock()
 
    for tagName, tagValue := range tags {
        tagKey, ok := tagMap[tagName]
        if !ok {
            return fmt.Errorf("referencing none existing tag %q in metric %q", tagName, metric.name)
        }
        // 添加这样的Mutator,如果tagKey存在则更新,否则插入
        mutators = append(mutators, tag.Upsert(tagKey, tagValue))
    }
 
    // RecordWithTags能够一次性记录一个或多个度量值
    return stats.RecordWithTags(
        context.Background(),
        // 提供Tag
        mutators,
        // 调用*stats.Int64Measure的M方法,可以创建一个Measurement
        // Measurement的本质是一个值,同时包含Int64Measure及其measureDescriptor的引用
        metric.measure.M(measurement))
}

OpenCensus中各种概念比较繁琐,讲清楚需要独立开一篇文章。这里牵涉到的有:

  1. tag.Mutator:这个接口负责为度量值生成标签(名、值对)
  2. stats.Measure:度量,此接口表示一个指标,和Prometheus中的Metric对应。度量只具有名称、描述、单位三个属性,不包含标签,或者值。每种度量都提供了方法来创建度量值,例如Int64Measure.M方法将int64转换为Measurement。度量对外不可见,要将度量值导出,必须使用视图。如果没有为Measure定义视图,则记录Measure的成本非常低
  3. stats.Measurement:度量值,此接口表示Measure的一个具体的采集值。
  4. view.View:视图,用于聚合、对外展示已经记录的度量值。视图具有唯一性的名称、关联唯一的度量、具有确定的标签名集合,以及一个确定的聚合函数

如果从上面的stats.RecordWithTags调用跟踪下去,可以看到OpenCensus最终仅仅会调用一个 internal.DefaultRecorder这个函数:

Go
1
2
3
4
5
6
7
8
9
func record(tags *tag.Map, ms interface{}, attachments map[string]interface{}) {
    req := &recordReq{
        tm:          tags,
        ms:          ms.([]stats.Measurement),
        attachments: attachments,
        t:           time.Now(),
    }
    defaultWorker.c <- req
}

可以看到此函数构建一个记录请求,并从通道发出。接收此请求并处理的Goroutine如下:

Go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
func (w *worker) start() {
    // 全局的Metrics生产者管理器
    prodMgr := metricproducer.GlobalManager()
    // 注册自己
    prodMgr.AddProducer(w)
 
    for {
        select {
        case cmd := <-w.c:
            // 处理记录请求
            cmd.handleCommand(w)
        case <-w.timer.C:
            w.reportUsage(time.Now())
        case <-w.quit:
            w.timer.Stop()
            close(w.c)
            w.done <- true
            return
        }
    }
}
 
func (cmd *recordReq) handleCommand(w *worker) {
    // Worker锁
    w.mu.Lock()
    defer w.mu.Unlock()
    // 一个请求中可以具有多个stats.Measurement
    for _, m := range cmd.ms {
        if (m == stats.Measurement{}) {
            continue
        }
        // 获取度量值的度量的所有视图
        ref := w.getMeasureRef(m.Measure().Name())
        for v := range ref.views {
            // 向所有视图添加此度量,内部会调用aggregator.addSample
            v.addSample(cmd.tm, m.Value(), cmd.attachments, time.Now())
        }
    }
}

最终,记录的、聚合后的指标值,就是放在View中的。

那么,外部怎么访问这些值?其实,你打开http://127.0.0.1:20257/metrics可以看到,Prometheus的Exporter已经获取到OpenCensus记录的数据了。那么Prometheus和OpenCensus是如何配合的呢?

Prometheus在处理/metrics请求时,会调用prometheus.Gatherer,此接口的实现是prometheus.Registry。在NPD启动期间,它会创建一个Registry:

Go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func NewExporter(o Options) (*Exporter, error) {
    if o.Registry == nil {
        // 创建Prometheus注册表
        o.Registry = prometheus.NewRegistry()
    }
    // 创建指标收集器
    collector := newCollector(o, o.Registry)
    // ...
}
 
func newCollector(opts Options, registrar *prometheus.Registry) *collector {
    return &collector{
        reg:    registrar,
        opts:   opts,
        // 通过此Reader读取OpenCensus采集的数据
        reader: metricexport.NewReader()}
}

通过上述代码可以看到Prometheus如何和OpenCensus集成的,它们之间的接口是metricexport.Reader,此接口是OpenCensus提供的,Prometheus依赖于此接口。

metricexport.NewReader()调用创建的Reader具有读取OpenCensus采集的数据的能力。 

Tomb

用于从外部控制协程的生命周期, 它的逻辑很简单,准备结束生命周期时:

  1. 外部协作者发起一个通知
  2. 协作线程接收到通知,进行清理
  3. 清理完成后,协程反向通知外部协作者
  4. 外部协作者退出阻塞
Go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package tomb
 
type Tomb struct {
    stop chan struct{}  // 当生命周期结束时,外部关闭此通道,以通知协程
    done chan struct{}  // 当协程完成清理后,关闭此通道,以通知Stop的调用者
}
 
func NewTomb() *Tomb {
    return &Tomb{
        stop: make(chan struct{}),
        done: make(chan struct{}),
    }
}
 
// 从外部(另外一个Goroutine)进行阻塞性的关闭操作
func (t *Tomb) Stop() {
    close(t.stop)
    <-t.done
}
 
// 简单的返回Stop通道,如果已经通知关闭,则此读取此通道不会阻塞
func (t *Tomb) Stopping() <-chan struct{} {
    return t.stop
}
 
// 协程内部负责调用此函数,反向通知协作者,告诉它清理工作已经完成
func (t *Tomb) Done() {
    close(t.done)
} 
NPD源码分析
SystemLogMonitor

此PD能够分析各种形式的日志,读取其内容,使用正则式匹配来发现节点故障。主要代码如下:

Go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
package systemlogmonitor
 
import (
    "encoding/json"
    "io/ioutil"
    "time"
 
    "github.com/golang/glog"
 
    "k8s.io/node-problem-detector/pkg/problemdaemon"
    "k8s.io/node-problem-detector/pkg/problemmetrics"
    "k8s.io/node-problem-detector/pkg/systemlogmonitor/logwatchers"
    watchertypes "k8s.io/node-problem-detector/pkg/systemlogmonitor/logwatchers/types"
    logtypes "k8s.io/node-problem-detector/pkg/systemlogmonitor/types"
    systemlogtypes "k8s.io/node-problem-detector/pkg/systemlogmonitor/types"
    "k8s.io/node-problem-detector/pkg/types"
    "k8s.io/node-problem-detector/pkg/util"
    "k8s.io/node-problem-detector/pkg/util/tomb"
)
 
const SystemLogMonitorName = "system-log-monitor"
 
// 初始化函数用于注册此PD的工厂函数
func init() {
    problemdaemon.Register(
        SystemLogMonitorName,
        types.ProblemDaemonHandler{
            CreateProblemDaemonOrDie: NewLogMonitorOrDie,
            CmdOptionDescription:     "Set to config file paths."})
}
 
type logMonitor struct {
    // 配置文件路径
    configPath string
    // 读取日志的逻辑委托给LogWatcher,这里解耦的目的是支持多种类型的日志
    watcher    watchertypes.LogWatcher
    // 日志缓冲,读取的日志在此等待处理
    buffer     LogBuffer
    // 对应配置文件中的字段
    config     MonitorConfig
    // 对应配置文件中的conditions字段
    conditions []types.Condition
    // 输入日志条目的通道
    logCh      <-chan *logtypes.Log
    // 输出状态的通道
    output     chan *types.Status
    // 墓碑,用于控制此Monitor的生命周期
    tomb       *tomb.Tomb
}
 
// 创建实例,如果失败则panic
func NewLogMonitorOrDie(configPath string) types.Monitor {
    // 创建实例
    l := &logMonitor{
        configPath: configPath,
        tomb:       tomb.NewTomb(),
    }
    // 读取配置文件
    f, err := ioutil.ReadFile(configPath)
    // 作为JSON解析为MonitorConfig
    err = json.Unmarshal(f, &l.config)
    // 设置MonitorConfig的默认值
    (&l.config).ApplyDefaultConfiguration()
    err = l.config.ValidateRules()
    
    // 创建LogWatcher
    l.watcher = logwatchers.GetLogWatcherOrDie(l.config.WatcherConfig)
    // 设置缓冲区
    l.buffer = NewLogBuffer(l.config.BufferSize)
    // 写死的最大通道容量
    l.output = make(chan *types.Status, 1000)
    // 如果启用指标报告
    if *l.config.EnableMetricsReporting {
        // 则为所有类型的Problem(Rule.Reason,比NodeCondition更细粒度)初始化指标
        // Perm类型的初始化一个Gauge指标,一个Counter指标
        // Temp类型的仅仅初始化一个Counter指标
        initializeProblemMetricsOrDie(l.config.Rules)
    }
    return l
}
 
// 初始化指标
func initializeProblemMetricsOrDie(rules []systemlogtypes.Rule) {
    for _, rule := range rules {
        if rule.Type == types.Perm {
            err := problemmetrics.GlobalProblemMetricsManager.SetProblemGauge(rule.Condition, rule.Reason, false)
        }
        err := problemmetrics.GlobalProblemMetricsManager.IncrementProblemCounter(rule.Reason, 0)
    }
}
 
// 启动
func (l *logMonitor) Start() (<-chan *types.Status, error) {
    var err error
    // 启动LogWatcher,监控日志的变化
    l.logCh, err = l.watcher.Watch()
    if err != nil {
        return nil, err
    }
    // 启动主循环
    go l.monitorLoop()
    return l.output, nil
}
 
// 停止
func (l *logMonitor) Stop() {
    // 关闭Stop通道,然后等待Done通道完成
    l.tomb.Stop()
}
 
// 主循环
func (l *logMonitor) monitorLoop() {
    // 主循环退出后,接触Stop()调用者的阻塞
    defer l.tomb.Done()
    // 初始化状态(Event和Condition)
    l.initializeStatus()
    // 循环
    for {
        select {
        // 日志可用,解析日志
        case log := <-l.logCh:
            l.parseLog(log)
        // Stop通道可读,意味着通知关闭了
        case <-l.tomb.Stopping():
            // 关闭LogWatcher
            l.watcher.Stop()
            glog.Infof("Log monitor stopped: %s", l.configPath)
            return
        }
    }
}
 
// 解析日志行
func (l *logMonitor) parseLog(log *logtypes.Log) {
    // 一旦新日志行可用,就将其推送到日志缓冲
    l.buffer.Push(log)
    for _, rule := range l.config.Rules {
        // 然后逐个规则去匹配
        matched := l.buffer.Match(rule.Pattern)
        if len(matched) == 0 {
            continue
        }
        // 如果匹配规则,则报告规则中声明的状态
        status := l.generateStatus(matched, rule)
        glog.Infof("New status generated: %+v", status)
        // 输出状态
        l.output <- status
    }
}
 
// 从日志生成Status
func (l *logMonitor) generateStatus(logs []*logtypes.Log, rule systemlogtypes.Rule) *types.Status {
    // 第一行日志的时间戳作为状态的时间戳
    timestamp := logs[0].Timestamp
    // 读取日志内容
    message := generateMessage(logs)
    var events []types.Event
    var changedConditions []*types.Condition
    if rule.Type == types.Temp {
        // 对于临时问题,仅仅生成事件
        events = append(events, types.Event{
            Severity:  types.Warn,
            Timestamp: timestamp,
            Reason:    rule.Reason,
            Message:   message,
        })
    } else {
        // 对于永久问题,改变Condition
        for i := range l.conditions {
            condition := &l.conditions[i]
            // 找到匹配的、此Monitor定义的Condition
            if condition.Type == rule.Condition {
                // 如果Condition改变(Status或Reson字段变了)
                if condition.Status == types.False || condition.Reason != rule.Reason {
                    // 则更新时间戳和消息
                    condition.Transition = timestamp
                    condition.Message = message
                    // 并发布事件
                    events = append(events, util.GenerateConditionChangeEvent(
                        condition.Type,
                        types.True,
                        rule.Reason,
                        timestamp,
                    ))
                }
                condition.Status = types.True
                condition.Reason = rule.Reason
                changedConditions = append(changedConditions, condition)
                break
            }
        }
    }
    // 报告Problem数量指标
    if *l.config.EnableMetricsReporting {
        for _, event := range events {
            err := problemmetrics.GlobalProblemMetricsManager.IncrementProblemCounter(event.Reason, 1)
        }
        for _, condition := range changedConditions {
            err := problemmetrics.GlobalProblemMetricsManager.SetProblemGauge(
                condition.Type, condition.Reason, condition.Status == types.True)
            }
        }
    }
    // 处于性能考虑,应该聚合Event、Condition,周期性的报告,而非这样立即报告
    return &types.Status{
        Source: l.config.Source,
        Events:     events,
        Conditions: l.conditions,
    }
}
 
// 初始化状态并报告一次
func (l *logMonitor) initializeStatus() {
    // 初始化默认Condition,来自配置的condition字段
    l.conditions = initialConditions(l.config.DefaultConditions)
    l.output <- &types.Status{
        Source:     l.config.Source,
        Conditions: l.conditions,
    }
}
 
func initialConditions(defaults []types.Condition) []types.Condition {
    conditions := make([]types.Condition, len(defaults))
    copy(conditions, defaults)
    for i := range conditions {
        conditions[i].Status = types.False
        conditions[i].Transition = time.Now()
    }
    return conditions
}
 
func generateMessage(logs []*logtypes.Log) string {
    messages := []string{}
    for _, log := range logs {
        messages = append(messages, log.Message)
    }
    return concatLogs(messages)
}

和配置相关的代码如下,MonitorConfig嵌入WatcherConfig,正好和配置文件结构对应:

Go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// node-problem-detector/pkg/systemlogmonitor/config.go
type MonitorConfig struct {
    // LogWatcher(LogMonitor插件)的配置
    watchertypes.WatcherConfig
    // 缓冲(行数)大小
    BufferSize int `json:"bufferSize"`
    // 此PD的名称
    Source string `json:"source"`
    // 此PD处理的所有Condition的默认状态
    DefaultConditions []types.Condition `json:"conditions"`
    // 日志匹配规则列表
    Rules []systemlogtypes.Rule `json:"rules"`
    // 是否将Problem报告为指标
    EnableMetricsReporting *bool `json:"metricsReporting,omitempty"`
}
 
// node-problem-detector/pkg/systemlogmonitor/logwatchers/types/log_watcher.go
type WatcherConfig struct {
    // 插件类型,可选 filelog, journald, kmsg
    Plugin string `json:"plugin,omitempty"`
    // 键值对形式的插件配置,具体可以包含哪些配置项,取决于插件
    PluginConfig map[string]string `json:"pluginConfig,omitempty"`
    // 日志的路径
    LogPath string `json:"logPath,omitempty"`
    // 向当前时间点往前看多久日志
    Lookback string `json:"lookback,omitempty"`
    // 仅仅查看节点启动之后多久的日志,可以避免启动期间的不稳定状态触发不必要的问题报告
    Delay string `json:"delay,omitempty"`
}

LogWatcher的实现有多种,它们具有统一的接口:

Go
1
2
3
4
5
6
7
8
9
type LogWatcher interface {
    // 开始监控日志,并通过通道输出日志
    Watch() (<-chan *types.Log, error)
    // 停止,注意释放打开的资源
    Stop()
}
 
// LogWatcher工厂函数
type WatcherCreateFunc func(WatcherConfig) LogWatcher

类似于Monitor、Exporter,LogWatcher也需要注册:

Go
1
2
3
4
5
6
7
8
9
10
11
12
13
// 注册表
var createFuncs = map[string]types.WatcherCreateFunc{}
 
// 注册函数,比较奇葩的是名称没有导出,因此各种LogWatcher的注册均是在logwatchers包中进行的(而非各LogWatcher自己的包)
func registerLogWatcher(name string, create types.WatcherCreateFunc) {
    createFuncs[name] = create
}
 
// 根据config.Plugin字段来查找注册表,获取LogWatcher的工厂函数
func GetLogWatcherOrDie(config types.WatcherConfig) types.LogWatcher {
    create, ok := createFuncs[config.Plugin]
    return create(config)
}

我们不去逐个分析LogWatcher的实现,仅以kernelLogWatcher(ksmg)为例:  

Go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
package kmsg
 
import (
    "fmt"
    "strings"
    "time"
 
    utilclock "code.cloudfoundry.org/clock"
    "github.com/euank/go-kmsg-parser/kmsgparser"
    "github.com/golang/glog"
 
    "k8s.io/node-problem-detector/pkg/systemlogmonitor/logwatchers/types"
    logtypes "k8s.io/node-problem-detector/pkg/systemlogmonitor/types"
    "k8s.io/node-problem-detector/pkg/util"
    "k8s.io/node-problem-detector/pkg/util/tomb"
)
 
type kernelLogWatcher struct {
    cfg       types.WatcherConfig
    startTime time.Time
    logCh     chan *logtypes.Log
    tomb      *tomb.Tomb
 
    kmsgParser kmsgparser.Parser
    clock      utilclock.Clock
}
 
// 工厂函数
func NewKmsgWatcher(cfg types.WatcherConfig) types.LogWatcher {
    // 获取系统启动到现在过了多久
    uptime, err := util.GetUptimeDuration()
    // 判断何时才应该开始监控日志    
    startTime, err := util.GetStartTime(time.Now(), uptime, cfg.Lookback, cfg.Delay)
    return &kernelLogWatcher{
        cfg:       cfg,
        startTime: startTime,
        tomb:      tomb.NewTomb(),
        logCh: make(chan *logtypes.Log, 100),
        clock: utilclock.NewClock(),
    }
}
 
// 确保签名匹配
var _ types.WatcherCreateFunc = NewKmsgWatcher
 
// 开始监控
func (k *kernelLogWatcher) Watch() (<-chan *logtypes.Log, error) {
    if k.kmsgParser == nil {
        // 初始化内核日志解析器
        parser, err := kmsgparser.NewParser()
        k.kmsgParser = parser
    }
    // 异步启动主监控循环
    go k.watchLoop()
    return k.logCh, nil
}
 
// 停止监控
func (k *kernelLogWatcher) Stop() {
    // 停止解析器
    k.kmsgParser.Close()
    // 发起停止信号,并等待主监控循环的通知
    k.tomb.Stop()
}
 
// 主监控循环
func (k *kernelLogWatcher) watchLoop() {
    // 退出时关闭输出通道,并且通过Tomb告知清理结束,Stop方法可以返回了
    defer func() {
        close(k.logCh)
        k.tomb.Done()
    }()
    // go-kmsg-parser项目提供的功能,获得一个可读通道,从中可以读取到内核消息
    kmsgs := k.kmsgParser.Parse()
 
    for {
        select {
        // 停止信号,清理
        case <-k.tomb.Stopping():
            // 关闭内核消息解析器
            if err := k.kmsgParser.Close(); err != nil {
            }
            return
        // 获取内核消息
        case msg := <-kmsgs:
            // 跳过空消息
            if msg.Message == "" {
                continue
            }
 
            // 对于过早的消息,丢弃
            if msg.Timestamp.Before(k.startTime) {
                continue
            }
            // 输出消息
            k.logCh <- &logtypes.Log{
                Message:   strings.TrimSpace(msg.Message),
                Timestamp: msg.Timestamp,
            }
        }
    }
}
SystemStatsMonitor

此PD仅仅产生Metrics,而不报告Problem,因此其Start方法返回nil。它报告指标时调用的是OpenCensus的API。

需要注意的是,NPD的Exporter是针对Problem的,Monitor可以产生,也可以不产生Problem(Status对象)。不产生Problem的Monitor和Exporter直接关系。

Go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
const SystemStatsMonitorName = "system-stats-monitor"
 
// 注册
func init() {
    problemdaemon.Register(SystemStatsMonitorName, types.ProblemDaemonHandler{
        CreateProblemDaemonOrDie: NewSystemStatsMonitorOrDie,
        CmdOptionDescription:     "Set to config file paths."})
}
 
type systemStatsMonitor struct {
    // 配置文件路径
    configPath    string
    // 从文件中读取到的配置
    config        ssmtypes.SystemStatsConfig
    // 统计信息收集器,目前仅仅支持磁盘、主机信息
    diskCollector *diskCollector
    hostCollector *hostCollector
    // 生命周期控制
    tomb          *tomb.Tomb
}
 
// 工厂
func NewSystemStatsMonitorOrDie(configPath string) types.Monitor {
    ssm := systemStatsMonitor{
        configPath: configPath,
        tomb:       tomb.NewTomb(),
    }
 
    // 读取、应用、验证配置
    f, err := ioutil.ReadFile(configPath)
    err = json.Unmarshal(f, &ssm.config)
    err = ssm.config.ApplyConfiguration()
    err = ssm.config.Validate()
 
    // 按需创建收集器
    if len(ssm.config.DiskConfig.MetricsConfigs) > 0 {
        ssm.diskCollector = NewDiskCollectorOrDie(&ssm.config.DiskConfig)
    }
    if len(ssm.config.HostConfig.MetricsConfigs) > 0 {
        ssm.hostCollector = NewHostCollectorOrDie(&ssm.config.HostConfig)
    }
    return &ssm
}
 
// 异步启动主循环
func (ssm *systemStatsMonitor) Start() (<-chan *types.Status, error) {
    go ssm.monitorLoop()
    return nil, nil
}
 
func (ssm *systemStatsMonitor) monitorLoop() {
    // 通知Stop()调用者
    defer ssm.tomb.Done()
    
    // 定时器
    runTicker := time.NewTicker(ssm.config.InvokeInterval)
    defer runTicker.Stop()
 
    // 立即进行一次采集
    select {
    case <-ssm.tomb.Stopping():
        return
    default:
        ssm.diskCollector.collect()
        ssm.hostCollector.collect()
    }
 
    // 定时采集
    for {
        select {
        case <-runTicker.C:
            ssm.diskCollector.collect()
            ssm.hostCollector.collect()
        case <-ssm.tomb.Stopping():
            return
        }
    }
}
 
func (ssm *systemStatsMonitor) Stop() {
    ssm.tomb.Stop()
}

可以看到,此PD只是定期调用收集器的collect方法,并且不从此方法获取任何信息。

目前Collector有两个,以DisCollector为例,我们看一下它将收集的指标输出到何处了:

Go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
package systemstatsmonitor
 
import (
    "context"
    "os/exec"
    "strings"
    "time"
 
    "github.com/golang/glog"
    "github.com/shirou/gopsutil/disk"
 
    ssmtypes "k8s.io/node-problem-detector/pkg/systemstatsmonitor/types"
    "k8s.io/node-problem-detector/pkg/util/metrics"
)
 
const deviceNameLabel = "device_name"
 
type diskCollector struct {
    // IO时间
    mIOTime      *metrics.Int64Metric
    // 加权IO时间,此字段上次刷新以来,消耗在IO上的毫秒数 * 进行中的IO操作数量
    mWeightedIO  *metrics.Int64Metric
    // 平均队列长度
    mAvgQueueLen *metrics.Float64Metric
    // 配置信息
    config *ssmtypes.DiskStatsConfig
 
    // IO时间历时记录
    historyIOTime     map[string]uint64
    historyWeightedIO map[string]uint64
}
 
 
// 工厂函数
func NewDiskCollectorOrDie(diskConfig *ssmtypes.DiskStatsConfig) *diskCollector {
    dc := diskCollector{config: diskConfig}
 
    var err error
 
    // 创建NPD封装的Metrics对象
    dc.mIOTime, err = metrics.NewInt64Metric(
        metrics.DiskIOTimeID,
        // displayName作为OpenCensus View名
        diskConfig.MetricsConfigs[string(metrics.DiskIOTimeID)].DisplayName,
        "The IO time spent on the disk",
        "second",
        // 求和聚合
        metrics.Sum,
        []string{deviceNameLabel})
 
    dc.mWeightedIO, err = metrics.NewInt64Metric(
        metrics.DiskWeightedIOID,
        diskConfig.MetricsConfigs[string(metrics.DiskWeightedIOID)].DisplayName,
        "The weighted IO on the disk",
        "second",
        metrics.Sum,
        []string{deviceNameLabel})
 
    dc.mAvgQueueLen, err = metrics.NewFloat64Metric(
        metrics.DiskAvgQueueLenID,
        diskConfig.MetricsConfigs[string(metrics.DiskAvgQueueLenID)].DisplayName,
        "The average queue length on the disk",
        "second",
        metrics.LastValue,
        []string{deviceNameLabel})
 
    dc.historyIOTime = make(map[string]uint64)
    dc.historyWeightedIO = make(map[string]uint64)
 
    return &dc
}
 
func (dc *diskCollector) collect() {
    if dc == nil {
        return
    }
 
    blks := []string{}
    // 列出所有磁盘
    // 列出所有非Slave非Holder磁盘
    if dc.config.IncludeRootBlk {
        blks = append(blks, listRootBlockDevices(dc.config.LsblkTimeout)...)
    }
    // 列出所有分区
    if dc.config.IncludeAllAttachedBlk {
        blks = append(blks, listAttachedBlockDevices()...)
    }
    // 调用gopsutil,此项目能够狂平台获取各种操作系统、硬件的指标
    //                           总是递增
    ioCountersStats, err := disk.IOCounters(blks...)
    if err != nil {
        return
    }
    // 迭代所有指标
    for deviceName, ioCountersStat := range ioCountersStats {
        // 根据上一次度量值计算平均队列长度
        lastIOTime := dc.historyIOTime[deviceName]
        lastWeightedIO := dc.historyWeightedIO[deviceName]
 
        dc.historyIOTime[deviceName] = ioCountersStat.IoTime
        dc.historyWeightedIO[deviceName] = ioCountersStat.WeightedIO
        // 平均队列长度 = (上次加权IO耗时 - 本次加权IO超时) / (上次IO耗时 - 本次IO耗时)
        //              = 平均队列长度 * (上次IO耗时 - 本次IO耗时) / (上次IO耗时 - 本次IO耗时)
        avgQueueLen := float64(0.0)
        if lastIOTime != ioCountersStat.IoTime {
            avgQueueLen = float64(ioCountersStat.WeightedIO-lastWeightedIO) / float64(ioCountersStat.IoTime-lastIOTime)
        }
 
        // 为指标添加 {"device_name": deviceName} 标签
        tags := map[string]string{deviceNameLabel: deviceName}
        // 这里录制度量时,要使用增量值,因为对应度量已经设置了聚合方法为sum
        if dc.mIOTime != nil {
            dc.mIOTime.Record(tags, int64(ioCountersStat.IoTime-lastIOTime))
        }
        if dc.mWeightedIO != nil {
            dc.mWeightedIO.Record(tags, int64(ioCountersStat.WeightedIO-lastWeightedIO))
        }
        if dc.mAvgQueueLen != nil {
            dc.mAvgQueueLen.Record(tags, avgQueueLen)
        }
    }
}
 
// 调用lsblk命令列出磁盘
func listRootBlockDevices(timeout time.Duration) []string {
    ctx, cancel := context.WithTimeout(context.Background(), timeout)
    defer cancel()
 
    // 调用命令
    cmd := exec.CommandContext(ctx, "lsblk", "-d", "-n", "-o", "NAME")
    stdout, err := cmd.Output()
    return strings.Split(strings.TrimSpace(string(stdout)), "\n")
}
 
// 列出所有分区
func listAttachedBlockDevices() []string {
    blks := []string{}
 
    partitions, err := disk.Partitions(false)
    if err != nil {
        glog.Errorf("Failed to retrieve the list of disk partitions: %v", err)
        return blks
    }
 
    for _, partition := range partitions {
        blks = append(blks, partition.Device)
    }
    return blks
}
CustomPluginMonitor

此PD为NPD提供了一种插件化机制,允许基于任何语言来编写监控脚本,只需要这些脚本遵循NPD关于退出码和标准输出的规范。

此PD定义了以下类型 / 常量:

Go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package types
 
import (
    "k8s.io/node-problem-detector/pkg/types"
    "time"
)
 
type Status int
 
// 自定义插件的返回码
const (
    OK      Status = 0
    NonOK   Status = 1
    Unknown Status = 2
)
 
// 插件的检查结果
type Result struct {
    // 提供入参
    Rule       *CustomRule
    // 插件状态
    ExitStatus Status
    // 标准输出
    Message    string
}
 
// 自定义规则(插件),描述CPM如何调用插件,分析调用结果
type CustomRule struct {
    // 报告永久还是临时问题
    Type types.Type `json:"type"`
    // 此问题触发哪种NodeCondition,仅当永久问题才设置此字段
    Condition string `json:"condition"`
    // 问题的简短原因,对于永久问题,通常描述NodeCondition的一个子类型
    Reason string `json:"reason"`
    // 自定义插件(脚本)的文件路径
    Path string `json:"path"`
    // 传递给自定义插件的参数
    Args []string `json:"args"`
    // 自定义插件执行超时
    TimeoutString *string `json:"timeout"`
    Timeout *time.Duration `json:"-"`
}

关于如何配置CPM,以及每个插件的代码:

Go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
package types
 
import (
    "fmt"
    "os"
    "time"
 
    "k8s.io/node-problem-detector/pkg/types"
)
 
// 配置参数默认值
var (
    // 默认全局超时
    defaultGlobalTimeout                     = 5 * time.Second
    defaultGlobalTimeoutString               = defaultGlobalTimeout.String()
    // 默认调用间隔
    defaultInvokeInterval                    = 30 * time.Second
    defaultInvokeIntervalString              = defaultInvokeInterval.String()
    // 默认最大输出长度
    defaultMaxOutputLength                   = 80
    // 默认并发度
    defaultConcurrency                       = 3
    // 默认是否 状态消息变更导致Condition更新
    defaultMessageChangeBasedConditionUpdate = false
    // 默认是否启用指标报告
    defaultEnableMetricsReporting            = true
 
    customPluginName = "custom"
)
 
// 全局配置
type pluginGlobalConfig struct {
    // 所有插件被调用的间隔
    InvokeIntervalString *string `json:"invoke_interval,omitempty"`
    // 全局插件执行超时
    TimeoutString *string `json:"timeout,omitempty"`
    InvokeInterval *time.Duration `json:"-"`
    Timeout *time.Duration `json:"-"`
    // 最大标准输出长度
    MaxOutputLength *int `json:"max_output_length,omitempty"`
    // 并发度
    Concurrency *int `json:"concurrency,omitempty"`
    // 状态消息变更是否导致Condition更新
    EnableMessageChangeBasedConditionUpdate *bool `json:"enable_message_change_based_condition_update,omitempty"`
}
 
// 此PD的配置,结构上对应配置文件
type CustomPluginConfig struct {
    // PD类型,必须为custom
    Plugin string `json:"plugin,omitempty"`
    // 全局配置
    PluginGlobalConfig pluginGlobalConfig `json:"pluginConfig,omitempty"`
    // 源名称
    Source string `json:"source"`
    // CPM需要处理的所有Condition的默认状态
    DefaultConditions []types.Condition `json:"conditions"`
    // 需要解析和执行的插件列表
    Rules []*CustomRule `json:"rules"`
    // 状态消息变更是否导致Condition更新
    EnableMetricsReporting *bool `json:"metricsReporting,omitempty"`
}

CPM的核心代码:

Go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
package custompluginmonitor
 
import (
    "encoding/json"
    "io/ioutil"
    "time"
 
    "github.com/golang/glog"
 
    "k8s.io/node-problem-detector/pkg/custompluginmonitor/plugin"
    cpmtypes "k8s.io/node-problem-detector/pkg/custompluginmonitor/types"
    "k8s.io/node-problem-detector/pkg/problemdaemon"
    "k8s.io/node-problem-detector/pkg/problemmetrics"
    "k8s.io/node-problem-detector/pkg/types"
    "k8s.io/node-problem-detector/pkg/util"
    "k8s.io/node-problem-detector/pkg/util/tomb"
)
 
// 此PD的名称
const CustomPluginMonitorName = "custom-plugin-monitor"
 
func init() {
    problemdaemon.Register(
        CustomPluginMonitorName,
        types.ProblemDaemonHandler{
            CreateProblemDaemonOrDie: NewCustomPluginMonitorOrDie,
            CmdOptionDescription:     "Set to config file paths."})
}
 
// CPM
type customPluginMonitor struct {
    configPath string
    config     cpmtypes.CustomPluginConfig
    conditions []types.Condition
    // 规则执行插件
    plugin     *plugin.Plugin
    // 插件执行结果的读通道
    resultChan <-chan cpmtypes.Result
    // 向NPM报送状态的写通道
    statusChan chan *types.Status
    tomb       *tomb.Tomb
}
 
// 工厂函数
func NewCustomPluginMonitorOrDie(configPath string) types.Monitor {
    c := &customPluginMonitor{
        configPath: configPath,
        tomb:       tomb.NewTomb(),
    }
    // 读取并校验配置
    f, err := ioutil.ReadFile(configPath)
    err = json.Unmarshal(f, &c.config)
    // Apply configurations
    err = (&c.config).ApplyConfiguration()
    // Validate configurations
    err = c.config.Validate()
    // 创建插件对象
    c.plugin = plugin.NewPlugin(c.config)
    // 状态通道
    c.statusChan = make(chan *types.Status, 1000)
 
    if *c.config.EnableMetricsReporting {
        initializeProblemMetricsOrDie(c.config.Rules)
    }
    return c
}
 
// 初始化问题指标
func initializeProblemMetricsOrDie(rules []*cpmtypes.CustomRule) {
    for _, rule := range rules {
        if rule.Type == types.Perm {
            err := problemmetrics.GlobalProblemMetricsManager.SetProblemGauge(rule.Condition, rule.Reason, false)
        }
        err := problemmetrics.GlobalProblemMetricsManager.IncrementProblemCounter(rule.Reason, 0)
    }
}
 
// 启动
func (c *customPluginMonitor) Start() (<-chan *types.Status, error) {
    // 启动插件
    go c.plugin.Run()
    // 启动主循环
    go c.monitorLoop()
    return c.statusChan, nil
}
 
// 停止
func (c *customPluginMonitor) Stop() {
    c.tomb.Stop()
}
 
// 主循环
func (c *customPluginMonitor) monitorLoop() {
    c.initializeStatus()
    // 得到插件的结果通道
    resultChan := c.plugin.GetResultChan()
    // 循环遍历处理插件的结果
    for {
        select {
        case result := <-resultChan:
            glog.V(3).Infof("Receive new plugin result for %s: %+v", c.configPath, result)
            // 将插件结果转换为Status
            status := c.generateStatus(result)
            glog.Infof("New status generated: %+v", status)
            // 输出到状态通道
            c.statusChan <- status
        case <-c.tomb.Stopping():
            c.plugin.Stop()
            glog.Infof("Custom plugin monitor stopped: %s", c.configPath)
            c.tomb.Done()
            break
        }
    }
}
 
// 从插件检查结果生成状态
func (c *customPluginMonitor) generateStatus(result cpmtypes.Result) *types.Status {
    timestamp := time.Now()
    var activeProblemEvents []types.Event
    var inactiveProblemEvents []types.Event
    if result.Rule.Type == types.Temp {
        // 对于临时错误,如果插件检查结果非0则产生一个事件
        if result.ExitStatus >= cpmtypes.NonOK {
            activeProblemEvents = append(activeProblemEvents, types.Event{
                Severity:  types.Warn,
                Timestamp: timestamp,
                Reason:    result.Rule.Reason,
                Message:   result.Message,
            })
        }
    } else {
        // 对于永久错误,如果插件检查结果非0则修改Condition
        for i := range c.conditions {
            condition := &c.conditions[i]
            if condition.Type == result.Rule.Condition {
                // 规则中的Reason、结果中的Message表明了发生的问题
                // 需要从配置中读取默认的Condition,以便在检查结果OK时恢复Condition
                var defaultConditionReason string
                var defaultConditionMessage string
                for j := range c.config.DefaultConditions {
                    defaultCondition := &c.config.DefaultConditions[j]
                    // conditions.type == rules[j].condition
                    if defaultCondition.Type == result.Rule.Condition {
                        defaultConditionReason = defaultCondition.Reason
                        defaultConditionMessage = defaultCondition.Message
                        break
                    }
                }
 
                needToUpdateCondition := true
                var newReason string
                var newMessage string
                // 如果检查结果为0则不处于Condition,为1则处于Condition,其它值则未知
                status := toConditionStatus(result.ExitStatus)
                if condition.Status == types.True && status != types.True {
                    // Condtion从True转变为False/Unknown
                    newReason = defaultConditionReason
                    if newMessage == "" {
                        newMessage = defaultConditionMessage
                    } else {
                        newMessage = result.Message
                    }
                } else if condition.Status != types.True && status == types.True {
                    // Condtion从False/Unknown转变为True
                    newReason = result.Rule.Reason
                    newMessage = result.Message
                } else if condition.Status != status {
                    // Condtion在False和Unknown之间转换
                    newReason = defaultConditionReason
                    if newMessage == "" {
                        newMessage = defaultConditionMessage
                    } else {
                        newMessage = result.Message
                    }
                } else if condition.Status == types.True && status == types.True &&
                    (condition.Reason != result.Rule.Reason ||
                        (*c.config.PluginGlobalConfig.EnableMessageChangeBasedConditionUpdate && condition.Message != result.Message)) {
                    // Condtion没有改变,和上次一样是True
                    newReason = result.Rule.Reason
                    newMessage = result.Message
                } else {
                    // Condtion没有改变,和上次一样是False/Unknown
                    needToUpdateCondition = false
                }
                
                if needToUpdateCondition {
                    condition.Transition = timestamp
                    condition.Status = status
                    condition.Reason = newReason
                    condition.Message = newMessage
 
                    updateEvent := util.GenerateConditionChangeEvent(
                        condition.Type,
                        status,
                        newReason,
                        timestamp,
                    )
 
                    if status == types.True {
                        activeProblemEvents = append(activeProblemEvents, updateEvent)
                    } else {
                        inactiveProblemEvents = append(inactiveProblemEvents, updateEvent)
                    }
                }
 
                break
            }
        }
    }
    // 报告指标
    if *c.config.EnableMetricsReporting {
        for _, event := range activeProblemEvents {
            err := problemmetrics.GlobalProblemMetricsManager.IncrementProblemCounter( event.Reason, 1)
        }
        for _, condition := range c.conditions {
            err := problemmetrics.GlobalProblemMetricsManager.SetProblemGauge(
                condition.Type, condition.Reason, condition.Status == types.True)
        }
    }
    // 发布Status
    return &types.Status{
        Source: c.config.Source,
        Events:     append(activeProblemEvents, inactiveProblemEvents...),
        Conditions: c.conditions,
    }
}
 
// 将插件退出码转换为Condtion.Status
func toConditionStatus(s cpmtypes.Status) types.ConditionStatus {
    switch s {
    case cpmtypes.OK:
        return types.False
    case cpmtypes.NonOK:
        return types.True
    default:
        return types.Unknown
    }
}
 
// 初始化,报告默认状态
func (c *customPluginMonitor) initializeStatus() {
    c.conditions = initialConditions(c.config.DefaultConditions)
    glog.Infof("Initialize condition generated: %+v", c.conditions)
    c.statusChan <- &types.Status{
        Source:     c.config.Source,
        Conditions: c.conditions,
    }
}
 
func initialConditions(defaults []types.Condition) []types.Condition {
    conditions := make([]types.Condition, len(defaults))
    copy(conditions, defaults)
    for i := range conditions {
        conditions[i].Status = types.False
        conditions[i].Transition = time.Now()
    }
    return conditions
}

实际上执行监控脚本的工作,由Plugin这个结构负责,并且将每个脚本的执行结果通过通道传递给CPM: 

Go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
package plugin
 
import (
    "context"
    "fmt"
    "os/exec"
    "strings"
    "sync"
    "syscall"
    "time"
 
    "github.com/golang/glog"
    cpmtypes "k8s.io/node-problem-detector/pkg/custompluginmonitor/types"
    "k8s.io/node-problem-detector/pkg/util/tomb"
)
 
type Plugin struct {
    config     cpmtypes.CustomPluginConfig
    // 此通道用于控制并发度
    syncChan   chan struct{}
    // 此通道用于输出结果
    resultChan chan cpmtypes.Result
    tomb       *tomb.Tomb
    // 嵌入
    sync.WaitGroup
}
 
func NewPlugin(config cpmtypes.CustomPluginConfig) *Plugin {
    return &Plugin{
        config:   config,
        // 限制通道大小为并发度
        syncChan: make(chan struct{}, *config.PluginGlobalConfig.Concurrency),
        resultChan: make(chan cpmtypes.Result, 1000),
        tomb:       tomb.NewTomb(),
    }
}
 
func (p *Plugin) GetResultChan() <-chan cpmtypes.Result {
    return p.resultChan
}
 
// 执行所有规则一遍
func (p *Plugin) Run() {
    defer func() {
        glog.Info("Stopping plugin execution")
        p.tomb.Done()
    }()
 
    // 仅仅支持全局一致的调度间隔
    runTicker := time.NewTicker(*p.config.PluginGlobalConfig.InvokeInterval)
    defer runTicker.Stop()
 
    runner := func() {
        glog.Info("Start to run custom plugins")
 
        for _, rule := range p.config.Rules {
            // 占据一个位置
            p.syncChan <- struct{}{}
            // 增加一个等待量,在synChan基础上又要加等待组的原因是
            // 防止遍历完毕后,Run方法过早退出
            p.Add(1)
            // 异步执行规则
            go func(rule *cpmtypes.CustomRule) {
                // 总是减少一个等待量
                defer p.Done()
                // 同时释放一个位置
                defer func() {
                    <-p.syncChan
                }()
 
                start := time.Now()
                // 执行规则
                exitStatus, message := p.run(*rule)
                end := time.Now()
 
                glog.V(3).Infof("Rule: %+v. Start time: %v. End time: %v. Duration: %v", rule, start, end, end.Sub(start))
                
                // 写入结果
                result := cpmtypes.Result{
                    Rule:       rule,
                    ExitStatus: exitStatus,
                    Message:    message,
                }
 
                p.resultChan <- result
 
                glog.Infof("Add check result %+v for rule %+v", result, rule)
            }(rule)
        }
        // 等待所有规则执行完毕,防止过早退出
        p.Wait()
        glog.Info("Finish running custom plugins")
    }
    // 首次执行
    select {
    case <-p.tomb.Stopping():
        return
    default:
        runner()
    }
    // 循环执行
    for {
        select {
        case <-runTicker.C:
            runner()
        case <-p.tomb.Stopping():
            return
        }
    }
}
 
// 执行单个规则
func (p *Plugin) run(rule cpmtypes.CustomRule) (exitStatus cpmtypes.Status, output string) {
    var ctx context.Context
    var cancel context.CancelFunc
    // 使用全局、当前规则超时中更小的值
    if rule.Timeout != nil && *rule.Timeout < *p.config.PluginGlobalConfig.Timeout {
        ctx, cancel = context.WithTimeout(context.Background(), *rule.Timeout)
    } else {
        ctx, cancel = context.WithTimeout(context.Background(), *p.config.PluginGlobalConfig.Timeout)
    }
    defer cancel()
    // 执行系统命令
    cmd := exec.CommandContext(ctx, rule.Path, rule.Args...)
    stdout, err := cmd.Output()
    // 如果出错,且退出码不是1则认为是Unknown
    if err != nil {
        if _, ok := err.(*exec.ExitError); !ok {
            glog.Errorf("Error in running plugin %q: error - %v. output - %q", rule.Path, err, string(stdout))
            return cpmtypes.Unknown, "Error in running plugin. Please check the error log"
        }
    }
 
    output = string(stdout)
    output = strings.TrimSpace(output)
 
    // 超时
    if cmd.ProcessState.Sys().(syscall.WaitStatus).Signaled() {
        output = fmt.Sprintf("Timeout when running plugin %q: state - %s. output - %q", rule.Path, cmd.ProcessState.String(), output)
    }
 
    // 修剪标准输出
    if len(output) > *p.config.PluginGlobalConfig.MaxOutputLength {
        output = output[:*p.config.PluginGlobalConfig.MaxOutputLength]
    }
 
    exitCode := cmd.ProcessState.Sys().(syscall.WaitStatus).ExitStatus()
    switch exitCode {
    case 0:
        return cpmtypes.OK, output
    case 1:
        return cpmtypes.NonOK, output
    default:
        return cpmtypes.Unknown, output
    }
}
 
func (p *Plugin) Stop() {
    p.tomb.Stop()
    glog.Info("Stop plugin execution")
}

注意CPM这里的Plugin,和NPD配置中提到的Plugin,不是一个概念。

cluster-autoscaler

云提供者通常支持动态创建、销毁节点。当NPD检测到故障节点,且无法恢复时,“治愈”措施就是Drain节点。这导致集群规模的缩小,应当调用云提供者的接口,补充节点。

开源项目autoscaler为K8S提供了若干额外的自动扩容组件:

  1. Cluster Autoscaler:能够自动对K8S节点数量进行扩缩,保证所有Pod有地方运行,且自动销毁空闲节点。支持GCP、AWS、Azure、阿里云、百度云,其它云环境需要自行扩展
  2. Vertical Pod Autoscaler:一系列组件,能够自动调整Pod的CPU、内存请求。未来可能支持inplace-update,也就是说不需要删除Pod即可完成request值的修改
  3. Addon Resizer:简化版的VPA,根据集群的节点规模,自动修改Deployment的request值

和本文主题相关的是cluster-autoscaler,我们可以利用它来保证集群规模的稳定。

velero

在灾难性故障中,K8S集群可能完全无法恢复,只能重建。那么,如何快速重建K8S集群就是关键技术问题。

Velero(Heptio Ark)是一个能进行K8S集群备份、迁移的开源项目,特性包括:

  1. 集群备份:支持备份完整集群(的K8S资源以及持久卷),或者根据命名空间、标签选择器来备份集群的一部分
  2. 定期备份
  3. 备份钩子:在备份之前、之后执行指定的运维操作
  4. 迁移:将K8S资源迁移到其它集群,例如将生产环境集群复制到开发环境

Velero由两个部分组成:

  1. 运行在K8S集群中的服务器端
  2. 运行在客户机上的CLI

你可以在云提供者或者裸金属环境的K8S集群上运行Velero。Velero集成对流行厂商的存储服务的支持。

存储要求

你需要首先选择一个对象存储后端,用于存放备份的Etcd数据。Velero支持S3兼容的对象存储,例如Ceph Rados 12.2.7、Minio。

要支持持久卷的备份,必须选择一个卷快照提供者,国内仅阿里云支持。裸金属集群要使用持久卷备份功能,可以自行开发插件,或者使用Restic等通用的存储备份工具,但是性能明显比卷快照差。Restic支持多种存储后端(用来存放它生成的备份),但是Velero+Restic仅仅支持S3兼容的对象存储。

提供者列表
提供者 对象存储 卷Snapshotter 插件仓库
AWS AWS S3 AWS EBS 地址
GCP Google Cloud Storage Google Compute Engine Disks 地址
Azure Azure Blob Storage Azure Managed Disks 地址
Restic

Restic是一个开源备份工具,Velero可以与之集成,实现任何类型的K8S持久卷备份。如果你的存储后端没有对应的Velero插件,或者使用了EFS、AzureFile、NFS、emptyDir、Local PV等没有快照概念的卷,可以考虑Restic。HostPath不被支持。

自定义资源
CR 说明
ResticRepository

表示并管理Velero的Restic仓库的生命周期。在第一次针对某个命名空间的备份请求创建后,Velero为每个命名空间创建一个Restic仓库,此CR的控制器会调用Restic仓库的生命周期命令,例如restic init, restic check,  restic prune

调用 velero restic repo get可以获得Velero的Restic仓库的信息

PodVolumeBackup

表示一个Pod中的一个卷的Restic备份,当发现被注解的Pod后,Velero备份主进程会创建一个或多个PodVolumeBackup对象

集群中会运行一个Daemonset,这样每个节点上都会运行一个控制器,负责执行restic backup命令以备份Pod的卷数据

PodVolumeRestore

表示一个Pod中的一个卷的Restic恢复,Velero主恢复进程发现Pod关联了Restic备份后,会创建一个或多个这种CR

同样的,每个节点上运行的控制器负责执行restic restoure恢复本机Pod的卷

限制条件
  1. 要求K8S 1.10以上支持的挂载传播(MountPropagation) 特性,此特性允许同一个Pod之间的容器共享同一个卷
  2. 不支持HostPath类型的卷
  3. Restic会加密所有备份数据,目前Velero使用一个静态的、通用的密钥。这意味着潜在的安全风险,所有能够访问你的OSS桶的人都能够解密数据。未来Velero会提供更加完善的安全支持
  4. 目前Velero基于Pod的名称来关联备份,这意味着Deployment的Pod删除重建后,会产生一个全新的,而非增量的备份
  5. Restic使用单线程扫描所有文件,如果需要备份的文件很大,例如数据库文件,扫描并去重的过程会很慢,即使实际差异很小
备份

要备份K8S资源或卷的内容,您需要使用自定义资源Backup,针对该资源的有效操作是创建、删除,修改没有意义。

关于K8S资源的备份,需要注意以下几点:

  1. 正在被删除的资源不会包含在备份中
  2. 您可以为任何资源添加标签exclude-from-backup,以禁止对它进行备份
  3. 在配置Backup资源时,可以通过命名空间、资源类型、标签指定过滤器,不匹配的资源不会包含在备份中

关于K8S卷的备份,需要注意以下几点:

  1. 卷备份基于Restic实现,它的工作方式是找到Pod卷的挂载目录,并将其内容复制出来
  2. 卷备份是Pod备份的附加项。而持久卷还有另外一种备份机制,即快照
  3. 对于访问模式为ReadWriteMany的持久卷,如果有多个Pod挂载了它,则仅仅会备份一次

非命名空间内资源的备份行为,受到includeClusterResources配置影响:

  1. true:备份集群级别资源,具体行为受标签选择器、资源类型选择器影响
  2. false:不备份集群级别资源
  3. null/unset:
    1. 如果备份包含了所有命名空间则备份所有集群级别资源
    2. 否则,仅仅当备份包含的命名空间中的资源所关联的集群级别资源包含到备份中。例如PersistentVolumeClaim关联的PersistentVolume会包含到备份中

备份流程:

  1. 主备份进程会检查每个它需要备份的Pod上的注解,如果有backup.velero.io/backup-volumes则意味着需要Restic备份
  2. Velero会确保Pod的命名空间的Restic仓库存在:
    1. 检查ResticRepository对象是否存在
    2. 如果不存在,则创建一个新的,并等待ResticRepository控制器初始化、检查
  3. Velero为每个需要备份的卷(列为上述注解的值)创建PodVolumeBackup对象
  4. 主备份进程等待PodVolumeBackup完成或失败
  5. 与此同时,每个PodVolumeBackup被对应节点的控制器处理,此控制器:
    1. 具有一个HostPath挂载点,对应宿主机 /var/lib/kubelet/pods目录,以便访问Pod卷数据
    2. 在上述HostPath下找到Pod卷的子目录
    3. 执行 restic backup
    4. 更新CR的状态为Completed或Failed
  6. 当所有PodVolumeBackup完成后,Velero主进程将这些CR添加到备份中,存放在名为BACKUPNAME-podvolumebackups.json.gz的文件中,并且上传到对象存储,Restic备份的Tar包同样会存放在对象存储中

操作步骤:

  1. 为Pod添加注解,指明哪些卷需要备份:
    Shell
    1
    2
    kubectl -n YOUR_POD_NAMESPACE annotate pod/YOUR_POD_NAME \
        backup.velero.io/backup-volumes=YOUR_VOLUME_NAME_1,YOUR_VOLUME_NAME_2,...
  2. 然后,创建一个Velero备份CR: velero backup create NAME OPTIONS... 
  3. 当备份完成后,查看其信息: velero backup describe YOUR_BACKUP_NAME 
  4. 获取卷备份对象:
    Shell
    1
    kubectl -n velero get podvolumebackups -l velero.io/backup-name=YOUR_BACKUP_NAME -o yaml
恢复

恢复流程:

  1. 主Velero恢复进程检查所有PodVolumeBackup资源
  2. 对于每个PodVolumeBackup,Velero首先保证Restic仓库存在:
    1. 检查ResticRepository资源是否存在于目标命名空间
    2. 如果不存在,则创建之,并且等待ResticRepository控制器完成Restic仓库的初始化和检查。在恢复时,真实的Restic仓库应该已经存在于对象存储中,因此实际上仅仅是检查其完整性
  3. Velero为Pod添加初始化容器,其任务是等待此Pod所有卷恢复完成
  4. Velero将添加了初始化容器的Pod提交给K8S
  5. 对于每个需要恢复的卷,创建PodVolumeRestore
  6. Velero主进程等待每个PodVolumeRestore完成或失败
  7. 与此同时,每个PodVolumeRestore会被恰当节点上的控制器处理,该控制器:
    1. 通过HostPath挂载 /var/lib/kubelet/pods,以便访问Pod卷数据
    2. 等待Pod运行Init容器
    3. 找到Init容器的卷子目录,这些卷和主容器是共享的
    4. 运行 restic restore
    5. 如果恢复成,则在卷的.velero子目录中写入一个文件,文件名为当前Velero Restore的UID
    6. 更新PodVolumeRestore的状态为Completed或Failed
  8. 初始化容器等待,直到发现所有相关的卷的根目录下的.velero内有文件写入,其UID为本次Restore的UID。初始化容器退出,主容器开始运行

在执行Restic恢复时,Velero使用一个助手init容器。其镜像默认为gcr.io/heptio-images/velero-restic-restore-helper:VERSION,其中VERSION和Velero的版本一致。如果需要使用定制的镜像,可以在Velero的命名空间创建一个ConfigMap:

YAML
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
apiVersion: v1
kind: ConfigMap
metadata:
  # 名称无所谓,基于标签找到此ConfigMap
  name: restic-restore-action-config
  namespace: velero
  labels:
    # 下面的标签用于识别此ConfigMap是某个插件的配置信息
    velero.io/plugin-config: ""
    # 下面的标签说明插件的名称和类型
    velero.io/restic: RestoreItemAction
data:
  image: myregistry.io/my-custom-helper-image[:OPTIONAL_TAG]
  cpuRequest: 200m
  memRequest: 128Mi
  cpuLimit: 200m
  memLimit: 128Mi
安装
通过命令行

首先到https://github.com/heptio/velero/releases/tag/v1.1.0下载最新版本的客户端,解压放到$PATH下。

然后,使用如下命令安装:

Shell
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
velero install \
    # 备份和卷存储的提供者名称
    --provider <YOUR_PROVIDER> \
    # 对象存储桶的名字
    --bucket <YOUR_BUCKET> \
    # Velero的IAM帐户的凭证文件,如果不支持,使用 --no-secret
    --secret-file <PATH_TO_FILE> \
    --velero-pod-cpu-request <CPU_REQUEST> \
    --velero-pod-mem-request <MEMORY_REQUEST> \
    --velero-pod-cpu-limit <CPU_LIMIT> \
    --velero-pod-mem-limit <MEMORY_LIMIT> \
    # 启用Restic集成
    [--use-restic] \
    [--restic-pod-cpu-request <CPU_REQUEST>] \
    [--restic-pod-mem-request <MEMORY_REQUEST>] \
    [--restic-pod-cpu-limit <CPU_LIMIT>] \
    [--restic-pod-mem-limit <MEMORY_LIMIT>]

下面是一个基于MinIO、不支持存储卷快照的例子:

Shell
1
2
3
4
5
6
7
8
velero install \
    --provider aws \
    --bucket velero \
    --secret-file ./credentials-velero \
    --use-volume-snapshots=false \
    --backup-location-config region=minio,s3ForcePathStyle="true",s3Url=https://minio.k8s.gmem.cc \
    # 启用Restic,等待部署完成
    --use-restic --wait

其中密钥文件的格式如下:

credentials-velero
INI
1
2
3
[default]
aws_access_key_id = minio
aws_secret_access_key = minio123

要卸载Velero时,删除以下K8S资源即可:

Shell
1
2
kubectl delete namespace/velero clusterrolebinding/velero
kubectl delete crds -l component=velero
通过Helm
Shell
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
kubectl create ns velero
kubectl -n velero create sa velero-server
kubectl get secrets gmemregsecret -o yaml --export | kubectl -n velero create -f -
 
 
helm install --namespace velero --name velero --set fullnameOverride=velero \
    --set configuration.provider=aws \
    --set-file credentials.secretContents.cloud=./credentials-velero \
    --set configuration.backupStorageLocation.name=aws \
    --set configuration.backupStorageLocation.bucket=velero \
    --set configuration.backupStorageLocation.config.region=minio \
    --set configuration.backupStorageLocation.config.s3ForcePathStyle=true \
    --set configuration.backupStorageLocation.config.s3Url=https://minio.k8s.gmem.cc \
    --set image.repository=docker.gmem.cc/velero/velero \
    --set image.tag=v1.1.0 \
    --set image.pullPolicy=IfNotPresent \
    --set serviceAccount.server.name=velero-server \
    --set serviceAccount.server.create=true \
    --set snapshotsEnabled=false \
    --set deployRestic=true \
    velero
 
kubectl -n velero patch sa velero-server -p '{"imagePullSecrets": [{"name": "gmemregsecret"}]}'

要删除,执行:

Shell
1
2
3
4
5
6
7
8
9
10
11
12
13
helm delete velero --purge
 
kubectl delete crd backups.velero.io                  
kubectl delete crd backupstoragelocations.velero.io  
kubectl delete crd deletebackuprequests.velero.io    
kubectl delete crd downloadrequests.velero.io        
kubectl delete crd podvolumebackups.velero.io        
kubectl delete crd podvolumerestores.velero.io        
kubectl delete crd resticrepositories.velero.io      
kubectl delete crd restores.velero.io                
kubectl delete crd schedules.velero.io                
kubectl delete crd serverstatusrequests.velero.io    
kubectl delete crd volumesnapshotlocations.velero.io
Restic集成

即使在安装时没有启用Restic集成,后续你仍然可以随时调用 velero install --use-restic启用Restic集成。

资源限制

Velero可能产生两个Deployment,一个是Velero控制器,一个是Restic,前文包含定制它们的资源用量的参数

多个对象存储位置

你可以为备份、卷快照指定多个存储位置。但是velero install时最多指定一个备份存储位置、一个卷快照存储位置。

后续你可以使用命令 velero backup-location create、 velero snapshot-location create添加新的存储位置。

如果在安装阶段不想提供默认的备份存储位置,可以指定 --no-default-backup-location,同时不指定--bucket、--provider。

对象存储位置映射为自定义资源BackupStorageLocation。它对应了一个桶,所有Velero数据都会存放在此桶的某个前缀下。一些供应商特定的字段(例如AWS区域、Azure存储帐户)也存放在此CR中。

用户可以预先配置多个对象存储位置、卷快照存储位置,并且在创建备份的时候选择使用哪个位置。

额外的卷快照位置

Velero支持通过插件方式来集成不同的卷快照提供者,你可以用AWS S3作为对象存储,而Portworx作为卷快照。

但是velero install仅仅支持配置单个提供者,同时用于对象存储、卷快照。

为了使用不同的卷快照提供者,你需要:

  1. 指定合理的对象存储参数,安装Velero服务器组件
  2. 将卷快照提供者插件添加到Velero
  3. 添加卷快照位置
    Shell
    1
    velero snapshot-location create <NAME> --provider <PROVIDER-NAME> [--config <PROVIDER-CONFIG>] 

快照存储位置映射为自定义资源VolumeSnapshotLocation,其字段完全取决于具体供应商(例如AWS区域、Azure资源组、Portworx快照类型)。

使用
备份
Shell
1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 安装一个K8S应用
kubectl apply -f examples/nginx-app/base.yaml
 
# 备份指定命名空间
velero backup create nginx-backup --include-namespaces nginx-example
 
# 使用选择器,仅仅备份匹配标签的对象
velero backup create nginx-backup --selector app=nginx
# 反向选择器
velero backup create nginx-backup --selector 'backup notin (ignore)'
 
 
# 查看备份
velero backup get nginx-backup
定期备份
Shell
1
2
3
4
# 使用Cron表达式
velero schedule create nginx-daily --schedule="0 1 * * *" --selector app=nginx
# 每天
velero schedule create nginx-daily --schedule="@daily" --selector app=nginx 
恢复 
Shell
1
2
3
4
5
6
7
8
9
10
# 模拟灾难
kubectl delete namespaces nginx-example
# 动态分配的PV的默认回收策略是Delete,因此上述命令会导致Nginx的PV的后被存储被删除,注意此删除
# 是异步的,因此,执行下一步之前,手工确认卷已经被删除
 
# 恢复
velero restore create --from-backup nginx-backup
 
# 查看恢复状态
velero restore get
清理
Shell
1
2
# 删除备份,包括对象存储、持久卷快照
velero backup delete BACKUP_NAME

 

← Draft学习笔记
OpenID Connect →

Leave a Reply Cancel reply

Your email address will not be published. Required fields are marked *

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code class="" title="" data-url=""> <del datetime=""> <em> <i> <q cite=""> <strike> <strong> <pre class="" title="" data-url=""> <span class="" title="" data-url="">

Related Posts

  • 通过自定义资源扩展Kubernetes
  • 如何在Pod中执行宿主机上的命令
  • Flexvolume学习笔记
  • Kata Containers学习笔记
  • Cilium学习笔记

Recent Posts

  • Investigating and Solving the Issue of Failed Certificate Request with ZeroSSL and Cert-Manager
  • A Comprehensive Study of Kotlin for Java Developers
  • 背诵营笔记
  • 利用LangChain和语言模型交互
  • 享学营笔记
ABOUT ME

汪震 | Alex Wong

江苏淮安人,现居北京。目前供职于腾讯云,专注容器方向。

GitHub:gmemcc

Git:git.gmem.cc

Email:gmemjunk@gmem.cc@me.com

ABOUT GMEM

绿色记忆是我的个人网站,域名gmem.cc中G是Green的简写,MEM是Memory的简写,CC则是我的小天使彩彩名字的简写。

我在这里记录自己的工作与生活,同时和大家分享一些编程方面的知识。

GMEM HISTORY
v2.00:微风
v1.03:单车旅行
v1.02:夏日版
v1.01:未完成
v0.10:彩虹天堂
v0.01:阳光海岸
MIRROR INFO
Meta
  • Log in
  • Entries RSS
  • Comments RSS
  • WordPress.org
Recent Posts
  • Investigating and Solving the Issue of Failed Certificate Request with ZeroSSL and Cert-Manager
    In this blog post, I will walk ...
  • A Comprehensive Study of Kotlin for Java Developers
    Introduction Purpose of the Study Understanding the Mo ...
  • 背诵营笔记
    Day 1 Find Your Greatness 原文 Greatness. It’s just ...
  • 利用LangChain和语言模型交互
    LangChain是什么 从名字上可以看出来,LangChain可以用来构建自然语言处理能力的链条。它是一个库 ...
  • 享学营笔记
    Unit 1 At home Lesson 1 In the ...
  • K8S集群跨云迁移
    要将K8S集群从一个云服务商迁移到另外一个,需要解决以下问题: 各种K8S资源的迁移 工作负载所挂载的数 ...
  • Terraform快速参考
    简介 Terraform用于实现基础设施即代码(infrastructure as code)—— 通过代码( ...
  • 草缸2021
    经过四个多月的努力,我的小小荷兰景到达极致了状态。

  • 编写Kubernetes风格的APIServer
    背景 前段时间接到一个需求做一个工具,工具将在K8S中运行。需求很适合用控制器模式实现,很自然的就基于kube ...
  • 记录一次KeyDB缓慢的定位过程
    环境说明 运行环境 这个问题出现在一套搭建在虚拟机上的Kubernetes 1.18集群上。集群有三个节点: ...
  • eBPF学习笔记
    简介 BPF,即Berkeley Packet Filter,是一个古老的网络封包过滤机制。它允许从用户空间注 ...
  • IPVS模式下ClusterIP泄露宿主机端口的问题
    问题 在一个启用了IPVS模式kube-proxy的K8S集群中,运行着一个Docker Registry服务 ...
  • 念爷爷
      今天是爷爷的头七,十二月七日、阴历十月廿三中午,老人家与世长辞。   九月初,回家看望刚动完手术的爸爸,发

  • 6 杨梅坑

  • liuhuashan
    深圳人才公园的网红景点 —— 流花山

  • 1 2020年10月拈花湾

  • 内核缺陷触发的NodePort服务63秒延迟问题
    现象 我们有一个新创建的TKE 1.3.0集群,使用基于Galaxy + Flannel(VXLAN模式)的容 ...
  • Galaxy学习笔记
    简介 Galaxy是TKEStack的一个网络组件,支持为TKE集群提供Overlay/Underlay容器网 ...
TOPLINKS
  • Zitahli's blue 91 people like this
  • 梦中的婚礼 64 people like this
  • 汪静好 61 people like this
  • 那年我一岁 36 people like this
  • 为了爱 28 people like this
  • 小绿彩 26 people like this
  • 杨梅坑 6 people like this
  • 亚龙湾之旅 1 people like this
  • 汪昌博 people like this
  • 彩虹姐姐的笑脸 24 people like this
  • 2013年11月香山 10 people like this
  • 2013年7月秦皇岛 6 people like this
  • 2013年6月蓟县盘山 5 people like this
  • 2013年2月梅花山 2 people like this
  • 2013年淮阴自贡迎春灯会 3 people like this
  • 2012年镇江金山游 1 people like this
  • 2012年徽杭古道 9 people like this
  • 2011年清明节后扬州行 1 people like this
  • 2008年十一云龙公园 5 people like this
  • 2008年之秋忆 7 people like this
  • 老照片 13 people like this
  • 火一样的六月 16 people like this
  • 发黄的相片 3 people like this
  • Cesium学习笔记 90 people like this
  • IntelliJ IDEA知识集锦 59 people like this
  • 基于Kurento搭建WebRTC服务器 38 people like this
  • Bazel学习笔记 37 people like this
  • PhoneGap学习笔记 32 people like this
  • NaCl学习笔记 32 people like this
  • 使用Oracle Java Mission Control监控JVM运行状态 29 people like this
  • 基于Calico的CNI 27 people like this
  • Ceph学习笔记 27 people like this
  • Three.js学习笔记 24 people like this
Tag Cloud
ActiveMQ AspectJ CDT Ceph Chrome CNI Command Cordova Coroutine CXF Cygwin DNS Docker eBPF Eclipse ExtJS F7 FAQ Groovy Hibernate HTTP IntelliJ IO编程 IPVS JacksonJSON JMS JSON JVM K8S kernel LB libvirt Linux知识 Linux编程 LOG Maven MinGW Mock Monitoring Multimedia MVC MySQL netfs Netty Nginx NIO Node.js NoSQL Oracle PDT PHP Redis RPC Scheduler ServiceMesh SNMP Spring SSL svn Tomcat TSDB Ubuntu WebGL WebRTC WebService WebSocket wxWidgets XDebug XML XPath XRM ZooKeeper 亚龙湾 单元测试 学习笔记 实时处理 并发编程 彩姐 性能剖析 性能调优 文本处理 新特性 架构模式 系统编程 网络编程 视频监控 设计模式 远程调试 配置文件 齐塔莉
Recent Comments
  • qg on Istio中的透明代理问题
  • heao on 基于本地gRPC的Go插件系统
  • 黄豆豆 on Ginkgo学习笔记
  • cloud on OpenStack学习笔记
  • 5dragoncon on Cilium学习笔记
  • Archeb on 重温iptables
  • C/C++编程:WebSocketpp(Linux + Clion + boostAsio) – 源码巴士 on 基于C/C++的WebSocket库
  • jerbin on eBPF学习笔记
  • point on Istio中的透明代理问题
  • G on Istio中的透明代理问题
  • 绿色记忆:Go语言单元测试和仿冒 on Ginkgo学习笔记
  • point on Istio中的透明代理问题
  • 【Maven】maven插件开发实战 – IT汇 on Maven插件开发
  • chenlx on eBPF学习笔记
  • Alex on eBPF学习笔记
  • CFC4N on eBPF学习笔记
  • 李运田 on 念爷爷
  • yongman on 记录一次KeyDB缓慢的定位过程
  • Alex on Istio中的透明代理问题
  • will on Istio中的透明代理问题
  • will on Istio中的透明代理问题
  • haolipeng on 基于本地gRPC的Go插件系统
  • 吴杰 on 基于C/C++的WebSocket库
©2005-2025 Gmem.cc | Powered by WordPress | 京ICP备18007345号-2