Kubernetes故障检测和自愈
在Kubernetes日常运维过程中,会出现各种各样的问题,例如:
- 节点CNI不可用,其它节点无法连接到故障节点的Pod
- Subpath方式挂载的Configmap,特定条件下出现Pod无限重启的问题
- 集群DNS服务器无法通过上游DNS解析外部名称
- 节点假死,但是持有的Ceph RBD的Watcher不释放,导致有状态服务的Pod调度走后仍然无法启动
- 误删Etcd数据、持久卷
这些问题导致部分Pod、节点、甚至整个集群不可用,需要人工运维才能恢复。
从接收到告警到运维人员手工处理完毕,可能已经过了1小时,严重影响服务质量。但是如果能识别这些告警并将运维知识转化为代码,某些问题可能在一分钟内就被发现和解决。
本文调研商业产品和社区的集群/节点故障检测、修复技术的现状,为自研节点自愈产品提供参考。
Pod所在节点的内核、CRI运行时等出现问题,无法支持Pod的运行。针对这类故障,社区或商业的解决方案较多,例如社区的NPD项目、GKE的节点修复功能。
组件故障可以认为是节点故障的子类,只是故障来源是K8S基础组件的一部分。
K8S集群基础组件出现故障,可能导致集群或在节点的部分功能不可用。我在线上环境遇到过的故障包括:
- KubeDNS故障:6个DNS Pod中的2个出现无法解析外部DNS名称的情况。后果是大量线上业务因域名解析
- Calico CNI故障:少数几个节点的容器网络和外部断开,节点访问自身的Pod IP没有问题,但是其它节点无法访问故障节点的Pod IP。这种情况下,Pod本机的健康检查无效,导致故障实例持续存在,一定比例的业务请求失败
由于K8S生态主要依赖于开源社区,很多组件不成熟,存在缺陷,因此这类故障较为纷杂。社区没有发现知名的解决方案,主要依赖于日常运维中知识的积累,而且这些运维知识往往是环境相关、K8S版本或组件版本相关的,通用性较差。
K8S控制平面不存在单点问题 ,通常情况下,出现整个集群的故障的概率是很低的。但是,某些行业对数据安全和可用性要求极高,另外,也出现过误操作导致集群破坏的案例,集群故障恢复还是需要考虑的。
应对集群故障的主要手段就是备份和恢复,Velero可以帮助我们实现这一点。它支持K8S资源(Etcd)、持久卷的备份,通过开发插件,某些存储后端可以基于快照来备份,效率很高。
我认为节点/组件的故障,根据需要,可以从两个角度发起。
绝大部分故障检测,在节点本地进行就足够了,这样做效率高,避免不必要的网络流量。
少部分网络相关的故障,可能从节点无法检测,这就需要远程检测。
还是上面的Calico CNI故障的例子,故障节点本身访问自己的Pod IP是畅通的,然而外部却无法访问。我们的应对方案,是运行在集群中运行3个Health Check Controller,如果大部分副本判定:节点A网络畅通但是节点A上的Pod Ip却无法联通,则认定节点A的CNI出现故障,需要进行处理。
谷歌K8S引擎(Google Kubernetes Engine)提供了自动修复节点的功能。GKE会周期性的检测集群中每个节点的健康状态,如果某个节点的健康检查连续N次失败,则启动一个修复进程,对节点进行修复。
处于Ready状态的节点被认为是健康的,不健康节点可能处于以下状况:
- 连续数次健康检查,报告NotReady状态
- 在指定的时间范围内,节点没有报告任何状态
- 节点在一个指定的时间范围内,处于磁盘空间不足的状态
GKE修复节点的方法比较简单,就是Drain并重新创建。Drain操作会导致节点上的Pod被驱除。
如果多个节点需要修复,GKE可以并行的执行修复。
这是一个K8S加载项(Addon),目的是将节点故障暴露给集群管理的上层组件。NPD通常运行为DaemonSet,也可以作为独立进程运行。NPD会检测各种各样的节点问题,例如:
- 基础设施服务故障:例如NTP服务宕机
- 硬件问题:CPU、内存、磁盘故障
- 内核问题:内核死锁、文件系统损坏
- 容器运行时错误:Docker守护进程假死
并报告给APIServer,报告的主要方式包括:
- NodeCondition:当遇到永久性的节点故障,导致其不可用时,设置节点的NodeCondition
- Event:可能对Pod产生影响的临时信息
在没有引入NPD的情况下,上面的各种节点问题对于K8S集群管理上层组件不可见,因此K8S会继续向问题节点调度Pod。
Problem Daemon(在代码内部也叫Monitor)是NPD的子守护进程,每个PD监控一个特定类型的节点故障,并报告给NPD。目前PD以Goroutine的形式运行在NPD中,未来会支持在独立进程(容器)中运行并编排为一个Pod。在编译期间,可以通过相应的标记禁用每一类PD。
目前可用的PD包括:
PD | NodeCondition | 说明 | ||||
KernelMonitor | KernelDeadlock |
监控内核日志,根据预定义规则来报告问题、指标 使用标记禁用:disable_system_log_monitor KernelMonitor、AbrtAdaptor都属于System Log Monitor,只是使用的配置文件不同。 SLM支持基于文件的日志、Journald、kmsg。要监控其它日志,需要实现LogWatcher接口 对于临时问题,SLM暴露counter类的指标,示例:
对于永久问题,同时报告为gauge、counter:
|
||||
AbrtAdaptor | 无 |
监控ABRT(Automatic Bug Report Tool)日志并报告。ABRT是一个健康监控守护进程,能够捕获内核问题、各种原因导致的应用崩溃 使用标记禁用:disable_system_log_monitor |
||||
CustomPluginMonitor | 依用户配置 |
通过调用用户配置的脚本来检测各种节点问题 脚本退出码:
脚本输出应该小于80字节,避免给Etcd的存储造成压力 使用标记禁用:disable_custom_plugin_monitor |
||||
SystemStatsMonitor | 暂无 |
将各种健康相关的统计信息报告为Metrics 目前支持的组件仅仅有主机信息、磁盘: disk/io_time 设备队列非空时间,毫秒 使用标记禁用:disable_system_stats_monitor |
NPD提供了若干Exporter组件,能够将节点问题、指标报告给后端:
Exporter | |
Kubernetes Exporter | 暴露临时问题为Event、永久问题为NodeCondition |
Prometheus Exporter | 暴露节点问题、指标为Prometheus metrics |
Stackdriver Exporter |
暴露节点问题、指标给Stackdriver监控API 使用标记禁用:disable_stackdriver_exporter |
故障节点上的事件,会记录在宿主机的某些日志中。这些日志(例如内核日志)中噪音信息太多,NPD会提取其中有价值的信息,记录到自己的Pod日志中。你可以通过EFK收集这些信息,NPD也可以将这些信息报送给Prometheus。
基于NPD的的节点自愈流程如下:
- NPD为故障节点添加额外的Condition元数据
- Cordon并Drain故障节点
- 利用cluster-autoscaler进行集群扩容,补充节点
这个流程本质上是替换,而不是治愈节点。在裸金属K8S集群中,由于缺乏基础设施的支撑,自动扩充节点可能无法实现,只能通过更加精细的自动化运维,治愈节点的异常状态。
以CNI故障为例,可能的治愈流程如下:
- 查询运维知识库,如果找到匹配项,执行对应的运维动作
- 如果上述步骤无效,尝试删除节点上负责CNI的Pod,以重置节点的路由、Iptables配置
- 如果上述步骤无效,尝试重启容器运行时
- 告警,要求运维人员介入
NPD使用Go modules管理依赖,因此构建它需要Go SDK 1.11+:
1 2 3 4 5 6 7 8 9 10 11 12 13 |
cd $GOPATH/src/k8s.io go get k8s.io/node-problem-detector cd node-problem-detector export GO111MODULE=on go mod vendor # 设置构建标记 export BUILD_TAGS="disable_custom_plugin_monitor disable_system_stats_monitor" # 在Ubuntu 14.04上需要安装 sudo apt install libsystemd-journal-dev make all |
可以通过Helm安装:
1 |
helm install stable/node-problem-detector |
依据宿主机操作系统的不同,你可能需要修改挂载为HostPath的宿主机日志目录(默认/var/log/)、内核消息目录(默认/dev/kmsg)的路径。
各PD的配置,均放在ConfigMap中,按需修改。
参数 | 说明 | ||||
--hostname-override | 覆盖NPD更新Condition、Evnet时使用的主机节点名,如果不指定,依次尝试NODE_NAME环境变量、os.Hostname | ||||
--config.system-log-monitor | PD AbrtAdaptor的配置文件路径,逗号分隔,示例:
对于每个配置,NPD会启动一个独立的日志监控线程 |
||||
--config.system-stats-monitor | PD SystemStatsMonitor的配置文件路径,逗号分隔,示例:
对于每个配置,NPD会启动一个独立的统计信息监控线程 |
||||
--config.custom-plugin-monitor | PD CustomPluginMonitor的配置文件路径,逗号分隔,示例:
插件的逻辑编写在脚本中:
对于每个配置,NPD会启动一个独立的监控线程 |
||||
--enable-k8s-exporter | 启用Kubernetes Exporter,默认true | ||||
--apiserver-override |
覆盖报告到的API Server的地址,格式和Heapster的source标记相同 如果以Standalone方式运行NPD,需要设置inClusterConfig为false:
|
||||
--address | NPD服务器的绑定地址 | ||||
--port | NPD服务器的绑定端口,设置为0禁用 | ||||
--prometheus-address |
Prometheus Export的监听地址,默认127.0.0.1:20257 |
在NPD的术语中,治愈系统(Remedy System)是一个或一组进程,负责分析NPD检测出的问题,并且采取补救措施,让K8S集群恢复健康状态。
目前官方提及的治愈系统有只有Draino。NPD项目并没有提供对Draino的集成,你需要手工部署和配置Draino。
Draino是Planet开源的小项目,最初在Planet用于解决GCE上运行的K8S集群的持久卷相关进程(mkfs.ext4、mount等)永久卡死在不可中断睡眠状态的问题。Draino的工作方式简单粗暴,只是检测到NodeCondition并Cordon、Drain节点。
基于Label和NodeCondition自动的Drain掉故障K8S节点:
- 具有匹配标签的的K8S节点,只要进入指定的NodeCondition之一,立即禁止调度(Cordoned)
- 在禁止调度之后一段时间,节点被Drain掉
Draino可以联用Cluster Autoscaler,自动的终结掉Drained的节点。
在Descheduler项目成熟以后,可以代替Draino。
建议本地启动NPD并调试,参考下面的命令行参数:
1 2 3 |
--hostname-override=xenial-100 --apiserver-wait-timeout=10s --logtostderr --stderrthreshold=0 -v=10 \ --apiserver-override=http://k8s.gmem.cc:6444?inClusterConfig=false \ --config.system-log-monitor=/home/alex/Go/workspaces/default/src/k8s.io/node-problem-detector/config/docker-monitor-filelog.json |
NPD抽象了一系列基本的类型,目前供内部使用,这些类型比起K8S API中的对应物更加轻量:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 |
package types import ( "time" "github.com/spf13/pflag" ) // 问题的严重性,目前仅仅支持Info和Warn,和K8S事件类型对应 type Severity string const ( Info Severity = "info" // 对应K8S的Normal事件 Warn Severity = "warn" // 对应K8S的Warning事件 ) // NodeCondition的状态 type ConditionStatus string const ( // 节点处于目标状态 True ConditionStatus = "True" // 节点不处于目标状态 False ConditionStatus = "False" // 不清楚 Unknown ConditionStatus = "Unknown" ) // 建模NodeCondition type Condition struct { // NodeCondition类型,例如KernelDeadlock, OutOfResource Type string `json:"type"` // 节点是否处于此NodeCondition Status ConditionStatus `json:"status"` // 节点转换为此Condition的时间 Transition time.Time `json:"transition"` // 记录为何进入此Condition的简短原因 Reason string `json:"reason"` // 人类可读的,进入此Condition的原因 Message string `json:"message"` } // 建模Event type Event struct { // 严重性 Severity Severity `json:"severity"` // 事件发生时间 Timestamp time.Time `json:"timestamp"` // 事件的简短原因 Reason string `json:"reason"` // 人类可读的消息 Message string `json:"message"` } // PD向NPD核心报告时使用的DTO type Status struct { // PD的名称 Source string `json:"source"` // 临时的节点问题 —— 事件对象,如果此Status用于Condition更新则此字段可以为空 // 从老到新排列在数组中 Events []Event `json:"events"` // 永久的节点问题 —— NodeCondition。PD必须总是在此字段报告最新的Condition Conditions []Condition `json:"conditions"` } // 建模问题分类 type Type string const ( // 临时问题,报告为Event Temp Type = "temporary" // 永久问题,报告为NodeCondition Perm Type = "permanent" ) // PD的接口。PD根据配置的规则,监控并报告节点问题、收集Metrics type Monitor interface { // 启动此PD,将返回一个通道,NPD核心从此通道获取状态更新 // 如果此PD仅仅报告Metrics,不关注Problem,则返回的通道应该设置为nil Start() (<-chan *Status, error) // 停止此PD Stop() } // 将节点的监控状态报告给某种控制平面,例如K8S API Server,或者Prometheus type Exporter interface { // 报告问题,此方法由NPD核心调用,传递问题给Exporter,Exporter则将问题传递到NPD外部 ExportProblems(*Status) } // PD类型,每个PD类型可以启动多个实例,每个实例对应一个配置 type ProblemDaemonType string // 此映射建模所有PD的所有配置 // 1) 每个键对应一种PD // 2) 每个值的每个元素对应配置文件的路径 type ProblemDaemonConfigPathMap map[ProblemDaemonType]*[]string // 每种PD负责提供一个下面的结构的实例,其函数指针作为实例化PD的工厂 type ProblemDaemonHandler struct { CreateProblemDaemonOrDie func(string) Monitor // 描述如何从命令行实例化PD CmdOptionDescription string } // Exporter的类型 type ExporterType string // 每种Exporter负责提供一个下面的结构的实例,其函数指针作为实例化Exporter的工厂 type ExporterHandler struct { CreateExporterOrDie func(CommandLineOptions) Exporter // 描述如何从命令行实例化PD Options CommandLineOptions } // 用于注入命令行选项 type CommandLineOptions interface { SetFlags(*pflag.FlagSet) } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
func main() { // ... // 读取并验证命令行选项 // 初始化所有配置的PD,参数类型为map[ProblemDaemonType]*[]string,每种PD可提供多个配置文件 // 每种PD都有对应的ProblemDaemonHandler,调用其CreateProblemDaemonOrDie方法、传入配置文件 // 并创建Goroutine problemDaemons := problemdaemon.NewProblemDaemons(npdo.MonitorConfigPaths) // ... // 初始化所有Exporters defaultExporters := []types.Exporter{} if ke := k8sexporter.NewExporterOrDie(npdo); ke != nil { defaultExporters = append(defaultExporters, ke) } if pe := prometheusexporter.NewExporterOrDie(npdo); pe != nil { defaultExporters = append(defaultExporters, pe) } // K8S、Prometheus是内置Exporter,还可以支持可拔插的Experters。 plugableExporters := exporters.NewExporters() npdExporters := []types.Exporter{} npdExporters = append(npdExporters, defaultExporters...) npdExporters = append(npdExporters, plugableExporters...) // 初始化NPD核心并启动 p := problemdetector.NewProblemDetector(problemDaemons, npdExporters) if err := p.Run(); err != nil { glog.Fatalf("Problem detector failed with error: %v", err) } } |
NPD要求至少启用一个PD,否则NPD就没有输入,没有实际意义。具体需要初始化哪些PD,取决于你提供的命令行参数。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
func NewProblemDaemons(monitorConfigPaths types.ProblemDaemonConfigPathMap) []types.Monitor { problemDaemonMap := make(map[string]types.Monitor) // 遍历配置 for problemDaemonType, configs := range monitorConfigPaths { // 处理每个PD类型 for _, config := range *configs { if _, ok := problemDaemonMap[config]; ok { // 跳过重复配置 continue } // 为每个PD的每个配置文件创建PD实例, 调用工厂函数 problemDaemonMap[config] = handlers[problemDaemonType].CreateProblemDaemonOrDie(config) } } problemDaemons := []types.Monitor{} for _, problemDaemon := range problemDaemonMap { problemDaemons = append(problemDaemons, problemDaemon) } // 返回PD列表 return problemDaemons } |
如果启用了Kubernertes Exporter,检测到的节点问题将报告为K8S的NodeCondition和Event。启动此Exporter的代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
func NewExporterOrDie(npdo *options.NodeProblemDetectorOptions) types.Exporter { // ... // 创建一个问题客户端,此客户端能够读写当前节点的问题、事件 c := problemclient.NewClientOrDie(npdo) // 连接到K8S API Server waitForAPIServerReadyWithTimeout(c, npdo) ke := k8sExporter{ client: c, // ConditionManager利用ProblemClient,将节点状态同步给API Server conditionManager: condition.NewConditionManager(c, clock.RealClock{}), } // 启动一个HTTP服务,在端点/conditions提供NodeCondition查询功能 ke.startHTTPReporting(npdo) // 启动一个异步线程,定期同步到API Server ke.conditionManager.Start() return &ke } |
如果启用了Prometheus Exporter,则会启动一个HTTP服务器,供Prometheus Server来抓取指标:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
func NewExporterOrDie(npdo *options.NodeProblemDetectorOptions) types.Exporter { // ... addr := net.JoinHostPort(npdo.PrometheusServerAddress, strconv.Itoa(npdo.PrometheusServerPort)) // 创建Prometheus的Exporter对象,它实现server.Handler pe, err := prometheus.NewExporter(prometheus.Options{}) go func() { mux := http.NewServeMux() // 处理Exporter请求 mux.Handle("/metrics", pe) if err := http.ListenAndServe(addr, mux); err != nil { } }() // 集成OpenCensus view.RegisterExporter(pe) return &prometheusExporter{} } |
入口点的最后一步是初始化NPD核心。NPD会持有所有Monitors、Exporters:
1 2 3 4 |
type problemDetector struct { monitors []types.Monitor exporters []types.Exporter } |
NPD的外部接口很简单:
1 2 3 4 |
type ProblemDetector interface { // 运行NPD Run() error } |
我们看一下Run方法的实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
func (p *problemDetector) Run() error { // 逐个启动Monitors // 所有Monitor的输出通道 var chans []<-chan *types.Status failureCount := 0 for _, m := range p.monitors { // 启动Monitor ch, err := m.Start() if err != nil { // 失败,尝试下一个 failureCount += 1 continue } if ch != nil { // 保存输出通道 chans = append(chans, ch) } } if len(p.monitors) == failureCount { // 所有PD都启动失败,失败 return fmt.Errorf("no problem daemon is successfully setup") } // 监听所有PD的输出通道,并将其中的Status归集到单个通道ch中 ch := groupChannel(chans) glog.Info("Problem detector started") // 收集到的PD输出,必须交给Exporter进行处理,才有价值 for { select { case status := <-ch: for _, exporter := range p.exporters { exporter.ExportProblems(status) } } } } // 为每个PD的输出通道创建Goroutine // 这些Goroutine接收到PD的状态报告后,将其合并到单个通道 func groupChannel(chans []<-chan *types.Status) <-chan *types.Status { statuses := make(chan *types.Status) for _, ch := range chans { go func(c <-chan *types.Status) { for status := range c { statuses <- status } }(ch) } return statuses } |
入口点中的语句: problemdaemon.NewProblemDaemons(npdo.MonitorConfigPaths)负责初始化所有配置的PD。
对于每种PD的每个配置文件,都会调用:ProblemDaemonHandler.CreateProblemDaemonOrDie进行PD实例化:
1 2 3 4 5 6 7 8 |
problemDaemonMap[config] = handlers[problemDaemonType].CreateProblemDaemonOrDie(config) type ProblemDaemonHandler struct { // 创建PD CreateProblemDaemonOrDie func(string) Monitor // 命令行选项 CmdOptionDescription string } |
开发自己的PD时,你需要提供ProblemDaemonHandler的实例并调用problemdaemon.Register进行注册:
1 2 3 4 5 6 7 |
var ( handlers = make(map[types.ProblemDaemonType]types.ProblemDaemonHandler) ) func Register(problemDaemonType types.ProblemDaemonType, handler types.ProblemDaemonHandler) { handlers[problemDaemonType] = handler } |
同时,提供Monitor接口的实现。
入口点中的语句: plugableExporters := exporters.NewExporters()负责初始化扩展的Exporters,Stackdriver Exporter就是这样的一种扩展。
NewExporters的逻辑很简单,遍历一个集合,取出其中的ExporterHandler并实例化Exporter:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
func NewExporters() []types.Exporter { exporters := []types.Exporter{} for _, handler := range handlers { exporter := handler.CreateExporterOrDie(handler.Options) if exporter == nil { continue } exporters = append(exporters, exporter) } return exporters } // ExporterHandler是结构,不是接口。每个Exporter插件需要指定 // 一个函数指针,作为创建Exporter的工厂函数 type ExporterHandler struct { CreateExporterOrDie func(CommandLineOptions) Exporter Options CommandLineOptions } |
exporters包对外暴露了注册扩展Exporter的接口:
1 2 3 4 5 6 7 |
var ( handlers = make(map[types.ExporterType]types.ExporterHandler) ) func Register(exporterType types.ExporterType, handler types.ExporterHandler) { handlers[exporterType] = handler } |
开发自己的Exporter时,你需要调用上面的Register注册Exporter的工厂函数,同时实现Exporter接口。
NPD对指标这一概念也进行了封装,它依赖OpenCensus而不是Prometheus这样具体的实现的API。
OpenCensus是一个开源项目,对比OpenTracing,在Tracing的基础上加了Metrics功能。现在两个项目已经合并为OpenTelemetry并进入CNCF沙箱。OpenTelemetry统一了数据格式规范、SDK,推荐用Prometheus作为Metrics后端,Jaeger做Tracing后端。
所有指标如下:
1 2 3 4 5 6 7 8 |
const ( ProblemCounterID MetricID = "problem_counter" ProblemGaugeID MetricID = "problem_gauge" DiskIOTimeID MetricID = "disk/io_time" DiskWeightedIOID MetricID = "disk/weighted_io" DiskAvgQueueLenID MetricID = "disk/avg_queue_len" HostUptimeID MetricID = "host/uptime" ) |
前两个是针对所有Problem的Counter/Gauge,后面几个都是SystemStatsMonitor暴露的指标。
NPD定义了两种数据类型的指标Int64Metric、Float64Metric,前者代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 |
package metrics import ( "context" "fmt" "go.opencensus.io/stats" "go.opencensus.io/stats/view" "go.opencensus.io/tag" ) // Int64MetricRepresentation 表示一个int64类型的指标的快照值 type Int64MetricRepresentation struct { // 指标名 Name string // 指标标签集 Labels map[string]string // 指标的顺时值 Value int64 } // Int64Metric 表示一个int64类型的指标 type Int64Metric struct { // 指标名 name string // OpenCensus中的概念,本质上就是一个描述符 measure *stats.Int64Measure } // 工厂函数,创建指标 func NewInt64Metric(metricID MetricID, viewName string, description string, unit string, aggregation Aggregation, tagNames []string) (*Int64Metric, error) { // OpenCensus中的View if viewName == "" { return nil, nil } // 建立指标ID和视图名的对应关系 MetricMap.AddMapping(metricID, viewName) // 将标签名转换为OpenCensus的Tag键 tagKeys, err := getTagKeysFromNames(tagNames) // 将NPD的Aggregation转换为OpenCensus的Aggregation // OpenCensus中的Aggregation表示聚合值的方法 var aggregationMethod *view.Aggregation switch aggregation { case LastValue: aggregationMethod = view.LastValue() // 仅仅报告最后记录的值 case Sum: aggregationMethod = view.Sum() // 对所有收集的值进行求和 default: return nil, fmt.Errorf("unknown aggregation option %q", aggregation) } // 创建Int64Measure度量 measure := stats.Int64(viewName, description, unit) // 创建上述度量的视图 newView := &view.View{ Name: viewName, Measure: measure, Description: description, Aggregation: aggregationMethod, TagKeys: tagKeys, } // 注册此度量的描述符measureDescriptor view.Register(newView) // 返回NPD的封装 metric := Int64Metric{viewName, measure} return &metric, nil } // 为指标记录一个度量,并使用提供的Tag作为指标标签 func (metric *Int64Metric) Record(tags map[string]string, measurement int64) error { // Mutator能够对tag map 进行变换 var mutators []tag.Mutator tagMapMutex.RLock() defer tagMapMutex.RUnlock() for tagName, tagValue := range tags { tagKey, ok := tagMap[tagName] if !ok { return fmt.Errorf("referencing none existing tag %q in metric %q", tagName, metric.name) } // 添加这样的Mutator,如果tagKey存在则更新,否则插入 mutators = append(mutators, tag.Upsert(tagKey, tagValue)) } // RecordWithTags能够一次性记录一个或多个度量值 return stats.RecordWithTags( context.Background(), // 提供Tag mutators, // 调用*stats.Int64Measure的M方法,可以创建一个Measurement // Measurement的本质是一个值,同时包含Int64Measure及其measureDescriptor的引用 metric.measure.M(measurement)) } |
OpenCensus中各种概念比较繁琐,讲清楚需要独立开一篇文章。这里牵涉到的有:
- tag.Mutator:这个接口负责为度量值生成标签(名、值对)
- stats.Measure:度量,此接口表示一个指标,和Prometheus中的Metric对应。度量只具有名称、描述、单位三个属性,不包含标签,或者值。每种度量都提供了方法来创建度量值,例如Int64Measure.M方法将int64转换为Measurement。度量对外不可见,要将度量值导出,必须使用视图。如果没有为Measure定义视图,则记录Measure的成本非常低
- stats.Measurement:度量值,此接口表示Measure的一个具体的采集值。
- view.View:视图,用于聚合、对外展示已经记录的度量值。视图具有唯一性的名称、关联唯一的度量、具有确定的标签名集合,以及一个确定的聚合函数
如果从上面的stats.RecordWithTags调用跟踪下去,可以看到OpenCensus最终仅仅会调用一个 internal.DefaultRecorder这个函数:
1 2 3 4 5 6 7 8 9 |
func record(tags *tag.Map, ms interface{}, attachments map[string]interface{}) { req := &recordReq{ tm: tags, ms: ms.([]stats.Measurement), attachments: attachments, t: time.Now(), } defaultWorker.c <- req } |
可以看到此函数构建一个记录请求,并从通道发出。接收此请求并处理的Goroutine如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
func (w *worker) start() { // 全局的Metrics生产者管理器 prodMgr := metricproducer.GlobalManager() // 注册自己 prodMgr.AddProducer(w) for { select { case cmd := <-w.c: // 处理记录请求 cmd.handleCommand(w) case <-w.timer.C: w.reportUsage(time.Now()) case <-w.quit: w.timer.Stop() close(w.c) w.done <- true return } } } func (cmd *recordReq) handleCommand(w *worker) { // Worker锁 w.mu.Lock() defer w.mu.Unlock() // 一个请求中可以具有多个stats.Measurement for _, m := range cmd.ms { if (m == stats.Measurement{}) { continue } // 获取度量值的度量的所有视图 ref := w.getMeasureRef(m.Measure().Name()) for v := range ref.views { // 向所有视图添加此度量,内部会调用aggregator.addSample v.addSample(cmd.tm, m.Value(), cmd.attachments, time.Now()) } } } |
最终,记录的、聚合后的指标值,就是放在View中的。
那么,外部怎么访问这些值?其实,你打开http://127.0.0.1:20257/metrics可以看到,Prometheus的Exporter已经获取到OpenCensus记录的数据了。那么Prometheus和OpenCensus是如何配合的呢?
Prometheus在处理/metrics请求时,会调用prometheus.Gatherer,此接口的实现是prometheus.Registry。在NPD启动期间,它会创建一个Registry:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
func NewExporter(o Options) (*Exporter, error) { if o.Registry == nil { // 创建Prometheus注册表 o.Registry = prometheus.NewRegistry() } // 创建指标收集器 collector := newCollector(o, o.Registry) // ... } func newCollector(opts Options, registrar *prometheus.Registry) *collector { return &collector{ reg: registrar, opts: opts, // 通过此Reader读取OpenCensus采集的数据 reader: metricexport.NewReader()} } |
通过上述代码可以看到Prometheus如何和OpenCensus集成的,它们之间的接口是metricexport.Reader,此接口是OpenCensus提供的,Prometheus依赖于此接口。
metricexport.NewReader()调用创建的Reader具有读取OpenCensus采集的数据的能力。
用于从外部控制协程的生命周期, 它的逻辑很简单,准备结束生命周期时:
- 外部协作者发起一个通知
- 协作线程接收到通知,进行清理
- 清理完成后,协程反向通知外部协作者
- 外部协作者退出阻塞
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
package tomb type Tomb struct { stop chan struct{} // 当生命周期结束时,外部关闭此通道,以通知协程 done chan struct{} // 当协程完成清理后,关闭此通道,以通知Stop的调用者 } func NewTomb() *Tomb { return &Tomb{ stop: make(chan struct{}), done: make(chan struct{}), } } // 从外部(另外一个Goroutine)进行阻塞性的关闭操作 func (t *Tomb) Stop() { close(t.stop) <-t.done } // 简单的返回Stop通道,如果已经通知关闭,则此读取此通道不会阻塞 func (t *Tomb) Stopping() <-chan struct{} { return t.stop } // 协程内部负责调用此函数,反向通知协作者,告诉它清理工作已经完成 func (t *Tomb) Done() { close(t.done) } |
此PD能够分析各种形式的日志,读取其内容,使用正则式匹配来发现节点故障。主要代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 |
package systemlogmonitor import ( "encoding/json" "io/ioutil" "time" "github.com/golang/glog" "k8s.io/node-problem-detector/pkg/problemdaemon" "k8s.io/node-problem-detector/pkg/problemmetrics" "k8s.io/node-problem-detector/pkg/systemlogmonitor/logwatchers" watchertypes "k8s.io/node-problem-detector/pkg/systemlogmonitor/logwatchers/types" logtypes "k8s.io/node-problem-detector/pkg/systemlogmonitor/types" systemlogtypes "k8s.io/node-problem-detector/pkg/systemlogmonitor/types" "k8s.io/node-problem-detector/pkg/types" "k8s.io/node-problem-detector/pkg/util" "k8s.io/node-problem-detector/pkg/util/tomb" ) const SystemLogMonitorName = "system-log-monitor" // 初始化函数用于注册此PD的工厂函数 func init() { problemdaemon.Register( SystemLogMonitorName, types.ProblemDaemonHandler{ CreateProblemDaemonOrDie: NewLogMonitorOrDie, CmdOptionDescription: "Set to config file paths."}) } type logMonitor struct { // 配置文件路径 configPath string // 读取日志的逻辑委托给LogWatcher,这里解耦的目的是支持多种类型的日志 watcher watchertypes.LogWatcher // 日志缓冲,读取的日志在此等待处理 buffer LogBuffer // 对应配置文件中的字段 config MonitorConfig // 对应配置文件中的conditions字段 conditions []types.Condition // 输入日志条目的通道 logCh <-chan *logtypes.Log // 输出状态的通道 output chan *types.Status // 墓碑,用于控制此Monitor的生命周期 tomb *tomb.Tomb } // 创建实例,如果失败则panic func NewLogMonitorOrDie(configPath string) types.Monitor { // 创建实例 l := &logMonitor{ configPath: configPath, tomb: tomb.NewTomb(), } // 读取配置文件 f, err := ioutil.ReadFile(configPath) // 作为JSON解析为MonitorConfig err = json.Unmarshal(f, &l.config) // 设置MonitorConfig的默认值 (&l.config).ApplyDefaultConfiguration() err = l.config.ValidateRules() // 创建LogWatcher l.watcher = logwatchers.GetLogWatcherOrDie(l.config.WatcherConfig) // 设置缓冲区 l.buffer = NewLogBuffer(l.config.BufferSize) // 写死的最大通道容量 l.output = make(chan *types.Status, 1000) // 如果启用指标报告 if *l.config.EnableMetricsReporting { // 则为所有类型的Problem(Rule.Reason,比NodeCondition更细粒度)初始化指标 // Perm类型的初始化一个Gauge指标,一个Counter指标 // Temp类型的仅仅初始化一个Counter指标 initializeProblemMetricsOrDie(l.config.Rules) } return l } // 初始化指标 func initializeProblemMetricsOrDie(rules []systemlogtypes.Rule) { for _, rule := range rules { if rule.Type == types.Perm { err := problemmetrics.GlobalProblemMetricsManager.SetProblemGauge(rule.Condition, rule.Reason, false) } err := problemmetrics.GlobalProblemMetricsManager.IncrementProblemCounter(rule.Reason, 0) } } // 启动 func (l *logMonitor) Start() (<-chan *types.Status, error) { var err error // 启动LogWatcher,监控日志的变化 l.logCh, err = l.watcher.Watch() if err != nil { return nil, err } // 启动主循环 go l.monitorLoop() return l.output, nil } // 停止 func (l *logMonitor) Stop() { // 关闭Stop通道,然后等待Done通道完成 l.tomb.Stop() } // 主循环 func (l *logMonitor) monitorLoop() { // 主循环退出后,接触Stop()调用者的阻塞 defer l.tomb.Done() // 初始化状态(Event和Condition) l.initializeStatus() // 循环 for { select { // 日志可用,解析日志 case log := <-l.logCh: l.parseLog(log) // Stop通道可读,意味着通知关闭了 case <-l.tomb.Stopping(): // 关闭LogWatcher l.watcher.Stop() glog.Infof("Log monitor stopped: %s", l.configPath) return } } } // 解析日志行 func (l *logMonitor) parseLog(log *logtypes.Log) { // 一旦新日志行可用,就将其推送到日志缓冲 l.buffer.Push(log) for _, rule := range l.config.Rules { // 然后逐个规则去匹配 matched := l.buffer.Match(rule.Pattern) if len(matched) == 0 { continue } // 如果匹配规则,则报告规则中声明的状态 status := l.generateStatus(matched, rule) glog.Infof("New status generated: %+v", status) // 输出状态 l.output <- status } } // 从日志生成Status func (l *logMonitor) generateStatus(logs []*logtypes.Log, rule systemlogtypes.Rule) *types.Status { // 第一行日志的时间戳作为状态的时间戳 timestamp := logs[0].Timestamp // 读取日志内容 message := generateMessage(logs) var events []types.Event var changedConditions []*types.Condition if rule.Type == types.Temp { // 对于临时问题,仅仅生成事件 events = append(events, types.Event{ Severity: types.Warn, Timestamp: timestamp, Reason: rule.Reason, Message: message, }) } else { // 对于永久问题,改变Condition for i := range l.conditions { condition := &l.conditions[i] // 找到匹配的、此Monitor定义的Condition if condition.Type == rule.Condition { // 如果Condition改变(Status或Reson字段变了) if condition.Status == types.False || condition.Reason != rule.Reason { // 则更新时间戳和消息 condition.Transition = timestamp condition.Message = message // 并发布事件 events = append(events, util.GenerateConditionChangeEvent( condition.Type, types.True, rule.Reason, timestamp, )) } condition.Status = types.True condition.Reason = rule.Reason changedConditions = append(changedConditions, condition) break } } } // 报告Problem数量指标 if *l.config.EnableMetricsReporting { for _, event := range events { err := problemmetrics.GlobalProblemMetricsManager.IncrementProblemCounter(event.Reason, 1) } for _, condition := range changedConditions { err := problemmetrics.GlobalProblemMetricsManager.SetProblemGauge( condition.Type, condition.Reason, condition.Status == types.True) } } } // 处于性能考虑,应该聚合Event、Condition,周期性的报告,而非这样立即报告 return &types.Status{ Source: l.config.Source, Events: events, Conditions: l.conditions, } } // 初始化状态并报告一次 func (l *logMonitor) initializeStatus() { // 初始化默认Condition,来自配置的condition字段 l.conditions = initialConditions(l.config.DefaultConditions) l.output <- &types.Status{ Source: l.config.Source, Conditions: l.conditions, } } func initialConditions(defaults []types.Condition) []types.Condition { conditions := make([]types.Condition, len(defaults)) copy(conditions, defaults) for i := range conditions { conditions[i].Status = types.False conditions[i].Transition = time.Now() } return conditions } func generateMessage(logs []*logtypes.Log) string { messages := []string{} for _, log := range logs { messages = append(messages, log.Message) } return concatLogs(messages) } |
和配置相关的代码如下,MonitorConfig嵌入WatcherConfig,正好和配置文件结构对应:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
// node-problem-detector/pkg/systemlogmonitor/config.go type MonitorConfig struct { // LogWatcher(LogMonitor插件)的配置 watchertypes.WatcherConfig // 缓冲(行数)大小 BufferSize int `json:"bufferSize"` // 此PD的名称 Source string `json:"source"` // 此PD处理的所有Condition的默认状态 DefaultConditions []types.Condition `json:"conditions"` // 日志匹配规则列表 Rules []systemlogtypes.Rule `json:"rules"` // 是否将Problem报告为指标 EnableMetricsReporting *bool `json:"metricsReporting,omitempty"` } // node-problem-detector/pkg/systemlogmonitor/logwatchers/types/log_watcher.go type WatcherConfig struct { // 插件类型,可选 filelog, journald, kmsg Plugin string `json:"plugin,omitempty"` // 键值对形式的插件配置,具体可以包含哪些配置项,取决于插件 PluginConfig map[string]string `json:"pluginConfig,omitempty"` // 日志的路径 LogPath string `json:"logPath,omitempty"` // 向当前时间点往前看多久日志 Lookback string `json:"lookback,omitempty"` // 仅仅查看节点启动之后多久的日志,可以避免启动期间的不稳定状态触发不必要的问题报告 Delay string `json:"delay,omitempty"` } |
LogWatcher的实现有多种,它们具有统一的接口:
1 2 3 4 5 6 7 8 9 |
type LogWatcher interface { // 开始监控日志,并通过通道输出日志 Watch() (<-chan *types.Log, error) // 停止,注意释放打开的资源 Stop() } // LogWatcher工厂函数 type WatcherCreateFunc func(WatcherConfig) LogWatcher |
类似于Monitor、Exporter,LogWatcher也需要注册:
1 2 3 4 5 6 7 8 9 10 11 12 13 |
// 注册表 var createFuncs = map[string]types.WatcherCreateFunc{} // 注册函数,比较奇葩的是名称没有导出,因此各种LogWatcher的注册均是在logwatchers包中进行的(而非各LogWatcher自己的包) func registerLogWatcher(name string, create types.WatcherCreateFunc) { createFuncs[name] = create } // 根据config.Plugin字段来查找注册表,获取LogWatcher的工厂函数 func GetLogWatcherOrDie(config types.WatcherConfig) types.LogWatcher { create, ok := createFuncs[config.Plugin] return create(config) } |
我们不去逐个分析LogWatcher的实现,仅以kernelLogWatcher(ksmg)为例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 |
package kmsg import ( "fmt" "strings" "time" utilclock "code.cloudfoundry.org/clock" "github.com/euank/go-kmsg-parser/kmsgparser" "github.com/golang/glog" "k8s.io/node-problem-detector/pkg/systemlogmonitor/logwatchers/types" logtypes "k8s.io/node-problem-detector/pkg/systemlogmonitor/types" "k8s.io/node-problem-detector/pkg/util" "k8s.io/node-problem-detector/pkg/util/tomb" ) type kernelLogWatcher struct { cfg types.WatcherConfig startTime time.Time logCh chan *logtypes.Log tomb *tomb.Tomb kmsgParser kmsgparser.Parser clock utilclock.Clock } // 工厂函数 func NewKmsgWatcher(cfg types.WatcherConfig) types.LogWatcher { // 获取系统启动到现在过了多久 uptime, err := util.GetUptimeDuration() // 判断何时才应该开始监控日志 startTime, err := util.GetStartTime(time.Now(), uptime, cfg.Lookback, cfg.Delay) return &kernelLogWatcher{ cfg: cfg, startTime: startTime, tomb: tomb.NewTomb(), logCh: make(chan *logtypes.Log, 100), clock: utilclock.NewClock(), } } // 确保签名匹配 var _ types.WatcherCreateFunc = NewKmsgWatcher // 开始监控 func (k *kernelLogWatcher) Watch() (<-chan *logtypes.Log, error) { if k.kmsgParser == nil { // 初始化内核日志解析器 parser, err := kmsgparser.NewParser() k.kmsgParser = parser } // 异步启动主监控循环 go k.watchLoop() return k.logCh, nil } // 停止监控 func (k *kernelLogWatcher) Stop() { // 停止解析器 k.kmsgParser.Close() // 发起停止信号,并等待主监控循环的通知 k.tomb.Stop() } // 主监控循环 func (k *kernelLogWatcher) watchLoop() { // 退出时关闭输出通道,并且通过Tomb告知清理结束,Stop方法可以返回了 defer func() { close(k.logCh) k.tomb.Done() }() // go-kmsg-parser项目提供的功能,获得一个可读通道,从中可以读取到内核消息 kmsgs := k.kmsgParser.Parse() for { select { // 停止信号,清理 case <-k.tomb.Stopping(): // 关闭内核消息解析器 if err := k.kmsgParser.Close(); err != nil { } return // 获取内核消息 case msg := <-kmsgs: // 跳过空消息 if msg.Message == "" { continue } // 对于过早的消息,丢弃 if msg.Timestamp.Before(k.startTime) { continue } // 输出消息 k.logCh <- &logtypes.Log{ Message: strings.TrimSpace(msg.Message), Timestamp: msg.Timestamp, } } } } |
此PD仅仅产生Metrics,而不报告Problem,因此其Start方法返回nil。它报告指标时调用的是OpenCensus的API。
需要注意的是,NPD的Exporter是针对Problem的,Monitor可以产生,也可以不产生Problem(Status对象)。不产生Problem的Monitor和Exporter直接关系。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 |
const SystemStatsMonitorName = "system-stats-monitor" // 注册 func init() { problemdaemon.Register(SystemStatsMonitorName, types.ProblemDaemonHandler{ CreateProblemDaemonOrDie: NewSystemStatsMonitorOrDie, CmdOptionDescription: "Set to config file paths."}) } type systemStatsMonitor struct { // 配置文件路径 configPath string // 从文件中读取到的配置 config ssmtypes.SystemStatsConfig // 统计信息收集器,目前仅仅支持磁盘、主机信息 diskCollector *diskCollector hostCollector *hostCollector // 生命周期控制 tomb *tomb.Tomb } // 工厂 func NewSystemStatsMonitorOrDie(configPath string) types.Monitor { ssm := systemStatsMonitor{ configPath: configPath, tomb: tomb.NewTomb(), } // 读取、应用、验证配置 f, err := ioutil.ReadFile(configPath) err = json.Unmarshal(f, &ssm.config) err = ssm.config.ApplyConfiguration() err = ssm.config.Validate() // 按需创建收集器 if len(ssm.config.DiskConfig.MetricsConfigs) > 0 { ssm.diskCollector = NewDiskCollectorOrDie(&ssm.config.DiskConfig) } if len(ssm.config.HostConfig.MetricsConfigs) > 0 { ssm.hostCollector = NewHostCollectorOrDie(&ssm.config.HostConfig) } return &ssm } // 异步启动主循环 func (ssm *systemStatsMonitor) Start() (<-chan *types.Status, error) { go ssm.monitorLoop() return nil, nil } func (ssm *systemStatsMonitor) monitorLoop() { // 通知Stop()调用者 defer ssm.tomb.Done() // 定时器 runTicker := time.NewTicker(ssm.config.InvokeInterval) defer runTicker.Stop() // 立即进行一次采集 select { case <-ssm.tomb.Stopping(): return default: ssm.diskCollector.collect() ssm.hostCollector.collect() } // 定时采集 for { select { case <-runTicker.C: ssm.diskCollector.collect() ssm.hostCollector.collect() case <-ssm.tomb.Stopping(): return } } } func (ssm *systemStatsMonitor) Stop() { ssm.tomb.Stop() } |
可以看到,此PD只是定期调用收集器的collect方法,并且不从此方法获取任何信息。
目前Collector有两个,以DisCollector为例,我们看一下它将收集的指标输出到何处了:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 |
package systemstatsmonitor import ( "context" "os/exec" "strings" "time" "github.com/golang/glog" "github.com/shirou/gopsutil/disk" ssmtypes "k8s.io/node-problem-detector/pkg/systemstatsmonitor/types" "k8s.io/node-problem-detector/pkg/util/metrics" ) const deviceNameLabel = "device_name" type diskCollector struct { // IO时间 mIOTime *metrics.Int64Metric // 加权IO时间,此字段上次刷新以来,消耗在IO上的毫秒数 * 进行中的IO操作数量 mWeightedIO *metrics.Int64Metric // 平均队列长度 mAvgQueueLen *metrics.Float64Metric // 配置信息 config *ssmtypes.DiskStatsConfig // IO时间历时记录 historyIOTime map[string]uint64 historyWeightedIO map[string]uint64 } // 工厂函数 func NewDiskCollectorOrDie(diskConfig *ssmtypes.DiskStatsConfig) *diskCollector { dc := diskCollector{config: diskConfig} var err error // 创建NPD封装的Metrics对象 dc.mIOTime, err = metrics.NewInt64Metric( metrics.DiskIOTimeID, // displayName作为OpenCensus View名 diskConfig.MetricsConfigs[string(metrics.DiskIOTimeID)].DisplayName, "The IO time spent on the disk", "second", // 求和聚合 metrics.Sum, []string{deviceNameLabel}) dc.mWeightedIO, err = metrics.NewInt64Metric( metrics.DiskWeightedIOID, diskConfig.MetricsConfigs[string(metrics.DiskWeightedIOID)].DisplayName, "The weighted IO on the disk", "second", metrics.Sum, []string{deviceNameLabel}) dc.mAvgQueueLen, err = metrics.NewFloat64Metric( metrics.DiskAvgQueueLenID, diskConfig.MetricsConfigs[string(metrics.DiskAvgQueueLenID)].DisplayName, "The average queue length on the disk", "second", metrics.LastValue, []string{deviceNameLabel}) dc.historyIOTime = make(map[string]uint64) dc.historyWeightedIO = make(map[string]uint64) return &dc } func (dc *diskCollector) collect() { if dc == nil { return } blks := []string{} // 列出所有磁盘 // 列出所有非Slave非Holder磁盘 if dc.config.IncludeRootBlk { blks = append(blks, listRootBlockDevices(dc.config.LsblkTimeout)...) } // 列出所有分区 if dc.config.IncludeAllAttachedBlk { blks = append(blks, listAttachedBlockDevices()...) } // 调用gopsutil,此项目能够狂平台获取各种操作系统、硬件的指标 // 总是递增 ioCountersStats, err := disk.IOCounters(blks...) if err != nil { return } // 迭代所有指标 for deviceName, ioCountersStat := range ioCountersStats { // 根据上一次度量值计算平均队列长度 lastIOTime := dc.historyIOTime[deviceName] lastWeightedIO := dc.historyWeightedIO[deviceName] dc.historyIOTime[deviceName] = ioCountersStat.IoTime dc.historyWeightedIO[deviceName] = ioCountersStat.WeightedIO // 平均队列长度 = (上次加权IO耗时 - 本次加权IO超时) / (上次IO耗时 - 本次IO耗时) // = 平均队列长度 * (上次IO耗时 - 本次IO耗时) / (上次IO耗时 - 本次IO耗时) avgQueueLen := float64(0.0) if lastIOTime != ioCountersStat.IoTime { avgQueueLen = float64(ioCountersStat.WeightedIO-lastWeightedIO) / float64(ioCountersStat.IoTime-lastIOTime) } // 为指标添加 {"device_name": deviceName} 标签 tags := map[string]string{deviceNameLabel: deviceName} // 这里录制度量时,要使用增量值,因为对应度量已经设置了聚合方法为sum if dc.mIOTime != nil { dc.mIOTime.Record(tags, int64(ioCountersStat.IoTime-lastIOTime)) } if dc.mWeightedIO != nil { dc.mWeightedIO.Record(tags, int64(ioCountersStat.WeightedIO-lastWeightedIO)) } if dc.mAvgQueueLen != nil { dc.mAvgQueueLen.Record(tags, avgQueueLen) } } } // 调用lsblk命令列出磁盘 func listRootBlockDevices(timeout time.Duration) []string { ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() // 调用命令 cmd := exec.CommandContext(ctx, "lsblk", "-d", "-n", "-o", "NAME") stdout, err := cmd.Output() return strings.Split(strings.TrimSpace(string(stdout)), "\n") } // 列出所有分区 func listAttachedBlockDevices() []string { blks := []string{} partitions, err := disk.Partitions(false) if err != nil { glog.Errorf("Failed to retrieve the list of disk partitions: %v", err) return blks } for _, partition := range partitions { blks = append(blks, partition.Device) } return blks } |
此PD为NPD提供了一种插件化机制,允许基于任何语言来编写监控脚本,只需要这些脚本遵循NPD关于退出码和标准输出的规范。
此PD定义了以下类型 / 常量:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 |
package types import ( "k8s.io/node-problem-detector/pkg/types" "time" ) type Status int // 自定义插件的返回码 const ( OK Status = 0 NonOK Status = 1 Unknown Status = 2 ) // 插件的检查结果 type Result struct { // 提供入参 Rule *CustomRule // 插件状态 ExitStatus Status // 标准输出 Message string } // 自定义规则(插件),描述CPM如何调用插件,分析调用结果 type CustomRule struct { // 报告永久还是临时问题 Type types.Type `json:"type"` // 此问题触发哪种NodeCondition,仅当永久问题才设置此字段 Condition string `json:"condition"` // 问题的简短原因,对于永久问题,通常描述NodeCondition的一个子类型 Reason string `json:"reason"` // 自定义插件(脚本)的文件路径 Path string `json:"path"` // 传递给自定义插件的参数 Args []string `json:"args"` // 自定义插件执行超时 TimeoutString *string `json:"timeout"` Timeout *time.Duration `json:"-"` } |
关于如何配置CPM,以及每个插件的代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 |
package types import ( "fmt" "os" "time" "k8s.io/node-problem-detector/pkg/types" ) // 配置参数默认值 var ( // 默认全局超时 defaultGlobalTimeout = 5 * time.Second defaultGlobalTimeoutString = defaultGlobalTimeout.String() // 默认调用间隔 defaultInvokeInterval = 30 * time.Second defaultInvokeIntervalString = defaultInvokeInterval.String() // 默认最大输出长度 defaultMaxOutputLength = 80 // 默认并发度 defaultConcurrency = 3 // 默认是否 状态消息变更导致Condition更新 defaultMessageChangeBasedConditionUpdate = false // 默认是否启用指标报告 defaultEnableMetricsReporting = true customPluginName = "custom" ) // 全局配置 type pluginGlobalConfig struct { // 所有插件被调用的间隔 InvokeIntervalString *string `json:"invoke_interval,omitempty"` // 全局插件执行超时 TimeoutString *string `json:"timeout,omitempty"` InvokeInterval *time.Duration `json:"-"` Timeout *time.Duration `json:"-"` // 最大标准输出长度 MaxOutputLength *int `json:"max_output_length,omitempty"` // 并发度 Concurrency *int `json:"concurrency,omitempty"` // 状态消息变更是否导致Condition更新 EnableMessageChangeBasedConditionUpdate *bool `json:"enable_message_change_based_condition_update,omitempty"` } // 此PD的配置,结构上对应配置文件 type CustomPluginConfig struct { // PD类型,必须为custom Plugin string `json:"plugin,omitempty"` // 全局配置 PluginGlobalConfig pluginGlobalConfig `json:"pluginConfig,omitempty"` // 源名称 Source string `json:"source"` // CPM需要处理的所有Condition的默认状态 DefaultConditions []types.Condition `json:"conditions"` // 需要解析和执行的插件列表 Rules []*CustomRule `json:"rules"` // 状态消息变更是否导致Condition更新 EnableMetricsReporting *bool `json:"metricsReporting,omitempty"` } |
CPM的核心代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 |
package custompluginmonitor import ( "encoding/json" "io/ioutil" "time" "github.com/golang/glog" "k8s.io/node-problem-detector/pkg/custompluginmonitor/plugin" cpmtypes "k8s.io/node-problem-detector/pkg/custompluginmonitor/types" "k8s.io/node-problem-detector/pkg/problemdaemon" "k8s.io/node-problem-detector/pkg/problemmetrics" "k8s.io/node-problem-detector/pkg/types" "k8s.io/node-problem-detector/pkg/util" "k8s.io/node-problem-detector/pkg/util/tomb" ) // 此PD的名称 const CustomPluginMonitorName = "custom-plugin-monitor" func init() { problemdaemon.Register( CustomPluginMonitorName, types.ProblemDaemonHandler{ CreateProblemDaemonOrDie: NewCustomPluginMonitorOrDie, CmdOptionDescription: "Set to config file paths."}) } // CPM type customPluginMonitor struct { configPath string config cpmtypes.CustomPluginConfig conditions []types.Condition // 规则执行插件 plugin *plugin.Plugin // 插件执行结果的读通道 resultChan <-chan cpmtypes.Result // 向NPM报送状态的写通道 statusChan chan *types.Status tomb *tomb.Tomb } // 工厂函数 func NewCustomPluginMonitorOrDie(configPath string) types.Monitor { c := &customPluginMonitor{ configPath: configPath, tomb: tomb.NewTomb(), } // 读取并校验配置 f, err := ioutil.ReadFile(configPath) err = json.Unmarshal(f, &c.config) // Apply configurations err = (&c.config).ApplyConfiguration() // Validate configurations err = c.config.Validate() // 创建插件对象 c.plugin = plugin.NewPlugin(c.config) // 状态通道 c.statusChan = make(chan *types.Status, 1000) if *c.config.EnableMetricsReporting { initializeProblemMetricsOrDie(c.config.Rules) } return c } // 初始化问题指标 func initializeProblemMetricsOrDie(rules []*cpmtypes.CustomRule) { for _, rule := range rules { if rule.Type == types.Perm { err := problemmetrics.GlobalProblemMetricsManager.SetProblemGauge(rule.Condition, rule.Reason, false) } err := problemmetrics.GlobalProblemMetricsManager.IncrementProblemCounter(rule.Reason, 0) } } // 启动 func (c *customPluginMonitor) Start() (<-chan *types.Status, error) { // 启动插件 go c.plugin.Run() // 启动主循环 go c.monitorLoop() return c.statusChan, nil } // 停止 func (c *customPluginMonitor) Stop() { c.tomb.Stop() } // 主循环 func (c *customPluginMonitor) monitorLoop() { c.initializeStatus() // 得到插件的结果通道 resultChan := c.plugin.GetResultChan() // 循环遍历处理插件的结果 for { select { case result := <-resultChan: glog.V(3).Infof("Receive new plugin result for %s: %+v", c.configPath, result) // 将插件结果转换为Status status := c.generateStatus(result) glog.Infof("New status generated: %+v", status) // 输出到状态通道 c.statusChan <- status case <-c.tomb.Stopping(): c.plugin.Stop() glog.Infof("Custom plugin monitor stopped: %s", c.configPath) c.tomb.Done() break } } } // 从插件检查结果生成状态 func (c *customPluginMonitor) generateStatus(result cpmtypes.Result) *types.Status { timestamp := time.Now() var activeProblemEvents []types.Event var inactiveProblemEvents []types.Event if result.Rule.Type == types.Temp { // 对于临时错误,如果插件检查结果非0则产生一个事件 if result.ExitStatus >= cpmtypes.NonOK { activeProblemEvents = append(activeProblemEvents, types.Event{ Severity: types.Warn, Timestamp: timestamp, Reason: result.Rule.Reason, Message: result.Message, }) } } else { // 对于永久错误,如果插件检查结果非0则修改Condition for i := range c.conditions { condition := &c.conditions[i] if condition.Type == result.Rule.Condition { // 规则中的Reason、结果中的Message表明了发生的问题 // 需要从配置中读取默认的Condition,以便在检查结果OK时恢复Condition var defaultConditionReason string var defaultConditionMessage string for j := range c.config.DefaultConditions { defaultCondition := &c.config.DefaultConditions[j] // conditions.type == rules[j].condition if defaultCondition.Type == result.Rule.Condition { defaultConditionReason = defaultCondition.Reason defaultConditionMessage = defaultCondition.Message break } } needToUpdateCondition := true var newReason string var newMessage string // 如果检查结果为0则不处于Condition,为1则处于Condition,其它值则未知 status := toConditionStatus(result.ExitStatus) if condition.Status == types.True && status != types.True { // Condtion从True转变为False/Unknown newReason = defaultConditionReason if newMessage == "" { newMessage = defaultConditionMessage } else { newMessage = result.Message } } else if condition.Status != types.True && status == types.True { // Condtion从False/Unknown转变为True newReason = result.Rule.Reason newMessage = result.Message } else if condition.Status != status { // Condtion在False和Unknown之间转换 newReason = defaultConditionReason if newMessage == "" { newMessage = defaultConditionMessage } else { newMessage = result.Message } } else if condition.Status == types.True && status == types.True && (condition.Reason != result.Rule.Reason || (*c.config.PluginGlobalConfig.EnableMessageChangeBasedConditionUpdate && condition.Message != result.Message)) { // Condtion没有改变,和上次一样是True newReason = result.Rule.Reason newMessage = result.Message } else { // Condtion没有改变,和上次一样是False/Unknown needToUpdateCondition = false } if needToUpdateCondition { condition.Transition = timestamp condition.Status = status condition.Reason = newReason condition.Message = newMessage updateEvent := util.GenerateConditionChangeEvent( condition.Type, status, newReason, timestamp, ) if status == types.True { activeProblemEvents = append(activeProblemEvents, updateEvent) } else { inactiveProblemEvents = append(inactiveProblemEvents, updateEvent) } } break } } } // 报告指标 if *c.config.EnableMetricsReporting { for _, event := range activeProblemEvents { err := problemmetrics.GlobalProblemMetricsManager.IncrementProblemCounter( event.Reason, 1) } for _, condition := range c.conditions { err := problemmetrics.GlobalProblemMetricsManager.SetProblemGauge( condition.Type, condition.Reason, condition.Status == types.True) } } // 发布Status return &types.Status{ Source: c.config.Source, Events: append(activeProblemEvents, inactiveProblemEvents...), Conditions: c.conditions, } } // 将插件退出码转换为Condtion.Status func toConditionStatus(s cpmtypes.Status) types.ConditionStatus { switch s { case cpmtypes.OK: return types.False case cpmtypes.NonOK: return types.True default: return types.Unknown } } // 初始化,报告默认状态 func (c *customPluginMonitor) initializeStatus() { c.conditions = initialConditions(c.config.DefaultConditions) glog.Infof("Initialize condition generated: %+v", c.conditions) c.statusChan <- &types.Status{ Source: c.config.Source, Conditions: c.conditions, } } func initialConditions(defaults []types.Condition) []types.Condition { conditions := make([]types.Condition, len(defaults)) copy(conditions, defaults) for i := range conditions { conditions[i].Status = types.False conditions[i].Transition = time.Now() } return conditions } |
实际上执行监控脚本的工作,由Plugin这个结构负责,并且将每个脚本的执行结果通过通道传递给CPM:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 |
package plugin import ( "context" "fmt" "os/exec" "strings" "sync" "syscall" "time" "github.com/golang/glog" cpmtypes "k8s.io/node-problem-detector/pkg/custompluginmonitor/types" "k8s.io/node-problem-detector/pkg/util/tomb" ) type Plugin struct { config cpmtypes.CustomPluginConfig // 此通道用于控制并发度 syncChan chan struct{} // 此通道用于输出结果 resultChan chan cpmtypes.Result tomb *tomb.Tomb // 嵌入 sync.WaitGroup } func NewPlugin(config cpmtypes.CustomPluginConfig) *Plugin { return &Plugin{ config: config, // 限制通道大小为并发度 syncChan: make(chan struct{}, *config.PluginGlobalConfig.Concurrency), resultChan: make(chan cpmtypes.Result, 1000), tomb: tomb.NewTomb(), } } func (p *Plugin) GetResultChan() <-chan cpmtypes.Result { return p.resultChan } // 执行所有规则一遍 func (p *Plugin) Run() { defer func() { glog.Info("Stopping plugin execution") p.tomb.Done() }() // 仅仅支持全局一致的调度间隔 runTicker := time.NewTicker(*p.config.PluginGlobalConfig.InvokeInterval) defer runTicker.Stop() runner := func() { glog.Info("Start to run custom plugins") for _, rule := range p.config.Rules { // 占据一个位置 p.syncChan <- struct{}{} // 增加一个等待量,在synChan基础上又要加等待组的原因是 // 防止遍历完毕后,Run方法过早退出 p.Add(1) // 异步执行规则 go func(rule *cpmtypes.CustomRule) { // 总是减少一个等待量 defer p.Done() // 同时释放一个位置 defer func() { <-p.syncChan }() start := time.Now() // 执行规则 exitStatus, message := p.run(*rule) end := time.Now() glog.V(3).Infof("Rule: %+v. Start time: %v. End time: %v. Duration: %v", rule, start, end, end.Sub(start)) // 写入结果 result := cpmtypes.Result{ Rule: rule, ExitStatus: exitStatus, Message: message, } p.resultChan <- result glog.Infof("Add check result %+v for rule %+v", result, rule) }(rule) } // 等待所有规则执行完毕,防止过早退出 p.Wait() glog.Info("Finish running custom plugins") } // 首次执行 select { case <-p.tomb.Stopping(): return default: runner() } // 循环执行 for { select { case <-runTicker.C: runner() case <-p.tomb.Stopping(): return } } } // 执行单个规则 func (p *Plugin) run(rule cpmtypes.CustomRule) (exitStatus cpmtypes.Status, output string) { var ctx context.Context var cancel context.CancelFunc // 使用全局、当前规则超时中更小的值 if rule.Timeout != nil && *rule.Timeout < *p.config.PluginGlobalConfig.Timeout { ctx, cancel = context.WithTimeout(context.Background(), *rule.Timeout) } else { ctx, cancel = context.WithTimeout(context.Background(), *p.config.PluginGlobalConfig.Timeout) } defer cancel() // 执行系统命令 cmd := exec.CommandContext(ctx, rule.Path, rule.Args...) stdout, err := cmd.Output() // 如果出错,且退出码不是1则认为是Unknown if err != nil { if _, ok := err.(*exec.ExitError); !ok { glog.Errorf("Error in running plugin %q: error - %v. output - %q", rule.Path, err, string(stdout)) return cpmtypes.Unknown, "Error in running plugin. Please check the error log" } } output = string(stdout) output = strings.TrimSpace(output) // 超时 if cmd.ProcessState.Sys().(syscall.WaitStatus).Signaled() { output = fmt.Sprintf("Timeout when running plugin %q: state - %s. output - %q", rule.Path, cmd.ProcessState.String(), output) } // 修剪标准输出 if len(output) > *p.config.PluginGlobalConfig.MaxOutputLength { output = output[:*p.config.PluginGlobalConfig.MaxOutputLength] } exitCode := cmd.ProcessState.Sys().(syscall.WaitStatus).ExitStatus() switch exitCode { case 0: return cpmtypes.OK, output case 1: return cpmtypes.NonOK, output default: return cpmtypes.Unknown, output } } func (p *Plugin) Stop() { p.tomb.Stop() glog.Info("Stop plugin execution") } |
注意CPM这里的Plugin,和NPD配置中提到的Plugin,不是一个概念。
云提供者通常支持动态创建、销毁节点。当NPD检测到故障节点,且无法恢复时,“治愈”措施就是Drain节点。这导致集群规模的缩小,应当调用云提供者的接口,补充节点。
开源项目autoscaler为K8S提供了若干额外的自动扩容组件:
- Cluster Autoscaler:能够自动对K8S节点数量进行扩缩,保证所有Pod有地方运行,且自动销毁空闲节点。支持GCP、AWS、Azure、阿里云、百度云,其它云环境需要自行扩展
- Vertical Pod Autoscaler:一系列组件,能够自动调整Pod的CPU、内存请求。未来可能支持inplace-update,也就是说不需要删除Pod即可完成request值的修改
- Addon Resizer:简化版的VPA,根据集群的节点规模,自动修改Deployment的request值
和本文主题相关的是cluster-autoscaler,我们可以利用它来保证集群规模的稳定。
在灾难性故障中,K8S集群可能完全无法恢复,只能重建。那么,如何快速重建K8S集群就是关键技术问题。
Velero(Heptio Ark)是一个能进行K8S集群备份、迁移的开源项目,特性包括:
- 集群备份:支持备份完整集群(的K8S资源以及持久卷),或者根据命名空间、标签选择器来备份集群的一部分
- 定期备份
- 备份钩子:在备份之前、之后执行指定的运维操作
- 迁移:将K8S资源迁移到其它集群,例如将生产环境集群复制到开发环境
Velero由两个部分组成:
- 运行在K8S集群中的服务器端
- 运行在客户机上的CLI
你可以在云提供者或者裸金属环境的K8S集群上运行Velero。Velero集成对流行厂商的存储服务的支持。
你需要首先选择一个对象存储后端,用于存放备份的Etcd数据。Velero支持S3兼容的对象存储,例如Ceph Rados 12.2.7、Minio。
要支持持久卷的备份,必须选择一个卷快照提供者,国内仅阿里云支持。裸金属集群要使用持久卷备份功能,可以自行开发插件,或者使用Restic等通用的存储备份工具,但是性能明显比卷快照差。Restic支持多种存储后端(用来存放它生成的备份),但是Velero+Restic仅仅支持S3兼容的对象存储。
提供者 | 对象存储 | 卷Snapshotter | 插件仓库 |
AWS | AWS S3 | AWS EBS | 地址 |
GCP | Google Cloud Storage | Google Compute Engine Disks | 地址 |
Azure | Azure Blob Storage | Azure Managed Disks | 地址 |
Restic是一个开源备份工具,Velero可以与之集成,实现任何类型的K8S持久卷备份。如果你的存储后端没有对应的Velero插件,或者使用了EFS、AzureFile、NFS、emptyDir、Local PV等没有快照概念的卷,可以考虑Restic。HostPath不被支持。
CR | 说明 |
ResticRepository |
表示并管理Velero的Restic仓库的生命周期。在第一次针对某个命名空间的备份请求创建后,Velero为每个命名空间创建一个Restic仓库,此CR的控制器会调用Restic仓库的生命周期命令,例如restic init, restic check, restic prune 调用 velero restic repo get可以获得Velero的Restic仓库的信息 |
PodVolumeBackup |
表示一个Pod中的一个卷的Restic备份,当发现被注解的Pod后,Velero备份主进程会创建一个或多个PodVolumeBackup对象 集群中会运行一个Daemonset,这样每个节点上都会运行一个控制器,负责执行restic backup命令以备份Pod的卷数据 |
PodVolumeRestore |
表示一个Pod中的一个卷的Restic恢复,Velero主恢复进程发现Pod关联了Restic备份后,会创建一个或多个这种CR 同样的,每个节点上运行的控制器负责执行restic restoure恢复本机Pod的卷 |
- 要求K8S 1.10以上支持的挂载传播(MountPropagation) 特性,此特性允许同一个Pod之间的容器共享同一个卷
- 不支持HostPath类型的卷
- Restic会加密所有备份数据,目前Velero使用一个静态的、通用的密钥。这意味着潜在的安全风险,所有能够访问你的OSS桶的人都能够解密数据。未来Velero会提供更加完善的安全支持
- 目前Velero基于Pod的名称来关联备份,这意味着Deployment的Pod删除重建后,会产生一个全新的,而非增量的备份
- Restic使用单线程扫描所有文件,如果需要备份的文件很大,例如数据库文件,扫描并去重的过程会很慢,即使实际差异很小
要备份K8S资源或卷的内容,您需要使用自定义资源Backup,针对该资源的有效操作是创建、删除,修改没有意义。
关于K8S资源的备份,需要注意以下几点:
- 正在被删除的资源不会包含在备份中
- 您可以为任何资源添加标签exclude-from-backup,以禁止对它进行备份
- 在配置Backup资源时,可以通过命名空间、资源类型、标签指定过滤器,不匹配的资源不会包含在备份中
关于K8S卷的备份,需要注意以下几点:
- 卷备份基于Restic实现,它的工作方式是找到Pod卷的挂载目录,并将其内容复制出来
- 卷备份是Pod备份的附加项。而持久卷还有另外一种备份机制,即快照
- 对于访问模式为ReadWriteMany的持久卷,如果有多个Pod挂载了它,则仅仅会备份一次
非命名空间内资源的备份行为,受到includeClusterResources配置影响:
- true:备份集群级别资源,具体行为受标签选择器、资源类型选择器影响
- false:不备份集群级别资源
- null/unset:
- 如果备份包含了所有命名空间则备份所有集群级别资源
- 否则,仅仅当备份包含的命名空间中的资源所关联的集群级别资源包含到备份中。例如PersistentVolumeClaim关联的PersistentVolume会包含到备份中
备份流程:
- 主备份进程会检查每个它需要备份的Pod上的注解,如果有backup.velero.io/backup-volumes则意味着需要Restic备份
- Velero会确保Pod的命名空间的Restic仓库存在:
- 检查ResticRepository对象是否存在
- 如果不存在,则创建一个新的,并等待ResticRepository控制器初始化、检查
- Velero为每个需要备份的卷(列为上述注解的值)创建PodVolumeBackup对象
- 主备份进程等待PodVolumeBackup完成或失败
- 与此同时,每个PodVolumeBackup被对应节点的控制器处理,此控制器:
- 具有一个HostPath挂载点,对应宿主机 /var/lib/kubelet/pods目录,以便访问Pod卷数据
- 在上述HostPath下找到Pod卷的子目录
- 执行 restic backup
- 更新CR的状态为Completed或Failed
- 当所有PodVolumeBackup完成后,Velero主进程将这些CR添加到备份中,存放在名为BACKUPNAME-podvolumebackups.json.gz的文件中,并且上传到对象存储,Restic备份的Tar包同样会存放在对象存储中
操作步骤:
- 为Pod添加注解,指明哪些卷需要备份:
12kubectl -n YOUR_POD_NAMESPACE annotate pod/YOUR_POD_NAME \backup.velero.io/backup-volumes=YOUR_VOLUME_NAME_1,YOUR_VOLUME_NAME_2,... - 然后,创建一个Velero备份CR: velero backup create NAME OPTIONS...
- 当备份完成后,查看其信息: velero backup describe YOUR_BACKUP_NAME
- 获取卷备份对象:
1kubectl -n velero get podvolumebackups -l velero.io/backup-name=YOUR_BACKUP_NAME -o yaml
恢复流程:
- 主Velero恢复进程检查所有PodVolumeBackup资源
- 对于每个PodVolumeBackup,Velero首先保证Restic仓库存在:
- 检查ResticRepository资源是否存在于目标命名空间
- 如果不存在,则创建之,并且等待ResticRepository控制器完成Restic仓库的初始化和检查。在恢复时,真实的Restic仓库应该已经存在于对象存储中,因此实际上仅仅是检查其完整性
- Velero为Pod添加初始化容器,其任务是等待此Pod所有卷恢复完成
- Velero将添加了初始化容器的Pod提交给K8S
- 对于每个需要恢复的卷,创建PodVolumeRestore
- Velero主进程等待每个PodVolumeRestore完成或失败
- 与此同时,每个PodVolumeRestore会被恰当节点上的控制器处理,该控制器:
- 通过HostPath挂载 /var/lib/kubelet/pods,以便访问Pod卷数据
- 等待Pod运行Init容器
- 找到Init容器的卷子目录,这些卷和主容器是共享的
- 运行 restic restore
- 如果恢复成,则在卷的.velero子目录中写入一个文件,文件名为当前Velero Restore的UID
- 更新PodVolumeRestore的状态为Completed或Failed
- 初始化容器等待,直到发现所有相关的卷的根目录下的.velero内有文件写入,其UID为本次Restore的UID。初始化容器退出,主容器开始运行
在执行Restic恢复时,Velero使用一个助手init容器。其镜像默认为gcr.io/heptio-images/velero-restic-restore-helper:VERSION,其中VERSION和Velero的版本一致。如果需要使用定制的镜像,可以在Velero的命名空间创建一个ConfigMap:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
apiVersion: v1 kind: ConfigMap metadata: # 名称无所谓,基于标签找到此ConfigMap name: restic-restore-action-config namespace: velero labels: # 下面的标签用于识别此ConfigMap是某个插件的配置信息 velero.io/plugin-config: "" # 下面的标签说明插件的名称和类型 velero.io/restic: RestoreItemAction data: image: myregistry.io/my-custom-helper-image[:OPTIONAL_TAG] cpuRequest: 200m memRequest: 128Mi cpuLimit: 200m memLimit: 128Mi |
首先到https://github.com/heptio/velero/releases/tag/v1.1.0下载最新版本的客户端,解压放到$PATH下。
然后,使用如下命令安装:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
velero install \ # 备份和卷存储的提供者名称 --provider <YOUR_PROVIDER> \ # 对象存储桶的名字 --bucket <YOUR_BUCKET> \ # Velero的IAM帐户的凭证文件,如果不支持,使用 --no-secret --secret-file <PATH_TO_FILE> \ --velero-pod-cpu-request <CPU_REQUEST> \ --velero-pod-mem-request <MEMORY_REQUEST> \ --velero-pod-cpu-limit <CPU_LIMIT> \ --velero-pod-mem-limit <MEMORY_LIMIT> \ # 启用Restic集成 [--use-restic] \ [--restic-pod-cpu-request <CPU_REQUEST>] \ [--restic-pod-mem-request <MEMORY_REQUEST>] \ [--restic-pod-cpu-limit <CPU_LIMIT>] \ [--restic-pod-mem-limit <MEMORY_LIMIT>] |
下面是一个基于MinIO、不支持存储卷快照的例子:
1 2 3 4 5 6 7 8 |
velero install \ --provider aws \ --bucket velero \ --secret-file ./credentials-velero \ --use-volume-snapshots=false \ --backup-location-config region=minio,s3ForcePathStyle="true",s3Url=https://minio.k8s.gmem.cc \ # 启用Restic,等待部署完成 --use-restic --wait |
其中密钥文件的格式如下:
1 2 3 |
[default] aws_access_key_id = minio aws_secret_access_key = minio123 |
要卸载Velero时,删除以下K8S资源即可:
1 2 |
kubectl delete namespace/velero clusterrolebinding/velero kubectl delete crds -l component=velero |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
kubectl create ns velero kubectl -n velero create sa velero-server kubectl get secrets gmemregsecret -o yaml --export | kubectl -n velero create -f - helm install --namespace velero --name velero --set fullnameOverride=velero \ --set configuration.provider=aws \ --set-file credentials.secretContents.cloud=./credentials-velero \ --set configuration.backupStorageLocation.name=aws \ --set configuration.backupStorageLocation.bucket=velero \ --set configuration.backupStorageLocation.config.region=minio \ --set configuration.backupStorageLocation.config.s3ForcePathStyle=true \ --set configuration.backupStorageLocation.config.s3Url=https://minio.k8s.gmem.cc \ --set image.repository=docker.gmem.cc/velero/velero \ --set image.tag=v1.1.0 \ --set image.pullPolicy=IfNotPresent \ --set serviceAccount.server.name=velero-server \ --set serviceAccount.server.create=true \ --set snapshotsEnabled=false \ --set deployRestic=true \ velero kubectl -n velero patch sa velero-server -p '{"imagePullSecrets": [{"name": "gmemregsecret"}]}' |
要删除,执行:
1 2 3 4 5 6 7 8 9 10 11 12 13 |
helm delete velero --purge kubectl delete crd backups.velero.io kubectl delete crd backupstoragelocations.velero.io kubectl delete crd deletebackuprequests.velero.io kubectl delete crd downloadrequests.velero.io kubectl delete crd podvolumebackups.velero.io kubectl delete crd podvolumerestores.velero.io kubectl delete crd resticrepositories.velero.io kubectl delete crd restores.velero.io kubectl delete crd schedules.velero.io kubectl delete crd serverstatusrequests.velero.io kubectl delete crd volumesnapshotlocations.velero.io |
即使在安装时没有启用Restic集成,后续你仍然可以随时调用 velero install --use-restic启用Restic集成。
Velero可能产生两个Deployment,一个是Velero控制器,一个是Restic,前文包含定制它们的资源用量的参数
你可以为备份、卷快照指定多个存储位置。但是velero install时最多指定一个备份存储位置、一个卷快照存储位置。
后续你可以使用命令 velero backup-location create、 velero snapshot-location create添加新的存储位置。
如果在安装阶段不想提供默认的备份存储位置,可以指定 --no-default-backup-location,同时不指定--bucket、--provider。
对象存储位置映射为自定义资源BackupStorageLocation。它对应了一个桶,所有Velero数据都会存放在此桶的某个前缀下。一些供应商特定的字段(例如AWS区域、Azure存储帐户)也存放在此CR中。
用户可以预先配置多个对象存储位置、卷快照存储位置,并且在创建备份的时候选择使用哪个位置。
Velero支持通过插件方式来集成不同的卷快照提供者,你可以用AWS S3作为对象存储,而Portworx作为卷快照。
但是velero install仅仅支持配置单个提供者,同时用于对象存储、卷快照。
为了使用不同的卷快照提供者,你需要:
- 指定合理的对象存储参数,安装Velero服务器组件
- 将卷快照提供者插件添加到Velero
- 添加卷快照位置
1velero snapshot-location create <NAME> --provider <PROVIDER-NAME> [--config <PROVIDER-CONFIG>]
快照存储位置映射为自定义资源VolumeSnapshotLocation,其字段完全取决于具体供应商(例如AWS区域、Azure资源组、Portworx快照类型)。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
# 安装一个K8S应用 kubectl apply -f examples/nginx-app/base.yaml # 备份指定命名空间 velero backup create nginx-backup --include-namespaces nginx-example # 使用选择器,仅仅备份匹配标签的对象 velero backup create nginx-backup --selector app=nginx # 反向选择器 velero backup create nginx-backup --selector 'backup notin (ignore)' # 查看备份 velero backup get nginx-backup |
1 2 3 4 |
# 使用Cron表达式 velero schedule create nginx-daily --schedule="0 1 * * *" --selector app=nginx # 每天 velero schedule create nginx-daily --schedule="@daily" --selector app=nginx |
1 2 3 4 5 6 7 8 9 10 |
# 模拟灾难 kubectl delete namespaces nginx-example # 动态分配的PV的默认回收策略是Delete,因此上述命令会导致Nginx的PV的后被存储被删除,注意此删除 # 是异步的,因此,执行下一步之前,手工确认卷已经被删除 # 恢复 velero restore create --from-backup nginx-backup # 查看恢复状态 velero restore get |
1 2 |
# 删除备份,包括对象存储、持久卷快照 velero backup delete BACKUP_NAME |
Leave a Reply