Istio Pilot与Envoy的交互机制解读
在基于Istio+Envoy实现的服务网格中,Istio的角色是控制平面,它是实现了Envoy的发现协议集xDS的管理服务器端。Envoy本身则作为网格的数据平面,和Istio通信,获得各种资源的配置并更新自身的代理规则。
除了实现xDS协议,Istio还负责:
- Envoy统计数据的收集,从Statd格式转换为Prometheus格式。(注:目前看来Envoy也直接暴露了Prometheus的Exporter)
- 限速服务、策略服务
- 和第三方Tracer的对接
- 数字证书分发
等功能。这些功能都需要Istio和Envoy的协同才能生效。最基础和关键的协同是Istio组件Pilot和Envoy之间基于xDS协议进行的各种Envoy配置信息的推送。
Istio的文档并没有对Istio Pilot和Envoy如何交互进行描述,本文结合Istio、Envoy的源码来探讨这些细节。
Pilot的model包为很多Pilot抽象创建了模型(结构),并定义了它们支持的操作。注意这里建模的是Pilot的抽象,因此名词Service是指Istio的抽象服务,而非K8S的Service或者Envoy的Cluster。
对Istio的配置信息、配置存储进行建模。
代表一个Istio配置单元:
1 2 3 4 5 |
type Config struct { ConfigMeta // 配置内容以Proto消息的形式存储 Spec proto.Message } |
配置的元数据:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
type ConfigMeta struct { //匹配内容消息类型的短类型名称,例如route-rule Type string // API组和版本 Group string Version string // 命名空间范围内唯一性名称 Name string // 命名空间 Namespace string // FQDN后缀 Domain string // 标签集 Labels map[string]string // 注解集 Annotations map[string]string // 资源版本,跟踪对配置注册表的变更 ResourceVersion string CreationTimestamp meta_v1.Time } |
定义一组平台无关的,但是底层平台(例如K8S)必须支持的API,通过这些API可以存取Istio配置信息
每个配置信息的键,由type + name + namespace的组合构成,确保每个配置具有唯一的键
写操作是异步执行的,也就是说Update后立即Get可能无法获得最新结果。有资源版本判断资源是否更新
此接口返回的引用,仅支持只读操作,对其修改存在线程安全问题
1 2 3 4 5 6 7 8 9 |
type ConfigStore interface { // 返回配置描述符,其实就是[]ProtoSchema类型,ProtoSchema描述了资源的Group/Version/Type等信息 ConfigDescriptor() ConfigDescriptor Get(typ, name, namespace string) (config *Config, exists bool) List(typ, namespace string) ([]Config, error) Create(config Config) (revision string, err error) Update(config Config) (newRevision string, err error) Delete(typ, name, namespace string) error } |
表示ConfigStore的本地完整复制的缓存,此缓存主动和远程存储保持同步,并且在获取更新时提供提供通知机制。
为了获得通知,事件处理器必须在Run之前注册,缓存需要在Run之后有一个初始的同步延迟。
1 2 3 4 5 6 7 8 9 |
type ConfigStoreCache interface { // CRUD接口 ConfigStore // 添加某种配置类型的事件处理器 RegisterEventHandler(typ string, handler func(Config, Event)) Run(stop <-chan struct{}) // 初始缓存同步完毕后返回true HasSynced() bool } |
此接口扩展ConfigStore,增加一些针对Istio资源的操控接口:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
type IstioConfigStore interface { ConfigStore // 列出ServiceEntry ServiceEntries() []Config // 列出绑定到指定工作负载标签的Gateway Gateways(workloadLabels LabelsCollection) []Config // 列出绑定到指定工作负载标签EnvoyFilter EnvoyFilter(workloadLabels LabelsCollection) *Config // 列出关联到指定目标服务实例的Mixerclient HTTP API Specs HTTPAPISpecByDestination(instance *ServiceInstance) []Config // 列出关联到指定目标服务实例的Mixerclient quota specifications QuotaSpecByDestination(instance *ServiceInstance) []Config // 列出关联到指定服务+端口的身份验证策略 // 如果存在多个不同范围(全局、命名空间、服务)的策略,最精确的那个被返回。如果同一范围有多个策略,返回第一个 AuthenticationPolicyByDestination(service *Service, port *Port) *Config // 列出指定命名空间的ServiceRoles ServiceRoles(namespace string) []Config // 列出指定命名空间的ServiceRoleBindings ServiceRoleBindings(namespace string) []Config // 列出名字为DefaultRbacConfigName的RbacConfig RbacConfig() *Config } |
此结构为Pilot提供聚合的环境性的API:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
type Environment struct { // 内嵌接口:用于列出服务、实例 ServiceDiscovery // 已经废弃,使用 PushContext.ServiceAccounts ServiceAccounts // 内嵌接口:用于列出路由规则 IstioConfigStore // 网格配置信息 Mesh *meshconfig.MeshConfig // 用于和Mixer通信 MixerSAN []string // 全局的推送上下文,已经废弃 // 除非出于测试、处理新连接的目的,不要使用此字段 PushContext *PushContext } |
此结构建模代理(Envoy代理)的属性,xDS使用此结构对代理进行识别:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
type Proxy struct { // 此代理所在的集群 ClusterID string // 节点类型(也就是说运行代理的那个Pod的代理角色 Type NodeType // 用于识别代理以及它的同地协作的服务实例的IP地址 IPAddress string // 平台先关的Sidecar代理ID ID string // 短主机名的DNS后缀 Domain string // 节点的元数据 Metadata map[string]string } |
用于区分不同代理在网格中的职责。
1 2 3 4 5 6 7 8 9 |
type NodeType string const ( // 应用程序容器的边车代理,普通被网格管理的Pod使用这种代理角色 Sidecar NodeType = "sidecar" // 独立运行的,集群入口代理,istio-ingress中运行的是这种代理 Ingress NodeType = "ingress" // 独立运行的,作为L7/L4路由器的代理,istio-ingressgateway、istio-egressgateway中运行的是这种代理 Router NodeType = "router" ) |
端点分片,存储单个服务的单个注册表中的单个分片的名称及其端点列表:
1 2 3 4 |
type EndpointShard struct { Shard string Entries []*IstioEndpoint } |
存储单个服务的所有分片信息。使用K8S作为注册表时,Shards通常只有一个元素,其键是"Kubernetes",其值是Shard名为"Kubernetes"的EndpointShard
1 2 3 4 5 6 |
type EndpointShardsByService struct { // 这种结构下,每个注册表只能有一个分片 // 映射的键是注册表名称 Shards map[string]*EndpointShard ServiceAccounts map[string]bool } |
此结构用于代替NetworkEndpoint和ServiceInstance,做了以下优化:
- ServicePortName字段代替ServicePort字段。原因是进行了端点回调(endpoint callbacks are made)时端口号、协议可能不可用
- 合并两个结构,原因是一对一关系
- 不再持有Service的指针。原因是接收到端点时,服务对象可能不可用
- 提供缓存的EnvoyEndpoint对象,避免为每次请求/每个客户端重新分配
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
type IstioEndpoint struct { // 工作负载的标签 Labels map[string]string Family AddressFamily Address string EndpointPort uint32 // 跟踪端口的名称,避免最终一致性相关的问题。某些情况下Endpoint先于Service可见,这时进行端口查找会失败 // 端口名到号的映射将在集群计算时进行 ServicePortName string // 用于遥测 UID string // 缓存的LbEndpoint(来自Envoy Go客户端包),通过数据转换得到,避免重复计算 EnvoyEndpoint *endpoint.LbEndpoint ServiceAccount string } |
参考下文。
此结构对Istio服务进行建模,每个服务具有全限定的名称(FQDN),一个或多个监听的端口,一个可选的和服务关联的负载均衡器/虚拟IP地址(FQDN解析到此地址)。
例如,在K8S中,服务kubernetes关联到FQDN kubernetes.default.svc.cluster.local,具有虚拟IP地址10.96.0.1,监听443端口。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
// 主机名,可能是通配符 type Hostname string type Service struct { // 主机名 Hostname Hostname // 服务的负载均衡器IPv4地址 Address string // 多集群支持,服务在每个集群中的负载均衡器IPv4地址 ClusterVIPs map[string]string // 监听的端口列表 Ports PortList // 运行服务的账号 ServiceAccounts []string // 指示服务是否位于网格外部,这种服务通过ServiceEntry定义 MeshExternal bool // 在路由之前,如何解析服务的实例 Resolution Resolution // 服务创建时间 CreationTime time.Time // 额外的属性,Mixer/RBAC 策略会用到 Attributes ServiceAttributes } |
用于指示在路由请求之前,如何解析出服务的实例:
1 2 3 4 5 6 7 8 9 10 |
type Resolution int const ( // 代理根据自己本地的负载均衡池决定使用哪个端点 ClientSideLB Resolution = iota // 代理进行DNS解析,并把请求发给解析结果 DNSLB // 代理直接根据请求者指定的目的地址 Passthrough ) |
服务的特定版本的一个实例,绑定到一个NetworkEndpoint:
1 2 3 4 5 6 7 8 9 10 |
type ServiceInstance struct { // 关联的端点 Endpoint NetworkEndpoint // 所属的服务 Service *Service // 标签集 Labels Labels AvailabilityZone string ServiceAccount string } |
建模关联到服务的实例的网络地址:
1 2 3 4 5 6 7 8 |
type NetworkEndpoint struct { // 地址族 Family AddressFamily Address string Port int ServicePort *Port UID string } |
对服务监听的网络端口进行建模:
1 2 3 4 5 6 7 8 9 10 11 12 13 |
type Port struct { // 易读的端口名,如果服务包含多个端口,则此字段必须 Name string // 服务的端口号,非必须关联到服务背后的实例的端口 Port int // 使用的协议 Protocol Protocol } // 端口集 type PortList []*Port |
通信协议:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
type Protocol string // 目前支持的协议枚举 const ( ProtocolGRPC Protocol = "GRPC" ProtocolHTTP Protocol = "HTTP" ProtocolHTTP2 Protocol = "HTTP2" ProtocolHTTPS Protocol = "HTTPS" ProtocolTCP Protocol = "TCP" ProtocolTLS Protocol = "TLS" ProtocolUDP Protocol = "UDP" ProtocolMongo Protocol = "Mongo" ProtocolRedis Protocol = "Redis" ProtocolUnsupported Protocol = "UnsupportedProtocol" ) |
流量的方向:
1 2 3 4 5 |
type TrafficDirection string const ( TrafficDirectionInbound TrafficDirection = "inbound" TrafficDirectionOutbound TrafficDirection = "outbound" ) |
此接口用于发现服务的实例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
type ServiceDiscovery interface { // 列出所有服务 Services() ([]*Service, error) // 废弃,根据主机名获得服务 GetService(hostname Hostname) (*Service, error) // 取回服务的、匹配指定标签集的实例 Instances(hostname Hostname, ports []string, labels LabelsCollection) ([]*ServiceInstance, error) InstancesByPort(hostname Hostname, servicePort int, labels LabelsCollection) ([]*ServiceInstance, error) // 返回和指定代理同地协作(co-located)的实例,所谓co-located是指运行在相同的命名空间和安全上下文 // // 对于以Sidecar方式运行的代理,返回非空的切片;对于独立运行的代理,返回空切片 GetProxyServiceInstances(*Proxy) ([]*ServiceInstance, error) // 返回一个IPv4地址关联的管理端口 ManagementPorts(addr string) PortList // 返回一个IPv4地址关联的健康检查探针 WorkloadHealthCheckInfo(addr string) ProbeList } |
Pilot(Pilot Discovery,其对应的客户端组件是Pilot Agent)是Istio最关键的组件,它的职责是将用户提供的、简单的、CRD形式的配置文件,转换为Envoy能理解的格式,并推送给Envoy以更新代理配置。
Pilot的启动逻辑位于bootstrap包中。
我们需要进行单步跟踪,才能了解Pilot在初始化期间做了哪些事情。为了调试的方便,我们在K8S集群外部启动Pilot服务。参考下面的启动参数:
1 2 3 4 5 6 7 8 9 10 11 |
export POD_NAME=istio-pilot-54f79f8bd7-w8b2g export POD_NAMESPACE=istio-system # pilot-discovery的入口点代码位于 pilot/cmd/pilot-discovery/main.go # 输出详尽日志 pilot discovery --log_output_level=default:debug --log_caller=default --domain=k8s.gmem.cc \ # 提供kubeconfig,注意不支持--masterUrl,不提供此参数Istio会假设在集群内部运行 --kubeconfig=/home/alex/.kube/config \ # 配置文件 --meshConfig=pilot/meshConfig |
其中meshConfig可以直接从K8S集群中提取,此配置存放在Configmap中,名称为istio-system/istio。内容如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
disablePolicyChecks: false enableTracing: true accessLogFile: "/dev/stdout" policyCheckFailOpen: false sdsUdsPath: "" sdsRefreshDelay: 15s defaultConfig: connectTimeout: 10s configPath: "/etc/istio/proxy" binaryPath: "/usr/local/bin/envoy" serviceCluster: istio-proxy drainDuration: 45s parentShutdownDuration: 1m0s proxyAdminPort: 15000 concurrency: 0 zipkinAddress: zipkin.istio-system:9411 controlPlaneAuthPolicy: NONE discoveryAddress: istio-pilot.istio-system:15007 |
Pilot的入口点位于pilot/cmd/pilot-discovery/main.go,它使用包spf13.cobra来管理一组子命令,Pilot核心功能由discovery子命令实现。
入口点的启动过程如下:
- 解析命令行参数
- 创建主服务,服务建模在结构bootstrap.Server中
- 调用initKubeClient方法,初始化K8S客户端
- 调用initClusterRegistries方法,初始化clusterStore字段
- 调用initMesh方法,初始化mesh字段
- 调用initMixerSan方法,初始化Mixer服务的SAN
- 调用initConfigController方法,初始化configController字段。通过addStartFunc延后调用配置控制器的Run方法
- 如果启用了Istio的IngressController功能,则调用configaggregate.MakeCache,包装configController,使其能处理Ingress类型的资源
- 调用initServiceControllers方法,此控制器能够从底层服务发现机制中获取Istio抽象服务。通过addStartFunc延后调用服务控制器的Run方法
- 调用initDiscoveryService方法,初始化发现服务,发现服务依赖于前面创建的配置控制器、服务控制器,对xDS的支持有发现服务提供
- 调用initMonitor方法,初始化Pilot监控服务
- 调用initMultiClusterController方法,初始化多集群控制器,目前可以用于跨越多个K8S集群的服务网格
- 启动ControlZ监听器
- 启动主服务,其实就是执行先前注册的延迟启动函数:
- 启动配置控制器
- 启动Ingress同步器
- 启动Service控制器
- 启动HTTP服务、gRPC服务、安全gRPC服务
- 等待停止信号
表示Pilot主服务,它不是一个简单的(侦听单个端口)服务,而是很多服务的集合。包含的字段如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
type Server struct { // xDS服务管理服务器的实现 EnvoyXdsServer *envoyv2.DiscoveryServer ServiceController *aggregate.Controller // 网格配置 mesh *meshconfig.MeshConfig // 配置控制器,负责监控K8S集群中,Istio的自定义资源的变更 configController model.ConfigStoreCache // Mixer服务的SAN列表 mixerSAN []string // K8S客户端 kubeClient kubernetes.Interface // 启动需要执行的函数列表 startFuncs []startFunc // 集群信息存储 clusterStore *clusterregistry.ClusterStore httpServer *http.Server grpcServer *grpc.Server secureGRPCServer *grpc.Server // 发现服务,支持xDS协议 discoveryService *envoy.DiscoveryService // 包装configController,提供针对Gateways、EnvoyFilter、ServiceEntries等资源的强类型接口 istioConfigStore model.IstioConfigStore // HTTP请求多路分发器,根据URL匹配来决定由哪个handler处理请求 mux *http.ServeMux // 监控各种K8S原生对象,并将更新推送给EDS等组件 kubeRegistry *kube.Controller } |
仅仅当args.Service.Registries包含Kubernetes,才会创建K8S客户端:
1 2 3 4 5 6 |
for _, r := range args.Service.Registries { if serviceregistry.ServiceRegistry(r) == serviceregistry.KubernetesRegistry { needToCreateClient = true break } } |
仅当创建了K8S客户端,才会调用此方法。
此方法创建的对象很简单,就是创建一个空的clusterregistry.ClusterStore:
1 2 3 4 |
type ClusterStore struct { rc map[string]*RemoteCluster storeLock sync.RWMutex } |
1 2 3 4 5 6 7 8 |
type RemoteCluster struct { Cluster *k8s_cr.Cluster FromSecret string Client *clientcmdapi.Config ClusterStatus string Controller *kube.Controller ControlChannel chan struct{} } |
Istio组件Mixer和Pilot需要相互通信,如果mesh.DefaultConfig.ControlPlaneAuthPolicy为mTLS,也就是说Mixer - Pilot通信需要双向TLS认证时,才调用此方法:
1 2 3 4 5 6 |
func (s *Server) initMixerSan(args *PilotArgs) error { if s.mesh.DefaultConfig.ControlPlaneAuthPolicy == meshconfig.AuthenticationPolicy_MUTUAL_TLS { s.mixerSAN = envoy.GetMixerSAN(args.Config.ControllerOptions.DomainSuffix, args.Namespace) } return nil } |
此方法转调makeKubeConfigController方法创建配置控制器:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
func (s *Server) makeKubeConfigController(args *PilotArgs) (model.ConfigStoreCache, error) { kubeCfgFile := s.getKubeCfgFile(args) // crd.Client,负责Istio自定义资源的CRUD操作 // 为每组API的每个版本(apiVersion)创建独立的REST客户端 // apiVersion包括networking.istio.io/v1alpha3、config.istio.io/v1alpha2、authentication.istio.io/v1alpha1、rbac.istio.io/v1alpha1 configClient, err := crd.NewClient(kubeCfgFile, "", model.IstioConfigTypes, args.Config.ControllerOptions.DomainSuffix) // 注册CRD if !args.Config.DisableInstallCRDs { if err = configClient.RegisterResources(); err != nil { return nil, multierror.Prefix(err, "failed to register custom resources.") } } // 创建配置控制器 return crd.NewController(configClient, args.Config.ControllerOptions), nil } |
initConfigController方法返回值的真实类型是crd.controller,除了实现ConfigStoreCache 接口的方法以外,它还负责管理Informer。它为每种CR创建一个Informer:
1 2 3 |
for _, schema := range client.ConfigDescriptor() { out.addInformer(schema, options.WatchedNamespace, options.ResyncPeriod) } |
Pilot中的控制器的工作模式基本都是一样的,下文我们详细分析。
如果必要(允许Istio处理Ingress资源的话),initConfigController还会创建IngressController:
1 2 |
// 下面的方法创建了一个监听Ingress资源的控制器 ingress.NewController(s.kubeClient, s.mesh, args.Config.ControllerOptions) |
包config/aggregate可以将处理不同资源类型的model.ConfigStoreCache组合起来,形成一个更大的model.ConfigStoreCache。接口保持不变,根据资源类型委托给适当的被包装的子对象。
默认的配置下,Pilot会把Config控制器、Ingress控制器组合起来:
1 2 3 4 |
configController, err := configaggregate.MakeCache([]model.ConfigStoreCache{ s.configController, ingress.NewController(s.kubeClient, s.mesh, args.Config.ControllerOptions), }) |
Istio网格的入口通常是Gateway,但是它也能够处理K8S原生的Ingress资源,也就是说Istio可以扮演K8S通常意义上的Ingress Controller。实际上入口流量由istio-ingress这个deployment的Pod处理。
哪些Ingress资源会被管理,由主服务的mesh.IngressControllerMode配置决定:
- MeshConfig_OFF(0):禁用Istio的IngressController功能
- MeshConfig_DEFAULT(1):作为整个K8S集群默认的Ingress控制器
- MeshConfig_STRICT(2):仅仅处理包含了注解kubernetes.io/ingress.class,且值等于mesh.IngressClass(默认istio)的Ingress资源
创建IngressController的逻辑位于initConfigController方法中。
此方法首先创建一个聚合控制器:
1 |
serviceControllers := aggregate.NewController() |
serviceregistry/aggregate中定义的“聚合控制器”,能够管理多个平台(例如K8S)的服务发现机制(ServiceRegistry)。
然后,它调用createK8sServiceControllers方法创建kube.Controller:
1 2 3 4 5 6 7 8 9 10 11 12 13 |
func (s *Server) createK8sServiceControllers(serviceControllers *aggregate.Controller, args *PilotArgs) (err error) { kubectl := kube.NewController(s.kubeClient, args.Config.ControllerOptions) // 创建控制器 s.kubeRegistry = kubectl // 赋值给主服务的kubeRegistry字段 serviceControllers.AddRegistry( // 添加到聚合控制器的registries中 aggregate.Registry{ Name: serviceregistry.KubernetesRegistry, // 服务注册表类型 ClusterID: string(serviceregistry.KubernetesRegistry), // 如果多个同一类型的注册表存在则有意义 ServiceDiscovery: kubectl, // 服务发现,可获得Istio的抽象服务(ServiceInstance) ServiceAccounts: kubectl, // 暴露Istio的抽象服务账号(字符串) Controller: kubectl, // 控制器,可通知Service、ServiceInstance的变更 } ) } |
然后,创建ServiceEntry发现服务,并添加到聚合控制器:
1 2 3 4 5 6 7 8 |
serviceEntryStore := external.NewServiceDiscovery(s.configController, s.istioConfigStore) serviceEntryRegistry := aggregate.Registry{ Name: "ServiceEntries", Controller: serviceEntryStore, ServiceDiscovery: serviceEntryStore, ServiceAccounts: serviceEntryStore, } serviceControllers.AddRegistry(serviceEntryRegistry) |
最后,注册ServiceController的延迟启动。
kube.Controller能够监控K8S中service、endpoint、node、pod等对象。它实现了model.Controller、model.ServiceDiscovery、model.ServiceAccounts等接口。
该结构包含以下字段:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
type Controller struct { domainSuffix string client kubernetes.Interface queue Queue // 事件队列 services cacheHandler // 各种K8S对象的处理器 endpoints cacheHandler nodes cacheHandler pods *PodCache // 允许此控制器读取环境信息,并发布状态信息 Env *model.Environment // 多集群环境下识别集群 ClusterID string // XDSUpdater推送EDS变更到ADS模型 EDSUpdater model.XDSUpdater // 用于请求全局配置的变更 ConfigUpdater model.ConfigUpdater stop chan struct{} } |
此方法首先创建一个新的发现服务:
1 2 3 4 5 6 7 8 9 |
discovery, err := envoy.NewDiscoveryService( s.ServiceController, s.configController, environment, // 提供聚合性的上下文信息API args.DiscoveryOptions, // 监听地址等信息 ) s.discoveryService = discovery s.mux = s.discoveryService.RestContainer.ServeMux |
结构envoy.DiscoveryService是真正的发现服务,是Pilot的核心。它负责推送配置给Envoy。
然后,创建一个gRPC服务器、一个HTTP服务器:
1 2 3 4 5 |
s.initGrpcServer() s.httpServer = &http.Server{ Addr: args.DiscoveryOptions.HTTPAddr, Handler: discovery.RestContainer} |
创建xDS服务:
1 2 3 4 |
// 基于Envoy协议v2版本 s.EnvoyXdsServer = envoyv2.NewDiscoveryServer(environment, istio_networking.NewConfigGenerator(args.Plugins)) // 用于将envoy/v2从envoy/解耦 s.EnvoyXdsServer.ConfigUpdater = s.discoveryService |
注册envoy包的全局函数:
1 2 3 |
// 当配置变更时,请求一个更新 envoy.Push = s.EnvoyXdsServer.Push envoy.BeforePush = s.EnvoyXdsServer.BeforePush |
将xDS服务注册到上述gRPC服务器上:
1 |
s.EnvoyXdsServer.Register(s.grpcServer) |
在使用K8S作为后端时,还会配置kube.Controller:
1 2 3 |
s.kubeRegistry.Env = environment s.kubeRegistry.ConfigUpdater = discovery s.kubeRegistry.EDSUpdater = s.EnvoyXdsServer |
最后,注册httpServer、grpcServer的延迟启动。
proxy/envoy包中定义的这个函数,负责创建一个新的发现服务。它会创建一个RESTful容器并将发现服务注册到此容器:
1 2 3 |
container := restful.NewContainer() out.Register(container) out.RestContainer = container |
它还会向serviceregistry/kube.Controller注册Service、Instance的处理器,对于K8S来说,当Serivce、Endpoint发生变更时,会获得通知。
1 2 3 4 |
serviceHandler := func(*model.Service, model.Event) { out.clearCache() } ctl.AppendServiceHandler(serviceHandler) instanceHandler := func(*model.ServiceInstance, model.Event) { out.clearCache() } err := ctl.AppendInstanceHandler(instanceHandler) |
这些处理器的逻辑很简单,就是清空Envoy缓存,但是kube.Controller附加了一些前置逻辑。
不论是K8S内置资源,而是Istio的CR,都由client-go负责List&Watch,并把事件会传递给config/kube/crd/controller.go、config/kube/ingress/controller.go、serviceregistry/kube/controller.go等控制器中的回调函数处理:
1 2 3 4 5 6 7 8 |
informer.AddEventHandler( cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { // 处理Prometheus指标 k8sEvents.With(prometheus.Labels{"type": otype, "event": "add"}).Add(1) // 放入队列 c.queue.Push(kube.NewTask(handler.Apply, obj, model.EventAdd)) }, |
可以看到,此回调函数的逻辑仅仅是将事件及其处理函数封装为Task并存放到控制器的队列中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
func (q *queueImpl) Push(item Task) { q.cond.L.Lock() defer q.cond.L.Unlock() if !q.closing { q.queue = append(q.queue, item) } // 唤醒等待cond的一个goroutine q.cond.Signal() } type Task struct { handler Handler obj interface{} event model.Event } |
队列的Run循环则取出事件并调用其处理函数:
1 2 3 |
if err := item.handler(item.obj, item.event); err != nil { ... } |
处理函数就是ChainHandler结构的Apply方法,此结构允许针对一个事件串行的调用多个实际的处理函数:
1 2 3 4 5 6 7 8 9 10 11 12 13 |
type ChainHandler struct { funcs []Handler } func (ch *ChainHandler) Apply(obj interface{}, event model.Event) error { for _, f := range ch.funcs { if err := f(obj, event); err != nil { // 链中任何一个实际处理函数出错,都会导致中止处理 return err } } return nil } |
ChainHandler是在对应控制器的初始化阶段创建的,实际处理函数也在那时注册。
Pilot对很多K8S资源变更的响应,都是简单的清除掉Envoy的配置缓存。它们调用的是envoy包的v1版本的DiscoveryService.clearCache方法:
1 2 3 |
func (ds *DiscoveryService) clearCache() { ds.ConfigUpdate(true) } |
可以看到,此方法仅仅是简单的请求一次Envoy配置的Full Push。 也就是说Pilot在配置变更的情况下,通常会完整的推送Envoy配置,幸好推送过程具有防抖动支持,而且配置变更不是频繁操作,否则可能出现性能问题。
config/kube/ingress/controller.go注册了多个处理函数,第一个处理函数判断是否informer已经完全同步,如果不是,则中止处理。第二个处理函数则是简单的清除Envoy缓存配置。
第一个处理函数是控制器的notify方法,此函数判断是否informer已经完全同步,如果不是,则中止处理:
1 2 3 4 5 6 7 8 |
func (c *controller) notify(obj interface{}, event model.Event) error { if !c.HasSynced() { return errors.New("waiting till full synchronization") } // 检查对象是否是DeletedFinalStateUnknown,如果否,对其调用MetaNamespaceKeyFunc,看看能否获得缓存键 _, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) return nil } |
第二个处理函数,首先将IstioObject转换为model.Config,然后转调在启动阶段通过s.configController.RegisterEventHandler()为不同资源类型注册的实际处理函数:
1 2 3 4 5 |
c.kinds[typ].handler.Append(func(object interface{}, ev model.Event) error { item, ok := object.(IstioObject) config, err := ConvertObject(schema, item, c.client.domainSuffix) f(*config, ev) }) |
MeshPolicy、VirtualService、DestinationRule、Gateway的实际处理函数,都仅仅是清除掉所有Envoy缓存。
此CRD的实际处理函数的逻辑复杂一些。首先将model.Config.Spec转换为具体的Istio结构:
1 |
serviceEntry := config.Spec.(*networking.ServiceEntry) |
然后,将ServiceEntry转换为Istio内部的“服务”(model.Service)并异步处理 。model.Service具有FQDN、若干端口、可选的负载均衡/虚拟IP:
1 2 3 4 5 6 7 |
// 一个ServiceEntry可能转换为多个服务 services := convertServices(serviceEntry, config.CreationTimestamp.Time) for _, handler := range c.serviceHandlers { for _, service := range services { go handler(service, event) } } |
最后,将ServiceEntry转换为Istio内部的“实例”(model.ServiceInstance)并异步处理。model.ServiceInstance关联到一个网络端点(ip:port),具有一个服务描述和一组描述服务版本的标签:
1 2 3 4 5 6 |
instances := convertInstances(serviceEntry, config.CreationTimestamp.Time) for _, handler := range c.instanceHandlers { for _, instance := range instances { go handler(instance, event) } } |
不管是服务还是实例,go handler ...的逻辑仍然仅仅是清除掉所有Envoy缓存。
在这里可以注意到,ServiceEntry所(通常)表示的外部服务、和一般性的K8S服务,在Istio内部具有相同的表示 —— Service + ServiceInstance。
第一个处理函数是控制器的notify方法,此函数判断是否informer已经完全同步,如果不是,则中止处理。
对于节点资源,默认情况下没有其它处理逻辑。
pilot的serviceregistry包维护了一个最终一致性的Pod缓存:
1 2 3 4 5 6 7 8 9 10 11 |
func newPodCache(ch cacheHandler, c *Controller) *PodCache { out := &PodCache{ cacheHandler: ch, c: c, keys: make(map[string]string), } ch.handler.Append(func(obj interface{}, ev model.Event) error { return out.event(obj, ev) }) return out } |
Pod事件会触发PodCache.event方法的调用,此方法会更新Pod的IP地址和namespace/name形式的Key之间的映射关系,并且为EDS服务更新工作负载:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 |
func (pc *PodCache) event(obj interface{}, ev model.Event) error { // 注意,Pod被删除后,可能得到*v1.Pod,也可能得到DeletionFinalStateUnknown这个标记对象 pod, ok := obj.(*v1.Pod) if !ok { tombstone, ok := obj.(cache.DeletedFinalStateUnknown) if !ok { return fmt.Errorf("couldn't get object from tombstone %+v", obj) } // 可以从DeletionFinalStateUnknown.Obj获得原始对象 pod, ok = tombstone.Obj.(*v1.Pod) if !ok { return fmt.Errorf("tombstone contained object that is not a pod %#v", obj) } } ip := pod.Status.PodIP // 新创建的Pod的IP地址为空,直到通过UpdateStatus分配了IP if len(ip) > 0 { key := KeyFunc(pod.Name, pod.Namespace) switch ev { case model.EventAdd: switch pod.Status.Phase { case v1.PodPending, v1.PodRunning: // 锁定并更新Pod缓存 pc.rwMu.Lock() pc.keys[ip] = key pc.rwMu.Unlock() // model.XDSUpdater // 更新工作负载,在EDS看来工作负载的ID是IP地址 if pc.c.EDSUpdater != nil { pc.c.EDSUpdater.WorkloadUpdate(ip, pod.ObjectMeta.Labels, pod.ObjectMeta.Annotations) } } case model.EventUpdate: switch pod.Status.Phase { case v1.PodPending, v1.PodRunning: // 更新Pod缓存 default: // 其它状态下,删除Pod缓存 } case model.EventDelete: // 删除Pod缓存 } } return nil } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
// 当接收到K8S Service更新时触发 c.services.handler.Append(func(obj interface{}, event model.Event) error { svc := *obj.(*v1.Service) // 不处理kube-system中定义的service if svc.Namespace == meta_v1.NamespaceSystem { return nil } if c.EDSUpdater != nil { hostname := svc.Name + "." + svc.Namespace ports := map[string]uint32{} portsByNum := map[uint32]string{} for _, port := range svc.Spec.Ports { ports[port.Name] = uint32(port.Port) portsByNum[uint32(port.Port)] = port.Name } // 变更服务信息后调用 c.EDSUpdater.SvcUpdate(c.ClusterID, hostname, ports, portsByNum) // Bypass convertService and the cache invalidation. // 请求完整的配置更新并返回 c.ConfigUpdater.ConfigUpdate(true) return nil } // f的逻辑是清空缓存,默认不会执行到这个旧逻辑,只有当禁用了Direct EDS(环境变量PILOT_DIRECT_EDS=0)才会执行 if svcConv := convertService(svc, c.domainSuffix); svcConv != nil { f(svcConv, event) } return nil }) |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
// 当接收到K8S Endpoint更新时触发 c.endpoints.handler.Append(func(obj interface{}, event model.Event) error { ep := obj.(*v1.Endpoints) // 不处理kube-system中的端点 if ep.Namespace == meta_v1.NamespaceSystem { return nil } if c.EDSUpdater != nil { // 更新端点 c.updateEDS(ep) } else { // 旧逻辑,如果端点对应的服务存在,则清空缓存 if item, exists := c.serviceByKey(ep.Name, ep.Namespace); exists { if svc := convertService(*item, c.domainSuffix); svc != nil { f(&model.ServiceInstance{Service: svc}, event) } } } return nil }) |
Pilot Discovery和Envoy代理配置的更新(这些配置更新都是由于K8S资源变化而引发)、推送有关的逻辑,主要分布在三个包中:
- proxy/envoy,主要的逻辑在此,包括xDS的实现
- model,推送上下文(PushContext)定义在此包中,此结构在每次推送时创建,持有和本次推送相关的所有上下文信息
- networking,负责生成Envoy的配置文件格式
推送由DiscoveryService.ConfigUpdate()方法触发,转调DiscoveryServer.Push(),后者调用PushContext初始化一系列相关的数据结构,并调用networking包中ConfigGenerator的方法生成Envoy v2的结构,然后向所有连接到Pilot的Envoy代理发起推送。
此结构负责发布服务、集群、路由给所有的代理。
1 2 3 4 5 6 7 8 9 10 11 12 |
type DiscoveryService struct { *model.Environment webhookClient *http.Client webhookEndpoint string // 缓存,目前的实现,在任何路由、服务、端点发生变化时,都会Flush缓存。应当实现某种缓存过期策略 // 避免反复Flush或者过期缓存滞留其中 sdsCache *discoveryCache // 发现服务会注册REST路由到此容器 RestContainer *restful.Container // 是否在去回弹之后,需要进行一个完整推送。如果仅仅要求EDS则为false fullPush bool } |
缓存的结构如下,是一种通用的存储:
1 2 3 4 5 6 7 8 9 10 11 12 13 |
type discoveryCacheEntry struct { data []byte // 什么都可以存 hit uint64 // atomic miss uint64 // atomic resourceCount uint32 } type discoveryCache struct { name string disabled bool mu sync.RWMutex cache map[string]*discoveryCacheEntry } |
此方法清除缓存,目前的实现很简陋,就是发起 ds.ConfigUpdate(true)调用,会请求完整的配置推送。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 |
// 100ms DebounceAfter = envDuration("PILOT_DEBOUNCE_AFTER", 100*time.Millisecond) // 10s DebounceMax = envDuration("PILOT_DEBOUNCE_MAX", 10*time.Second) func (ds *DiscoveryService) ConfigUpdate(full bool) { // 设置完整推送标记 if full { ds.fullPush = true } // 去回弹逻辑 if DebounceAfter > 0 { // 如果定时器标记已经设置,不做任何操作,这意味着频繁调用ConfigUpdate不会引发不良后果 if !clearCacheTimerSet { // 此标记在实际执行推送时清除 clearCacheTimerSet = true // 记录去回弹操作开始时间戳 startDebounce := lastClearCacheEvent // 100ms后开始第一次去回弹判断 time.AfterFunc(DebounceAfter, func() { ds.debouncePush(startDebounce) }) } return } // 旧逻辑 // 如果上一次配置变更发生在1秒前,执行推送 if time.Since(lastClearCacheEvent) > 1*time.Second { ds.doPush() return } // 如果上一次变更在1秒内,但是上一次推送大于clearCacheTime,也推送 if time.Since(lastClearCache) > time.Duration(clearCacheTime)*time.Second { ds.doPush() return } // 否则 if !clearCacheTimerSet { clearCacheTimerSet = true time.AfterFunc(1*time.Second, func() { ds.clearCache() // 一秒后判断重新判断是否需要推送 }) } } |
debouncePush方法中包含一些额外的去回弹逻辑:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
func (ds *DiscoveryService) debouncePush(startDebounce time.Time) { // 上一次请求清除缓存的时间 since := time.Since(lastClearCacheEvent) // 距今如果大于200ms,或者从启动去回弹到到现在大于10s // 如果反复请求推送,则||左侧的表达式一直不会满足。为了防止无限的去回弹,必然的推送在最初请求的10s后发生 if since > 2*DebounceAfter || time.Since(startDebounce) > DebounceMax { ds.doPush() } else { // 下一轮去回弹 time.AfterFunc(DebounceAfter, func() { ds.debouncePush(startDebounce) }) } } |
doPush方法真正触发配置推送:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
func (ds *DiscoveryService) doPush() { // 本次推送正在处理时,可能有别的配置变更发生,因此这里需要撤销标记,防止遗漏事件 clearCacheTimerSet = false lastClearCache = time.Now() full := ds.fullPush // 获取自上次推送依赖的服务端点更新 edsUpdates := BeforePush() // Update the config values, next ConfigUpdate and eds updates will use this clearCacheMutex.Lock() ds.fullPush = false clearCacheMutex.Unlock() // 推送,全部或者增量的EDS更新 Push(full, edsUpdates) } |
注意,全局函数BefoerPush实际上是envoyv2.DiscoveryServer.BeforePush方法:
1 2 3 |
// 以envoy包的全局变量作为媒介,在DiscoveryService和v2.DiscoveryServer之间传递这两个函数,避免它们直接依赖 envoy.Push = s.EnvoyXdsServer.Push envoy.BeforePush = s.EnvoyXdsServer.BeforePush |
类似的,全局函数Push 也是envoyv2.DiscoveryServer.Push方法。
也就是说,推送的职责在这里转交给DiscoveryServer了。
此结构提供了Envoy v2 xds API的gRPC实现,在启动阶段由initDiscoveryService创建。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 |
type DiscoveryServer struct { // 环境信息 Env *model.Environment // 仅仅用于调试和压力测试 MemRegistry *MemServiceDiscovery // 负责使用 Istio的网络API + Service注册表信息 生成数据平面的配置信息(也就是Envoy代理配置) ConfigGenerator core.ConfigGenerator // 目前主要用于判断(K8S的)初始缓存同步是否完成 ConfigController model.ConfigStoreCache // 初始连接的限速器 initThrottle chan time.Time // 限速器 throttle chan time.Time // 为/debug/adsz提供配置快照。默认false,可以通过环境变量PILOT_DEBUG_ADSZ_CONFIG=1启用 DebugConfigs bool // 保护被ADS读写的全局数据结构, 包括 EDSUpdates和shards mutex sync.RWMutex // 服务的端点分片列表,此字段从增量更新构建 EndpointShardsByService map[string]*model.EndpointShardsByService // 工作负载列表,可用于检测变更。直接由registry推送的更新计算得到 WorkloadsByID map[string]*Workload // 负责请求配置更新,实现了放抖动(debouncing,延迟配置推送,防止Regsitry连续的资源更新导致过频推送)且能进行变更检测 // 用于将envoy/v2从envoy/解耦,在Istio 1.1将进行简化/清理 ConfigUpdater model.ConfigUpdater // 跟踪自上一次完整推送之后的所有服务的(端点的)变更,从1.0.3+仅仅用于跟踪两次推送之间的增量 // 示例: // { // details.default.svc.k8s.gmem.cc: { // Shards: { // "Kubernetes": { // Shard: "Kubernetes", // Entries: [ endpoints... ] // } // } // } // } edsUpdates map[string]*model.EndpointShardsByService // 保护全局推送上下文,一旦配置变更此上下文就改变,多个地方需要读取此上下文 pushContextMutex sync.RWMutex } |
此结构支持的方法繁多,定义在proxy/envoy/v2包中的ads.go、cds.go、eds.go、lds.go、rds.go等文件中,对应xDS API的各子集以及ADS。其中一部分方法是通过接口暴露的,例如:
- AggregatedDiscoveryServiceServer,Envoy提供的gRPC接口。在ads.go中实现
- XDSUpdater,在push_context.go中定义,在eds.go中实现
此方法的逻辑很简单,将上次变更依赖的服务端点变更增量提取出来返回,并重置此“变更增量”为空:
1 2 3 4 5 6 |
func (s *DiscoveryServer) BeforePush() map[string]*model.EndpointShardsByService { edsUpdates := s.edsUpdates // 重置,后续的更新由新的map跟踪 s.edsUpdates = map[string]*model.EndpointShardsByService{} return edsUpdates } |
在未来,Istio对增量更新提供完整支持后,这里需要重构。在将proxy/envoy/discovery合并到v2 discovery之后,此方法不再允许外部访问。
该方法在准备好配置后,使用ADS协议执行推送:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 |
func (s *DiscoveryServer) Push(full bool, edsUpdates map[string]*model.EndpointShardsByService) { if !full { // 执行增量推送 go s.AdsPushAll(version, s.globalPushContext(), false, edsUpdates) return } // 获取推送上下文 pc := s.globalPushContext() if pc != nil { // 将model.LastPushStatus赋值为当前推送上下文,并更新监控指标 pc.OnConfigChange() } // 创建新的全局推送上下文,基于此上下文进行推送 t0 := time.Now() push := model.NewPushContext() push.ServiceAccounts = s.ServiceAccounts // 初始化此推送上下文 if err := push.InitContext(s.Env); err != nil { adsLog.Errorf("XDS: failed to update services %v", err) return } // 生成所有代理共享的配置,例如出站监听器/集群的配置 if err := s.ConfigGenerator.BuildSharedPushState(s.Env, push); err != nil { adsLog.Errorf("XDS: Failed to rebuild share state in configgen: %v", err) totalXDSInternalErrors.Add(1) return } // 列出端点并创建分片 if err := s.updateServiceShards(push); err != nil { return } // 替换掉全局上下文 s.Env.PushContext = push // 异步推送 go s.AdsPushAll(versionLocal, push, true, nil) } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
func (s *DiscoveryServer) AdsPushAll(version string, push *model.PushContext, full bool, edsUpdates map[string]*model.EndpointShardsByService) { if !full { // 更新集群信息,进行增量EDS推送。只有那些变化了的集群才被更新、推送 s.edsIncremental(version, push, edsUpdates) return } // 赋值一个临时映射,避免加锁 cMap := make(map[string]*EdsCluster, len(edsClusters)) for k, v := range edsClusters { cMap[k] = v } // 为每个集群更新端点 for clusterName, edsCluster := range cMap { if err := s.updateCluster(push, clusterName, edsCluster); err != nil { adsLog.Errorf("updateCluster failed with clusterName %s", clusterName) } } // 向所有客户端连接发送一个信号 s.startPush(version, push, true, nil) } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 |
func (s *DiscoveryServer) startPush(version string, push *model.PushContext, full bool, edsUpdates map[string]*model.EndpointShardsByService) { // 遍历所有已连接的Envoy,这覆盖ADS / EDS(0.7),它们共享一样的连接表 adsClientsMutex.RLock() // 复制以避免加锁 pending := []*XdsConnection{} for _, v := range adsClients { pending = append(pending, v) } adsClientsMutex.RUnlock() // 触发每个代理的配置的重现计算,包括Envoy监听的所有配置,包括EDS pendingPush := int32(len(pending)) tstart := time.Now() for { if len(pending) == 0 { // 无需推送,或者都推送完了,退出 return } currentVersion := versionInfo() // 如果具有更新版本的推送正在进行,则当前推送取消 if !allowConcurrentPush && version != currentVersion && full { return } c := pending[0] pending = pending[1:] edsOnly := edsUpdates if full { edsOnly = nil } // 非阻塞性推送,如果下一次推送紧跟着就到来,可能导致问题 // 这里是逐个处理客户端,可以优化为并行的进行 client := c // time.After返回一个只读通道,在给定的超时到达后,通道可读 to := time.After(PushTimeout) select { // 尝试向客户端的推送通道发送事件 case client.pushChannel <- &XdsEvent{ push: push, pending: &pendingPush, version: version, edsUpdatedServices: edsOnly, }: // 该客户端推送成功 client.LastPush = time.Now() client.LastPushFailure = timeZero // 如果推送通道不可用,检查客户端连接是否关闭 case <-client.doneChannel: adsLog.Infof("Client closed connection %v", client.ConID) case <-to: // 推送超时,可能是由于Envoy代理处于异常状态,无法接收数据 pushTimeouts.Add(1) // 放回去等待重试 pending = append(pending, c) } } } |
可以看到,startPush仅仅是把事件放入通道,并不直接牵涉到gRPC相关的操作。这实现了数据和传输的解耦。
推送上下文,此结构跟踪推送的状态(指标、错误)。在一次推送完毕后,指标全部置零。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
type PushContext struct { // 操控下面的map的锁 proxyStatusMutex sync.RWMutex // ID ErrCode 记录推送给代理时的事件 ProxyStatus map[string]map[string]ProxyPushStatus // 最后一次配置变更(导致推送状态重置)的时间 Start time.Time End time.Time // 保护下面的字段 Mutex sync.Mutex // 推送开始时系统中的所有服务 Services []*Service // 以主机名索引的所有服务 ServiceByHostname map[Hostname]*Service // 服务关联的DestinationRule(和目的地相关的规则) destinationRuleHosts []Hostname destinationRuleByHosts map[Hostname]*combinedDestinationRule // 环境信息 Env *Environment ServiceAccounts func(string) []string // 用于跟踪服务名称和端口的映射关系ServicePort2Name is used to keep track of service name and port mapping. // ADS的名字使用端口号,而端点的名字则使用端口名为准 // 服务名 端口号+名称信息 ServicePort2Name map[string]PortList initDone bool } |
此结构提供了Add、UpdateMetrics方法,和Prometheus统计指标有关:
1 2 3 4 |
// 可以步进和推送有关的Prometheus计数器 func (ps *PushContext) Add(metric *PushMetric, key string, proxy *Proxy, msg string) // 基于推送的当前状态来更新Prometheus指标 func (ps *PushContext) UpdateMetrics() |
该方法负责初始化一个新的推送上下文,生成所需的一切数据结构,在执行Envoy配置推送之前你需要(从创建PushContext的协程)调用它。
该方法缓存注册表中的所有服务。
该方法缓存所有VirtualService对象。
该方法初始化所有DestinationRule对象。
列出绑定到指定网关的序列服务。
返回一个服务的DestinationRule。
计算出给定服务的给定子集对应的标签。Istio使用K8S Pod标签区分子集。
此接口负责请求配置更新,在配置推送之前,需要进行去回弹(防止连续触发的频繁推送)。
1 2 3 |
type ConfigUpdater interface { ConfigUpdate(full bool) } |
目前,接口由DiscoveryService实现,而DiscoveryServer也引用前者:
1 |
s.EnvoyXdsServer.ConfigUpdater = s.discoveryService |
XDSUpdater用于xDS模型的直接更新,以及增量推送。DiscoveryServer实现了此接口。
Pilot使用多个注册表 —— 例如每个K8S集群就是一个注册表。每个注册表负责跟踪关联到网格中服务的端点的变更,并在变更后调用EDSUpdate方法。
注册表可以单个服务的端点分组为更小的子集(例如每个Deployment一个子集),以处理端点数量巨大的服务。由于限制可扩容性,Istio倾向于避免传递大量的对象,例如某个注册表的全部端点,或者某个服务在全部注册表中的端点。
Istio在未来会进行一些优化,例如以标签、网关、或者区域来分组端点。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
type XDSUpdater interface { // 当端点列表发生变化,或者ServiceEntry的标签发生变化,该方法被调用 // 对于一个集群/主机名,其所有端点必须一并发送给代理 // shard是一个键,目前是注册表的名称(例如Kubernetes) EDSUpdate(shard, hostname string, entry []*IstioEndpoint) error // 当一个服务-端口映射定义发生变化后调用,对服务的标签、注解或者其它属性进行变更,可能触发EDS/CDS重新计算、增量推送 // 但不会影响LDS/RDS SvcUpdate(shard, hostname string, ports map[string]uint32, rports map[uint32]string) // 当一个工作负载的标签、注解发生变化时由注册表负责调用 // 对于K8S来说,id是Pod的IP地址(如果Pod在默认/主网络中) WorkloadUpdate(id string, labels map[string]string, annotations map[string]string) } |
当服务的信息发生变化后,调用此方法,主要逻辑是更新全局推送上下文中的ServicePort2Name字段。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
func (s *DiscoveryServer) SvcUpdate(cluster, hostname string, ports map[string]uint32, rports map[uint32]string) { pc := s.globalPushContext() s.mutex.Lock() defer s.mutex.Unlock() // 在1.0中,服务仅仅来自primary K8S集群 if cluster == string(serviceregistry.KubernetesRegistry) { pl := model.PortList{} for k, v := range ports { pl = append(pl, &model.Port{ Port: int(v), Name: k, }) } pc.ServicePort2Name[hostname] = pl } } |
当工作负载的标签/注解发生变化后,PodCache调用此方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
func (s *DiscoveryServer) WorkloadUpdate(id string, labels map[string]string, annotations map[string]string) { if labels == nil { // 没有标签,不需要推送更新给Envoy,删除缓存 s.mutex.Lock() delete(s.WorkloadsByID, id) s.mutex.Unlock() return } // 使用读锁,禁止写 s.mutex.RLock() w, f := s.WorkloadsByID[id] s.mutex.RUnlock() if !f { // 不存在于缓存中,可能从未连接到此工作负载,不需要推送 s.mutex.Lock() s.WorkloadsByID[id] = &Workload{ Labels: labels, Annotations: annotations, } s.mutex.Unlock() return } // 深度比较 if reflect.DeepEqual(w.Labels, labels) { // 标签没有变化,也不推送 return } w.Labels = labels // Pod标签变化,意味着需要重新计算Envoy配置 // 为了简单和安全,简单的进行全部推送。其实只需要识别出影响到哪些工作负载(那些可能使用到当前工作负载的),并且仅仅为这些负载进行推送 adsLog.Infof("Label change, full push %s ", id) s.ConfigUpdater.ConfigUpdate(true) } |
kube.Controller在接收到K8S的端点更新后,会调用其自身的updateEDS方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
func (c *Controller) updateEDS(ep *v1.Endpoints) { // 计算出K8S服务名 hostname := serviceHostname(ep.Name, ep.Namespace, c.domainSuffix) // 收集端点 endpoints := []*model.IstioEndpoint{} for _, ss := range ep.Subsets { for _, ea := range ss.Addresses { // 如果端点对应的Pod不存在 pod, exists := c.pods.getPodByIP(ea.IP) if !exists { // 则请求一次配置更新。Pod信息可能后续很快推送过来 if c.ConfigUpdater != nil { c.ConfigUpdater.ConfigUpdate(true) } continue } // 如果Pod存在,则抓取其标签、UID等信息,构建出IstioEndpoint labels := map[string]string(convertLabels(pod.ObjectMeta)) uid := fmt.Sprintf("kubernetes://%s.%s", pod.Name, pod.Namespace) for _, port := range ss.Ports { endpoints = append(endpoints, &model.IstioEndpoint{ Address: ea.IP, EndpointPort: uint32(port.Port), ServicePortName: port.Name, Labels: labels, UID: uid, ServiceAccount: kubeToIstioServiceAccount(pod.Spec.ServiceAccountName, pod.GetNamespace(), c.domainSuffix), }) } } } // 更新 Kubernetes分片 这个服务 这些端点 err := c.EDSUpdater.EDSUpdate(c.ClusterID, string(hostname), endpoints) if err != nil { //如果仅仅进行EDS推送不可以,则全局推送 c.ConfigUpdater.ConfigUpdate(true) } else { // 仅仅进行EDS推送 c.ConfigUpdater.ConfigUpdate(false) } } |
此接口位于networking.core包中,负责生成xDS响应。其实现位于v1alpha3子包(Istio CRD的API版本号)中。接口规格如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
type ConfigGenerator interface { // 此方法重新计算所有Envoy代理之间的共享状态,这些状态包括: // 1、Sidecar的出站集群(outbound clusters)、出站监听器(outbound listener) // 2、Sidecar/Gateway的路由表 // 这些状态保存在ConfigGenerator对象中,在后续调用BuildClusters/BuildListeners/BuildHTTPRoutes时重用 // 该方法不会调用插件,大部分插件需要的是特定代理的(例如mixer/authn/authz)的信息 // BuildYYY函数会针对预计算对象调用插件 BuildSharedPushState(env *model.Environment, push *model.PushContext) error // LDS。为指定的代理构建inbound/outbound listeners信息,多次调用此函数,不会反复构建同一监听器 BuildListeners(env *model.Environment, node *model.Proxy, push *model.PushContext) ([]*v2.Listener, error) // CDS。为指定代理构建clusters信息 BuildClusters(env *model.Environment, node *model.Proxy, push *model.PushContext) ([]*v2.Cluster, error) // RDS。 为指定代理构建HTTP routes信息 BuildHTTPRoutes(env *model.Environment, node *model.Proxy, push *model.PushContext, routeName string) (*v2.RouteConfiguration, error) } |
注意BuildYYY函数的返回值均为Envoy v2协议的ProtoBuf消息类型。
该方法的实现如下:
1 2 3 4 5 6 |
func (configgen *ConfigGeneratorImpl) BuildSharedPushState(env *model.Environment, push *model.PushContext) error { // 为Sidecar代理、Router代理分别构建出站集群 configgen.OutboundClustersForSidecars = configgen.buildOutboundClusters(env, model.Sidecar, push) configgen.OutboundClustersForGateways = configgen.buildOutboundClusters(env, model.Router, push) return nil } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 |
func (configgen *ConfigGeneratorImpl) buildOutboundClusters(env *model.Environment, proxyType model.NodeType, push *model.PushContext) []*v2.Cluster { clusters := make([]*v2.Cluster, 0) // 遍历上下文中包含的所有服务 for _, service := range push.Services { // 获取服务的目的地规则(可能为空) config := push.DestinationRule(service.Hostname) for _, port := range service.Ports { if port.Protocol == model.ProtocolUDP { continue } // 得到服务包含的主机列表,如果服务的Resolution设置为DNSLB则返回nil hosts := buildClusterHosts(env, service, port.Port) // BuildSubsetKey:生成引用特定(限定服务、端口、子集)实例集的键,Envoy使用此键查询Pilot,进而获得子集中包含的实例列表 // 示例:outbound|443||kubernetes.default.svc.k8s.gmem.cc clusterName := model.BuildSubsetKey(model.TrafficDirectionOutbound, "", service.Hostname, port.Port) // 获得集群(上游服务)的Istio账号 upstreamServiceAccounts := env.ServiceAccounts.GetIstioServiceAccounts(service.Hostname, []string{port.Name}) // convertResolution将Istio解析枚举转换为Envoy的集群发现类型(默认v2.Cluster_EDS) // ClientSideLB -> v2.Cluster_EDS // DNSLB -> v2.Cluster_STRICT_DNS // Passthrough -> v2.Cluster_ORIGINAL_DST // 生成默认(相对于子集来说是默认)集群v2.Cluster信息,包括流量策略TrafficPolicy(连接池、异常检测、TLS设置、负载均衡器) defaultCluster := buildDefaultCluster(env, clusterName, convertResolution(service.Resolution), hosts) // 更新v2.Cluster.EdsClusterConfig,将此集群的EDS的配置源设置为ADS updateEds(env, defaultCluster, service.Hostname) // 如果是HTTP/2协议,则将最大并发流数量设置为1073741824 setUpstreamProtocol(defaultCluster, port) // 将当前集群添加到列表 clusters = append(clusters, defaultCluster) if config != nil { // 如果此服务具有关联的目的地规则 destinationRule := config.Spec.(*networking.DestinationRule) // 如果使用mTLS,将DestinationRule的TrafficPolicy的证书字段填充上 convertIstioMutual(destinationRule, upstreamServiceAccounts) // 将TrafficPolicy转换为v2.Cluster的对应字段 applyTrafficPolicy(defaultCluster, destinationRule.TrafficPolicy, port) for _, subset := range destinationRule.Subsets { // 处理子集,处理方式和默认集群类似 // 子集集群的名字示例 outbound|9080|v1|details.default.svc.k8s.gmem.cc subsetClusterName := model.BuildSubsetKey(model.TrafficDirectionOutbound, subset.Name, service.Hostname, port.Port) subsetCluster := buildDefaultCluster(env, subsetClusterName, convertResolution(service.Resolution), hosts) updateEds(env, subsetCluster, service.Hostname) setUpstreamProtocol(subsetCluster, port) applyTrafficPolicy(subsetCluster, destinationRule.TrafficPolicy, port) applyTrafficPolicy(subsetCluster, subset.TrafficPolicy, port) // 调用插件,以修改生成的配置 for _, p := range configgen.Plugins { p.OnOutboundCluster(env, push, service, port, subsetCluster) } clusters = append(clusters, subsetCluster) } } else { // 如果全局启用了mTLS,并且此服务不是外部服务,并且当前代理是边车 if env.Mesh.AuthPolicy == meshconfig.MeshConfig_MUTUAL_TLS && !service.MeshExternal && proxyType == model.Sidecar { applyUpstreamTLSSettings(defaultCluster, buildIstioMutualTLS(upstreamServiceAccounts, "")) } } // 为默认集群调用插件 for _, p := range configgen.Plugins { p.OnOutboundCluster(env, push, service, port, defaultCluster) } } } return clusters } |
networking包提供了一种插件机制,允许在构造xdsapi.Listener期间,对xdsapi.Listener进行任意的修改。例如启用:
- AuthenticationPlugin插件,可以在入站监听器、出站集群上设置mTLS认证
- mixer插件,可以在入站监听器上设置策略检查
插件类型包括:
1 2 3 4 5 6 7 8 9 10 11 12 |
const ( // 身份验证插件 Authn = "authn" // RBAC插件 Authz = "authz" // Envoyfilter插件 Envoyfilter = "envoyfilter" // 健康检查插件 Health = "health" // Mixer 插件 Mixer = "mixer" ) |
插件接口规格如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
type Plugin interface { // 每当为一个服务添加一个出站监听器(到LDS输出)后,调用此方法 // 可用于在出站路径上添加额外的监听器 OnOutboundListener(in *InputParams, mutable *MutableObjects) error // 每当为一个服务添加一个监听器(到LDS输出)后,调用此方法 // 可用于添加额外的监听器 OnInboundListener(in *InputParams, mutable *MutableObjects) error // 每当一个新集群添加到CDS输出后,调用此方法 // 为每次推送调用一次,而非为每个边车/代理 OnOutboundCluster(env *model.Environment, push *model.PushContext, service *model.Service, servicePort *model.Port, cluster *xdsapi.Cluster) // 每当一个新集群添加到CDS输出后,调用此方法 // 为每个边车/代理调用 OnInboundCluster(env *model.Environment, node *model.Proxy, push *model.PushContext, service *model.Service, servicePort *model.Port, cluster *xdsapi.Cluster) // 每当新的虚拟主机(和路由)添加到出站路径的RDS后调用 OnOutboundRouteConfiguration(in *InputParams, routeConfiguration *xdsapi.RouteConfiguration) // 每当新的虚拟主机(和路由)添加到入站路径的RDS后调用 OnInboundRouteConfiguration(in *InputParams, routeConfiguration *xdsapi.RouteConfiguration) } |
具体的插件,只需要按需实现部分方法即可。
一系列传递给插件回调函数(Plugin接口函数)的值,这些值应该被插件只读访问:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
type InputParams struct { // 必须字段,监听器的协议,例如TCP/HTTP ListenerProtocol ListenerProtocol // 必须字段,环境信息 Env *model.Environment // xDS响应是给哪个节点的 Node *model.Proxy // 网格中所有代理服务的实例 ProxyInstances []*model.ServiceInstance // 和监听器同地协作的服务实例(应用到Sidecar) ServiceInstance *model.ServiceInstance // S和监听器同地协作的服务(应用到Sidecar) // 对于出站TCP监听器,此字段指向目的地服务 Service *model.Service // 监听器的端口 // 对于边车的outbound/inbound监听器,指向服务端口(而非端点端口) // 对于网关的inbound监听器,指向网关服务器端口 Port *model.Port // 推送上下文 Push *model.PushContext } |
传递给On*Listener回调的一系列对象:
1 2 3 4 5 6 |
type MutableObjects struct { // 当前正在生成的监听器配置 Listener *xdsapi.Listener // 关联到此监听器的过滤器链 FilterChains []FilterChain } |
传递给On*Cluster回调的Envoy Cluster对象,直接对应ProtoBuffer消息。
传递给On*RouteConfiguration回调的Envoy RouteConfiguration对象,直接对应ProtoBuffer消息。
本章讨论Pilot Agent的启动过程,以及Envoy的Bootstrap配置是如何生成和重新载入的。Pilot Agnet和Pilot Discovery如何通信在下一章分析。
为了方便调试,我们在本地启动Pilot Agent,参考下面的命令行参数:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
export POD_NAME=productpage-v1-8d69b45c-5qcv7 export POD_NAMESPACE=default export INSTANCE_IP=10.0.0.1 export ISTIO_BOOTSTRAP=pilot/envoy_bootstrap_tmpl.tpl # pilot-agent的入口点代码位于 pilot/cmd/pilot-agent/main.go # 此代理的角色,可选sidecar、ingress、router pilot proxy sidecar --log_output_level=default:debug --log_caller=default --domain=default.svc.k8s.gmem.cc \ # Envoy二进制文件的位置 --binaryPath=/home/alex/CPP/projects/clion/envoy/bazel-bin/source/exe/envoy-static \ # 生成的Envoy配置文件存放在何处 --configPath=pilot/envoyConfig \ # 当前代理的节点属于哪个服务 --serviceCluster=productpage \ # 发现服务的地址 --discoveryAddress=localhost:15010 \ # Envoy进程的日志级别 --proxyLogLevel=debug |
Pilot Agent的入口点位于pilot/cmd/pilot-agent/main.go,它使用包spf13.cobra来管理一组子命令,Pilot核心功能由proxy子命令实现。
入口点的启动过程如下:
- 解析命令行参数
- 初始化model.Proxy对象,在K8S环境下
- ID=$POD_NAME.$POD_NAMESPACE
- Domain 从命令行--domain参数传入
- IPAddress,从环境变量INSTANCE_IP读入
- 如果控制平面的身份验证策略是mTLS,则初始化Pilot的SAN
- 调用model.ValidateProxyConfig进行代理配置合法性验证
- 初始化数字证书位置信息,默认存储在/etc/certs/。如果当前代理的角色不是sidecar而是ingress,还要初始化Ingress数字证书位置信息
- 如果提供了Envoy配置模板,且CustomConfigFile为空(不使用自定义Envoy配置文件),则根据此模板生成一个Envoy配置文件,并将其路径赋值给CustomConfigFile
- 如果提供了statusPort,则调用status.NewServer启动一个状态服务。访问状态服务的/healthz/ready可以知晓代理是否准备好
- 调用envoy.NewProxy,创建一个envoy结构
- 调用proxy.NewAgent,创建一个agent结构
- 调用envoy.NewWatcher,创建一个watcher结构
- 调用watcher.Run启动Watcher
- 启动Agent控制循环
- 调用Reload立即载入,导致Envoy进程启动
- 调用watchCerts监听数字证书的变化
此结构位于proxy.envoy包中,封装启动envoy所需的信息:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
func NewProxy(config meshconfig.ProxyConfig, node string, logLevel string, pilotSAN []string) proxy.Proxy { // 设置命令行传入的envoy的日志级别 var args []string if logLevel != "" { args = append(args, "-l", logLevel) } return envoy{ // ProxyConfig结构 config: config, // 节点的标识符,例如sidecar~10.0.0.1~productpage-v1-8d69b45c-5qcv7.default~default.svc.k8s.gmem.cc node: node, // 额外的Envoy参数 extraArgs: args, // 启用mTLS时所需的Pilot的SAN pilotSAN: pilotSAN, } } |
该结构实现了Proxy接口:
1 2 3 4 5 6 7 8 9 |
type Proxy interface { // 启动代理 Run(interface{}, int, <-chan error) error // 在代理退出后,清除对应的纪元 Cleanup(int) // 如果以期望配置启动代理的最大尝试次数到达仍然失败,Agent在终结之前调用此方法 Panic(interface{}) } |
此方法生成Envoy的Bootstrap配置,并启动Envoy进程:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 |
func (proxy envoy) Run(config interface{}, epoch int, abort <-chan error) error { var fname string // 使用自定义配置文件,或者通过模板生成配置文件 if len(proxy.config.CustomConfigFile) > 0 { fname = proxy.config.CustomConfigFile } else { out, err := bootstrap.WriteBootstrap(&proxy.config, proxy.node, epoch, proxy.pilotSAN, proxy.opts, os.Environ()) fname = out } // 处理Envoy命令行参数 args := proxy.args(fname, epoch) if len(proxy.config.CustomConfigFile) == 0 { args = append(args, "--v2-config-only") } // e.g. -c pilot/envoyConfig/envoy-rev0.json --restart-epoch 0 --drain-time-s 2 --parent-shutdown-time-s 3 --service-cluster productpage --service-node sidecar~10.0.0.1~productpage-v1-8d69b45c-5qcv7.default~default.svc.k8s.gmem.cc --max-obj-name-len 189 --allow-unknown-fields -l warn --v2-config-only] log.Infof("Envoy command: %v", args) /* 异步命令行调用 */ cmd := exec.Command(proxy.config.BinaryPath, args...) cmd.Stdout = os.Stdout // 直接把Envoy子进程的标准输出和错误收集过来 cmd.Stderr = os.Stderr if err := cmd.Start(); err != nil { return err } // 等待Envoy进程退出 if proxy.errChan != nil { // 把Envoy退出错误码存入err通道 go func() { proxy.errChan <- cmd.Wait() }() return nil } // 再建一个通道和协程,当Envoy退出时协程写入通道,让当前协程从select case中退出 done := make(chan error, 1) go func() { done <- cmd.Wait() }() select { // Agent可以发送Abort信号,作为响应此Envoy实例需要停止 case err := <-abort: log.Warnf("Aborting epoch %d", epoch) if errKill := cmd.Process.Kill(); errKill != nil { log.Warnf("killing epoch %d caused an error %v", epoch, errKill) } return err // Envoy实例自然死亡 case err := <-done: return err } } |
此方法生成Envoy Bootstrap配置。使用的配置模板,按以下逻辑寻找:
- 如果指定了--customConfigFile,使用此文件。这步逻辑仅仅用于测试,实际无用
- 如果config.ProxyBootstrapTemplatePath不为空,使用此文件。此字段没有通过命令行暴露
- 否则,使用常量DefaultCfgDir指定的值,也就是默认配置
- 如果设置了环境变量ISTIO_BOOTSTRAP,使用此变量指定的配置模板路径
默认配置的路径是/var/lib/istio/envoy/envoy_bootstrap_tmpl.json,内容为Envoy的JSON格式的配置的Go Template。
生成的Envoy配置文件样例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 |
{ // 节点(边车和其代理的应用所在Pod)信息 "node": { "id": "sidecar~10.0.0.1~productpage-v1-8d69b45c-5qcv7.default~default.svc.k8s.gmem.cc", "cluster": "productpage", "metadata": { "istio": "sidecar" } }, // 监控指标配置,抽取标签 "stats_config": { "use_all_default_tags": false, "stats_tags": [ { "tag_name": "cluster_name", "regex": "^cluster\\.((.+(\\..+\\.svc\\.cluster\\.local))\\.)" }, { "tag_name": "tcp_prefix", "regex": "^tcp\\.((.*)\\.)\\w+$" }, { "tag_name": "response_code", "regex": "_rq(_(\\d{3}))$" }, { "tag_name": "response_code_class", "regex": "_rq(_(\\dxx))$" }, { "tag_name": "http_conn_manager_listener_prefix", "regex": "^listener(=\\.).*\\.http\\.(((:[_.[:digit:]]*|[_\\[\\]aAbBcCdDeEfF[:digit:]]*))\\.)" }, { "tag_name": "http_conn_manager_prefix", "regex": "^http\\.(((:[_.[:digit:]]*|[_\\[\\]aAbBcCdDeEfF[:digit:]]*))\\.)" }, { "tag_name": "listener_address", "regex": "^listener\\.(((:[_.[:digit:]]*|[_\\[\\]aAbBcCdDeEfF[:digit:]]*))\\.)" } ] }, // 管理端口配置,管理端点也暴露了Prometheus Exporter "admin": { "access_log_path": "/dev/null", "address": { "socket_address": { "address": "127.0.0.1", "port_value": 15000 } } }, // 动态发现配置 "dynamic_resources": { "lds_config": { "ads": {} }, "cds_config": { "ads": {} }, // 使用集群xds-grpc提供的聚合发现服务 "ads_config": { "api_type": "GRPC", "refresh_delay": "1s", "grpc_services": [ { "envoy_grpc": { "cluster_name": "xds-grpc" } } ] } }, // 静态配置 "static_resources": { "clusters": [ // Prometheus Exporter { "name": "prometheus_stats", "type": "STATIC", "connect_timeout": "0.250s", "lb_policy": "ROUND_ROBIN", "hosts": [ { "socket_address": { "protocol": "TCP", "address": "127.0.0.1", "port_value": 15000 } } ] }, // xDS服务地址,测试时注意把前文提到的发现服务开启 { "name": "xds-grpc", "type": "STRICT_DNS", "connect_timeout": "1s", "lb_policy": "ROUND_ROBIN", "hosts": [ { "socket_address": { "address": "localhost", "port_value": 15010 } } ], "circuit_breakers": { "thresholds": [ { "priority": "DEFAULT", "max_connections": 100000, "max_pending_requests": 100000, "max_requests": 100000 }, { "priority": "HIGH", "max_connections": 100000, "max_pending_requests": 100000, "max_requests": 100000 } ] }, "upstream_connection_options": { "tcp_keepalive": { "keepalive_time": 300 } }, "http2_protocol_options": {} } ], // 暴露Prometheus指标的监听器 "listeners": [ { "address": { "socket_address": { "protocol": "TCP", "address": "0.0.0.0", "port_value": 15090 } }, "filter_chains": [ { "filters": [ { "name": "envoy.http_connection_manager", "config": { "codec_type": "AUTO", "stat_prefix": "stats", "route_config": { "virtual_hosts": [ { "name": "backend", "domains": [ "*" ], "routes": [ // 访问curl http://localhost:15090/stats/prometheus可以直接看到指标 { "match": { "prefix": "/stats/prometheus" }, "route": { "cluster": "prometheus_stats" } } ] } ] }, "http_filters": { "name": "envoy.router" } } } ] } ] } ] } } |
此结构位于proxy.envoy包中,它是envoy proxy的代理人(Agent) 。它负责管理envoy进程的生命周期和重启。
Agent跟踪所有运行中的代理epochs及其配置。Envoy热重启通过启动具有递增的重启纪元的(strictly incremented restart epoch)代理进程实现。优雅的关闭旧的epochs并且将所有必须的状态传递给最新的epoch是envoy的职责,Agent不会去主动关闭旧的epoch对应的envoy进程。
最初的epoch是0。
这里使用的重启协议(restart protocol )是和Envoy的重启纪元(restart epochs)语义匹配的:为了成功启动用来替代正在运行的Envoy的新进程,新进程的重启纪元必须正好比所有正在运行的其它Envoy进程中纪元最大的那个高1。
Agent需要调用Proxy的两个函数:
- Run函数,用于启动代理,且必须一直阻塞直到代理退出
- Cleanup函数,用于在代理退出后立即进行清理,此函数必须是非阻塞的,因为它在Agent的主控制循环中被调用
这两个函数都以代理的纪元作为入参。
每当Run函数返回了错误,Agent都假设代理启动失败了,并且会重试若干次。后续的重启可能重用之前失败的启动的纪元号。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
func NewAgent(proxy Proxy, retry Retry) Agent { return &agent{ // Proxy接口的实现,也就是上面的envoy proxy: proxy, // 重试(为proxy应用新配置)规则 retry: retry, // 当前纪元以及对应的配置 epochs: make(map[int]interface{}), // 通过此通道传递期望的配置 configCh: make(chan interface{}), // 通过此通道提示代理退出 statusCh: make(chan exitStatus), // 通过此通道终止运行中的实例 abortCh: make(map[int]chan error), } } |
该结构实现了Agent接口:
1 2 3 4 5 6 7 |
type Agent interface { // 为代理设置期望的配置 —— Agent对比当前活动的配置和期望的配置,如果有必要,触发重启操作 // 如果重启失败,以指数后退(exponential back-off)重试 ScheduleConfigUpdate(config interface{}) // 启动Agent控制循环 Run(ctx context.Context) } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 |
func (a *agent) Run(ctx context.Context) { // 限速器,平滑1QPS,爆发10QPS。为了处理所有通道的消息需要高QPS rateLimiter := rate.NewLimiter(1, 10) for { // Wait 阻塞,直到有了1个事件许可,等价于WaitN(ctx,1) err := rateLimiter.Wait(ctx) var delay time.Duration = 1<<63 - 1 // 如果先前已经安排了下一次调度,设置延迟时间戳 if a.retry.restart != nil { delay = time.Until(*a.retry.restart) } select { // 尝试从配置通道读取新配置,注意,Watcher从此通道传来的仅仅是散列值,此值仅用于识别配置是否变化,而不包含实际的配置信息 case config := <-a.configCh: // 如果和当前配置不同 if !reflect.DeepEqual(a.desiredConfig, config) { // 赋值给期望配置 a.desiredConfig = config // 重置 重试预算(剩余的重试次数) a.retry.budget = a.retry.MaxRetries a.reconcile() } // 如果此通道可读,则说明代理应该退出 case status := <-a.statusCh: // 删除status中的epoch delete(a.epochs, status.epoch) // 删除终止通道,防止在非Abort错误时自我终止 delete(a.abortCh, status.epoch) // 更新配置 a.currentConfig = a.epochs[a.latestEpoch()] // Abort错误 if status.err == errAbort { log.Infof("Epoch %d aborted", status.epoch) } else if status.err != nil { // 用于Envoy热重启竞态条件的关系,这里需要通过杀死所有Envoy进程,立即的、非优雅的重启 a.abortAll() } else { // 正常退出 log.Infof("Epoch %d exited normally", status.epoch) } // 清理此纪元的配置文件 a.proxy.Cleanup(status.epoch) // 如果存在错误,调度一次重试 if status.err != nil { // 延迟已调度的延迟,如果不为空,说明已经调度了 if a.retry.restart == nil { // 如果还有重试预算 if a.retry.budget > 0 { delayDuration := a.retry.InitialInterval * (1 << uint(a.retry.MaxRetries-a.retry.budget)) restart := time.Now().Add(delayDuration) a.retry.restart = &restart // 调度下一次重试 a.retry.budget = a.retry.budget - 1 } else { // 没有重试预算了,Panic a.proxy.Panic(status.epoch) return } } } // 如果已经到了下次调度的延迟 case <-time.After(delay): a.reconcile() // 通道关闭 case _, more := <-ctx.Done(): if !more { a.terminate() return } } } } |
此方法重现载入新的代理配置:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
func (a *agent) reconcile() { // 取消掉任何计划重启 a.retry.restart = nil // 再次检查期望配置是否和当前配置相同 if reflect.DeepEqual(a.desiredConfig, a.currentConfig) { return } // 新增纪元 epoch := a.latestEpoch() + 1 // 重置abort通道 abortCh := make(chan error, MaxAborts) a.epochs[epoch] = a.desiredConfig a.abortCh[epoch] = abortCh a.currentConfig = a.desiredConfig // 在新协程中启动代理 go a.waitForExit(a.desiredConfig, epoch, abortCh) } |
此方法调用代理的Run()方法,并且在其崩了后,将错误码写入通道:
1 2 3 4 |
func (a *agent) waitForExit(config interface{}, epoch int, abortCh <-chan error) { err := a.proxy.Run(config, epoch, abortCh) a.statusCh <- exitStatus{epoch: epoch, err: err} } |
此结构位于proxy.envoy包中,负责当代理配置发生变化后,触发Agent的reload。它持有agent,还监控数字证书的变更:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
func NewWatcher(config meshconfig.ProxyConfig, agent proxy.Agent, role model.Proxy, certs []CertSource, pilotSAN []string) Watcher { return &watcher{ // Agent agent: agent, // 代理角色 role: role, // ProxyConfig config: config, // CertSource列表 certs: certs, // Pilot的SAN pilotSAN: pilotSAN, } } |
此结构实现了Watcher接口:
1 2 3 4 5 6 |
type Watcher interface { // 阻塞性的运行监控循环 Run(context.Context) // 使用最新的配置Reload Agent Reload() } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
func (w *watcher) Run(ctx context.Context) { // 启动Agent的控制循环 go w.agent.Run(ctx) // 立即以最新配置重载Agent w.Reload() // 监控数字证书文件的变更 certDirs := make([]string, 0, len(w.certs)) for _, cert := range w.certs { certDirs = append(certDirs, cert.Directory) } // 如果数字证书发生变化,则调用Reload方法重载Agent go watchCerts(ctx, certDirs, watchFileEvents, defaultMinDelay, w.Reload) <-ctx.Done() } |
促使Agent立即启动Envoy:
1 2 3 4 5 6 7 8 9 |
func (w *watcher) Reload() { // 生成数字证书的哈希 h := sha256.New() for _, cert := range w.certs { generateCertHash(h, cert.Directory, cert.Files) } // 以哈希值作为配置信息,调度下次更新 w.agent.ScheduleConfigUpdate(h.Sum(nil)) } |
Agnet的ScheduleConfigUpdate很简单,就是写通道:
1 2 3 |
func (a *agent) ScheduleConfigUpdate(config interface{}) { a.configCh <- config } |
从这里可以看出,导致代理需要重新载入的唯一原因,就是数字证书发生了变更。
此结构表示当前代理的配置,和命令行选项是对应的:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 |
type ProxyConfig struct { // 生成的配置文件存放的路径 ConfigPath string // Envoy二进制文件的位置 BinaryPath string // 所属的集群,也就是K8S服务 ServiceCluster string // 热重启时多久Drain掉连接,必须大于1s DrainDuration *duration.Duration // 热重启时关闭父进程的延迟 ParentShutdownDuration *duration.Duration // 发现服务(xDS)的访问地址,代理连接此服务进行各种资源的发现 DiscoveryAddress string // 服务发现的轮询间隔,用于EDS, CDS, LDS, 但不用于RDS,必须大于1ms DiscoveryRefreshDelay *duration.Duration // Zipkin兼容的Tracer地址 ZipkinAddress string // Envoy连接到上游集群端点的超时 ConnectTimeout *duration.Duration // Statsd UDP监听地址 StatsdUdpAddress string // 管理端口,在此端口监听管理命令 ProxyAdminPort int32 // 此实例所在的可用性区域 // 在K8S中,节点的可用性区域通过注解failure-domain.beta.kubernetes.io/zone设置 AvailabilityZone string // 控制平面的身份验证策略 ControlPlaneAuthPolicy AuthenticationPolicy // 自定义的代理配置文件路径,当前Mixer、Pilot的前置代理使用自定义配置文件 CustomConfigFile string // Envoy监控指标的name字段的最大长度 StatNameLength int32 // 并发的工作线程数量 Concurrency int32 // Envoy Bootstrap配置模板的路径 ProxyBootstrapTemplatePath string // 如何重定向入站流量到Envoy InterceptionMode ProxyConfig_InboundInterceptionMode XXX_NoUnkeyedLiteral struct{} XXX_unrecognized []byte XXX_sizecache int32 } |
此结构表示数字证书的存储位置:
1 2 3 4 5 6 |
type CertSource struct { // 所在目录 Directory string // 证书文件列表 Files []string } |
代理端的通信行为(主要是xDS)完全由Envoy负责,和Istio无关,后者仅仅提供Bootstrap配置并在必要时Reload。
Envoy的入口点定义在source/exe/main.cc中,仅仅是创建一个Envoy::MainCommon对象并调用其run方法:
1 2 3 4 5 |
int main(int argc, char** argv) { std::unique_ptr<Envoy::MainCommon> main_common; main_common = std::make_unique<Envoy::MainCommon>(argc, argv); return main_common->run() ? EXIT_SUCCESS : EXIT_FAILURE; } |
Envoy::MainCommon对象的创建过程如下:
- 创建化负责信号处理、线程池、命令参数解析、定时器的实例变量
- 创建MainCommonBase base_:
- 创建热重启器
- 进行日志配置
- 创建化监控指标的存储
- 创建Envoy服务InstanceImpl —— Envoy服务本体
- 初始化热重启器
- 通过组件工厂ComponentFactory创建DrainManager
- 初始化InstanceImpl:
- 加载Bootstrap配置
- 配置StoreRoot,该对象是负责监控指标的存储,包括Sink的管理
- 从Bootstrap配置创建一个InitialImpl,表示Envoy服务器的最初配置
- 利用InitialImpl创建Admin端点,并启动其监听器,开始监听15000端口
- 创建过载管理器,用于保护Envoy实例所在节点不资源耗尽
- 创建监听器管理器ListenerManager
- 通过主线程的Dispatcher,为主线程注册线程本地存储(TLS)
- 调用StoreRoot的initializeThreading,启用多线程支持
- 创建Runtime::Loader,此对象能够从磁盘读取Envoy的运行时快照
- 创建SSL上下文管理器
- 创建集群管理器工厂ProdClusterManagerFactory
- 传入集群管理器工厂,初始化Envoy服务器的主配置Configuration::MainImpl
- 使用密钥管理器添加bootstrap_中配置的静态密钥
- 使用集群管理工厂,传入bootstrap配置,创建集群管理器
- 创建限速客户端工厂RateLimitClientFactory
- 调用监听器管理器,添加所有静态配置的监听器
- 调用initializeTracers,为当前Envoy服务器初始化Tracer
- 调用initializeStatsSinks,初始化监控信息的Sink
- 为HTTP上下文设置Tracer
- 如果配置了LDS动态资源,则调用监听器管理器,创建注册gRPC订阅
- 将Sinks添加到StoreRoot
- 为StoreRoot的Flush设置定时器
- 初始化用于死锁检测的GuardDog
run方法简单的转发给MainCommonBase.run,进而转发给Server::InstanceImpl的run方法。如果运行模式是Serve,则后者的逻辑如下:
- 初始化RunHelper
- 调用GuardDog创建针对主线程的WatchDog
- 启动WatchDog,由主线程的Dispatcher创建定时器,定期touch此WatchDog
- 阻塞性的运行Dispatcher事件循环
- 调用runPostCallbacks,在运行事件循环之前执行所有后置回调。默认情况下没有需要执行的回调
- 调用event_base_loop,运行event_base直到1-N个未决/活动事件可用
- 当Dispatcher.exit被调用(也就是主事件循环退出)后,停止WatchDog
- 调用Server::InstanceImpl的terminate方法,停止Envoy服务器
- 重置RunHelper
从第5步开始,主线程会阻塞很长时间。后续的逻辑都主要通过libevent事件驱动 —— 当发生网络事件后执行某种回调。
如果定义了宏ENVOY_HANDLE_SIGNALS,则MainCommon的成员变量负责处理信号:
1 2 3 4 5 6 |
#ifdef ENVOY_HANDLE_SIGNALS // 在备选栈上执行信号处理 Envoy::SignalAction handle_sigs; // 打印Backtrace并退出 Envoy::TerminateHandler log_on_terminate; #endif |
MainCommon的成员变量 PlatformImpl platform_impl_;定义平台依赖的操作,目前仅仅包含一个线程工厂实现。
MainCommon的成员变量 Envoy::OptionsImpl options_;负责利用TCLAP解析命令行参数,它实现 Envoy::Server::Option接口,通过此接口可获得各种Envoy启动选项。
MainCommon的成员变量 Event::RealTimeSystem real_time_system_;用于墙上时间度量,以及设置定时器、执行回调。
MainCommon持有此类型的一个对象,并且把绝大部分职责委托给此类处理。构造函数逻辑如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 |
MainCommonBase::MainCommonBase(OptionsImpl& options, Event::TimeSystem& time_system, TestHooks& test_hooks, Server::ComponentFactory& component_factory, std::unique_ptr<Runtime::RandomGenerator>&& random_generator, Thread::ThreadFactory& thread_factory) : options_(options), component_factory_(component_factory), thread_factory_(thread_factory) { // c-ares是一个C语言实现的异步请求DNS的实现,在实例初始化时,应该先调用该函数对该库相关内部模块 ares_library_init(ARES_LIB_INIT_ALL); // 忽略SIGPIPE信号 Event::Libevent::Global::initialize(); switch (options_.mode()) { case Server::Mode::InitOnly: case Server::Mode::Serve: { if (!options.hotRestartDisabled()) { // 热重启器,接口由HotRestart提供,实现代码和配置的“热”重启 restarter_ = std::make_unique<Server::HotRestartImpl>(options_);init-cluster-mgr } // ThreadLocal::InstanceImpl实现Instance,负责注册线程,读写线程本地数据 tls_ = std::make_unique<ThreadLocal::InstanceImpl>(); // 写、读日志锁,实现类是ProcessSharedMutex,可以跨进程使用的互斥量 Thread::BasicLockable& log_lock = restarter_->logLock(); Thread::BasicLockable& access_log_lock = restarter_->accessLogLock(); auto local_address = Network::Utility::getLocalAddress(options_.localAddressIpVersion()); // 日志上下文,相当于同时调用setLogLevel, setLogFormat, setLock logging_context_ = std::make_unique<Logger::Context>(options_.logLevel(), options_.logFormat(), log_lock); // 配置各Envoy组件的日志 configureComponentLogLevels(); // 监控指标存储,ThreadLocalStoreImpl是支持线程本地缓存的StoreRoot实现 stats_store_ = std::make_unique<Stats::ThreadLocalStoreImpl>(options_.statsOptions(), // Stats::StatDataAllocator负责创建Counter、Gauge等Metric的实例 restarter_->statsAllocator()); // 创建Envoy服务器 server_ = std::make_unique<Server::InstanceImpl>( options_, time_system, local_address, test_hooks, *restarter_, *stats_store_, access_log_lock, component_factory, std::move(random_generator), *tls_, thread_factory); break;// 1、使用密钥管理器添加bootstrap_中配置的静态密钥 // 2、使用集群管理工厂,传入bootstrap配置,创建集群管理器 // 3、创建限速客户端工厂RateLimitClientFactory // 4、调用监听器管理器,添加所有静态配置的监听器 // 对于每个监听器,如果存在draining的监听器占用了它绑定的地址,则夺取其SocketSharedPtr // 如果不存在,则调用创建新的SocketSharedPtr,可能导致创建底层套接字,并绑定到端口 // 5、调用initializeTracers,为当前Envoy服务器初始化Tracer // 6、调用initializeStatsSinks,初始化监控信息的Sink } case Server::Mode::Validate: break; } } |
InstanceImpl类实现接口Instance,代表一个运行中的、由若干紧密协作的组件构成的、独立运行的Envoy服务。
该类的构造函数逻辑如下:
1 2 3 4 5 6 7 8 9 10 11 |
// 创建基于文件的日志Sink if (!options.logPath().empty()) { file_logger_ = std::make_unique<Logger::FileSinkDelegate>( options.logPath(), access_log_manager_, Logger::Registry::getSink()); } // 初始化热重启器 restarter_.initialize(*dispatcher_, *this); // 创建DrainManager drain_manager_ = component_factory.createDrainManager(*this); // 初始化Envoy服务 initialize(options, local_address, component_factory); |
初始化逻辑如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 |
// 首先打印epoch信息,以及所有以及注册的、静态链接的扩展 ENVOY_LOG(info, "initializing epoch {} (hot restart version={})", options.restartEpoch(), restarter_.version()); ENVOY_LOG(info, "statically linked extensions:"); ENVOY_LOG(info, " access_loggers: {}", Registry::FactoryRegistry<Configuration::AccessLogInstanceFactory>::allFactoryNames()); // 编写扩展时,你需要负责注册扩展,只需要声明如下形式的静态变量即可 // 扩展类名 扩展所属分类 // static Registry::RegisterFactory<FileAccessLogFactory, Server::Configuration::AccessLogInstanceFactory> register_; // 将Bootstrap配置(由Pilot Agent生成)读取到envoy::config::bootstrap::v2::Bootstrap中 // Bootstrap是Protobuf消息(::google::protobuf::Message) // 自动依据扩展名来决定如何解析配置文件 // 如果指定了--config-yaml选项,则YAML中的配置覆盖到--config-path指定的配置中 // 如果以Bootstrap V2配置解析失败,则尝试以V1重新解析 InstanceUtil::loadBootstrapConfig(bootstrap_, options); // 记录Bootstrap配置修改时间 bootstrap_config_update_time_ = time_system_.systemTime(); // 需要尽早配置StoreRoot,以支持监控指标收集 // TagProducer分析一个指标名,从中抽取一系列标签 stats_store_.setTagProducer(Config::Utility::createTagProducer(bootstrap_)); // StatsMatcher决定哪些指标启用禁用 stats_store_.setStatsMatcher(Config::Utility::createStatsMatcher(bootstrap_)); // 在上述存储中创建Envoy服务状态统计 // struct ServerStats { // Stats::Gauge &uptime_; // Stats::Gauge &concurrency_; // ... // }; server_stats_ = std::make_unique<ServerStats>( // (stats_store_).gauge("server." + std::string("uptime")), ... ServerStats{ALL_SERVER_STATS(POOL_GAUGE_PREFIX(stats_store_, "server."))}); // 收集server.***指标 server_stats_->concurrency_.set(options_.concurrency()); server_stats_->hot_restart_epoch_.set(options_.restartEpoch()); // 本地信息,例如节点名称、所属集群、IP地址 local_info_ = std::make_unique<LocalInfo::LocalInfoImpl>( bootstrap_.node(), local_address, options.serviceZone(), options.serviceClusterName(), options.serviceNodeName()); // 创建一个Initial对象,此对象表示初始化配置 —— 在加载主配置之前需要知道的配置信息 Configuration::InitialImpl initial_config(bootstrap_); // 如果可以,关闭父进程中的admin processing,这让admin processing可以启动一个新进程 HotRestart::ShutdownParentAdminInfo info; info.original_start_time_ = original_start_time_; restarter_.shutdownParentAdmin(info); original_start_time_ = info.original_start_time_; // 全局的admin HTTP端点 admin_ = std::make_unique<AdminImpl>(initial_config.admin().profilePath(), *this); admin_->startHttpListener(initial_config.admin().accessLogPath(), options.adminAddressPath(), initial_config.admin().address(), stats_store_.createScope("listener.admin.")); // ConfigTracker供admin端点/config_dump使用,管理一系列能够提供配置信息的回调函数 config_tracker_entry_ = // add返回EntryOwner,此类型实现了map条目的RAII语义 —— 仅当EntryOwner或ConfigTracker销毁后条目被移除 admin_->getConfigTracker().add("bootstrap", [this] { return dumpBootstrapConfig(); }); // 将admin监听器添加到连接管理器。ConnectionHandler能够添加/删除/启用/禁用/停止网路监听器 // 调用handler->addListener(*listener_)后,创建ActiveListener,监听15000端口。也就是说ConnectionHandler负责启动端口监听 admin_->addListenerToHandler(handler_.get()); // 创建过载管理器 overload_manager_ = std::make_unique<OverloadManagerImpl>(dispatcher(), stats(), threadLocal(), bootstrap_.overload_manager()); // 创建监听器管理器,负责管理:所有监听器、所有负责连接处理的工作线程(Worker) // 创建指定数量的WorkerImpl,为每个WorkerImpl向过载管理器注册过载回调(行为是不再接受连接) // 为每个WorkerImpl创建Dispatcher,并将此Dispatcher注册到ThreadLocal::Instance以支持后续读写线程本地变量 listener_manager_ = std::make_unique<ListenerManagerImpl>(*this, listener_component_factory_, worker_factory_, time_system_); // 主线程也需要注册到TLS thread_local_.registerThread(*dispatcher_, true); // 当所有工作线程对象初始化后,调用下面的方法,调用Stats::StoreRoot的如下方法,以支持多线程操作 stats_store_.initializeThreading(*dispatcher_, thread_local_); // Runtime::Loader能够从磁盘读取运行时快照 runtime_loader_ = component_factory.createRuntime(*this, initial_config); // SSL上下文管理器,管理进程中所有SSL上下文 // 实现类ContextManagerImpl,线程模型如下:上下文可以从任意线程创建(但是实践上通常从主线程分配) // 上下文的分配/销毁是少见操作,因此整体上使用一把锁来保护 ssl_context_manager_ = std::make_unique<Ssl::ContextManagerImpl>(time_system_); // ProdClusterManagerFactory是ClusterManagerFactory的产品环境实现,Envoy很多命名包含Prod的类,用于和测试、Mock用途的类区分 // 集群管理器工厂,负责创建集群管理操作所需要的对象 cluster_manager_factory_ = std::make_unique<Upstream::ProdClusterManagerFactory>( runtime(), stats(), threadLocal(), random(), dnsResolver(), sslContextManager(), dispatcher(), localInfo(), secretManager(), api(), http_context_); // Configuration::MainImpl是主服务器配置的实现,其初initialize方法必须在Envoy服务器完全准备好后调用,应用自举配置到服务器: config_.initialize(bootstrap_, *this, *cluster_manager_factory_); // 为HTTP上下文设置Tracer http_context_.setTracer(config_.httpTracer()); // 如果包含LDS动态资源 if (bootstrap_.dynamic_resources().has_lds_config()) { // 则调用监听器管理器,创建LDS API Provider // 委托给ListenerComponentFactory.createLdsApi,最终会在cm.adsMux()上注册gRPC订阅 listener_manager_->createLdsApi(bootstrap_.dynamic_resources().lds_config()); } // 将Sink添加到StoreRoot for (Stats::SinkPtr& sink : config_.statsSinks()) { stats_store_.addSink(*sink); } // 注册Stat刷出定时器 // 某些Sink需要Dispatcher的支持,因此在主循环开始前,不能刷出 stat_flush_timer_ = dispatcher_->createTimer([this]() -> void { flushStats(); }); stat_flush_timer_->enableTimer(config_.statsFlushInterval()); // 看门狗,用于死锁检测,在Worker启动之前、主循环run之前初始化 guard_dog_ = std::make_unique<Server::GuardDogImpl>(stats_store_, config_, time_system_, api()); |
此方法的代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
bool MainCommonBase::run() { // 对应不同的运行模式 switch (options_.mode()) { case Server::Mode::Serve: // 调用Server::InstanceImpl::run server_->run(); return true; case Server::Mode::Validate: { ... } case Server::Mode::InitOnly: { ... } } |
此方法的代码如下:
1 2 3 4 5 6 7 8 9 10 |