Istio Pilot与Envoy的交互机制解读
在基于Istio+Envoy实现的服务网格中,Istio的角色是控制平面,它是实现了Envoy的发现协议集xDS的管理服务器端。Envoy本身则作为网格的数据平面,和Istio通信,获得各种资源的配置并更新自身的代理规则。
除了实现xDS协议,Istio还负责:
- Envoy统计数据的收集,从Statd格式转换为Prometheus格式。(注:目前看来Envoy也直接暴露了Prometheus的Exporter)
- 限速服务、策略服务
- 和第三方Tracer的对接
- 数字证书分发
等功能。这些功能都需要Istio和Envoy的协同才能生效。最基础和关键的协同是Istio组件Pilot和Envoy之间基于xDS协议进行的各种Envoy配置信息的推送。
Istio的文档并没有对Istio Pilot和Envoy如何交互进行描述,本文结合Istio、Envoy的源码来探讨这些细节。
Pilot的model包为很多Pilot抽象创建了模型(结构),并定义了它们支持的操作。注意这里建模的是Pilot的抽象,因此名词Service是指Istio的抽象服务,而非K8S的Service或者Envoy的Cluster。
对Istio的配置信息、配置存储进行建模。
代表一个Istio配置单元:
1 2 3 4 5 |
type Config struct { ConfigMeta // 配置内容以Proto消息的形式存储 Spec proto.Message } |
配置的元数据:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
type ConfigMeta struct { //匹配内容消息类型的短类型名称,例如route-rule Type string // API组和版本 Group string Version string // 命名空间范围内唯一性名称 Name string // 命名空间 Namespace string // FQDN后缀 Domain string // 标签集 Labels map[string]string // 注解集 Annotations map[string]string // 资源版本,跟踪对配置注册表的变更 ResourceVersion string CreationTimestamp meta_v1.Time } |
定义一组平台无关的,但是底层平台(例如K8S)必须支持的API,通过这些API可以存取Istio配置信息
每个配置信息的键,由type + name + namespace的组合构成,确保每个配置具有唯一的键
写操作是异步执行的,也就是说Update后立即Get可能无法获得最新结果。有资源版本判断资源是否更新
此接口返回的引用,仅支持只读操作,对其修改存在线程安全问题
1 2 3 4 5 6 7 8 9 |
type ConfigStore interface { // 返回配置描述符,其实就是[]ProtoSchema类型,ProtoSchema描述了资源的Group/Version/Type等信息 ConfigDescriptor() ConfigDescriptor Get(typ, name, namespace string) (config *Config, exists bool) List(typ, namespace string) ([]Config, error) Create(config Config) (revision string, err error) Update(config Config) (newRevision string, err error) Delete(typ, name, namespace string) error } |
表示ConfigStore的本地完整复制的缓存,此缓存主动和远程存储保持同步,并且在获取更新时提供提供通知机制。
为了获得通知,事件处理器必须在Run之前注册,缓存需要在Run之后有一个初始的同步延迟。
1 2 3 4 5 6 7 8 9 |
type ConfigStoreCache interface { // CRUD接口 ConfigStore // 添加某种配置类型的事件处理器 RegisterEventHandler(typ string, handler func(Config, Event)) Run(stop <-chan struct{}) // 初始缓存同步完毕后返回true HasSynced() bool } |
此接口扩展ConfigStore,增加一些针对Istio资源的操控接口:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
type IstioConfigStore interface { ConfigStore // 列出ServiceEntry ServiceEntries() []Config // 列出绑定到指定工作负载标签的Gateway Gateways(workloadLabels LabelsCollection) []Config // 列出绑定到指定工作负载标签EnvoyFilter EnvoyFilter(workloadLabels LabelsCollection) *Config // 列出关联到指定目标服务实例的Mixerclient HTTP API Specs HTTPAPISpecByDestination(instance *ServiceInstance) []Config // 列出关联到指定目标服务实例的Mixerclient quota specifications QuotaSpecByDestination(instance *ServiceInstance) []Config // 列出关联到指定服务+端口的身份验证策略 // 如果存在多个不同范围(全局、命名空间、服务)的策略,最精确的那个被返回。如果同一范围有多个策略,返回第一个 AuthenticationPolicyByDestination(service *Service, port *Port) *Config // 列出指定命名空间的ServiceRoles ServiceRoles(namespace string) []Config // 列出指定命名空间的ServiceRoleBindings ServiceRoleBindings(namespace string) []Config // 列出名字为DefaultRbacConfigName的RbacConfig RbacConfig() *Config } |
此结构为Pilot提供聚合的环境性的API:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
type Environment struct { // 内嵌接口:用于列出服务、实例 ServiceDiscovery // 已经废弃,使用 PushContext.ServiceAccounts ServiceAccounts // 内嵌接口:用于列出路由规则 IstioConfigStore // 网格配置信息 Mesh *meshconfig.MeshConfig // 用于和Mixer通信 MixerSAN []string // 全局的推送上下文,已经废弃 // 除非出于测试、处理新连接的目的,不要使用此字段 PushContext *PushContext } |
此结构建模代理(Envoy代理)的属性,xDS使用此结构对代理进行识别:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
type Proxy struct { // 此代理所在的集群 ClusterID string // 节点类型(也就是说运行代理的那个Pod的代理角色 Type NodeType // 用于识别代理以及它的同地协作的服务实例的IP地址 IPAddress string // 平台先关的Sidecar代理ID ID string // 短主机名的DNS后缀 Domain string // 节点的元数据 Metadata map[string]string } |
用于区分不同代理在网格中的职责。
1 2 3 4 5 6 7 8 9 |
type NodeType string const ( // 应用程序容器的边车代理,普通被网格管理的Pod使用这种代理角色 Sidecar NodeType = "sidecar" // 独立运行的,集群入口代理,istio-ingress中运行的是这种代理 Ingress NodeType = "ingress" // 独立运行的,作为L7/L4路由器的代理,istio-ingressgateway、istio-egressgateway中运行的是这种代理 Router NodeType = "router" ) |
端点分片,存储单个服务的单个注册表中的单个分片的名称及其端点列表:
1 2 3 4 |
type EndpointShard struct { Shard string Entries []*IstioEndpoint } |
存储单个服务的所有分片信息。使用K8S作为注册表时,Shards通常只有一个元素,其键是"Kubernetes",其值是Shard名为"Kubernetes"的EndpointShard
1 2 3 4 5 6 |
type EndpointShardsByService struct { // 这种结构下,每个注册表只能有一个分片 // 映射的键是注册表名称 Shards map[string]*EndpointShard ServiceAccounts map[string]bool } |
此结构用于代替NetworkEndpoint和ServiceInstance,做了以下优化:
- ServicePortName字段代替ServicePort字段。原因是进行了端点回调(endpoint callbacks are made)时端口号、协议可能不可用
- 合并两个结构,原因是一对一关系
- 不再持有Service的指针。原因是接收到端点时,服务对象可能不可用
- 提供缓存的EnvoyEndpoint对象,避免为每次请求/每个客户端重新分配
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
type IstioEndpoint struct { // 工作负载的标签 Labels map[string]string Family AddressFamily Address string EndpointPort uint32 // 跟踪端口的名称,避免最终一致性相关的问题。某些情况下Endpoint先于Service可见,这时进行端口查找会失败 // 端口名到号的映射将在集群计算时进行 ServicePortName string // 用于遥测 UID string // 缓存的LbEndpoint(来自Envoy Go客户端包),通过数据转换得到,避免重复计算 EnvoyEndpoint *endpoint.LbEndpoint ServiceAccount string } |
参考下文。
此结构对Istio服务进行建模,每个服务具有全限定的名称(FQDN),一个或多个监听的端口,一个可选的和服务关联的负载均衡器/虚拟IP地址(FQDN解析到此地址)。
例如,在K8S中,服务kubernetes关联到FQDN kubernetes.default.svc.cluster.local,具有虚拟IP地址10.96.0.1,监听443端口。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
// 主机名,可能是通配符 type Hostname string type Service struct { // 主机名 Hostname Hostname // 服务的负载均衡器IPv4地址 Address string // 多集群支持,服务在每个集群中的负载均衡器IPv4地址 ClusterVIPs map[string]string // 监听的端口列表 Ports PortList // 运行服务的账号 ServiceAccounts []string // 指示服务是否位于网格外部,这种服务通过ServiceEntry定义 MeshExternal bool // 在路由之前,如何解析服务的实例 Resolution Resolution // 服务创建时间 CreationTime time.Time // 额外的属性,Mixer/RBAC 策略会用到 Attributes ServiceAttributes } |
用于指示在路由请求之前,如何解析出服务的实例:
1 2 3 4 5 6 7 8 9 10 |
type Resolution int const ( // 代理根据自己本地的负载均衡池决定使用哪个端点 ClientSideLB Resolution = iota // 代理进行DNS解析,并把请求发给解析结果 DNSLB // 代理直接根据请求者指定的目的地址 Passthrough ) |
服务的特定版本的一个实例,绑定到一个NetworkEndpoint:
1 2 3 4 5 6 7 8 9 10 |
type ServiceInstance struct { // 关联的端点 Endpoint NetworkEndpoint // 所属的服务 Service *Service // 标签集 Labels Labels AvailabilityZone string ServiceAccount string } |
建模关联到服务的实例的网络地址:
1 2 3 4 5 6 7 8 |
type NetworkEndpoint struct { // 地址族 Family AddressFamily Address string Port int ServicePort *Port UID string } |
对服务监听的网络端口进行建模:
1 2 3 4 5 6 7 8 9 10 11 12 13 |
type Port struct { // 易读的端口名,如果服务包含多个端口,则此字段必须 Name string // 服务的端口号,非必须关联到服务背后的实例的端口 Port int // 使用的协议 Protocol Protocol } // 端口集 type PortList []*Port |
通信协议:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
type Protocol string // 目前支持的协议枚举 const ( ProtocolGRPC Protocol = "GRPC" ProtocolHTTP Protocol = "HTTP" ProtocolHTTP2 Protocol = "HTTP2" ProtocolHTTPS Protocol = "HTTPS" ProtocolTCP Protocol = "TCP" ProtocolTLS Protocol = "TLS" ProtocolUDP Protocol = "UDP" ProtocolMongo Protocol = "Mongo" ProtocolRedis Protocol = "Redis" ProtocolUnsupported Protocol = "UnsupportedProtocol" ) |
流量的方向:
1 2 3 4 5 |
type TrafficDirection string const ( TrafficDirectionInbound TrafficDirection = "inbound" TrafficDirectionOutbound TrafficDirection = "outbound" ) |
此接口用于发现服务的实例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
type ServiceDiscovery interface { // 列出所有服务 Services() ([]*Service, error) // 废弃,根据主机名获得服务 GetService(hostname Hostname) (*Service, error) // 取回服务的、匹配指定标签集的实例 Instances(hostname Hostname, ports []string, labels LabelsCollection) ([]*ServiceInstance, error) InstancesByPort(hostname Hostname, servicePort int, labels LabelsCollection) ([]*ServiceInstance, error) // 返回和指定代理同地协作(co-located)的实例,所谓co-located是指运行在相同的命名空间和安全上下文 // // 对于以Sidecar方式运行的代理,返回非空的切片;对于独立运行的代理,返回空切片 GetProxyServiceInstances(*Proxy) ([]*ServiceInstance, error) // 返回一个IPv4地址关联的管理端口 ManagementPorts(addr string) PortList // 返回一个IPv4地址关联的健康检查探针 WorkloadHealthCheckInfo(addr string) ProbeList } |
Pilot(Pilot Discovery,其对应的客户端组件是Pilot Agent)是Istio最关键的组件,它的职责是将用户提供的、简单的、CRD形式的配置文件,转换为Envoy能理解的格式,并推送给Envoy以更新代理配置。
Pilot的启动逻辑位于bootstrap包中。
我们需要进行单步跟踪,才能了解Pilot在初始化期间做了哪些事情。为了调试的方便,我们在K8S集群外部启动Pilot服务。参考下面的启动参数:
1 2 3 4 5 6 7 8 9 10 11 |
export POD_NAME=istio-pilot-54f79f8bd7-w8b2g export POD_NAMESPACE=istio-system # pilot-discovery的入口点代码位于 pilot/cmd/pilot-discovery/main.go # 输出详尽日志 pilot discovery --log_output_level=default:debug --log_caller=default --domain=k8s.gmem.cc \ # 提供kubeconfig,注意不支持--masterUrl,不提供此参数Istio会假设在集群内部运行 --kubeconfig=/home/alex/.kube/config \ # 配置文件 --meshConfig=pilot/meshConfig |
其中meshConfig可以直接从K8S集群中提取,此配置存放在Configmap中,名称为istio-system/istio。内容如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
disablePolicyChecks: false enableTracing: true accessLogFile: "/dev/stdout" policyCheckFailOpen: false sdsUdsPath: "" sdsRefreshDelay: 15s defaultConfig: connectTimeout: 10s configPath: "/etc/istio/proxy" binaryPath: "/usr/local/bin/envoy" serviceCluster: istio-proxy drainDuration: 45s parentShutdownDuration: 1m0s proxyAdminPort: 15000 concurrency: 0 zipkinAddress: zipkin.istio-system:9411 controlPlaneAuthPolicy: NONE discoveryAddress: istio-pilot.istio-system:15007 |
Pilot的入口点位于pilot/cmd/pilot-discovery/main.go,它使用包spf13.cobra来管理一组子命令,Pilot核心功能由discovery子命令实现。
入口点的启动过程如下:
- 解析命令行参数
- 创建主服务,服务建模在结构bootstrap.Server中
- 调用initKubeClient方法,初始化K8S客户端
- 调用initClusterRegistries方法,初始化clusterStore字段
- 调用initMesh方法,初始化mesh字段
- 调用initMixerSan方法,初始化Mixer服务的SAN
- 调用initConfigController方法,初始化configController字段。通过addStartFunc延后调用配置控制器的Run方法
- 如果启用了Istio的IngressController功能,则调用configaggregate.MakeCache,包装configController,使其能处理Ingress类型的资源
- 调用initServiceControllers方法,此控制器能够从底层服务发现机制中获取Istio抽象服务。通过addStartFunc延后调用服务控制器的Run方法
- 调用initDiscoveryService方法,初始化发现服务,发现服务依赖于前面创建的配置控制器、服务控制器,对xDS的支持有发现服务提供
- 调用initMonitor方法,初始化Pilot监控服务
- 调用initMultiClusterController方法,初始化多集群控制器,目前可以用于跨越多个K8S集群的服务网格
- 启动ControlZ监听器
- 启动主服务,其实就是执行先前注册的延迟启动函数:
- 启动配置控制器
- 启动Ingress同步器
- 启动Service控制器
- 启动HTTP服务、gRPC服务、安全gRPC服务
- 等待停止信号
表示Pilot主服务,它不是一个简单的(侦听单个端口)服务,而是很多服务的集合。包含的字段如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
type Server struct { // xDS服务管理服务器的实现 EnvoyXdsServer *envoyv2.DiscoveryServer ServiceController *aggregate.Controller // 网格配置 mesh *meshconfig.MeshConfig // 配置控制器,负责监控K8S集群中,Istio的自定义资源的变更 configController model.ConfigStoreCache // Mixer服务的SAN列表 mixerSAN []string // K8S客户端 kubeClient kubernetes.Interface // 启动需要执行的函数列表 startFuncs []startFunc // 集群信息存储 clusterStore *clusterregistry.ClusterStore httpServer *http.Server grpcServer *grpc.Server secureGRPCServer *grpc.Server // 发现服务,支持xDS协议 discoveryService *envoy.DiscoveryService // 包装configController,提供针对Gateways、EnvoyFilter、ServiceEntries等资源的强类型接口 istioConfigStore model.IstioConfigStore // HTTP请求多路分发器,根据URL匹配来决定由哪个handler处理请求 mux *http.ServeMux // 监控各种K8S原生对象,并将更新推送给EDS等组件 kubeRegistry *kube.Controller } |
仅仅当args.Service.Registries包含Kubernetes,才会创建K8S客户端:
1 2 3 4 5 6 |
for _, r := range args.Service.Registries { if serviceregistry.ServiceRegistry(r) == serviceregistry.KubernetesRegistry { needToCreateClient = true break } } |
仅当创建了K8S客户端,才会调用此方法。
此方法创建的对象很简单,就是创建一个空的clusterregistry.ClusterStore:
1 2 3 4 |
type ClusterStore struct { rc map[string]*RemoteCluster storeLock sync.RWMutex } |
1 2 3 4 5 6 7 8 |
type RemoteCluster struct { Cluster *k8s_cr.Cluster FromSecret string Client *clientcmdapi.Config ClusterStatus string Controller *kube.Controller ControlChannel chan struct{} } |
Istio组件Mixer和Pilot需要相互通信,如果mesh.DefaultConfig.ControlPlaneAuthPolicy为mTLS,也就是说Mixer - Pilot通信需要双向TLS认证时,才调用此方法:
1 2 3 4 5 6 |
func (s *Server) initMixerSan(args *PilotArgs) error { if s.mesh.DefaultConfig.ControlPlaneAuthPolicy == meshconfig.AuthenticationPolicy_MUTUAL_TLS { s.mixerSAN = envoy.GetMixerSAN(args.Config.ControllerOptions.DomainSuffix, args.Namespace) } return nil } |
此方法转调makeKubeConfigController方法创建配置控制器:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
func (s *Server) makeKubeConfigController(args *PilotArgs) (model.ConfigStoreCache, error) { kubeCfgFile := s.getKubeCfgFile(args) // crd.Client,负责Istio自定义资源的CRUD操作 // 为每组API的每个版本(apiVersion)创建独立的REST客户端 // apiVersion包括networking.istio.io/v1alpha3、config.istio.io/v1alpha2、authentication.istio.io/v1alpha1、rbac.istio.io/v1alpha1 configClient, err := crd.NewClient(kubeCfgFile, "", model.IstioConfigTypes, args.Config.ControllerOptions.DomainSuffix) // 注册CRD if !args.Config.DisableInstallCRDs { if err = configClient.RegisterResources(); err != nil { return nil, multierror.Prefix(err, "failed to register custom resources.") } } // 创建配置控制器 return crd.NewController(configClient, args.Config.ControllerOptions), nil } |
initConfigController方法返回值的真实类型是crd.controller,除了实现ConfigStoreCache 接口的方法以外,它还负责管理Informer。它为每种CR创建一个Informer:
1 2 3 |
for _, schema := range client.ConfigDescriptor() { out.addInformer(schema, options.WatchedNamespace, options.ResyncPeriod) } |
Pilot中的控制器的工作模式基本都是一样的,下文我们详细分析。
如果必要(允许Istio处理Ingress资源的话),initConfigController还会创建IngressController:
1 2 |
// 下面的方法创建了一个监听Ingress资源的控制器 ingress.NewController(s.kubeClient, s.mesh, args.Config.ControllerOptions) |
包config/aggregate可以将处理不同资源类型的model.ConfigStoreCache组合起来,形成一个更大的model.ConfigStoreCache。接口保持不变,根据资源类型委托给适当的被包装的子对象。
默认的配置下,Pilot会把Config控制器、Ingress控制器组合起来:
1 2 3 4 |
configController, err := configaggregate.MakeCache([]model.ConfigStoreCache{ s.configController, ingress.NewController(s.kubeClient, s.mesh, args.Config.ControllerOptions), }) |
Istio网格的入口通常是Gateway,但是它也能够处理K8S原生的Ingress资源,也就是说Istio可以扮演K8S通常意义上的Ingress Controller。实际上入口流量由istio-ingress这个deployment的Pod处理。
哪些Ingress资源会被管理,由主服务的mesh.IngressControllerMode配置决定:
- MeshConfig_OFF(0):禁用Istio的IngressController功能
- MeshConfig_DEFAULT(1):作为整个K8S集群默认的Ingress控制器
- MeshConfig_STRICT(2):仅仅处理包含了注解kubernetes.io/ingress.class,且值等于mesh.IngressClass(默认istio)的Ingress资源
创建IngressController的逻辑位于initConfigController方法中。
此方法首先创建一个聚合控制器:
1 |
serviceControllers := aggregate.NewController() |
serviceregistry/aggregate中定义的“聚合控制器”,能够管理多个平台(例如K8S)的服务发现机制(ServiceRegistry)。
然后,它调用createK8sServiceControllers方法创建kube.Controller:
1 2 3 4 5 6 7 8 9 10 11 12 13 |
func (s *Server) createK8sServiceControllers(serviceControllers *aggregate.Controller, args *PilotArgs) (err error) { kubectl := kube.NewController(s.kubeClient, args.Config.ControllerOptions) // 创建控制器 s.kubeRegistry = kubectl // 赋值给主服务的kubeRegistry字段 serviceControllers.AddRegistry( // 添加到聚合控制器的registries中 aggregate.Registry{ Name: serviceregistry.KubernetesRegistry, // 服务注册表类型 ClusterID: string(serviceregistry.KubernetesRegistry), // 如果多个同一类型的注册表存在则有意义 ServiceDiscovery: kubectl, // 服务发现,可获得Istio的抽象服务(ServiceInstance) ServiceAccounts: kubectl, // 暴露Istio的抽象服务账号(字符串) Controller: kubectl, // 控制器,可通知Service、ServiceInstance的变更 } ) } |
然后,创建ServiceEntry发现服务,并添加到聚合控制器:
1 2 3 4 5 6 7 8 |
serviceEntryStore := external.NewServiceDiscovery(s.configController, s.istioConfigStore) serviceEntryRegistry := aggregate.Registry{ Name: "ServiceEntries", Controller: serviceEntryStore, ServiceDiscovery: serviceEntryStore, ServiceAccounts: serviceEntryStore, } serviceControllers.AddRegistry(serviceEntryRegistry) |
最后,注册ServiceController的延迟启动。
kube.Controller能够监控K8S中service、endpoint、node、pod等对象。它实现了model.Controller、model.ServiceDiscovery、model.ServiceAccounts等接口。
该结构包含以下字段:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
type Controller struct { domainSuffix string client kubernetes.Interface queue Queue // 事件队列 services cacheHandler // 各种K8S对象的处理器 endpoints cacheHandler nodes cacheHandler pods *PodCache // 允许此控制器读取环境信息,并发布状态信息 Env *model.Environment // 多集群环境下识别集群 ClusterID string // XDSUpdater推送EDS变更到ADS模型 EDSUpdater model.XDSUpdater // 用于请求全局配置的变更 ConfigUpdater model.ConfigUpdater stop chan struct{} } |
此方法首先创建一个新的发现服务:
1 2 3 4 5 6 7 8 9 |
discovery, err := envoy.NewDiscoveryService( s.ServiceController, s.configController, environment, // 提供聚合性的上下文信息API args.DiscoveryOptions, // 监听地址等信息 ) s.discoveryService = discovery s.mux = s.discoveryService.RestContainer.ServeMux |
结构envoy.DiscoveryService是真正的发现服务,是Pilot的核心。它负责推送配置给Envoy。
然后,创建一个gRPC服务器、一个HTTP服务器:
1 2 3 4 5 |
s.initGrpcServer() s.httpServer = &http.Server{ Addr: args.DiscoveryOptions.HTTPAddr, Handler: discovery.RestContainer} |
创建xDS服务:
1 2 3 4 |
// 基于Envoy协议v2版本 s.EnvoyXdsServer = envoyv2.NewDiscoveryServer(environment, istio_networking.NewConfigGenerator(args.Plugins)) // 用于将envoy/v2从envoy/解耦 s.EnvoyXdsServer.ConfigUpdater = s.discoveryService |
注册envoy包的全局函数:
1 2 3 |
// 当配置变更时,请求一个更新 envoy.Push = s.EnvoyXdsServer.Push envoy.BeforePush = s.EnvoyXdsServer.BeforePush |
将xDS服务注册到上述gRPC服务器上:
1 |
s.EnvoyXdsServer.Register(s.grpcServer) |
在使用K8S作为后端时,还会配置kube.Controller:
1 2 3 |
s.kubeRegistry.Env = environment s.kubeRegistry.ConfigUpdater = discovery s.kubeRegistry.EDSUpdater = s.EnvoyXdsServer |
最后,注册httpServer、grpcServer的延迟启动。
proxy/envoy包中定义的这个函数,负责创建一个新的发现服务。它会创建一个RESTful容器并将发现服务注册到此容器:
1 2 3 |
container := restful.NewContainer() out.Register(container) out.RestContainer = container |
它还会向serviceregistry/kube.Controller注册Service、Instance的处理器,对于K8S来说,当Serivce、Endpoint发生变更时,会获得通知。
1 2 3 4 |
serviceHandler := func(*model.Service, model.Event) { out.clearCache() } ctl.AppendServiceHandler(serviceHandler) instanceHandler := func(*model.ServiceInstance, model.Event) { out.clearCache() } err := ctl.AppendInstanceHandler(instanceHandler) |
这些处理器的逻辑很简单,就是清空Envoy缓存,但是kube.Controller附加了一些前置逻辑。
不论是K8S内置资源,而是Istio的CR,都由client-go负责List&Watch,并把事件会传递给config/kube/crd/controller.go、config/kube/ingress/controller.go、serviceregistry/kube/controller.go等控制器中的回调函数处理:
1 2 3 4 5 6 7 8 |
informer.AddEventHandler( cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { // 处理Prometheus指标 k8sEvents.With(prometheus.Labels{"type": otype, "event": "add"}).Add(1) // 放入队列 c.queue.Push(kube.NewTask(handler.Apply, obj, model.EventAdd)) }, |
可以看到,此回调函数的逻辑仅仅是将事件及其处理函数封装为Task并存放到控制器的队列中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
func (q *queueImpl) Push(item Task) { q.cond.L.Lock() defer q.cond.L.Unlock() if !q.closing { q.queue = append(q.queue, item) } // 唤醒等待cond的一个goroutine q.cond.Signal() } type Task struct { handler Handler obj interface{} event model.Event } |
队列的Run循环则取出事件并调用其处理函数:
1 2 3 |
if err := item.handler(item.obj, item.event); err != nil { ... } |
处理函数就是ChainHandler结构的Apply方法,此结构允许针对一个事件串行的调用多个实际的处理函数:
1 2 3 4 5 6 7 8 9 10 11 12 13 |
type ChainHandler struct { funcs []Handler } func (ch *ChainHandler) Apply(obj interface{}, event model.Event) error { for _, f := range ch.funcs { if err := f(obj, event); err != nil { // 链中任何一个实际处理函数出错,都会导致中止处理 return err } } return nil } |
ChainHandler是在对应控制器的初始化阶段创建的,实际处理函数也在那时注册。
Pilot对很多K8S资源变更的响应,都是简单的清除掉Envoy的配置缓存。它们调用的是envoy包的v1版本的DiscoveryService.clearCache方法:
1 2 3 |
func (ds *DiscoveryService) clearCache() { ds.ConfigUpdate(true) } |
可以看到,此方法仅仅是简单的请求一次Envoy配置的Full Push。 也就是说Pilot在配置变更的情况下,通常会完整的推送Envoy配置,幸好推送过程具有防抖动支持,而且配置变更不是频繁操作,否则可能出现性能问题。
config/kube/ingress/controller.go注册了多个处理函数,第一个处理函数判断是否informer已经完全同步,如果不是,则中止处理。第二个处理函数则是简单的清除Envoy缓存配置。
第一个处理函数是控制器的notify方法,此函数判断是否informer已经完全同步,如果不是,则中止处理:
1 2 3 4 5 6 7 8 |
func (c *controller) notify(obj interface{}, event model.Event) error { if !c.HasSynced() { return errors.New("waiting till full synchronization") } // 检查对象是否是DeletedFinalStateUnknown,如果否,对其调用MetaNamespaceKeyFunc,看看能否获得缓存键 _, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) return nil } |
第二个处理函数,首先将IstioObject转换为model.Config,然后转调在启动阶段通过s.configController.RegisterEventHandler()为不同资源类型注册的实际处理函数:
1 2 3 4 5 |
c.kinds[typ].handler.Append(func(object interface{}, ev model.Event) error { item, ok := object.(IstioObject) config, err := ConvertObject(schema, item, c.client.domainSuffix) f(*config, ev) }) |
MeshPolicy、VirtualService、DestinationRule、Gateway的实际处理函数,都仅仅是清除掉所有Envoy缓存。
此CRD的实际处理函数的逻辑复杂一些。首先将model.Config.Spec转换为具体的Istio结构:
1 |
serviceEntry := config.Spec.(*networking.ServiceEntry) |
然后,将ServiceEntry转换为Istio内部的“服务”(model.Service)并异步处理 。model.Service具有FQDN、若干端口、可选的负载均衡/虚拟IP:
1 2 3 4 5 6 7 |
// 一个ServiceEntry可能转换为多个服务 services := convertServices(serviceEntry, config.CreationTimestamp.Time) for _, handler := range c.serviceHandlers { for _, service := range services { go handler(service, event) } } |
最后,将ServiceEntry转换为Istio内部的“实例”(model.ServiceInstance)并异步处理。model.ServiceInstance关联到一个网络端点(ip:port),具有一个服务描述和一组描述服务版本的标签:
1 2 3 4 5 6 |
instances := convertInstances(serviceEntry, config.CreationTimestamp.Time) for _, handler := range c.instanceHandlers { for _, instance := range instances { go handler(instance, event) } } |
不管是服务还是实例,go handler ...的逻辑仍然仅仅是清除掉所有Envoy缓存。
在这里可以注意到,ServiceEntry所(通常)表示的外部服务、和一般性的K8S服务,在Istio内部具有相同的表示 —— Service + ServiceInstance。
第一个处理函数是控制器的notify方法,此函数判断是否informer已经完全同步,如果不是,则中止处理。
对于节点资源,默认情况下没有其它处理逻辑。
pilot的serviceregistry包维护了一个最终一致性的Pod缓存:
1 2 3 4 5 6 7 8 9 10 11 |
func newPodCache(ch cacheHandler, c *Controller) *PodCache { out := &PodCache{ cacheHandler: ch, c: c, keys: make(map[string]string), } ch.handler.Append(func(obj interface{}, ev model.Event) error { return out.event(obj, ev) }) return out } |
Pod事件会触发PodCache.event方法的调用,此方法会更新Pod的IP地址和namespace/name形式的Key之间的映射关系,并且为EDS服务更新工作负载:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 |
func (pc *PodCache) event(obj interface{}, ev model.Event) error { // 注意,Pod被删除后,可能得到*v1.Pod,也可能得到DeletionFinalStateUnknown这个标记对象 pod, ok := obj.(*v1.Pod) if !ok { tombstone, ok := obj.(cache.DeletedFinalStateUnknown) if !ok { return fmt.Errorf("couldn't get object from tombstone %+v", obj) } // 可以从DeletionFinalStateUnknown.Obj获得原始对象 pod, ok = tombstone.Obj.(*v1.Pod) if !ok { return fmt.Errorf("tombstone contained object that is not a pod %#v", obj) } } ip := pod.Status.PodIP // 新创建的Pod的IP地址为空,直到通过UpdateStatus分配了IP if len(ip) > 0 { key := KeyFunc(pod.Name, pod.Namespace) switch ev { case model.EventAdd: switch pod.Status.Phase { case v1.PodPending, v1.PodRunning: // 锁定并更新Pod缓存 pc.rwMu.Lock() pc.keys[ip] = key pc.rwMu.Unlock() // model.XDSUpdater // 更新工作负载,在EDS看来工作负载的ID是IP地址 if pc.c.EDSUpdater != nil { pc.c.EDSUpdater.WorkloadUpdate(ip, pod.ObjectMeta.Labels, pod.ObjectMeta.Annotations) } } case model.EventUpdate: switch pod.Status.Phase { case v1.PodPending, v1.PodRunning: // 更新Pod缓存 default: // 其它状态下,删除Pod缓存 } case model.EventDelete: // 删除Pod缓存 } } return nil } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
// 当接收到K8S Service更新时触发 c.services.handler.Append(func(obj interface{}, event model.Event) error { svc := *obj.(*v1.Service) // 不处理kube-system中定义的service if svc.Namespace == meta_v1.NamespaceSystem { return nil } if c.EDSUpdater != nil { hostname := svc.Name + "." + svc.Namespace ports := map[string]uint32{} portsByNum := map[uint32]string{} for _, port := range svc.Spec.Ports { ports[port.Name] = uint32(port.Port) portsByNum[uint32(port.Port)] = port.Name } // 变更服务信息后调用 c.EDSUpdater.SvcUpdate(c.ClusterID, hostname, ports, portsByNum) // Bypass convertService and the cache invalidation. // 请求完整的配置更新并返回 c.ConfigUpdater.ConfigUpdate(true) return nil } // f的逻辑是清空缓存,默认不会执行到这个旧逻辑,只有当禁用了Direct EDS(环境变量PILOT_DIRECT_EDS=0)才会执行 if svcConv := convertService(svc, c.domainSuffix); svcConv != nil { f(svcConv, event) } return nil }) |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
// 当接收到K8S Endpoint更新时触发 c.endpoints.handler.Append(func(obj interface{}, event model.Event) error { ep := obj.(*v1.Endpoints) // 不处理kube-system中的端点 if ep.Namespace == meta_v1.NamespaceSystem { return nil } if c.EDSUpdater != nil { // 更新端点 c.updateEDS(ep) } else { // 旧逻辑,如果端点对应的服务存在,则清空缓存 if item, exists := c.serviceByKey(ep.Name, ep.Namespace); exists { if svc := convertService(*item, c.domainSuffix); svc != nil { f(&model.ServiceInstance{Service: svc}, event) } } } return nil }) |
Pilot Discovery和Envoy代理配置的更新(这些配置更新都是由于K8S资源变化而引发)、推送有关的逻辑,主要分布在三个包中:
- proxy/envoy,主要的逻辑在此,包括xDS的实现
- model,推送上下文(PushContext)定义在此包中,此结构在每次推送时创建,持有和本次推送相关的所有上下文信息
- networking,负责生成Envoy的配置文件格式
推送由DiscoveryService.ConfigUpdate()方法触发,转调DiscoveryServer.Push(),后者调用PushContext初始化一系列相关的数据结构,并调用networking包中ConfigGenerator的方法生成Envoy v2的结构,然后向所有连接到Pilot的Envoy代理发起推送。
此结构负责发布服务、集群、路由给所有的代理。
1 2 3 4 5 6 7 8 9 10 11 12 |
type DiscoveryService struct { *model.Environment webhookClient *http.Client webhookEndpoint string // 缓存,目前的实现,在任何路由、服务、端点发生变化时,都会Flush缓存。应当实现某种缓存过期策略 // 避免反复Flush或者过期缓存滞留其中 sdsCache *discoveryCache // 发现服务会注册REST路由到此容器 RestContainer *restful.Container // 是否在去回弹之后,需要进行一个完整推送。如果仅仅要求EDS则为false fullPush bool } |
缓存的结构如下,是一种通用的存储:
1 2 3 4 5 6 7 8 9 10 11 12 13 |
type discoveryCacheEntry struct { data []byte // 什么都可以存 hit uint64 // atomic miss uint64 // atomic resourceCount uint32 } type discoveryCache struct { name string disabled bool mu sync.RWMutex cache map[string]*discoveryCacheEntry } |
此方法清除缓存,目前的实现很简陋,就是发起 ds.ConfigUpdate(true)调用,会请求完整的配置推送。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 |
// 100ms DebounceAfter = envDuration("PILOT_DEBOUNCE_AFTER", 100*time.Millisecond) // 10s DebounceMax = envDuration("PILOT_DEBOUNCE_MAX", 10*time.Second) func (ds *DiscoveryService) ConfigUpdate(full bool) { // 设置完整推送标记 if full { ds.fullPush = true } // 去回弹逻辑 if DebounceAfter > 0 { // 如果定时器标记已经设置,不做任何操作,这意味着频繁调用ConfigUpdate不会引发不良后果 if !clearCacheTimerSet { // 此标记在实际执行推送时清除 clearCacheTimerSet = true // 记录去回弹操作开始时间戳 startDebounce := lastClearCacheEvent // 100ms后开始第一次去回弹判断 time.AfterFunc(DebounceAfter, func() { ds.debouncePush(startDebounce) }) } return } // 旧逻辑 // 如果上一次配置变更发生在1秒前,执行推送 if time.Since(lastClearCacheEvent) > 1*time.Second { ds.doPush() return } // 如果上一次变更在1秒内,但是上一次推送大于clearCacheTime,也推送 if time.Since(lastClearCache) > time.Duration(clearCacheTime)*time.Second { ds.doPush() return } // 否则 if !clearCacheTimerSet { clearCacheTimerSet = true time.AfterFunc(1*time.Second, func() { ds.clearCache() // 一秒后判断重新判断是否需要推送 }) } } |
debouncePush方法中包含一些额外的去回弹逻辑:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
func (ds *DiscoveryService) debouncePush(startDebounce time.Time) { // 上一次请求清除缓存的时间 since := time.Since(lastClearCacheEvent) // 距今如果大于200ms,或者从启动去回弹到到现在大于10s // 如果反复请求推送,则||左侧的表达式一直不会满足。为了防止无限的去回弹,必然的推送在最初请求的10s后发生 if since > 2*DebounceAfter || time.Since(startDebounce) > DebounceMax { ds.doPush() } else { // 下一轮去回弹 time.AfterFunc(DebounceAfter, func() { ds.debouncePush(startDebounce) }) } } |
doPush方法真正触发配置推送:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
func (ds *DiscoveryService) doPush() { // 本次推送正在处理时,可能有别的配置变更发生,因此这里需要撤销标记,防止遗漏事件 clearCacheTimerSet = false lastClearCache = time.Now() full := ds.fullPush // 获取自上次推送依赖的服务端点更新 edsUpdates := BeforePush() // Update the config values, next ConfigUpdate and eds updates will use this clearCacheMutex.Lock() ds.fullPush = false clearCacheMutex.Unlock() // 推送,全部或者增量的EDS更新 Push(full, edsUpdates) } |
注意,全局函数BefoerPush实际上是envoyv2.DiscoveryServer.BeforePush方法:
1 2 3 |
// 以envoy包的全局变量作为媒介,在DiscoveryService和v2.DiscoveryServer之间传递这两个函数,避免它们直接依赖 envoy.Push = s.EnvoyXdsServer.Push envoy.BeforePush = s.EnvoyXdsServer.BeforePush |
类似的,全局函数Push 也是envoyv2.DiscoveryServer.Push方法。
也就是说,推送的职责在这里转交给DiscoveryServer了。
此结构提供了Envoy v2 xds API的gRPC实现,在启动阶段由initDiscoveryService创建。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 |
type DiscoveryServer struct { // 环境信息 Env *model.Environment // 仅仅用于调试和压力测试 MemRegistry *MemServiceDiscovery // 负责使用 Istio的网络API + Service注册表信息 生成数据平面的配置信息(也就是Envoy代理配置) ConfigGenerator core.ConfigGenerator // 目前主要用于判断(K8S的)初始缓存同步是否完成 ConfigController model.ConfigStoreCache // 初始连接的限速器 initThrottle chan time.Time // 限速器 throttle chan time.Time // 为/debug/adsz提供配置快照。默认false,可以通过环境变量PILOT_DEBUG_ADSZ_CONFIG=1启用 DebugConfigs bool // 保护被ADS读写的全局数据结构, 包括 EDSUpdates和shards mutex sync.RWMutex // 服务的端点分片列表,此字段从增量更新构建 EndpointShardsByService map[string]*model.EndpointShardsByService // 工作负载列表,可用于检测变更。直接由registry推送的更新计算得到 WorkloadsByID map[string]*Workload // 负责请求配置更新,实现了放抖动(debouncing,延迟配置推送,防止Regsitry连续的资源更新导致过频推送)且能进行变更检测 // 用于将envoy/v2从envoy/解耦,在Istio 1.1将进行简化/清理 ConfigUpdater model.ConfigUpdater // 跟踪自上一次完整推送之后的所有服务的(端点的)变更,从1.0.3+仅仅用于跟踪两次推送之间的增量 // 示例: // { // details.default.svc.k8s.gmem.cc: { // Shards: { // "Kubernetes": { // Shard: "Kubernetes", // Entries: [ endpoints... ] // } // } // } // } edsUpdates map[string]*model.EndpointShardsByService // 保护全局推送上下文,一旦配置变更此上下文就改变,多个地方需要读取此上下文 pushContextMutex sync.RWMutex } |
此结构支持的方法繁多,定义在proxy/envoy/v2包中的ads.go、cds.go、eds.go、lds.go、rds.go等文件中,对应xDS API的各子集以及ADS。其中一部分方法是通过接口暴露的,例如:
- AggregatedDiscoveryServiceServer,Envoy提供的gRPC接口。在ads.go中实现
- XDSUpdater,在push_context.go中定义,在eds.go中实现
此方法的逻辑很简单,将上次变更依赖的服务端点变更增量提取出来返回,并重置此“变更增量”为空:
1 2 3 4 5 6 |
func (s *DiscoveryServer) BeforePush() map[string]*model.EndpointShardsByService { edsUpdates := s.edsUpdates // 重置,后续的更新由新的map跟踪 s.edsUpdates = map[string]*model.EndpointShardsByService{} return edsUpdates } |
在未来,Istio对增量更新提供完整支持后,这里需要重构。在将proxy/envoy/discovery合并到v2 discovery之后,此方法不再允许外部访问。
该方法在准备好配置后,使用ADS协议执行推送:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 |
func (s *DiscoveryServer) Push(full bool, edsUpdates map[string]*model.EndpointShardsByService) { if !full { // 执行增量推送 go s.AdsPushAll(version, s.globalPushContext(), false, edsUpdates) return } // 获取推送上下文 pc := s.globalPushContext() if pc != nil { // 将model.LastPushStatus赋值为当前推送上下文,并更新监控指标 pc.OnConfigChange() } // 创建新的全局推送上下文,基于此上下文进行推送 t0 := time.Now() push := model.NewPushContext() push.ServiceAccounts = s.ServiceAccounts // 初始化此推送上下文 if err := push.InitContext(s.Env); err != nil { adsLog.Errorf("XDS: failed to update services %v", err) return } // 生成所有代理共享的配置,例如出站监听器/集群的配置 if err := s.ConfigGenerator.BuildSharedPushState(s.Env, push); err != nil { adsLog.Errorf("XDS: Failed to rebuild share state in configgen: %v", err) totalXDSInternalErrors.Add(1) return } // 列出端点并创建分片 if err := s.updateServiceShards(push); err != nil { return } // 替换掉全局上下文 s.Env.PushContext = push // 异步推送 go s.AdsPushAll(versionLocal, push, true, nil) } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
func (s *DiscoveryServer) AdsPushAll(version string, push *model.PushContext, full bool, edsUpdates map[string]*model.EndpointShardsByService) { if !full { // 更新集群信息,进行增量EDS推送。只有那些变化了的集群才被更新、推送 s.edsIncremental(version, push, edsUpdates) return } // 赋值一个临时映射,避免加锁 cMap := make(map[string]*EdsCluster, len(edsClusters)) for k, v := range edsClusters { cMap[k] = v } // 为每个集群更新端点 for clusterName, edsCluster := range cMap { if err := s.updateCluster(push, clusterName, edsCluster); err != nil { adsLog.Errorf("updateCluster failed with clusterName %s", clusterName) } } // 向所有客户端连接发送一个信号 s.startPush(version, push, true, nil) } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 |
func (s *DiscoveryServer) startPush(version string, push *model.PushContext, full bool, edsUpdates map[string]*model.EndpointShardsByService) { // 遍历所有已连接的Envoy,这覆盖ADS / EDS(0.7),它们共享一样的连接表 adsClientsMutex.RLock() // 复制以避免加锁 pending := []*XdsConnection{} for _, v := range adsClients { pending = append(pending, v) } adsClientsMutex.RUnlock() // 触发每个代理的配置的重现计算,包括Envoy监听的所有配置,包括EDS pendingPush := int32(len(pending)) tstart := time.Now() for { if len(pending) == 0 { // 无需推送,或者都推送完了,退出 return } currentVersion := versionInfo() // 如果具有更新版本的推送正在进行,则当前推送取消 if !allowConcurrentPush && version != currentVersion && full { return } c := pending[0] pending = pending[1:] edsOnly := edsUpdates if full { edsOnly = nil } // 非阻塞性推送,如果下一次推送紧跟着就到来,可能导致问题 // 这里是逐个处理客户端,可以优化为并行的进行 client := c // time.After返回一个只读通道,在给定的超时到达后,通道可读 to := time.After(PushTimeout) select { // 尝试向客户端的推送通道发送事件 case client.pushChannel <- &XdsEvent{ push: push, pending: &pendingPush, version: version, edsUpdatedServices: edsOnly, }: // 该客户端推送成功 client.LastPush = time.Now() client.LastPushFailure = timeZero // 如果推送通道不可用,检查客户端连接是否关闭 case <-client.doneChannel: adsLog.Infof("Client closed connection %v", client.ConID) case <-to: // 推送超时,可能是由于Envoy代理处于异常状态,无法接收数据 pushTimeouts.Add(1) // 放回去等待重试 pending = append(pending, c) } } } |
可以看到,startPush仅仅是把事件放入通道,并不直接牵涉到gRPC相关的操作。这实现了数据和传输的解耦。
推送上下文,此结构跟踪推送的状态(指标、错误)。在一次推送完毕后,指标全部置零。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
type PushContext struct { // 操控下面的map的锁 proxyStatusMutex sync.RWMutex // ID ErrCode 记录推送给代理时的事件 ProxyStatus map[string]map[string]ProxyPushStatus // 最后一次配置变更(导致推送状态重置)的时间 Start time.Time End time.Time // 保护下面的字段 Mutex sync.Mutex // 推送开始时系统中的所有服务 Services []*Service // 以主机名索引的所有服务 ServiceByHostname map[Hostname]*Service // 服务关联的DestinationRule(和目的地相关的规则) destinationRuleHosts []Hostname destinationRuleByHosts map[Hostname]*combinedDestinationRule // 环境信息 Env *Environment ServiceAccounts func(string) []string // 用于跟踪服务名称和端口的映射关系ServicePort2Name is used to keep track of service name and port mapping. // ADS的名字使用端口号,而端点的名字则使用端口名为准 // 服务名 端口号+名称信息 ServicePort2Name map[string]PortList initDone bool } |
此结构提供了Add、UpdateMetrics方法,和Prometheus统计指标有关:
1 2 3 4 |
// 可以步进和推送有关的Prometheus计数器 func (ps *PushContext) Add(metric *PushMetric, key string, proxy *Proxy, msg string) // 基于推送的当前状态来更新Prometheus指标 func (ps *PushContext) UpdateMetrics() |
该方法负责初始化一个新的推送上下文,生成所需的一切数据结构,在执行Envoy配置推送之前你需要(从创建PushContext的协程)调用它。
该方法缓存注册表中的所有服务。
该方法缓存所有VirtualService对象。
该方法初始化所有DestinationRule对象。
列出绑定到指定网关的序列服务。
返回一个服务的DestinationRule。
计算出给定服务的给定子集对应的标签。Istio使用K8S Pod标签区分子集。
此接口负责请求配置更新,在配置推送之前,需要进行去回弹(防止连续触发的频繁推送)。
1 2 3 |
type ConfigUpdater interface { ConfigUpdate(full bool) } |
目前,接口由DiscoveryService实现,而DiscoveryServer也引用前者:
1 |
s.EnvoyXdsServer.ConfigUpdater = s.discoveryService |
XDSUpdater用于xDS模型的直接更新,以及增量推送。DiscoveryServer实现了此接口。
Pilot使用多个注册表 —— 例如每个K8S集群就是一个注册表。每个注册表负责跟踪关联到网格中服务的端点的变更,并在变更后调用EDSUpdate方法。
注册表可以单个服务的端点分组为更小的子集(例如每个Deployment一个子集),以处理端点数量巨大的服务。由于限制可扩容性,Istio倾向于避免传递大量的对象,例如某个注册表的全部端点,或者某个服务在全部注册表中的端点。
Istio在未来会进行一些优化,例如以标签、网关、或者区域来分组端点。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
type XDSUpdater interface { // 当端点列表发生变化,或者ServiceEntry的标签发生变化,该方法被调用 // 对于一个集群/主机名,其所有端点必须一并发送给代理 // shard是一个键,目前是注册表的名称(例如Kubernetes) EDSUpdate(shard, hostname string, entry []*IstioEndpoint) error // 当一个服务-端口映射定义发生变化后调用,对服务的标签、注解或者其它属性进行变更,可能触发EDS/CDS重新计算、增量推送 // 但不会影响LDS/RDS SvcUpdate(shard, hostname string, ports map[string]uint32, rports map[uint32]string) // 当一个工作负载的标签、注解发生变化时由注册表负责调用 // 对于K8S来说,id是Pod的IP地址(如果Pod在默认/主网络中) WorkloadUpdate(id string, labels map[string]string, annotations map[string]string) } |
当服务的信息发生变化后,调用此方法,主要逻辑是更新全局推送上下文中的ServicePort2Name字段。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
func (s *DiscoveryServer) SvcUpdate(cluster, hostname string, ports map[string]uint32, rports map[uint32]string) { pc := s.globalPushContext() s.mutex.Lock() defer s.mutex.Unlock() // 在1.0中,服务仅仅来自primary K8S集群 if cluster == string(serviceregistry.KubernetesRegistry) { pl := model.PortList{} for k, v := range ports { pl = append(pl, &model.Port{ Port: int(v), Name: k, }) } pc.ServicePort2Name[hostname] = pl } } |
当工作负载的标签/注解发生变化后,PodCache调用此方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
func (s *DiscoveryServer) WorkloadUpdate(id string, labels map[string]string, annotations map[string]string) { if labels == nil { // 没有标签,不需要推送更新给Envoy,删除缓存 s.mutex.Lock() delete(s.WorkloadsByID, id) s.mutex.Unlock() return } // 使用读锁,禁止写 s.mutex.RLock() w, f := s.WorkloadsByID[id] s.mutex.RUnlock() if !f { // 不存在于缓存中,可能从未连接到此工作负载,不需要推送 s.mutex.Lock() s.WorkloadsByID[id] = &Workload{ Labels: labels, Annotations: annotations, } s.mutex.Unlock() return } // 深度比较 if reflect.DeepEqual(w.Labels, labels) { // 标签没有变化,也不推送 return } w.Labels = labels // Pod标签变化,意味着需要重新计算Envoy配置 // 为了简单和安全,简单的进行全部推送。其实只需要识别出影响到哪些工作负载(那些可能使用到当前工作负载的),并且仅仅为这些负载进行推送 adsLog.Infof("Label change, full push %s ", id) s.ConfigUpdater.ConfigUpdate(true) } |
kube.Controller在接收到K8S的端点更新后,会调用其自身的updateEDS方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
func (c *Controller) updateEDS(ep *v1.Endpoints) { // 计算出K8S服务名 hostname := serviceHostname(ep.Name, ep.Namespace, c.domainSuffix) // 收集端点 endpoints := []*model.IstioEndpoint{} for _, ss := range ep.Subsets { for _, ea := range ss.Addresses { // 如果端点对应的Pod不存在 pod, exists := c.pods.getPodByIP(ea.IP) if !exists { // 则请求一次配置更新。Pod信息可能后续很快推送过来 if c.ConfigUpdater != nil { c.ConfigUpdater.ConfigUpdate(true) } continue } // 如果Pod存在,则抓取其标签、UID等信息,构建出IstioEndpoint labels := map[string]string(convertLabels(pod.ObjectMeta)) uid := fmt.Sprintf("kubernetes://%s.%s", pod.Name, pod.Namespace) for _, port := range ss.Ports { endpoints = append(endpoints, &model.IstioEndpoint{ Address: ea.IP, EndpointPort: uint32(port.Port), ServicePortName: port.Name, Labels: labels, UID: uid, ServiceAccount: kubeToIstioServiceAccount(pod.Spec.ServiceAccountName, pod.GetNamespace(), c.domainSuffix), }) } } } // 更新 Kubernetes分片 这个服务 这些端点 err := c.EDSUpdater.EDSUpdate(c.ClusterID, string(hostname), endpoints) if err != nil { //如果仅仅进行EDS推送不可以,则全局推送 c.ConfigUpdater.ConfigUpdate(true) } else { // 仅仅进行EDS推送 c.ConfigUpdater.ConfigUpdate(false) } } |
此接口位于networking.core包中,负责生成xDS响应。其实现位于v1alpha3子包(Istio CRD的API版本号)中。接口规格如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
type ConfigGenerator interface { // 此方法重新计算所有Envoy代理之间的共享状态,这些状态包括: // 1、Sidecar的出站集群(outbound clusters)、出站监听器(outbound listener) // 2、Sidecar/Gateway的路由表 // 这些状态保存在ConfigGenerator对象中,在后续调用BuildClusters/BuildListeners/BuildHTTPRoutes时重用 // 该方法不会调用插件,大部分插件需要的是特定代理的(例如mixer/authn/authz)的信息 // BuildYYY函数会针对预计算对象调用插件 BuildSharedPushState(env *model.Environment, push *model.PushContext) error // LDS。为指定的代理构建inbound/outbound listeners信息,多次调用此函数,不会反复构建同一监听器 BuildListeners(env *model.Environment, node *model.Proxy, push *model.PushContext) ([]*v2.Listener, error) // CDS。为指定代理构建clusters信息 BuildClusters(env *model.Environment, node *model.Proxy, push *model.PushContext) ([]*v2.Cluster, error) // RDS。 为指定代理构建HTTP routes信息 BuildHTTPRoutes(env *model.Environment, node *model.Proxy, push *model.PushContext, routeName string) (*v2.RouteConfiguration, error) } |
注意BuildYYY函数的返回值均为Envoy v2协议的ProtoBuf消息类型。
该方法的实现如下:
1 2 3 4 5 6 |
func (configgen *ConfigGeneratorImpl) BuildSharedPushState(env *model.Environment, push *model.PushContext) error { // 为Sidecar代理、Router代理分别构建出站集群 configgen.OutboundClustersForSidecars = configgen.buildOutboundClusters(env, model.Sidecar, push) configgen.OutboundClustersForGateways = configgen.buildOutboundClusters(env, model.Router, push) return nil } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 |
func (configgen *ConfigGeneratorImpl) buildOutboundClusters(env *model.Environment, proxyType model.NodeType, push *model.PushContext) []*v2.Cluster { clusters := make([]*v2.Cluster, 0) // 遍历上下文中包含的所有服务 for _, service := range push.Services { // 获取服务的目的地规则(可能为空) config := push.DestinationRule(service.Hostname) for _, port := range service.Ports { if port.Protocol == model.ProtocolUDP { continue } // 得到服务包含的主机列表,如果服务的Resolution设置为DNSLB则返回nil hosts := buildClusterHosts(env, service, port.Port) // BuildSubsetKey:生成引用特定(限定服务、端口、子集)实例集的键,Envoy使用此键查询Pilot,进而获得子集中包含的实例列表 // 示例:outbound|443||kubernetes.default.svc.k8s.gmem.cc clusterName := model.BuildSubsetKey(model.TrafficDirectionOutbound, "", service.Hostname, port.Port) // 获得集群(上游服务)的Istio账号 upstreamServiceAccounts := env.ServiceAccounts.GetIstioServiceAccounts(service.Hostname, []string{port.Name}) // convertResolution将Istio解析枚举转换为Envoy的集群发现类型(默认v2.Cluster_EDS) // ClientSideLB -> v2.Cluster_EDS // DNSLB -> v2.Cluster_STRICT_DNS // Passthrough -> v2.Cluster_ORIGINAL_DST // 生成默认(相对于子集来说是默认)集群v2.Cluster信息,包括流量策略TrafficPolicy(连接池、异常检测、TLS设置、负载均衡器) defaultCluster := buildDefaultCluster(env, clusterName, convertResolution(service.Resolution), hosts) // 更新v2.Cluster.EdsClusterConfig,将此集群的EDS的配置源设置为ADS updateEds(env, defaultCluster, service.Hostname) // 如果是HTTP/2协议,则将最大并发流数量设置为1073741824 setUpstreamProtocol(defaultCluster, port) // 将当前集群添加到列表 clusters = append(clusters, defaultCluster) if config != nil { // 如果此服务具有关联的目的地规则 destinationRule := config.Spec.(*networking.DestinationRule) // 如果使用mTLS,将DestinationRule的TrafficPolicy的证书字段填充上 convertIstioMutual(destinationRule, upstreamServiceAccounts) // 将TrafficPolicy转换为v2.Cluster的对应字段 applyTrafficPolicy(defaultCluster, destinationRule.TrafficPolicy, port) for _, subset := range destinationRule.Subsets { // 处理子集,处理方式和默认集群类似 // 子集集群的名字示例 outbound|9080|v1|details.default.svc.k8s.gmem.cc subsetClusterName := model.BuildSubsetKey(model.TrafficDirectionOutbound, subset.Name, service.Hostname, port.Port) subsetCluster := buildDefaultCluster(env, subsetClusterName, convertResolution(service.Resolution), hosts) updateEds(env, subsetCluster, service.Hostname) setUpstreamProtocol(subsetCluster, port) applyTrafficPolicy(subsetCluster, destinationRule.TrafficPolicy, port) applyTrafficPolicy(subsetCluster, subset.TrafficPolicy, port) // 调用插件,以修改生成的配置 for _, p := range configgen.Plugins { p.OnOutboundCluster(env, push, service, port, subsetCluster) } clusters = append(clusters, subsetCluster) } } else { // 如果全局启用了mTLS,并且此服务不是外部服务,并且当前代理是边车 if env.Mesh.AuthPolicy == meshconfig.MeshConfig_MUTUAL_TLS && !service.MeshExternal && proxyType == model.Sidecar { applyUpstreamTLSSettings(defaultCluster, buildIstioMutualTLS(upstreamServiceAccounts, "")) } } // 为默认集群调用插件 for _, p := range configgen.Plugins { p.OnOutboundCluster(env, push, service, port, defaultCluster) } } } return clusters } |
networking包提供了一种插件机制,允许在构造xdsapi.Listener期间,对xdsapi.Listener进行任意的修改。例如启用:
- AuthenticationPlugin插件,可以在入站监听器、出站集群上设置mTLS认证
- mixer插件,可以在入站监听器上设置策略检查
插件类型包括:
1 2 3 4 5 6 7 8 9 10 11 12 |
const ( // 身份验证插件 Authn = "authn" // RBAC插件 Authz = "authz" // Envoyfilter插件 Envoyfilter = "envoyfilter" // 健康检查插件 Health = "health" // Mixer 插件 Mixer = "mixer" ) |
插件接口规格如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
type Plugin interface { // 每当为一个服务添加一个出站监听器(到LDS输出)后,调用此方法 // 可用于在出站路径上添加额外的监听器 OnOutboundListener(in *InputParams, mutable *MutableObjects) error // 每当为一个服务添加一个监听器(到LDS输出)后,调用此方法 // 可用于添加额外的监听器 OnInboundListener(in *InputParams, mutable *MutableObjects) error // 每当一个新集群添加到CDS输出后,调用此方法 // 为每次推送调用一次,而非为每个边车/代理 OnOutboundCluster(env *model.Environment, push *model.PushContext, service *model.Service, servicePort *model.Port, cluster *xdsapi.Cluster) // 每当一个新集群添加到CDS输出后,调用此方法 // 为每个边车/代理调用 OnInboundCluster(env *model.Environment, node *model.Proxy, push *model.PushContext, service *model.Service, servicePort *model.Port, cluster *xdsapi.Cluster) // 每当新的虚拟主机(和路由)添加到出站路径的RDS后调用 OnOutboundRouteConfiguration(in *InputParams, routeConfiguration *xdsapi.RouteConfiguration) // 每当新的虚拟主机(和路由)添加到入站路径的RDS后调用 OnInboundRouteConfiguration(in *InputParams, routeConfiguration *xdsapi.RouteConfiguration) } |
具体的插件,只需要按需实现部分方法即可。
一系列传递给插件回调函数(Plugin接口函数)的值,这些值应该被插件只读访问:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
type InputParams struct { // 必须字段,监听器的协议,例如TCP/HTTP ListenerProtocol ListenerProtocol // 必须字段,环境信息 Env *model.Environment // xDS响应是给哪个节点的 Node *model.Proxy // 网格中所有代理服务的实例 ProxyInstances []*model.ServiceInstance // 和监听器同地协作的服务实例(应用到Sidecar) ServiceInstance *model.ServiceInstance // S和监听器同地协作的服务(应用到Sidecar) // 对于出站TCP监听器,此字段指向目的地服务 Service *model.Service // 监听器的端口 // 对于边车的outbound/inbound监听器,指向服务端口(而非端点端口) // 对于网关的inbound监听器,指向网关服务器端口 Port *model.Port // 推送上下文 Push *model.PushContext } |
传递给On*Listener回调的一系列对象:
1 2 3 4 5 6 |
type MutableObjects struct { // 当前正在生成的监听器配置 Listener *xdsapi.Listener // 关联到此监听器的过滤器链 FilterChains []FilterChain } |
传递给On*Cluster回调的Envoy Cluster对象,直接对应ProtoBuffer消息。
传递给On*RouteConfiguration回调的Envoy RouteConfiguration对象,直接对应ProtoBuffer消息。
本章讨论Pilot Agent的启动过程,以及Envoy的Bootstrap配置是如何生成和重新载入的。Pilot Agnet和Pilot Discovery如何通信在下一章分析。
为了方便调试,我们在本地启动Pilot Agent,参考下面的命令行参数:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
export POD_NAME=productpage-v1-8d69b45c-5qcv7 export POD_NAMESPACE=default export INSTANCE_IP=10.0.0.1 export ISTIO_BOOTSTRAP=pilot/envoy_bootstrap_tmpl.tpl # pilot-agent的入口点代码位于 pilot/cmd/pilot-agent/main.go # 此代理的角色,可选sidecar、ingress、router pilot proxy sidecar --log_output_level=default:debug --log_caller=default --domain=default.svc.k8s.gmem.cc \ # Envoy二进制文件的位置 --binaryPath=/home/alex/CPP/projects/clion/envoy/bazel-bin/source/exe/envoy-static \ # 生成的Envoy配置文件存放在何处 --configPath=pilot/envoyConfig \ # 当前代理的节点属于哪个服务 --serviceCluster=productpage \ # 发现服务的地址 --discoveryAddress=localhost:15010 \ # Envoy进程的日志级别 --proxyLogLevel=debug |
Pilot Agent的入口点位于pilot/cmd/pilot-agent/main.go,它使用包spf13.cobra来管理一组子命令,Pilot核心功能由proxy子命令实现。
入口点的启动过程如下:
- 解析命令行参数
- 初始化model.Proxy对象,在K8S环境下
- ID=$POD_NAME.$POD_NAMESPACE
- Domain 从命令行--domain参数传入
- IPAddress,从环境变量INSTANCE_IP读入
- 如果控制平面的身份验证策略是mTLS,则初始化Pilot的SAN
- 调用model.ValidateProxyConfig进行代理配置合法性验证
- 初始化数字证书位置信息,默认存储在/etc/certs/。如果当前代理的角色不是sidecar而是ingress,还要初始化Ingress数字证书位置信息
- 如果提供了Envoy配置模板,且CustomConfigFile为空(不使用自定义Envoy配置文件),则根据此模板生成一个Envoy配置文件,并将其路径赋值给CustomConfigFile
- 如果提供了statusPort,则调用status.NewServer启动一个状态服务。访问状态服务的/healthz/ready可以知晓代理是否准备好
- 调用envoy.NewProxy,创建一个envoy结构
- 调用proxy.NewAgent,创建一个agent结构
- 调用envoy.NewWatcher,创建一个watcher结构
- 调用watcher.Run启动Watcher
- 启动Agent控制循环
- 调用Reload立即载入,导致Envoy进程启动
- 调用watchCerts监听数字证书的变化
此结构位于proxy.envoy包中,封装启动envoy所需的信息:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
func NewProxy(config meshconfig.ProxyConfig, node string, logLevel string, pilotSAN []string) proxy.Proxy { // 设置命令行传入的envoy的日志级别 var args []string if logLevel != "" { args = append(args, "-l", logLevel) } return envoy{ // ProxyConfig结构 config: config, // 节点的标识符,例如sidecar~10.0.0.1~productpage-v1-8d69b45c-5qcv7.default~default.svc.k8s.gmem.cc node: node, // 额外的Envoy参数 extraArgs: args, // 启用mTLS时所需的Pilot的SAN pilotSAN: pilotSAN, } } |
该结构实现了Proxy接口:
1 2 3 4 5 6 7 8 9 |
type Proxy interface { // 启动代理 Run(interface{}, int, <-chan error) error // 在代理退出后,清除对应的纪元 Cleanup(int) // 如果以期望配置启动代理的最大尝试次数到达仍然失败,Agent在终结之前调用此方法 Panic(interface{}) } |
此方法生成Envoy的Bootstrap配置,并启动Envoy进程:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 |
func (proxy envoy) Run(config interface{}, epoch int, abort <-chan error) error { var fname string // 使用自定义配置文件,或者通过模板生成配置文件 if len(proxy.config.CustomConfigFile) > 0 { fname = proxy.config.CustomConfigFile } else { out, err := bootstrap.WriteBootstrap(&proxy.config, proxy.node, epoch, proxy.pilotSAN, proxy.opts, os.Environ()) fname = out } // 处理Envoy命令行参数 args := proxy.args(fname, epoch) if len(proxy.config.CustomConfigFile) == 0 { args = append(args, "--v2-config-only") } // e.g. -c pilot/envoyConfig/envoy-rev0.json --restart-epoch 0 --drain-time-s 2 --parent-shutdown-time-s 3 --service-cluster productpage --service-node sidecar~10.0.0.1~productpage-v1-8d69b45c-5qcv7.default~default.svc.k8s.gmem.cc --max-obj-name-len 189 --allow-unknown-fields -l warn --v2-config-only] log.Infof("Envoy command: %v", args) /* 异步命令行调用 */ cmd := exec.Command(proxy.config.BinaryPath, args...) cmd.Stdout = os.Stdout // 直接把Envoy子进程的标准输出和错误收集过来 cmd.Stderr = os.Stderr if err := cmd.Start(); err != nil { return err } // 等待Envoy进程退出 if proxy.errChan != nil { // 把Envoy退出错误码存入err通道 go func() { proxy.errChan <- cmd.Wait() }() return nil } // 再建一个通道和协程,当Envoy退出时协程写入通道,让当前协程从select case中退出 done := make(chan error, 1) go func() { done <- cmd.Wait() }() select { // Agent可以发送Abort信号,作为响应此Envoy实例需要停止 case err := <-abort: log.Warnf("Aborting epoch %d", epoch) if errKill := cmd.Process.Kill(); errKill != nil { log.Warnf("killing epoch %d caused an error %v", epoch, errKill) } return err // Envoy实例自然死亡 case err := <-done: return err } } |
此方法生成Envoy Bootstrap配置。使用的配置模板,按以下逻辑寻找:
- 如果指定了--customConfigFile,使用此文件。这步逻辑仅仅用于测试,实际无用
- 如果config.ProxyBootstrapTemplatePath不为空,使用此文件。此字段没有通过命令行暴露
- 否则,使用常量DefaultCfgDir指定的值,也就是默认配置
- 如果设置了环境变量ISTIO_BOOTSTRAP,使用此变量指定的配置模板路径
默认配置的路径是/var/lib/istio/envoy/envoy_bootstrap_tmpl.json,内容为Envoy的JSON格式的配置的Go Template。
生成的Envoy配置文件样例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 |
{ // 节点(边车和其代理的应用所在Pod)信息 "node": { "id": "sidecar~10.0.0.1~productpage-v1-8d69b45c-5qcv7.default~default.svc.k8s.gmem.cc", "cluster": "productpage", "metadata": { "istio": "sidecar" } }, // 监控指标配置,抽取标签 "stats_config": { "use_all_default_tags": false, "stats_tags": [ { "tag_name": "cluster_name", "regex": "^cluster\\.((.+(\\..+\\.svc\\.cluster\\.local))\\.)" }, { "tag_name": "tcp_prefix", "regex": "^tcp\\.((.*)\\.)\\w+$" }, { "tag_name": "response_code", "regex": "_rq(_(\\d{3}))$" }, { "tag_name": "response_code_class", "regex": "_rq(_(\\dxx))$" }, { "tag_name": "http_conn_manager_listener_prefix", "regex": "^listener(=\\.).*\\.http\\.(((:[_.[:digit:]]*|[_\\[\\]aAbBcCdDeEfF[:digit:]]*))\\.)" }, { "tag_name": "http_conn_manager_prefix", "regex": "^http\\.(((:[_.[:digit:]]*|[_\\[\\]aAbBcCdDeEfF[:digit:]]*))\\.)" }, { "tag_name": "listener_address", "regex": "^listener\\.(((:[_.[:digit:]]*|[_\\[\\]aAbBcCdDeEfF[:digit:]]*))\\.)" } ] }, // 管理端口配置,管理端点也暴露了Prometheus Exporter "admin": { "access_log_path": "/dev/null", "address": { "socket_address": { "address": "127.0.0.1", "port_value": 15000 } } }, // 动态发现配置 "dynamic_resources": { "lds_config": { "ads": {} }, "cds_config": { "ads": {} }, // 使用集群xds-grpc提供的聚合发现服务 "ads_config": { "api_type": "GRPC", "refresh_delay": "1s", "grpc_services": [ { "envoy_grpc": { "cluster_name": "xds-grpc" } } ] } }, // 静态配置 "static_resources": { "clusters": [ // Prometheus Exporter { "name": "prometheus_stats", "type": "STATIC", "connect_timeout": "0.250s", "lb_policy": "ROUND_ROBIN", "hosts": [ { "socket_address": { "protocol": "TCP", "address": "127.0.0.1", "port_value": 15000 } } ] }, // xDS服务地址,测试时注意把前文提到的发现服务开启 { "name": "xds-grpc", "type": "STRICT_DNS", "connect_timeout": "1s", "lb_policy": "ROUND_ROBIN", "hosts": [ { "socket_address": { "address": "localhost", "port_value": 15010 } } ], "circuit_breakers": { "thresholds": [ { "priority": "DEFAULT", "max_connections": 100000, "max_pending_requests": 100000, "max_requests": 100000 }, { "priority": "HIGH", "max_connections": 100000, "max_pending_requests": 100000, "max_requests": 100000 } ] }, "upstream_connection_options": { "tcp_keepalive": { "keepalive_time": 300 } }, "http2_protocol_options": {} } ], // 暴露Prometheus指标的监听器 "listeners": [ { "address": { "socket_address": { "protocol": "TCP", "address": "0.0.0.0", "port_value": 15090 } }, "filter_chains": [ { "filters": [ { "name": "envoy.http_connection_manager", "config": { "codec_type": "AUTO", "stat_prefix": "stats", "route_config": { "virtual_hosts": [ { "name": "backend", "domains": [ "*" ], "routes": [ // 访问curl http://localhost:15090/stats/prometheus可以直接看到指标 { "match": { "prefix": "/stats/prometheus" }, "route": { "cluster": "prometheus_stats" } } ] } ] }, "http_filters": { "name": "envoy.router" } } } ] } ] } ] } } |
此结构位于proxy.envoy包中,它是envoy proxy的代理人(Agent) 。它负责管理envoy进程的生命周期和重启。
Agent跟踪所有运行中的代理epochs及其配置。Envoy热重启通过启动具有递增的重启纪元的(strictly incremented restart epoch)代理进程实现。优雅的关闭旧的epochs并且将所有必须的状态传递给最新的epoch是envoy的职责,Agent不会去主动关闭旧的epoch对应的envoy进程。
最初的epoch是0。
这里使用的重启协议(restart protocol )是和Envoy的重启纪元(restart epochs)语义匹配的:为了成功启动用来替代正在运行的Envoy的新进程,新进程的重启纪元必须正好比所有正在运行的其它Envoy进程中纪元最大的那个高1。
Agent需要调用Proxy的两个函数:
- Run函数,用于启动代理,且必须一直阻塞直到代理退出
- Cleanup函数,用于在代理退出后立即进行清理,此函数必须是非阻塞的,因为它在Agent的主控制循环中被调用
这两个函数都以代理的纪元作为入参。
每当Run函数返回了错误,Agent都假设代理启动失败了,并且会重试若干次。后续的重启可能重用之前失败的启动的纪元号。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
func NewAgent(proxy Proxy, retry Retry) Agent { return &agent{ // Proxy接口的实现,也就是上面的envoy proxy: proxy, // 重试(为proxy应用新配置)规则 retry: retry, // 当前纪元以及对应的配置 epochs: make(map[int]interface{}), // 通过此通道传递期望的配置 configCh: make(chan interface{}), // 通过此通道提示代理退出 statusCh: make(chan exitStatus), // 通过此通道终止运行中的实例 abortCh: make(map[int]chan error), } } |
该结构实现了Agent接口:
1 2 3 4 5 6 7 |
type Agent interface { // 为代理设置期望的配置 —— Agent对比当前活动的配置和期望的配置,如果有必要,触发重启操作 // 如果重启失败,以指数后退(exponential back-off)重试 ScheduleConfigUpdate(config interface{}) // 启动Agent控制循环 Run(ctx context.Context) } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 |
func (a *agent) Run(ctx context.Context) { // 限速器,平滑1QPS,爆发10QPS。为了处理所有通道的消息需要高QPS rateLimiter := rate.NewLimiter(1, 10) for { // Wait 阻塞,直到有了1个事件许可,等价于WaitN(ctx,1) err := rateLimiter.Wait(ctx) var delay time.Duration = 1<<63 - 1 // 如果先前已经安排了下一次调度,设置延迟时间戳 if a.retry.restart != nil { delay = time.Until(*a.retry.restart) } select { // 尝试从配置通道读取新配置,注意,Watcher从此通道传来的仅仅是散列值,此值仅用于识别配置是否变化,而不包含实际的配置信息 case config := <-a.configCh: // 如果和当前配置不同 if !reflect.DeepEqual(a.desiredConfig, config) { // 赋值给期望配置 a.desiredConfig = config // 重置 重试预算(剩余的重试次数) a.retry.budget = a.retry.MaxRetries a.reconcile() } // 如果此通道可读,则说明代理应该退出 case status := <-a.statusCh: // 删除status中的epoch delete(a.epochs, status.epoch) // 删除终止通道,防止在非Abort错误时自我终止 delete(a.abortCh, status.epoch) // 更新配置 a.currentConfig = a.epochs[a.latestEpoch()] // Abort错误 if status.err == errAbort { log.Infof("Epoch %d aborted", status.epoch) } else if status.err != nil { // 用于Envoy热重启竞态条件的关系,这里需要通过杀死所有Envoy进程,立即的、非优雅的重启 a.abortAll() } else { // 正常退出 log.Infof("Epoch %d exited normally", status.epoch) } // 清理此纪元的配置文件 a.proxy.Cleanup(status.epoch) // 如果存在错误,调度一次重试 if status.err != nil { // 延迟已调度的延迟,如果不为空,说明已经调度了 if a.retry.restart == nil { // 如果还有重试预算 if a.retry.budget > 0 { delayDuration := a.retry.InitialInterval * (1 << uint(a.retry.MaxRetries-a.retry.budget)) restart := time.Now().Add(delayDuration) a.retry.restart = &restart // 调度下一次重试 a.retry.budget = a.retry.budget - 1 } else { // 没有重试预算了,Panic a.proxy.Panic(status.epoch) return } } } // 如果已经到了下次调度的延迟 case <-time.After(delay): a.reconcile() // 通道关闭 case _, more := <-ctx.Done(): if !more { a.terminate() return } } } } |
此方法重现载入新的代理配置:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
func (a *agent) reconcile() { // 取消掉任何计划重启 a.retry.restart = nil // 再次检查期望配置是否和当前配置相同 if reflect.DeepEqual(a.desiredConfig, a.currentConfig) { return } // 新增纪元 epoch := a.latestEpoch() + 1 // 重置abort通道 abortCh := make(chan error, MaxAborts) a.epochs[epoch] = a.desiredConfig a.abortCh[epoch] = abortCh a.currentConfig = a.desiredConfig // 在新协程中启动代理 go a.waitForExit(a.desiredConfig, epoch, abortCh) } |
此方法调用代理的Run()方法,并且在其崩了后,将错误码写入通道:
1 2 3 4 |
func (a *agent) waitForExit(config interface{}, epoch int, abortCh <-chan error) { err := a.proxy.Run(config, epoch, abortCh) a.statusCh <- exitStatus{epoch: epoch, err: err} } |
此结构位于proxy.envoy包中,负责当代理配置发生变化后,触发Agent的reload。它持有agent,还监控数字证书的变更:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
func NewWatcher(config meshconfig.ProxyConfig, agent proxy.Agent, role model.Proxy, certs []CertSource, pilotSAN []string) Watcher { return &watcher{ // Agent agent: agent, // 代理角色 role: role, // ProxyConfig config: config, // CertSource列表 certs: certs, // Pilot的SAN pilotSAN: pilotSAN, } } |
此结构实现了Watcher接口:
1 2 3 4 5 6 |
type Watcher interface { // 阻塞性的运行监控循环 Run(context.Context) // 使用最新的配置Reload Agent Reload() } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
func (w *watcher) Run(ctx context.Context) { // 启动Agent的控制循环 go w.agent.Run(ctx) // 立即以最新配置重载Agent w.Reload() // 监控数字证书文件的变更 certDirs := make([]string, 0, len(w.certs)) for _, cert := range w.certs { certDirs = append(certDirs, cert.Directory) } // 如果数字证书发生变化,则调用Reload方法重载Agent go watchCerts(ctx, certDirs, watchFileEvents, defaultMinDelay, w.Reload) <-ctx.Done() } |
促使Agent立即启动Envoy:
1 2 3 4 5 6 7 8 9 |
func (w *watcher) Reload() { // 生成数字证书的哈希 h := sha256.New() for _, cert := range w.certs { generateCertHash(h, cert.Directory, cert.Files) } // 以哈希值作为配置信息,调度下次更新 w.agent.ScheduleConfigUpdate(h.Sum(nil)) } |
Agnet的ScheduleConfigUpdate很简单,就是写通道:
1 2 3 |
func (a *agent) ScheduleConfigUpdate(config interface{}) { a.configCh <- config } |
从这里可以看出,导致代理需要重新载入的唯一原因,就是数字证书发生了变更。
此结构表示当前代理的配置,和命令行选项是对应的:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 |
type ProxyConfig struct { // 生成的配置文件存放的路径 ConfigPath string // Envoy二进制文件的位置 BinaryPath string // 所属的集群,也就是K8S服务 ServiceCluster string // 热重启时多久Drain掉连接,必须大于1s DrainDuration *duration.Duration // 热重启时关闭父进程的延迟 ParentShutdownDuration *duration.Duration // 发现服务(xDS)的访问地址,代理连接此服务进行各种资源的发现 DiscoveryAddress string // 服务发现的轮询间隔,用于EDS, CDS, LDS, 但不用于RDS,必须大于1ms DiscoveryRefreshDelay *duration.Duration // Zipkin兼容的Tracer地址 ZipkinAddress string // Envoy连接到上游集群端点的超时 ConnectTimeout *duration.Duration // Statsd UDP监听地址 StatsdUdpAddress string // 管理端口,在此端口监听管理命令 ProxyAdminPort int32 // 此实例所在的可用性区域 // 在K8S中,节点的可用性区域通过注解failure-domain.beta.kubernetes.io/zone设置 AvailabilityZone string // 控制平面的身份验证策略 ControlPlaneAuthPolicy AuthenticationPolicy // 自定义的代理配置文件路径,当前Mixer、Pilot的前置代理使用自定义配置文件 CustomConfigFile string // Envoy监控指标的name字段的最大长度 StatNameLength int32 // 并发的工作线程数量 Concurrency int32 // Envoy Bootstrap配置模板的路径 ProxyBootstrapTemplatePath string // 如何重定向入站流量到Envoy InterceptionMode ProxyConfig_InboundInterceptionMode XXX_NoUnkeyedLiteral struct{} XXX_unrecognized []byte XXX_sizecache int32 } |
此结构表示数字证书的存储位置:
1 2 3 4 5 6 |
type CertSource struct { // 所在目录 Directory string // 证书文件列表 Files []string } |
代理端的通信行为(主要是xDS)完全由Envoy负责,和Istio无关,后者仅仅提供Bootstrap配置并在必要时Reload。
Envoy的入口点定义在source/exe/main.cc中,仅仅是创建一个Envoy::MainCommon对象并调用其run方法:
1 2 3 4 5 |
int main(int argc, char** argv) { std::unique_ptr<Envoy::MainCommon> main_common; main_common = std::make_unique<Envoy::MainCommon>(argc, argv); return main_common->run() ? EXIT_SUCCESS : EXIT_FAILURE; } |
Envoy::MainCommon对象的创建过程如下:
- 创建化负责信号处理、线程池、命令参数解析、定时器的实例变量
- 创建MainCommonBase base_:
- 创建热重启器
- 进行日志配置
- 创建化监控指标的存储
- 创建Envoy服务InstanceImpl —— Envoy服务本体
- 初始化热重启器
- 通过组件工厂ComponentFactory创建DrainManager
- 初始化InstanceImpl:
- 加载Bootstrap配置
- 配置StoreRoot,该对象是负责监控指标的存储,包括Sink的管理
- 从Bootstrap配置创建一个InitialImpl,表示Envoy服务器的最初配置
- 利用InitialImpl创建Admin端点,并启动其监听器,开始监听15000端口
- 创建过载管理器,用于保护Envoy实例所在节点不资源耗尽
- 创建监听器管理器ListenerManager
- 通过主线程的Dispatcher,为主线程注册线程本地存储(TLS)
- 调用StoreRoot的initializeThreading,启用多线程支持
- 创建Runtime::Loader,此对象能够从磁盘读取Envoy的运行时快照
- 创建SSL上下文管理器
- 创建集群管理器工厂ProdClusterManagerFactory
- 传入集群管理器工厂,初始化Envoy服务器的主配置Configuration::MainImpl
- 使用密钥管理器添加bootstrap_中配置的静态密钥
- 使用集群管理工厂,传入bootstrap配置,创建集群管理器
- 创建限速客户端工厂RateLimitClientFactory
- 调用监听器管理器,添加所有静态配置的监听器
- 调用initializeTracers,为当前Envoy服务器初始化Tracer
- 调用initializeStatsSinks,初始化监控信息的Sink
- 为HTTP上下文设置Tracer
- 如果配置了LDS动态资源,则调用监听器管理器,创建注册gRPC订阅
- 将Sinks添加到StoreRoot
- 为StoreRoot的Flush设置定时器
- 初始化用于死锁检测的GuardDog
run方法简单的转发给MainCommonBase.run,进而转发给Server::InstanceImpl的run方法。如果运行模式是Serve,则后者的逻辑如下:
- 初始化RunHelper
- 调用GuardDog创建针对主线程的WatchDog
- 启动WatchDog,由主线程的Dispatcher创建定时器,定期touch此WatchDog
- 阻塞性的运行Dispatcher事件循环
- 调用runPostCallbacks,在运行事件循环之前执行所有后置回调。默认情况下没有需要执行的回调
- 调用event_base_loop,运行event_base直到1-N个未决/活动事件可用
- 当Dispatcher.exit被调用(也就是主事件循环退出)后,停止WatchDog
- 调用Server::InstanceImpl的terminate方法,停止Envoy服务器
- 重置RunHelper
从第5步开始,主线程会阻塞很长时间。后续的逻辑都主要通过libevent事件驱动 —— 当发生网络事件后执行某种回调。
如果定义了宏ENVOY_HANDLE_SIGNALS,则MainCommon的成员变量负责处理信号:
1 2 3 4 5 6 |
#ifdef ENVOY_HANDLE_SIGNALS // 在备选栈上执行信号处理 Envoy::SignalAction handle_sigs; // 打印Backtrace并退出 Envoy::TerminateHandler log_on_terminate; #endif |
MainCommon的成员变量 PlatformImpl platform_impl_;定义平台依赖的操作,目前仅仅包含一个线程工厂实现。
MainCommon的成员变量 Envoy::OptionsImpl options_;负责利用TCLAP解析命令行参数,它实现 Envoy::Server::Option接口,通过此接口可获得各种Envoy启动选项。
MainCommon的成员变量 Event::RealTimeSystem real_time_system_;用于墙上时间度量,以及设置定时器、执行回调。
MainCommon持有此类型的一个对象,并且把绝大部分职责委托给此类处理。构造函数逻辑如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 |
MainCommonBase::MainCommonBase(OptionsImpl& options, Event::TimeSystem& time_system, TestHooks& test_hooks, Server::ComponentFactory& component_factory, std::unique_ptr<Runtime::RandomGenerator>&& random_generator, Thread::ThreadFactory& thread_factory) : options_(options), component_factory_(component_factory), thread_factory_(thread_factory) { // c-ares是一个C语言实现的异步请求DNS的实现,在实例初始化时,应该先调用该函数对该库相关内部模块 ares_library_init(ARES_LIB_INIT_ALL); // 忽略SIGPIPE信号 Event::Libevent::Global::initialize(); switch (options_.mode()) { case Server::Mode::InitOnly: case Server::Mode::Serve: { if (!options.hotRestartDisabled()) { // 热重启器,接口由HotRestart提供,实现代码和配置的“热”重启 restarter_ = std::make_unique<Server::HotRestartImpl>(options_);init-cluster-mgr } // ThreadLocal::InstanceImpl实现Instance,负责注册线程,读写线程本地数据 tls_ = std::make_unique<ThreadLocal::InstanceImpl>(); // 写、读日志锁,实现类是ProcessSharedMutex,可以跨进程使用的互斥量 Thread::BasicLockable& log_lock = restarter_->logLock(); Thread::BasicLockable& access_log_lock = restarter_->accessLogLock(); auto local_address = Network::Utility::getLocalAddress(options_.localAddressIpVersion()); // 日志上下文,相当于同时调用setLogLevel, setLogFormat, setLock logging_context_ = std::make_unique<Logger::Context>(options_.logLevel(), options_.logFormat(), log_lock); // 配置各Envoy组件的日志 configureComponentLogLevels(); // 监控指标存储,ThreadLocalStoreImpl是支持线程本地缓存的StoreRoot实现 stats_store_ = std::make_unique<Stats::ThreadLocalStoreImpl>(options_.statsOptions(), // Stats::StatDataAllocator负责创建Counter、Gauge等Metric的实例 restarter_->statsAllocator()); // 创建Envoy服务器 server_ = std::make_unique<Server::InstanceImpl>( options_, time_system, local_address, test_hooks, *restarter_, *stats_store_, access_log_lock, component_factory, std::move(random_generator), *tls_, thread_factory); break;// 1、使用密钥管理器添加bootstrap_中配置的静态密钥 // 2、使用集群管理工厂,传入bootstrap配置,创建集群管理器 // 3、创建限速客户端工厂RateLimitClientFactory // 4、调用监听器管理器,添加所有静态配置的监听器 // 对于每个监听器,如果存在draining的监听器占用了它绑定的地址,则夺取其SocketSharedPtr // 如果不存在,则调用创建新的SocketSharedPtr,可能导致创建底层套接字,并绑定到端口 // 5、调用initializeTracers,为当前Envoy服务器初始化Tracer // 6、调用initializeStatsSinks,初始化监控信息的Sink } case Server::Mode::Validate: break; } } |
InstanceImpl类实现接口Instance,代表一个运行中的、由若干紧密协作的组件构成的、独立运行的Envoy服务。
该类的构造函数逻辑如下:
1 2 3 4 5 6 7 8 9 10 11 |
// 创建基于文件的日志Sink if (!options.logPath().empty()) { file_logger_ = std::make_unique<Logger::FileSinkDelegate>( options.logPath(), access_log_manager_, Logger::Registry::getSink()); } // 初始化热重启器 restarter_.initialize(*dispatcher_, *this); // 创建DrainManager drain_manager_ = component_factory.createDrainManager(*this); // 初始化Envoy服务 initialize(options, local_address, component_factory); |
初始化逻辑如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 |
// 首先打印epoch信息,以及所有以及注册的、静态链接的扩展 ENVOY_LOG(info, "initializing epoch {} (hot restart version={})", options.restartEpoch(), restarter_.version()); ENVOY_LOG(info, "statically linked extensions:"); ENVOY_LOG(info, " access_loggers: {}", Registry::FactoryRegistry<Configuration::AccessLogInstanceFactory>::allFactoryNames()); // 编写扩展时,你需要负责注册扩展,只需要声明如下形式的静态变量即可 // 扩展类名 扩展所属分类 // static Registry::RegisterFactory<FileAccessLogFactory, Server::Configuration::AccessLogInstanceFactory> register_; // 将Bootstrap配置(由Pilot Agent生成)读取到envoy::config::bootstrap::v2::Bootstrap中 // Bootstrap是Protobuf消息(::google::protobuf::Message) // 自动依据扩展名来决定如何解析配置文件 // 如果指定了--config-yaml选项,则YAML中的配置覆盖到--config-path指定的配置中 // 如果以Bootstrap V2配置解析失败,则尝试以V1重新解析 InstanceUtil::loadBootstrapConfig(bootstrap_, options); // 记录Bootstrap配置修改时间 bootstrap_config_update_time_ = time_system_.systemTime(); // 需要尽早配置StoreRoot,以支持监控指标收集 // TagProducer分析一个指标名,从中抽取一系列标签 stats_store_.setTagProducer(Config::Utility::createTagProducer(bootstrap_)); // StatsMatcher决定哪些指标启用禁用 stats_store_.setStatsMatcher(Config::Utility::createStatsMatcher(bootstrap_)); // 在上述存储中创建Envoy服务状态统计 // struct ServerStats { // Stats::Gauge &uptime_; // Stats::Gauge &concurrency_; // ... // }; server_stats_ = std::make_unique<ServerStats>( // (stats_store_).gauge("server." + std::string("uptime")), ... ServerStats{ALL_SERVER_STATS(POOL_GAUGE_PREFIX(stats_store_, "server."))}); // 收集server.***指标 server_stats_->concurrency_.set(options_.concurrency()); server_stats_->hot_restart_epoch_.set(options_.restartEpoch()); // 本地信息,例如节点名称、所属集群、IP地址 local_info_ = std::make_unique<LocalInfo::LocalInfoImpl>( bootstrap_.node(), local_address, options.serviceZone(), options.serviceClusterName(), options.serviceNodeName()); // 创建一个Initial对象,此对象表示初始化配置 —— 在加载主配置之前需要知道的配置信息 Configuration::InitialImpl initial_config(bootstrap_); // 如果可以,关闭父进程中的admin processing,这让admin processing可以启动一个新进程 HotRestart::ShutdownParentAdminInfo info; info.original_start_time_ = original_start_time_; restarter_.shutdownParentAdmin(info); original_start_time_ = info.original_start_time_; // 全局的admin HTTP端点 admin_ = std::make_unique<AdminImpl>(initial_config.admin().profilePath(), *this); admin_->startHttpListener(initial_config.admin().accessLogPath(), options.adminAddressPath(), initial_config.admin().address(), stats_store_.createScope("listener.admin.")); // ConfigTracker供admin端点/config_dump使用,管理一系列能够提供配置信息的回调函数 config_tracker_entry_ = // add返回EntryOwner,此类型实现了map条目的RAII语义 —— 仅当EntryOwner或ConfigTracker销毁后条目被移除 admin_->getConfigTracker().add("bootstrap", [this] { return dumpBootstrapConfig(); }); // 将admin监听器添加到连接管理器。ConnectionHandler能够添加/删除/启用/禁用/停止网路监听器 // 调用handler->addListener(*listener_)后,创建ActiveListener,监听15000端口。也就是说ConnectionHandler负责启动端口监听 admin_->addListenerToHandler(handler_.get()); // 创建过载管理器 overload_manager_ = std::make_unique<OverloadManagerImpl>(dispatcher(), stats(), threadLocal(), bootstrap_.overload_manager()); // 创建监听器管理器,负责管理:所有监听器、所有负责连接处理的工作线程(Worker) // 创建指定数量的WorkerImpl,为每个WorkerImpl向过载管理器注册过载回调(行为是不再接受连接) // 为每个WorkerImpl创建Dispatcher,并将此Dispatcher注册到ThreadLocal::Instance以支持后续读写线程本地变量 listener_manager_ = std::make_unique<ListenerManagerImpl>(*this, listener_component_factory_, worker_factory_, time_system_); // 主线程也需要注册到TLS thread_local_.registerThread(*dispatcher_, true); // 当所有工作线程对象初始化后,调用下面的方法,调用Stats::StoreRoot的如下方法,以支持多线程操作 stats_store_.initializeThreading(*dispatcher_, thread_local_); // Runtime::Loader能够从磁盘读取运行时快照 runtime_loader_ = component_factory.createRuntime(*this, initial_config); // SSL上下文管理器,管理进程中所有SSL上下文 // 实现类ContextManagerImpl,线程模型如下:上下文可以从任意线程创建(但是实践上通常从主线程分配) // 上下文的分配/销毁是少见操作,因此整体上使用一把锁来保护 ssl_context_manager_ = std::make_unique<Ssl::ContextManagerImpl>(time_system_); // ProdClusterManagerFactory是ClusterManagerFactory的产品环境实现,Envoy很多命名包含Prod的类,用于和测试、Mock用途的类区分 // 集群管理器工厂,负责创建集群管理操作所需要的对象 cluster_manager_factory_ = std::make_unique<Upstream::ProdClusterManagerFactory>( runtime(), stats(), threadLocal(), random(), dnsResolver(), sslContextManager(), dispatcher(), localInfo(), secretManager(), api(), http_context_); // Configuration::MainImpl是主服务器配置的实现,其初initialize方法必须在Envoy服务器完全准备好后调用,应用自举配置到服务器: config_.initialize(bootstrap_, *this, *cluster_manager_factory_); // 为HTTP上下文设置Tracer http_context_.setTracer(config_.httpTracer()); // 如果包含LDS动态资源 if (bootstrap_.dynamic_resources().has_lds_config()) { // 则调用监听器管理器,创建LDS API Provider // 委托给ListenerComponentFactory.createLdsApi,最终会在cm.adsMux()上注册gRPC订阅 listener_manager_->createLdsApi(bootstrap_.dynamic_resources().lds_config()); } // 将Sink添加到StoreRoot for (Stats::SinkPtr& sink : config_.statsSinks()) { stats_store_.addSink(*sink); } // 注册Stat刷出定时器 // 某些Sink需要Dispatcher的支持,因此在主循环开始前,不能刷出 stat_flush_timer_ = dispatcher_->createTimer([this]() -> void { flushStats(); }); stat_flush_timer_->enableTimer(config_.statsFlushInterval()); // 看门狗,用于死锁检测,在Worker启动之前、主循环run之前初始化 guard_dog_ = std::make_unique<Server::GuardDogImpl>(stats_store_, config_, time_system_, api()); |
此方法的代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
bool MainCommonBase::run() { // 对应不同的运行模式 switch (options_.mode()) { case Server::Mode::Serve: // 调用Server::InstanceImpl::run server_->run(); return true; case Server::Mode::Validate: { ... } case Server::Mode::InitOnly: { ... } } |
此方法的代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
void InstanceImpl::run() { // 创建运行助手 // 服务器实例 // 服务器选项 事件分发器 集群管理器 run_helper_ = std::make_unique<RunHelper>(*this, options_, *dispatcher_, clusterManager(), // 日志管理器 初始化管理器 过载管理器 access_log_manager_, init_manager_, overloadManager(), // 用于启动工作线程的回调函数 [this]() -> void { startWorkers(); }); // 看门狗 auto watchdog = guard_dog_->createWatchDog(api_->threadFactory().currentThreadId()); watchdog->startWatchdog(*dispatcher_); // Event::Dispatcher是对libevent的包装,负责事件分发 // 启动事件循环,阻塞 dispatcher_->run(Event::Dispatcher::RunType::Block); // 重置看门狗 guard_dog_->stopWatching(watchdog); watchdog.reset(); // 停止 terminate(); // 重置运行助手 run_helper_.reset(); } |
此运行助手对象的职责包括:
- 初始化信号处理,主要时关闭Instance
- 设置集群管理器的初始化后回调
- 暂停RDS订阅
- 执行初始化管理器的初始化,初始化其上注册的所有目标,最后启动工作线程
- 恢复RDS订阅
- 启动过载管理器
构造函数代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 |
RunHelper::RunHelper(Instance& instance, Options& options, Event::Dispatcher& dispatcher, Upstream::ClusterManager& cm, AccessLog::AccessLogManager& access_log_manager, InitManagerImpl& init_manager, OverloadManager& overload_manager, std::function<void()> workers_start_cb) { // 创建基于Dispatcher的信号处理事件回调 if (options.signalHandlingEnabled()) { sigterm_ = dispatcher.listenForSignal(SIGTERM, [&instance]() { instance.shutdown(); }); sigint_ = dispatcher.listenForSignal(SIGINT, [&instance]() { instance.shutdown(); }); sig_usr_1_ = dispatcher.listenForSignal(SIGUSR1, [&access_log_manager]() { access_log_manager.reopen(); }); sig_hup_ = dispatcher.listenForSignal(SIGHUP, []() { // 不响应挂断信号,要知道如何热重启,查看文档 }); } // 启动集群管理器初始化后回调,直到上游集群全部初始化完毕,不会启动工作线程并处理流量 cm.setInitializedCb([&instance, &init_manager, &cm, workers_start_cb]() { // 如果实例被关闭,不执行任何操作。随时都可能收到信号 if (instance.isShutdown()) { return; } // 暂停RDS发现,确保在订阅了所有RDS资源之前,不会发送任何请求 // 订阅在初始化回调中发生,因此在初始化管理器的初始化回调执行完毕之前暂停RDS发现 cm.adsMux().pause(Config::TypeUrl::get().RouteConfiguration); // 所有集群均已经初始化,现在初始化 初始化管理器 // 下面的回调不能捕获this,因为它执行的时候RunHelper可能已经销毁 init_manager.initialize([&instance, workers_start_cb]() { // 如果实例被关闭,不执行任何操作。随时都可能收到信号 if (instance.isShutdown()) { return; } // 否则(在初始化所有注册的target之后)启动工作线程 workers_start_cb(); }); // 初始化回调执行完毕 cm.adsMux().resume(Config::TypeUrl::get().RouteConfiguration); }); // 启动负载管理器 overload_manager.start(); } |
此方法的代码如下:
1 2 3 4 5 6 7 |
void DispatcherImpl::run(RunType type) { run_tid_ = api_.threadFactory().currentThreadId(); // 在执行事件循环之间的钩子回调 runPostCallbacks(); // 启动libevent事件循环 event_base_loop(base_.get(), type == RunType::NonBlock ? EVLOOP_NONBLOCK : 0); } |
到这里可以看到,启动过程的最后,主线程陷入无限循环。
所有后续的逻辑,由Envoy的事件机制来触发。此事件机制的核心是事件分发器接口(Dispatcher),在Envoy启动期间,很多事件回调被注册到Dispatcher,并在以后异步的、可能反复的执行。
Envoy使用libevent2提供的事件机制,但是在其上做了一层封装 —— Dispatcher,事件回调就是通过此Dispatcher注册的。Dispatcher的接口如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 |
namespace Envoy { namespace Event { struct FileReadyType { // 文件可读 static const uint32_t Read = 0x1; // 文件可写 static const uint32_t Write = 0x2; // 对端关闭了文件 static const uint32_t Closed = 0x4; }; /** * 事件分发器的post()调用之后执行的回调 */ typedef std::function<void()> PostCb; /** * 抽象的事件分发循环 */ class Dispatcher { public: virtual ~Dispatcher() {} /** * 此分发器使用的时间源 */ virtual TimeSystem& timeSystem() PURE; /** * 清空延迟删除队列 */ virtual void clearDeferredDeleteList() PURE; /** * 创建一个服务器连接 * @param socket 该指针指向ConnectionSocket,此结构持有一个套接字(FD)以及一些元数据(例如本地地址) * 对于服务器连接来说,此结构代表一个已经accept的套接字 * 对于客户端连接来说,此结构代码一个正连接到远程地址的套接字 * @param transport_socket 提供连接使用的传输套接字。TransportSocket负责实际的读写以及数据转换(例如TLS) * @return Network::ConnectionPtr 返回一个归属调用者的服务器连接对象 */ virtual Network::ConnectionPtr createServerConnection(Network::ConnectionSocketPtr&& socket, Network::TransportSocketPtr&& transport_socket) PURE; /** * 创建一个客户端连接 * @param address 需要连接到的服务器 * @param source_address 绑定到的本地地址,或者nulptr自动绑定 * @param transport_socket 此连接使用的传输套接字 * @param options 套接字选项 * @return Network::ClientConnectionPtr 返回一个归属调用者的客户端连接对象 */ virtual Network::ClientConnectionPtr createClientConnection(Network::Address::InstanceConstSharedPtr address, Network::Address::InstanceConstSharedPtr source_address, Network::TransportSocketPtr&& transport_socket, const Network::ConnectionSocket::OptionsSharedPtr& options) PURE; /** * 创建一个供启动事件循环的线程独占使用的异步DNS解析器 * @param resolvers DNS服务器地址,默认/etc/resolv.conf * @return 返回归属调用者的Network::DnsResolverSharedPtr */ virtual Network::DnsResolverSharedPtr createDnsResolver(const std::vector<Network::Address::InstanceConstSharedPtr>& resolvers) PURE; /** * 创建一个文件(对于Linux来说,任何文件式的接口,包括普通文件、套接字都支持),当其可读可写时触发回调 * @param fd 监控的文件描述符 * @param cb 文件可读写时执行的回调 * @param trigger 边缘触发还是水平触发 * @param events 此事件最初监听的事件类型,FileReadyType按位或 */ virtual FileEventPtr createFileEvent(int fd, FileReadyCb cb, FileTriggerType trigger, uint32_t events) PURE; /** * @return Filesystem::WatcherPtr 返回归属调用者的文件系统监控器 */ virtual Filesystem::WatcherPtr createFilesystemWatcher() PURE; /** * 在指定的端口上创建一个监听器 * @param socket 需要监听的套接字 * @param cb 监听器事件的处理回调 * @param bind_to_port 是否需要绑定到传输端口 * @param hand_off_restored_destination_connections 当恢复(restoring)了新连接的目的地址后,该监听器是否 * 应该搜索另外一个(更匹配连接的)监听器 * @return Network::ListenerPtr 返回归属调用者的监听器 */ virtual Network::ListenerPtr createListener(Network::Socket& socket, Network::ListenerCallbacks& cb, bool bind_to_port, bool hand_off_restored_destination_connections) PURE; /** * 分配一个定时器 */ virtual Event::TimerPtr createTimer(TimerCb cb) PURE; /** * 提交一个条目,延迟删除 */ virtual void deferredDelete(DeferredDeletablePtr&& to_delete) PURE; /** * 退出事件循环 */ virtual void exit() PURE; /** * 监听信号事件,进程中只有单个分发器可以处理信号,否则行为未定义 * * @param signal_num 需要监听的信号 * @param cb 信号发生时执行的回调 * @return SignalEventPtr返回归属调用者的信号事件 */ virtual SignalEventPtr listenForSignal(int signal_num, SignalCb cb) PURE; /** * 添加一个Post回调,此回调在执行事件循环的那个线程异步的执行。此调用线程安全 */ virtual void post(PostCb callback) PURE; /** * 执行事件循环,知道某个回调或其它线程调用exit() * 阻塞模式:除非exit()被调用,不会退出循环 * 非阻塞模式:仅仅执行活动事件的回调,然后事件循环就退出 */ enum class RunType { Block, NonBlock }; virtual void run(RunType type) PURE; /** * 获取此分发器的带水位支持的缓冲的工厂 */ virtual Buffer::WatermarkFactory& getWatermarkFactory() PURE; }; typedef std::unique_ptr<Dispatcher> DispatcherPtr; } // namespace Event } // namespace Envoy |
DispatcherImpl是基于libevent的Dispatcher实现。可以看到,注册回调主要依靠createFileEvent方法。
在Envoy启动阶段,会注册多个事件回调,包括热重启模块的信号回调、DNS解析模块的回调、ADS客户端的定时器回调。
该接口用于执行多个目标的初始化:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
class Manager { public: virtual ~Manager() {} /** * 注册一个在未来需要初始化的目标,初始化管理器会在适当的时候,调用所有目标的initialize方法 */ virtual void registerTarget(Target& target) PURE; enum class State { /** * 目标们尚未被初始化 */ NotInitialized, /** * 目标们正在被初始化 */ Initializing, /** * 所有目标已经初始化完毕 */ Initialized }; /** * 返回状态 */ virtual State state() const PURE; }; |
单个初始化目标由下面的类型表示:
1 2 3 4 5 6 7 8 9 10 |
class Target { public: virtual ~Target() {} /** * 当目标应该进行初始化时调用该方法 * @param callback 目标的初始化完成后,调用的回调 */ virtual void initialize(std::function<void()> callback) PURE; }; |
该实现用于Post集群管理器、Pre监听时的初始化管理。集群管理器并不是单个实例,例如每个Cluster都有自己的初始化管理器。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
class InitManagerImpl : public Init::Manager { public: // 初始化所有目标,并在完毕后执行总回调 // 1、如果目标集为空,直接总回调,进入Initialized状态 // 2、否则,将此总回调赋值给实例变量。进入Initializing状态,依次initializeTarget每个目标 void initialize(std::function<void()> callback); void registerTarget(Init::Target& target) override; State state() const override { return state_; } private: // 调用单个目标的initialize,并在完成后回调中移除此目标 // 如果移除后没有更多目标,将管理器设置为Initialized状态,并且执行总回调 void initializeTarget(Init::Target& target); std::list<Init::Target*> targets_; State state_{State::NotInitialized}; // 总回调 std::function<void()> callback_; }; |
HotRestartImpl实现热重启功能,在初始化时它为UDS注册一个事件回调:
1 2 3 4 5 6 7 8 9 10 11 12 |
void HotRestartImpl::initialize(Event::Dispatcher& dispatcher, Server::Instance& server) { socket_event_ = // 注册事件回调 dispatcher.createFileEvent(my_domain_socket_, [this](uint32_t events) -> void { ASSERT(events == Event::FileReadyType::Read); onSocketEvent(); }, // 边缘触发,监听可读事件 Event::FileTriggerType::Edge, Event::FileReadyType::Read); server_ = &server; } |
事件到达后调用onSocketEvent()方法,获取RCP消息,根据消息类型做出各种处理,例如:关闭Admin端点、Drain监听器、返回监控统计信息、停止当前进程。
DnsResolver负责异步的DNS解析。多个组件需要进行DNS解析,例如StrictDnsClusterImpl.startPreInit方法会解析集群的DNS域名。
InstanceImpl在构造函数中创建DnsResolverImpl:
1 2 3 4 5 6 7 |
InstanceImpl::InstanceImpl( ... ): ... dns_resolver_(dispatcher_->createDnsResolver({})){ ... } Network::DnsResolverSharedPtr DispatcherImpl::createDnsResolver( const std::vector<Network::Address::InstanceConstSharedPtr>& resolvers) { return Network::DnsResolverSharedPtr{new Network::DnsResolverImpl(*this, resolvers)}; } |
DnsResolverImpl在初始化期间,会将onAresSocketStateChange方法作为回调传递给ares:
1 2 3 4 5 6 7 8 9 10 11 12 13 |
DnsResolverImpl::DnsResolverImpl(){ initializeChannel(&options, 0); } void DnsResolverImpl::initializeChannel(ares_options* options, int optmask) { // 将回调提供给ares options->sock_state_cb = [](void* arg, int fd, int read, int write) { static_cast<DnsResolverImpl*>(arg)->onAresSocketStateChange(fd, read, write); }; options->sock_state_cb_data = this; // 初始化ares ares_init_options(&channel_, options, optmask | ARES_OPT_SOCK_STATE_CB); } |
当某个ares套接字可读可写时,会调用onAresSocketStateChange:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
void DnsResolverImpl::onAresSocketStateChange(int fd, int read, int write) { auto it = events_.find(fd); // 如果不可读不可写,而且之间已经跟踪了此fd,则删除fd不再跟踪 if (read == 0 && write == 0) { if (it != events_.end()) { events_.erase(it); } return; } // 如果之前没有跟踪过此fd if (it == events_.end()) { // 注册事件回调 events_[fd] = dispatcher_.createFileEvent( fd, [this, fd](uint32_t events) { onEventCallback(fd, events); }, Event::FileTriggerType::Level, Event::FileReadyType::Read | Event::FileReadyType::Write); } } // 事件回调的逻辑是调用ares: void DnsResolverImpl::onEventCallback(int fd, uint32_t events) { const ares_socket_t read_fd = events & Event::FileReadyType::Read ? fd : ARES_SOCKET_BAD; const ares_socket_t write_fd = events & Event::FileReadyType::Write ? fd : ARES_SOCKET_BAD; ares_process_fd(channel_, read_fd, write_fd); updateAresTimer(); } |
上面的“ares套接字可读可写”,可以由解析DNS的请求来触发,例如StrictDnsClusterImpl.startPreInit会触发DNS解析:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
void StrictDnsClusterImpl::startPreInit() { for (const ResolveTargetPtr& target : resolve_targets_) { target->startResolve(); } } void StrictDnsClusterImpl::ResolveTarget::startResolve() { active_query_ = parent_.dns_resolver_->resolve( ..., []( address_list )->void { // 解析成功后的回调 } ) } ActiveDnsQuery* DnsResolverImpl::resolve(const std::string& dns_name, DnsLookupFamily dns_lookup_family, ResolveCb callback) { std::unique_ptr<PendingResolution> pending_resolution( new PendingResolution(callback, dispatcher_, channel_, dns_name)); pending_resolution->getHostByName(AF_INET6); } |
上面代码的最后一个方法,回调被传递给PendingResolution对象,随即调用其getHostByName方法:
1 2 3 4 5 6 |
void DnsResolverImpl::PendingResolution::getHostByName(int family) { ares_gethostbyname(channel_, dns_name_.c_str(), family, [](void* arg, int status, int timeouts, hostent* hostent) { static_cast<PendingResolution*>(arg)->onAresHostCallback(status, timeouts, hostent); }, this); } |
此方法调用c-ares库进行主机名解析,回调是PendingResolution的onAresHostCallback方法:
1 2 3 4 5 |
void DnsResolverImpl::PendingResolution::onAresHostCallback(int status, int timeouts, hostent* hostent) { // 解析处地址列表,并调用上面的回调C0 callback_(std::move(address_list)); } |
总之,DNS解析是由Dispatcher的回调机制和ares的回调机制协作实现的,达到的效果是异步、高效的DNS解析。
在Envoy主服务InstanceImpl的主配置的初始化过程中,会创建集群管理器ClusterManagerImpl。
集群管理器负责Bootstrap配置中定义的静态集群的初始化,以及CDS/EDS的订阅的启动。这个处理过程比较复杂,分为多个阶段完成
- 初始化静态/DNS集群
- 初始化预定义的EDS集群
- 如果需要,初始化CDS订阅,并等待响应
- 初始化CDS提供的集群,分为两个阶段
- 主(Primary)集群初始化,所有非EDS集群是主集群
- 次(Secondary)集群初始化,EDS集群为次集群。每个EDS集群会独自创建一个xDS订阅,这导致EDS集群需要依赖于非EDS集群(xds-grpc,STRICT_DNS集群),因此EDS集群需要在第二阶段初始化
- 如果集群启用了主动健康检查,此时会触发单轮检查
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 |
ClusterManagerImpl::ClusterManagerImpl(...) : ..., // 注册针对每个集群的初始化回调,每个集群本身初始化完毕后调用此回调。集群管理器使用此回调进行后初始化处理 init_helper_([this](Cluster& cluster) { onClusterInit(cluster); }), config_tracker_entry_( admin.getConfigTracker().add("clusters", [this] { return dumpClusterConfigs(); })), // 使用主线程分发器,及其时间源 time_source_(main_thread_dispatcher.timeSystem()), dispatcher_(main_thread_dispatcher), http_context_(http_context) { // gRPC异步客户端管理器(AsyncClientManager)可以为每个gRPC服务(envoy::api::v2::core::GrpcService配置) // 创建AsyncClient的工厂 async_client_manager_ = std::make_unique<Grpc::AsyncClientManagerImpl>(*this, tls, time_source_, api); // 获得自举配置中的集群管理器配置 const auto& cm_config = bootstrap.cluster_manager(); ... // 处理v1 API的遗留风格的服务发现配置 if (bootstrap.dynamic_resources().deprecated_v1().has_sds_config()) { eds_config_ = bootstrap.dynamic_resources().deprecated_v1().sds_config(); } // 读取所有静态资源 for (const auto& cluster : bootstrap.static_resources().clusters()) { // 加载所有主集群,默认配置下有prometheus-stat、xds-grpc两个静态配置的主集群 if (cluster.type() != envoy::api::v2::Cluster::EDS) { loadCluster(cluster, "", false, active_clusters_); } } // 如果必要,创建ADS,可能依赖于主集群 if (bootstrap.dynamic_resources().has_ads_config()) { ads_mux_ = std::make_unique<Config::GrpcMuxImpl>( local_info, // 代理本地环境信息 // 创建gRPC异步客户端 Config::Utility::factoryForGrpcApiConfigSource( *async_client_manager_, bootstrap.dynamic_resources().ads_config(), stats) ->create(), // 使用主线程的事件分发器 main_thread_dispatcher, // 寻找gRPC服务 *Protobuf::DescriptorPool::generated_pool()->FindMethodByName( "envoy.service.discovery.v2.AggregatedDiscoveryService.StreamAggregatedResources"), // 随机数生成器 // Stats::Scope,被限定了scope的stats random_, stats_, // 限速配置 Envoy::Config::Utility::parseRateLimitSettings(bootstrap.dynamic_resources().ads_config())); } else { // 没有ADS配置,提供一个占位符 ads_mux_ = std::make_unique<Config::NullGrpcMuxImpl>(); } // 在ADS初始化后,加载EDS类型的静态集群,这种集群可能依赖ADS来发现端点 for (const auto& cluster : bootstrap.static_resources().clusters()) { // Now load all the secondary clusters. if (cluster.type() == envoy::api::v2::Cluster::EDS) { loadCluster(cluster, "", false, active_clusters_); } } // 更新监控指标 cm_stats_.cluster_added_.add(bootstrap.static_resources().clusters().size()); updateGauges(); // 到这里,所有静态集群已经加载完毕,现在检查基于v1 API SDS的集群、基于ADS的集群 const ClusterInfoMap loaded_clusters = clusters(); if (bootstrap.dynamic_resources().deprecated_v1().has_sds_config()) { ... } // 获取当前本地集群的名称 absl::optional<std::string> local_cluster_name; if (!cm_config.local_cluster_name().empty()) { local_cluster_name_ = cm_config.local_cluster_name(); local_cluster_name = cm_config.local_cluster_name(); if (active_clusters_.find(local_cluster_name.value()) == active_clusters_.end()) { throw EnvoyException( fmt::format("local cluster '{}' must be defined", local_cluster_name.value())); } } // 一旦最初的静态Bootstrap集群被创建(包括本地集群),就可以创建线程本地的集群管理器 tls_->set([this, local_cluster_name]( Event::Dispatcher& dispatcher) -> ThreadLocal::ThreadLocalObjectSharedPtr { // ThreadLocalClusterManagerImpl在线程本地缓存集群数据,并从parent central dynamic cluster获取更新 // 此对象维护负载均衡器状态、所有已经创建的线程池 return std::make_shared<ThreadLocalClusterManagerImpl>(*this, dispatcher, local_cluster_name); }); // 创建CDS客户端,并传递给集群管理器初始化助手ClusterManagerInitHelper对象 if (bootstrap.dynamic_resources().has_cds_config()) { cds_api_ = factory_.createCds(bootstrap.dynamic_resources().cds_config(), eds_config_, *this); init_helper_.setCds(cds_api_.get()); } else { init_helper_.setCds(nullptr); } // 将所有静态自举集群传递给初始化助手处理,这会导致: // 1、初始化所有主集群 // 2、进行post-init处理,来初始化任何线程感知的负载均衡器,并创建per-worker的主机(端点)集更新 for (auto& cluster : active_clusters_) { init_helper_.addCluster(*cluster.second->cluster_); } // 将状态设置为WaitingForStaticInitialize // 如果所有主集群都初始化完毕了,可能进行静态配置的次集群(EDS)初始化 // 此方法会调用maybeFinishInitialize init_helper_.onStaticLoadComplete(); // 启动ADS客户端,创建新的gRPC流 ads_mux_->start(); } |
由集群管理器调用init_helper_.addCluster完成,该方法实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
void ClusterManagerInitHelper::addCluster(Cluster& cluster) { // 集群第一次初始化后执行的回调。例如,对于动态DNS集群,此回调将在最初的DNS解析完成后调用 const auto initialize_cb = [&cluster, this] { onClusterInit(cluster); }; if (cluster.initializePhase() == Cluster::InitializePhase::Primary) { // 主集群初始化 primary_init_clusters_.push_back(&cluster); cluster.initialize(initialize_cb); } else { // 从CDS获取到的动态集群,走这个分支 secondary_init_clusters_.push_back(&cluster); // 如果当前已经启动了第二阶段初始化,则立即开始初始化 if (started_secondary_initialize_) { cluster.initialize(initialize_cb); } } } |
可以看到,init_helper会调用集群的initialize方法,这是定义在ClusterImplBase中的模板方法:
1 2 3 4 5 6 |
void ClusterImplBase::initialize(std::function<void()> callback) { // 设置初始化回调 initialization_complete_callback_ = callback; // 开始进行预初始化 startPreInit(); } |
集群初始化的第一步是预初始化,静态集群对该方法的实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
void StaticClusterImpl::startPreInit() { // 检查是否配置了监控检查,如果是,将所有节点标记为不健康 const auto& health_checker_flag = health_checker_ != nullptr ? absl::optional<Upstream::Host::HealthFlag>(Host::HealthFlag::FAILED_ACTIVE_HC) : absl::nullopt; // PriorityState为每个优先级绑定一组 主机+对应的位置权重映射 auto& priority_state = priority_state_manager_->priorityState(); for (size_t i = 0; i < priority_state.size(); ++i) { priority_state_manager_->updateClusterPrioritySet( i, std::move(priority_state[i].first), absl::nullopt, absl::nullopt, health_checker_flag, overprovisioning_factor_); } priority_state_manager_.reset(); // 预初始化完成后回调 onPreInitComplete(); } |
DNS集群的startPreInit,从DNS解析开始:
1 2 3 4 5 |
void StrictDnsClusterImpl::startPreInit() { for (const ResolveTargetPtr& target : resolve_targets_) { target->startResolve(); } } |
DNS解析完成后,异步的执行下面的回调:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 |
void StrictDnsClusterImpl::ResolveTarget::startResolve() { ENVOY_LOG(debug, "starting async DNS resolution for {}", dns_address_); parent_.info_->stats().update_attempt_.inc(); active_query_ = parent_.dns_resolver_->resolve( dns_address_, parent_.dns_lookup_family_, [this](const std::list<Network::Address::InstanceConstSharedPtr>&& address_list) -> void { active_query_ = nullptr; // 异步解析完成,更新指标 parent_.info_->stats().update_success_.inc(); // 为每个解析结果创建主机对象HostImpl std::unordered_map<std::string, HostSharedPtr> updated_hosts; HostVector new_hosts; for (const Network::Address::InstanceConstSharedPtr& address : address_list) { new_hosts.emplace_back(new HostImpl( parent_.info_, dns_address_, Network::Utility::getAddressWithPort(*address, port_), lb_endpoint_.metadata(), lb_endpoint_.load_balancing_weight().value(), locality_lb_endpoint_.locality(), lb_endpoint_.endpoint().health_check_config(), locality_lb_endpoint_.priority())); } HostVector hosts_added; HostVector hosts_removed; // 调用BaseDynamicClusterImpl::updateDynamicHostList更新主机列表 if (parent_.updateDynamicHostList(new_hosts, hosts_, hosts_added, hosts_removed, updated_hosts, all_hosts_)) { parent_.updateAllHosts(hosts_added, hosts_removed, locality_lb_endpoint_.priority()); } else { parent_.info_->stats().update_no_rebuild_.inc(); } all_hosts_ = std::move(updated_hosts); // 结束处理,尽管集群可能有多个DNS名称,这里在解析成功一个后就结束初始化过程 parent_.onPreInitComplete(); resolve_timer_->enableTimer(parent_.dns_refresh_rate_ms_); }); } |
EDS集群的startPreInit,从启动EDS订阅开始:
1 |
void EdsClusterImpl::startPreInit() { subscription_->start({cluster_name_}, *this); } |
预初始化完成后,此时端点列表已经获得。onPreInitComplete执行下面的回调,进行必要的健康检查:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
void ClusterImplBase::onPreInitComplete() { // 防止重复调用 if (initialization_started_) { return; } initialization_started_ = true; // 执行onInitDone回调 init_manager_.initialize([this]() { onInitDone(); }); } void ClusterImplBase::onInitDone() { if (health_checker_ && pending_initialize_health_checks_ == 0) { for (auto& host_set : prioritySet().hostSetsPerPriority()) { pending_initialize_health_checks_ += host_set->hosts().size(); } // 在健康检查完毕后调用finishInitialization health_checker_->addHostCheckCompleteCb([this](HostSharedPtr, HealthTransition) -> void { if (pending_initialize_health_checks_ > 0 && --pending_initialize_health_checks_ == 0) { finishInitialization(); } }); } // 不需要健康检查,直接调用 if (pending_initialize_health_checks_ == 0) { finishInitialization(); } } |
然后调用finishInitialization方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
void ClusterImplBase::finishInitialization() { ASSERT(initialization_complete_callback_ != nullptr); ASSERT(initialization_started_); auto snapped_callback = initialization_complete_callback_; initialization_complete_callback_ = nullptr; // 重新载入健康(通过健康检查)的主机 if (health_checker_ != nullptr) { reloadHealthyHosts(); } if (snapped_callback != nullptr) { // 执行回调,也就是ClusterManagerInitHelper::onClusterInit snapped_callback(); } } |
此回调的实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
void ClusterManagerInitHelper::onClusterInit(Cluster& cluster) { // 针对每个集群的初始化回调 per_cluster_init_callback_(cluster); // 将当前集群,移除主集群列表/此集群列表 removeCluster(cluster); } void ClusterManagerInitHelper::removeCluster(Cluster& cluster) { if (state_ == State::AllClustersInitialized) { return; } std::list<Cluster*>* cluster_list; if (cluster.initializePhase() == Cluster::InitializePhase::Primary) { cluster_list = &primary_init_clusters_; } else { ASSERT(cluster.initializePhase() == Cluster::InitializePhase::Secondary); cluster_list = &secondary_init_clusters_; } cluster_list->remove(&cluster); maybeFinishInitialize(); } |
per_cluster_init_callback_在集群管理器构造时,通过初始化列表传递给ClusterManagerInitHelper,其实现就是集群管理器的onClusterInit方法:
集群管理器通过此方法进行集群的后初始化处理:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 |
void ClusterManagerImpl::onClusterInit(Cluster& cluster) { // 到这里为止,集群尚未配置以支持跨线程更新 // ClusterData,持有集群配置、版本信息、Cluster的引用等信息 auto cluster_data = active_clusters_.find(cluster.info()->name()); if (cluster_data->second->thread_aware_lb_ != nullptr) { // 如果线程感知的负载均衡器已经存在,则初始化之 cluster_data->second->thread_aware_lb_->initialize(); }管理器 // 配置以支持跨线程更新 // PrioritySet,包含单个集群的所有HostSet,以优先级分组 // addMemberUpdateCb,如果任何HostSet发生变化,或者新HostSet创建,调用此回调 cluster.prioritySet().addMemberUpdateCb([&cluster, this](uint32_t priority, const HostVector& hosts_added, const HostVector& hosts_removed) { // 当集群的主机集更新时,此回调触发。需要将此更新发送给所有线程本地配置 // 某些情况下,合并一个时间窗口内多次主机集更新可以提升性能。目前能实现的、安全的合并,不支持添加、删除 // 主机的情况。也就是说,仅仅对那些改变主机健康状态/权重/元数据的更新,可以被合并 bool scheduled = false; // 合并时间窗口1000ms const auto merge_timeout = PROTOBUF_GET_MS_OR_DEFAULT(cluster.info()->lbConfig(), update_merge_window, 1000); // Remember: we only merge updates with no adds/removes — just hc/weight/metadata changes. const bool is_mergeable = !hosts_added.size() && !hosts_removed.size(); if (merge_timeout > 0) { // 尝试调度合并,如果不支持合并,返回false管理器 scheduled = scheduleUpdate(cluster, priority, is_mergeable, merge_timeout); } // 无法调度 if (!scheduled) { // 立即向线程本地集群递送更新 postThreadLocalClusterUpdate(cluster, priority, hosts_added, hosts_removed); } }); // 递送第一次集群主机集更新 // 遍历集群的所有HostSet for (auto& host_set : cluster.prioritySet().hostSetsPerPriority()) { if (host_set->hosts().empty()) { continue; } // 对于包含主机的HostSet,向线程本地集群递送更新 postThreadLocalClusterUpdate(cluster, host_set->priority(), host_set->hosts(), HostVector{}); } } void ClusterManagerImpl::postThreadLocalClusterUpdate(const Cluster& cluster, uint32_t priority, const HostVector& hosts_added, const HostVector& hosts_removed) { const auto& host_set = cluster.prioritySet().hostSetsPerPriority()[priority]; // 拷贝各类主机副本 HostVectorConstSharedPtr hosts_copy(new HostVector(host_set->hosts())); HostVectorConstSharedPtr healthy_hosts_copy(new HostVector(host_set->healthyHosts())); HostVectorConstSharedPtr degraded_hosts_copy(new HostVector(host_set->healthyHosts())); HostsPerLocalityConstSharedPtr hosts_per_locality_copy = host_set->hostsPerLocality().clone(); HostsPerLocalityConstSharedPtr healthy_hosts_per_locality_copy = host_set->healthyHostsPerLocality().clone(); HostsPerLocalityConstSharedPtr degraded_hosts_per_locality_copy = host_set->degradedHostsPerLocality().clone(); // 在所有已经注册的线程的Dispatcher上执行回调(异步),并最后在主线程上同步的执行 // 每个工作线程都有自己的Dispatcher // 调用Dispatcher.post()。导致回调在目标线程的Dispatcher事件循环上下文中执行 tls_->runOnAllThreads([this, name = cluster.info()->name(), priority, hosts_copy, healthy_hosts_copy, degraded_hosts_copy, hosts_per_locality_copy, healthy_hosts_per_locality_copy, degraded_hosts_per_locality_copy, locality_weights = host_set->localityWeights(), hosts_added, hosts_removed]() { // 这里应该只对线程本地数据进行操作 // 调用线程本地集群管理器,更新集群的主机成员 // 如果使用线程感知LB(TLS集群),则重新创建负载均衡器 ThreadLocalClusterManagerImpl::updateClusterMembership( name, priority, HostSetImpl::updateHostsParams(hosts_copy, hosts_per_locality_copy, healthy_hosts_copy, healthy_hosts_per_locality_copy, degraded_hosts_copy, degraded_hosts_per_locality_copy), locality_weights, hosts_added, hosts_removed, *tls_); }); } |
在每个集群的初始化后都会调用, 判断初始化过程是否可以结束:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 |
void ClusterManagerInitHelper::maybeFinishInitialize() { // 如果当前正在进行初始的静态集群的加载,或者正等待CDS初始化完毕,不做任何事情 if (state_ == State::Loading || state_ == State::WaitingForCdsInitialize) { return; } // 如果主集群没有完全初始化,不做任何事情 if (!primary_init_clusters_.empty()) { return; } // 如果正在等待次集群初始化 if (!secondary_init_clusters_.empty()) { // 如果次集群初始化阶段尚未开始 if (!started_secondary_initialize_) { // 此触发次集群初始化阶段 started_secondary_initialize_ = true; // Cluster::initialize()方法可能修改列表secondary_init_clusters_,移除当前正在初始化的集群 for (auto iter = secondary_init_clusters_.begin(); iter != secondary_init_clusters_.end();) { Cluster* cluster = *iter; ++iter; // 初始化次集群 cluster->initialize([cluster, this] { onClusterInit(*cluster); }); } } return; } started_secondary_initialize_ = false; // 等待静态初始化,且需要CDS if (state_ == State::WaitingForStaticInitialize && cds_) { // 进行CDS的初始化 state_ = State::WaitingForCdsInitialize; cds_->initialize(); } else { // 所有集群已经初始化完毕 state_ = State::AllClustersInitialized; if (initialized_callback_) { initialized_callback_(); } } } |
调用此方法,可以设置在所有集群都初始化之后,调用的回调。
RunHelper会调用此函数,注册的回调会导致InitManager初始化,后者会导致:
- LdsApiImpl,注册到InitManager的Target,被初始化
- LdsApiImpl初始化完毕后,工作进程启动
在InstanceImpl初始化阶段,它会创建全局的监听器管理器:
1 2 |
listener_manager_ = std::make_unique<ListenerManagerImpl>(*this, listener_component_factory_, worker_factory_, time_system_); |
构造函数如下:
1 2 3 4 5 6 7 8 9 10 11 12 |
ListenerManagerImpl::ListenerManagerImpl(Instance& server, ListenerComponentFactory& listener_factory, WorkerFactory& worker_factory, TimeSource& time_source) : server_(server), time_source_(time_source), factory_(listener_factory), stats_(generateStats(server.stats())), config_tracker_entry_(server.admin().getConfigTracker().add( "listeners", [this] { return dumpListenerConfigs(); })) { // 创建工作线程 for (uint32_t i = 0; i < server.options().concurrency(); i++) { workers_.emplace_back(worker_factory.createWorker(server.overloadManager())); } } |
在主配置初始化阶段,静态监听器被创建:
1 2 3 4 |
const auto& listeners = bootstrap.static_resources().listeners(); for (ssize_t i = 0; i < listeners.size(); i++) { server.listenerManager().addOrUpdateListener(listeners[i], "", false); } |
MainImpl::initialize将监听器创建工作委托给监听器管理器完成:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 |
bool ListenerManagerImpl::addOrUpdateListener(const envoy::api::v2::Listener& config, const std::string& version_info, bool modifiable) { // 如果监听器没有配置名称,随机生成 std::string name; if (!config.name().empty()) { name = config.name(); } else { name = server_.random().uuid(); } const uint64_t hash = MessageUtil::hash(config); ENVOY_LOG(debug, "begin add/update listener: name={} hash={}", name, hash); auto existing_active_listener = getListenerByName(active_listeners_, name); auto existing_warming_listener = getListenerByName(warming_listeners_, name); if ((existing_warming_listener != warming_listeners_.end() && (*existing_warming_listener)->blockUpdate(hash)) || (existing_active_listener != active_listeners_.end() && (*existing_active_listener)->blockUpdate(hash))) { // 重复或者被锁定的监听器,不得更新 return false; } // 实例化监听器对象 ListenerImplPtr new_listener( new ListenerImpl(config, version_info, *this, name, modifiable, workers_started_, hash)); ListenerImpl& new_listener_ref = *new_listener; // 强制要求相同名字的监听器配置必须具有相同的IP地址,以避免更新期间出现冲突,兵却让我们可以使用相同的绑定地址 // 需要注意,如果绑定到端口0(让内核自由选择端口),新监听器会使用对应的旧监听器所监听的端口 if ((existing_warming_listener != warming_listeners_.end() && *(*existing_warming_listener)->address() != *new_listener->address()) || (existing_active_listener != active_listeners_.end() && *(*existing_active_listener)->address() != *new_listener->address())) { throw EnvoyException("监听地址发生变化,不允许"); } bool added = false; if (existing_warming_listener != warming_listeners_.end()) { // 现有监听器在预热阶段,获取现有监听器的套接字,然后替换原先的监听器 new_listener->setSocket((*existing_warming_listener)->getSocket()); *existing_warming_listener = std::move(new_listener); } else if (existing_active_listener != active_listeners_.end()) { // 现有监听器是激活的,工作线程是否已经启动,影响处理方式 // 但是不管工作线程是否启动,都需要从现有监听器将套接字拿过来 new_listener->setSocket((*existing_active_listener)->getSocket()); if (workers_started_) { // 工作线程已经启动,加入到预热列表 warming_listeners_.emplace_back(std::move(new_listener)); } else { // 工作线程尚未启动,替换激活列表中现有监听器 *existing_active_listener = std::move(new_listener); } } else { if (!new_listener->bindToPort() && (hasListenerWithAddress(warming_listeners_, *new_listener->address()) || hasListenerWithAddress(active_listeners_, *new_listener->address()))) { const std::string message = throw EnvoyException("无法添加,现有监听器的地址和当前监听器的重复"); } // 新添加的监听器,工作线程是否已经启动影响处理方式 // 查找是否存在draining的监听器绑定到相同地址 Network::SocketSharedPtr draining_listener_socket; auto existing_draining_listener = std::find_if( draining_listeners_.cbegin(), draining_listeners_.cend(), [&new_listener](const DrainingListener& listener) { return *new_listener->address() == *listener.listener_->socket().localAddress(); }); if (existing_draining_listener != draining_listeners_.cend()) { // Draining监听器已经监听了我们的套接字,这是一个边缘情况(Edge case) // 发生的原因可能是监听器移除,然后由很快被添加回来(使用相同的地址,相同或不同的名称) draining_listener_socket = existing_draining_listener->listener_->getSocket(); } // 为新监听器设置套接字,使用draining的 new_listener->setSocket(draining_listener_socket ? draining_listener_socket // 或者创建新的 : factory_.createListenSocket(new_listener->address(), new_listener->listenSocketOptions(), new_listener->bindToPort())); if (workers_started_) { // 如果工作线程已经启动,作为预热监听器添加 warming_listeners_.emplace_back(std::move(new_listener)); } else { // 否则作为激活监听器添加 active_listeners_.emplace_back(std::move(new_listener)); } added = true; } // 执行监听器的初始化 new_listener_ref.initialize(); return true; } |
如果需要创建新的监听器,则监听器管理器会调用下面的构造函数:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 |
ListenerImpl::ListenerImpl(const envoy::api::v2::Listener& config, const std::string& version_info, ListenerManagerImpl& parent, const std::string& name, bool modifiable, bool workers_started, uint64_t hash) // 监听器管理器 监听地址 : parent_(parent), address_(Network::Address::resolveProtoAddress(config.address())), // 全局的监控指标scope global_scope_(parent_.server_.stats().createScope("")), // 监听器的监控指标scope listener_scope_( parent_.server_.stats().createScope(fmt::format("listener.{}.", address_->asString()))), // 是否需要绑定到端口 bind_to_port_(PROTOBUF_GET_WRAPPED_OR_DEFAULT(config.deprecated_v1(), bind_to_port, true)), // 是否直接转发给原始目的地对应的监听器 hand_off_restored_destination_connections_( PROTOBUF_GET_WRAPPED_OR_DEFAULT(config, use_original_dst, false)), // 每个连接的缓冲区限额 per_connection_buffer_limit_bytes_( PROTOBUF_GET_WRAPPED_OR_DEFAULT(config, per_connection_buffer_limit_bytes, 1024 * 1024)), // 监听器Tag,用于连接处理器跟踪 listener_tag_(parent_.factory_.nextListenerTag()), name_(name), reverse_write_filter_order_( PROTOBUF_GET_WRAPPED_OR_DEFAULT(config, bugfix_reverse_write_filter_order, true)), // 监听器是否可修改或删除 // 监听器是在工作线程启动前,还是启动后添加的 // 查重哈希 modifiable_(modifiable), workers_started_(workers_started), hash_(hash), // Drain管理器,负责处理连接的draining local_drain_manager_(parent.factory_.createDrainManager(config.drain_type())), // 配置 配置版本信息 config_(config), version_info_(version_info), // 过滤器处理超时 listener_filters_timeout_( PROTOBUF_GET_MS_OR_DEFAULT(config, listener_filters_timeout, 15000)) { // 允许侦听任意地址 if (config.has_transparent()) { addListenSocketOptions(Network::SocketOptionFactory::buildIpTransparentOptions()); } // 允许绑定到非本地的,以及目前不存在的地址 if (config.has_freebind()) { addListenSocketOptions(Network::SocketOptionFactory::buildIpFreebindOptions()); } // TFO,简化握手,提高连接打开速度 if (config.has_tcp_fast_open_queue_length()) { addListenSocketOptions(Network::SocketOptionFactory::buildTcpFastOpenOptions( config.tcp_fast_open_queue_length().value())); } // 其它套接字选项 if (config.socket_options().size() > 0) { addListenSocketOptions( Network::SocketOptionFactory::buildLiteralOptions(config.socket_options())); } // 如果监听器过滤器不为空,则创建监听器过滤器工厂(的列表) if (!config.listener_filters().empty()) { listener_filter_factories_ = parent_.factory_.createListenerFilterFactoryList(config.listener_filters(), *this); } // 如果设置了use_original_dst标记,强制添加original dst这个监听器过滤器 if (PROTOBUF_GET_WRAPPED_OR_DEFAULT(config, use_original_dst, false)) { auto& factory = Config::Utility::getAndCheckFactory<Configuration::NamedListenerFilterConfigFactory>( Extensions::ListenerFilters::ListenerFilterNames::get().OriginalDst); listener_filter_factories_.push_back( factory.createFilterFactoryFromProto(Envoy::ProtobufWkt::Empty(), *this)); } // 如果设置了use_proxy_proto标记,强制添加代理协议监听器 // 此监听器位于监听器过滤器链的尾部 if (PROTOBUF_GET_WRAPPED_OR_DEFAULT(config.filter_chains()[0], use_proxy_proto, false)) { auto& factory = Config::Utility::getAndCheckFactory<Configuration::NamedListenerFilterConfigFactory>( Extensions::ListenerFilters::ListenerFilterNames::get().ProxyProtocol); listener_filter_factories_.push_back( factory.createFilterFactoryFromProto(Envoy::ProtobufWkt::Empty(), *this)); } bool need_tls_inspector = false; // 存放所有过滤器链匹配 std::unordered_set<envoy::api::v2::listener::FilterChainMatch, MessageUtil, MessageUtil> filter_chains; // 遍历过滤器链 for (const auto& filter_chain : config.filter_chains()) { const auto& filter_chain_match = filter_chain.filter_chain_match(); if (filter_chains.find(filter_chain_match) != filter_chains.end()) { throw EnvoyException("多个过滤器链使用了相同的匹配规则"); } filter_chains.insert(filter_chain_match); // 如果没有配置传输套接字,那么,默认: // 1、对于普通TCP流量,使用raw_buffer // 2、对于TLS流量,使用基于BoringSSL的tls auto transport_socket = filter_chain.transport_socket(); if (!filter_chain.has_transport_socket()) { if (filter_chain.has_tls_context()) { transport_socket.set_name(Extensions::TransportSockets::TransportSocketNames::get().Tls); MessageUtil::jsonConvert(filter_chain.tls_context(), *transport_socket.mutable_config()); } else { transport_socket.set_name( Extensions::TransportSockets::TransportSocketNames::get().RawBuffer); } } // DownstreamTransportSocketConfigFactory,每个用于下游连接的传输套接字(例如RawBufferSocketFactory)实现此接口 // 其createTransportSocketFactory方法返回传输套接字的工厂 auto& config_factory = Config::Utility::getAndCheckFactory< Server::Configuration::DownstreamTransportSocketConfigFactory>(transport_socket.name()); // 转换为上述工厂的配置信息 ProtobufTypes::MessagePtr message = Config::Utility::translateToFactoryConfig(transport_socket, config_factory); // 可以基于IP地址CIDR匹配 std::vector<std::string> destination_ips; for (const auto& destination_ip : filter_chain_match.prefix_ranges()) { const auto& cidr_range = Network::Address::CidrRange::create(destination_ip); destination_ips.push_back(cidr_range.asString()); } // 可以基于服务器名称(例如TLS协议的SNI)匹配 std::vector<std::string> server_names(filter_chain_match.server_names().begin(), filter_chain_match.server_names().end()); // 不支持局部通配符名称,仅仅支持*.gmem.cc,而不支持www.*.cc for (const auto& server_name : server_names) { if (server_name.find('*') != std::string::npos && !isWildcardServerName(server_name)) { throw EnvoyException(); } } // 可以基于应用层协议匹配 std::vector<std::string> application_protocols( filter_chain_match.application_protocols().begin(), filter_chain_match.application_protocols().end()); // 传输套接字工厂上下文 Server::Configuration::TransportSocketFactoryContextImpl factory_context( parent_.server_.sslContextManager(), *listener_scope_, parent_.server_.clusterManager(), parent_.server_.localInfo(), parent_.server_.dispatcher(), parent_.server_.random(), parent_.server_.stats()); // 关联初始化管理器,此管理器作为动态密钥提供者 factory_context.setInitManager(initManager()); // 添加此过滤器链配置 addFilterChain( PROTOBUF_GET_WRAPPED_OR_DEFAULT(filter_chain_match, destination_port, 0), destination_ips, server_names, filter_chain_match.transport_protocol(), application_protocols, filter_chain_match.source_type(), // 创建传输套接字工厂 config_factory.createTransportSocketFactory(*message, factory_context, server_names), parent_.factory_.createNetworkFilterFactoryList(filter_chain.filters(), *this)); need_tls_inspector |= filter_chain_match.transport_protocol() == "tls" || (filter_chain_match.transport_protocol().empty() && (!server_names.empty() || !application_protocols.empty())); } // 用于更快的查找 convertDestinationIPsMapToTrie(); // 如果需要,且没有明确配置,自动注入TLS Inspector这个监听器过滤器 if (need_tls_inspector) { for (const auto& filter : config.listener_filters()) { if (filter.name() == Extensions::ListenerFilters::ListenerFilterNames::get().TlsInspector) { need_tls_inspector = false; break; } } if (need_tls_inspector) { // 过滤器链规则依赖TLS Inspector但是没有明确配置,这里进行注入 // 如果没有编译TLS Inspector到Envoy二进制文件中,注入会失败 auto& factory = Config::Utility::getAndCheckFactory<Configuration::NamedListenerFilterConfigFactory>( Extensions::ListenerFilters::ListenerFilterNames::get().TlsInspector); listener_filter_factories_.push_back( factory.createFilterFactoryFromProto(Envoy::ProtobufWkt::Empty(), *this)); } } } |
监听器管理器创建新的监听器后,可能需要为其创建监听套接字,这是通过调用ListenerComponentFactory实现的:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 |
Network::SocketSharedPtr ProdListenerComponentFactory::createListenSocket(Network::Address::InstanceConstSharedPtr address, const Network::Socket::OptionsSharedPtr& options, bool bind_to_port) { // 对于每个监听器配置,跨越所有工作线程共享单个网络监听套接字 // 使用UNIX管道(UDS) if (address->type() == Network::Address::Type::Pipe) { const std::string addr = fmt::format("unix://{}", address->asString()); const int fd = server_.hotRestart().duplicateParentListenSocket(addr); if (fd != -1) { return std::make_shared<Network::UdsListenSocket>(fd, address); } return std::make_shared<Network::UdsListenSocket>(address); } // 使用TCP const std::string addr = fmt::format("tcp://{}", address->asString()); // 尝试通过IPC请求,从父进程获取套接字文件描述符 const int fd = server_.hotRestart().duplicateParentListenSocket(addr); if (fd != -1) { // 获取到了 return std::make_shared<Network::TcpListenSocket>(fd, address, options); } // 自行创建套接字 return std::make_shared<Network::TcpListenSocket>(address, options, bind_to_port); } // 监听套接字构造函数 NetworkListenSocket(const Address::InstanceConstSharedPtr& address, const Network::Socket::OptionsSharedPtr& options, bool bind_to_port) // 进行系统调用,创建socket,返回其句柄 : ListenSocketImpl(address->socket(T::type), address) { RELEASE_ASSERT(fd_ != -1, ""); // 绑定前设置套接字选项 setPrebindSocketOptions(); // 创建底层套接字 setupSocket(options, bind_to_port); } // 套接字选项:重用地址 void NetworkListenSocket<NetworkSocketTrait<Address::SocketType::Stream>>::setPrebindSocketOptions() { int on = 1; // 提供系统调用封装 auto& os_syscalls = Api::OsSysCallsSingleton::get(); Api::SysCallIntResult status = os_syscalls.setsockopt(fd_, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)); } // 创建底层套接字 void ListenSocketImpl::setupSocket(const Network::Socket::OptionsSharedPtr& options, bool bind_to_port) { // 监听套接字选项 setListenSocketOptions(options); if (bind_to_port) { // 绑定套接字到地址 const Api::SysCallIntResult result = local_address_->bind(fd_); } } |
监听器管理创建/更新监听器后,会调用此方法:
1 2 3 4 5 6 7 8 9 10 11 12 |
void ListenerImpl::initialize() { last_updated_ = timeSource().systemTime(); // 如果工作线程已经启动,则不使用全局的Init管理器,而使用局部的/每个监听器专有的初始化管理器 if (workers_started_) { dynamic_init_manager_.initialize([this]() -> void { if (!initialize_canceled_) { // 预热后回调 parent_.onListenerWarmed(*this); } }); } } |
工作线程对象(WorkerImpl)是由监听器管理器在其构造函数中创建的。 ProdWorkerFactory::createWorker的实现如下:
1 2 3 4 5 6 7 8 9 |
WorkerPtr ProdWorkerFactory::createWorker(OverloadManager& overload_manager) { // 为工作线程创建一个事件分发器 Event::DispatcherPtr dispatcher(api_.allocateDispatcher(time_system_)); return WorkerPtr{new WorkerImpl( tls_, hooks_, std::move(dispatcher), // ConnectionHandler用于增删改查启禁监听器 Network::ConnectionHandlerPtr{new ConnectionHandlerImpl(ENVOY_LOGGER(), *dispatcher)}, overload_manager, api_)}; } |
工作线程的构造函数如下:
1 2 3 4 5 6 7 8 9 10 11 12 |
WorkerImpl::WorkerImpl(ThreadLocal::Instance& tls, TestHooks& hooks, Event::DispatcherPtr&& dispatcher, Network::ConnectionHandlerPtr handler, OverloadManager& overload_manager, Api::Api& api) : tls_(tls), hooks_(hooks), dispatcher_(std::move(dispatcher)), handler_(std::move(handler)), api_(api) { // 注册线程到TLS tls_.registerThread(*dispatcher_, false); // 过载后的行为,不再接受新连接 overload_manager.registerForAction( OverloadActionNames::get().StopAcceptingConnections, *dispatcher_, [this](OverloadActionState state) { stopAcceptingConnectionsCb(state); }); } |
需要注意,注册到TLS时,使用的是线程的分发器,而不是线程本身。
工作线程的启动,默认配置下,是由RunHelper注册给集群管理器的setInitializedCb回调触发的。此回调会初始化InitManager,默认配置下,注册到InitManager的唯一Target是LdsApiImpl。而InitManager的总回调就是InstanceImpl::startWorkers方法。
也就是说,当LDS客户端初始化完毕后,工作线程才会启动。
Envoy服务实例提供了启动线程的方法:
1 2 3 4 5 6 7 8 9 |
void InstanceImpl::startWorkers() { // 转调监听器管理器 listener_manager_->startWorkers(*guard_dog_); // 到这里,所有监听端口已经正常运作,可以接管(热重启场景下)所有流量了 // 通知父进程,drain掉所有它的监听器 restarter_.drainParentListeners(); // 下面的方法在新发动的主进程中调用,开始父进程的关闭逻辑,最终导致原来的主(父)进程终结 drain_manager_->startParentShutdownSequence(); } |
监听器管理器的同名方法内容如下:
1 2 3 4 5 6 7 8 9 10 11 12 |
void ListenerManagerImpl::startWorkers(GuardDog& guard_dog) { // 所有依赖初始化完毕,启动工作线程 workers_started_ = true; for (const auto& worker : workers_) { // 此时应该没有处于预热中的监听器 for (const auto& listener : active_listeners_) { // 将监听器添加到工作线程 addListenerToWorker(*worker, *listener); } worker->start(guard_dog); } } |
WorkerImpl::start方法就是启动一个物理线程:
1 2 3 |
void WorkerImpl::start(GuardDog& guard_dog) { thread_ = api_.threadFactory().createThread([this, &guard_dog]() -> void { threadRoutine(guard_dog); }); } |
在新线程中,会执行下面的函数:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
void WorkerImpl::threadRoutine(GuardDog& guard_dog) { // 和主线程类似,先创建看门狗 auto watchdog = guard_dog.createWatchDog(api_.threadFactory().currentThreadId()); watchdog->startWatchdog(*dispatcher_); // 然后阻塞的运行此线程的事件循环 dispatcher_->run(Event::Dispatcher::RunType::Block); // 事件循环退出,进程需要退出,关闭看门狗 guard_dog.stopWatching(watchdog); // 在实际退出线程之前,必须关闭所有活动的网络连接 // 这会阻止主线程运行任何可能引用线程本地变量的析构函数 // 析构网络连接处理器 handler_.reset(); // 准备退出当前线程,所有线程本地变量将被释放 tls_.shutdownThread(); watchdog.reset(); } |
只要当线程真正启动后,它的事件循环才会运作, ListenerManagerImpl::addListenerToWorker注册的关联监听器到工作线程的请求才会被执行。
在工作线程启动之前,需要关联到监听器:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
void ListenerManagerImpl::addListenerToWorker(Worker& worker, ListenerImpl& listener) { // 需要添加的监听器 添加成功或失败后执行的回调 worker.addListener(listener, [this, &listener](bool success) -> void { // 这些逻辑会在工作线程上执行,不会阻塞主线程的的post调用 server_.dispatcher().post([this, success, &listener]() -> void { // 理论上说,可能出现监听器添加到一个工作线程,却无法添加到其它工作线程的情况 if (!success && !listener.onListenerCreateFailure()) { // 除了记录日志以外,可能需要添加一个服务器选项,在发生这种以外时退出整个服务器 ENVOY_LOG(critical, "listener '{}' failed to listen on address '{}' on worker", listener.name(), listener.socket().localAddress()->asString()); stats_.listener_create_failure_.inc(); // 移除监听器 removeListener(listener.name()); } }); }); } void WorkerImpl::addListener(Network::ListenerConfig& listener, AddListenerCompletion completion) { // 监听器添加动作是在工作线程的Dispatcher.post中,异步的、在Dispatcher的事件循环上下文中执行的 // 我们必须处理监听器无法成功添加的情况:存在一种竞态条件,两个进程可以成功绑定到一个地址,但是以EADDRINUSE listen()却失败 dispatcher_->post([this, &listener, completion]() -> void { try { // 调用此线程的ConnectionHandler,添加监听器 handler_->addListener(listener); // 监听器添加完成后的钩子 hooks_.onWorkerListenerAdded(); // 执行回调 completion(true); } catch (const Network::CreateListenerException& e) { // 执行回调 completion(false); } }); } void ConnectionHandlerImpl::addListener(Network::ListenerConfig& config) { // 此连接处理器所管理的监听器对象 ActiveListenerPtr l(new ActiveListener(*this, config)); if (disable_listeners_) { l->listener_->disable(); } listeners_.emplace_back(config.socket().localAddress(), std::move(l)); } |
在集群管理器初始化一节中,我们了解到,如果Bootstrap配置中包含了ADS配置,则集群管理器会创建GrpcMuxImpl —— ADS API的客户端实现,用于通过gRPC协议,在连接到管理服务器的单个流中管理多个gRPC订阅。GrpcMuxImpl支持ADS订阅的同时,也可用于单种xDS API,例如EDS。
GrpcMuxImpl的start方法包括延迟创建到管理服务器的流的逻辑:
1 |
void GrpcMuxImpl::start() { establishNewStream(); } |
集群管理器在初始化完毕所有静态集群后,会调用上述方法。
GrpcMuxImpl的构造函数中也包括延迟创建到管理服务器的流的逻辑:
1 2 |
// 创建定时器 retry_timer_ = dispatcher.createTimer([this]() -> void { establishNewStream(); }); |
定时器是libevent支持的一种事件触发方式。在定时器到期后,会执行GrpcMuxImpl::establishNewStream方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
void GrpcMuxImpl::establishNewStream() { // 异步客户端,创建gRPC双向流。将GrpcMuxImpl自身作为回调(AsyncStreamCallbacks)传入 stream_ = async_client_->start(service_method_, *this); if (stream_ == nullptr) { handleFailure(); // 通过定时器重试 return; } // 设置统计指标 control_plane_stats_.connected_state_.set(1); // subscriptions_为订阅列表,最初值为:type.googleapis.com/envoy.api.v2.Cluster for (const auto type_url : subscriptions_) { queueDiscoveryRequest(type_url); } } |
queueDiscoveryRequest会调用sendDiscoveryRequest发送xDS请求:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
void GrpcMuxImpl::queueDiscoveryRequest(const std::string& type_url) { // 排队,然后触发所有请求处理 request_queue_.push(type_url); drainRequests(); } void GrpcMuxImpl::drainRequests() { while (!request_queue_.empty()) { if (!rate_limiting_enabled_ || limit_request_->consume()) { // 不限速 // 逐个发送请求 sendDiscoveryRequest(request_queue_.front()); request_queue_.pop(); } else { // 限速 drain_request_timer_->enableTimer( std::chrono::milliseconds(limit_request_->nextTokenAvailableMs())); break; } } } void GrpcMuxImpl::sendDiscoveryRequest(const std::string& type_url) { ... stream_->sendMessage(request, false); ... } |
除了establishNewStream以外,subscribe方法也会调用queueDiscoveryRequest。各种xDS API的客户端实现,例如CdsApiImpl,都会调用subscribe方法,来订阅某种资源:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
GrpcMuxWatchPtr GrpcMuxImpl::subscribe(const std::string& type_url, const std::vector<std::string>& resources, GrpcMuxCallbacks& callbacks) { // GrpcMuxWatch是多路复用gRPC订阅的句柄,此句柄销毁后,订阅被取消 auto watch = std::unique_ptr<GrpcMuxWatch>(new GrpcMuxWatchImpl(resources, callbacks, type_url, *this)); // 如果这种类型的API尚未订阅,则订阅之 if (!api_state_[type_url].subscribed_) { api_state_[type_url].request_.set_type_url(type_url); api_state_[type_url].request_.mutable_node()->MergeFrom(local_info_.node()); api_state_[type_url].subscribed_ = true; subscriptions_.emplace_back(type_url); } // 发送订阅请求 queueDiscoveryRequest(type_url); return watch; } |
ADS客户端启动后,Istio Pilot随时可能将动态的集群信息推送过来。
推送信息经过一系列的处理(包括Envoy的过滤器链)后,被onReceiveMessage处理:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 |
void GrpcMuxImpl::onReceiveMessage(std::unique_ptr<envoy::api::v2::DiscoveryResponse>&& message) { // type_url,例如 type.googleapis.com/envoy.api.v2.Cluster const std::string& type_url = message->type_url(); if (api_state_.count(type_url) == 0) { // 当前没有订阅者 return; } if (api_state_[type_url].watches_.empty()) { // 更新nonce值,此值由服务器提供,标识最后一次响应 api_state_[type_url].request_.set_response_nonce(message->nonce()); if (message->resources().empty()) { // 没有订阅句柄也没有资源,可能的原因是,Envoy解除了某个资源的订阅,而且此资源也从管理服务器上删除掉了 // 例如,一个被删除的集群,触发对ClusterLoadAssignment的unwatch,与此同时XDS服务器发送了 // ClusterLoadAssignment的空列表过来 // 接受此资源,但是不再发送请求,原因是没有watch了 api_state_[type_url].request_.set_version_info(message->version_info()); } else { // 没有订阅句柄(GrpcMuxWatchImpl),但是获取了资源,不应该发生 // 发送一个NACK(不更新version_info) queueDiscoveryRequest(type_url); } return; } try { // 使用一个新的映射存储资源 std::unordered_map<std::string, ProtobufWkt::Any> resources; GrpcMuxCallbacks& callbacks = api_state_[type_url].watches_.front()->callbacks_; for (const auto& resource : message->resources()) { if (type_url != resource.type_url()) { throw EnvoyException("type_url不匹配"); } const std::string resource_name = callbacks.resourceName(resource); resources.emplace(resource_name, resource); } // 遍历订阅句柄 for (auto watch : api_state_[type_url].watches_) { // 如果当前没有资源 if (watch->resources_.empty()) { // 执行SubscriptionCallbacks.onConfigUpdate回调 // 各资源客户端,例如CdsApiImpl,就是Config::SubscriptionCallbacks<envoy::api::v2::Cluster>的实现 // 这导致转调CdsApiImpl.onConfigUpdate watch->callbacks_.onConfigUpdate(message->resources(), message->version_info()); continue; } // 如果当前有资源,则针对新资源调用onConfigUpdate Protobuf::RepeatedPtrField<ProtobufWkt::Any> found_resources; for (auto watched_resource_name : watch->resources_) { auto it = resources.find(watched_resource_name); if (it != resources.end()) { found_resources.Add()->MergeFrom(it->second); } } watch->callbacks_.onConfigUpdate(found_resources, message->version_info()); } // 更新资源版本号 api_state_[type_url].request_.set_version_info(message->version_info()); } catch (const EnvoyException& e) { for (auto watch : api_state_[type_url].watches_) { // 调用更新失败回调 watch->callbacks_.onConfigUpdateFailed(&e); } ::google::rpc::Status* error_detail = api_state_[type_url].request_.mutable_error_detail(); error_detail->set_code(Grpc::Status::GrpcStatus::Internal); error_detail->set_message(e.what()); } // 更新nonce api_state_[type_url].request_.set_response_nonce(message->nonce()); // 发起下一次发现请求 queueDiscoveryRequest(type_url); } |
可以看到onReceiveMessage会转调各订阅者提供的回调函数,完毕后则会触发下一次发现请求。
CDS客户端是在集群管理器初始化阶段创建的:
1 2 3 4 5 6 7 8 9 10 |
ClusterManagerImpl::ClusterManagerImpl(){ if (bootstrap.dynamic_resources().has_cds_config()) { // ProdClusterManagerFactory::createCds // CdsApiImpl::create cds_api_ = factory_.createCds(bootstrap.dynamic_resources().cds_config(), eds_config_, *this); init_helper_.setCds(cds_api_.get()); } else { init_helper_.setCds(nullptr); } } |
构造函数如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
CdsApiImpl::CdsApiImpl(const envoy::api::v2::core::ConfigSource& cds_config, const absl::optional<envoy::api::v2::core::ConfigSource>& eds_config, ClusterManager& cm, Event::Dispatcher& dispatcher, Runtime::RandomGenerator& random, const LocalInfo::LocalInfo& local_info, Stats::Scope& scope) // 创建一个stats的scope(前缀) : cm_(cm), scope_(scope.createScope("cluster_manager.cds.")) { // 检查本地节点信息 Config::Utility::checkLocalInfo("cds", local_info); subscription_ = Config::SubscriptionFactory::subscriptionFromConfigSource<envoy::api::v2::Cluster>( cds_config, local_info, dispatcher, cm, random, *scope_, // 遗留的REST构造器 [this, &cds_config, &eds_config, &cm, &dispatcher, &random, &local_info, &scope]() -> Config::Subscription<envoy::api::v2::Cluster>* { return new CdsSubscription(Config::Utility::generateStats(*scope_), cds_config, eds_config, cm, dispatcher, random, local_info, scope.statsOptions()); }, // REST方法 "envoy.api.v2.ClusterDiscoveryService.FetchClusters", // GRPC方法 "envoy.api.v2.ClusterDiscoveryService.StreamClusters"); } |
在每个集群初始化后都会触发的maybeFinishInitialize中,进行CDS客户端初始化:
1 2 3 4 5 6 7 8 |
void ClusterManagerInitHelper::maybeFinishInitialize() { ... if (state_ == State::WaitingForStaticInitialize && cds_) { state_ = State::WaitingForCdsInitialize; cds_->initialize(); } else { } } |
初始化逻辑很简单,就是启动订阅:
1 |
void initialize() override { subscription_->start({}, *this); } |
CdsApiImpl.subscription_字段类型为 Config::Subscription<envoy::api::v2::Cluster>,就是上文提到的GrpcMuxImpl,CDS会订阅type.googleapis.com/envoy.api.v2.Cluster这种资源。
GrpcMuxImpl::onReceiveMessage对接收到的推送进行处理后,转交给CdsApiImpl::onConfigUpdate处理:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
void CdsApiImpl::onConfigUpdate(const ResourceVector& resources, const std::string& version_info) { // 暂定对端点资源的发现请求。在处理LDS/CDS更新时,为了避免RDS/EDS泛洪更新,调用此方法 cm_.adsMux().pause(Config::TypeUrl::get().ClusterLoadAssignment); // 此方法结束后恢复端点资源的发现请求 Cleanup eds_resume([this] { cm_.adsMux().resume(Config::TypeUrl::get().ClusterLoadAssignment); }); // 遍历集群,获取名称,如果存在重复项,抛出异常 std::unordered_set<std::string> cluster_names; for (const auto& cluster : resources) { if (!cluster_names.insert(cluster.name()).second) { throw EnvoyException(fmt::format("duplicate cluster {} found", cluster.name())); } } // 进行消息验证 for (const auto& cluster : resources) { MessageUtil::validate(cluster); } // 跟踪哪些集群需要被移除。每次CDS更新,都把所有集群推送过来,没有在推送列表中的动态集群,需要移除 ClusterManager::ClusterInfoMap clusters_to_remove = cm_.clusters(); for (auto& cluster : resources) { const std::string cluster_name = cluster.name(); clusters_to_remove.erase(cluster_name); // 添加或更新集群 if (cm_.addOrUpdateCluster(cluster, version_info)) { ENVOY_LOG(debug, "cds: add/update cluster '{}'", cluster_name); } } for (auto cluster : clusters_to_remove) { const std::string cluster_name = cluster.first; // 删除集群 if (cm_.removeCluster(cluster_name)) { ENVOY_LOG(debug, "cds: remove cluster '{}'", cluster_name); } } version_info_ = version_info; runInitializeCallbackIfAny(); } |
addOrUpdateCluster负责添加或更新集群:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 |
bool ClusterManagerImpl::addOrUpdateCluster(const envoy::api::v2::Cluster& cluster, const std::string& version_info) { // 首先判断这个集群是新的,还是对既有动态集群的更新 // 同时检查预热中的集群列表、活动集群列表,确定是否需要执行更新,或者更新需要被阻止。阻止条件 // 1、静态配置的集群不允许在主配置(相对Bootstrap配置)中更新 // 2、或,配置哈希没变化 const std::string cluster_name = cluster.name(); const auto existing_active_cluster = active_clusters_.find(cluster_name); const auto existing_warming_cluster = warming_clusters_.find(cluster_name); const uint64_t new_hash = MessageUtil::hash(cluster); if ((existing_active_cluster != active_clusters_.end() && existing_active_cluster->second->blockUpdate(new_hash)) || (existing_warming_cluster != warming_clusters_.end() && existing_warming_cluster->second->blockUpdate(new_hash))) { return false; } if (existing_active_cluster != active_clusters_.end() || existing_warming_cluster != warming_clusters_.end()) { // 已经初始化的情况下,下面的调用没有作用。其本意是从对应待初始化列表中移除集群 init_helper_.removeCluster(*existing_active_cluster->second->cluster_); cm_stats_.cluster_modified_.inc(); } else { cm_stats_.cluster_added_.inc(); } // 添加/修改集群的时机不同,则逻辑完全不同: // 1、在Envoy服务初始化时期,使用Init管理器来处理和主/次集群、静态/CDS集群、预热所有集群相关的复杂逻辑 // 2、在初始化之后,为每个集群独立的处理预热 // // 注意:将所有预热逻辑集中到Init管理器中是可能的,但是为了不让Init管理器更加复杂,这里独立处理了。未来可能进行重构 // 统一处理 // 所有集群初始化完毕之后use_active_map=true const bool use_active_map = init_helper_.state() != ClusterManagerInitHelper::State::AllClustersInitialized; // 加入到活动or预热ClusterMap中 loadCluster(cluster, version_info, true, use_active_map ? active_clusters_ : warming_clusters_); if (use_active_map) { auto& cluster_entry = active_clusters_.at(cluster_name); // 更新各线程本地信息 createOrUpdateThreadLocalCluster(*cluster_entry); // 调用集群初始化助手添加集群 init_helper_.addCluster(*cluster_entry->cluster_); } else { auto& cluster_entry = warming_clusters_.at(cluster_name); // 初始化集群,并在回调中预热集群 cluster_entry->cluster_->initialize([this, cluster_name] { auto warming_it = warming_clusters_.find(cluster_name); auto& cluster_entry = *warming_it->second; updates_map_.erase(cluster_name); active_clusters_[cluster_name] = std::move(warming_it->second); warming_clusters_.erase(warming_it); // 预热完成 createOrUpdateThreadLocalCluster(cluster_entry); // 调用ClusterManagerImpl::onClusterInit // 此时集群还没有设置以支持跨线程更新,原因是避免在初始化阶段进行不必要的更新 // 如果必要,该方法会首先初始化线程感知的负载均衡器 onClusterInit(*cluster_entry.cluster_); updateGauges(); }); } updateGauges(); return true; } |
addOrUpdateCluster可能会调用ClusterManagerInitHelper::addCluster,就是集群管理器初始化,针对静态配置的集群调用的方法。但是这次走针对次集群的分支。
EdsClusterImpl是一种集群实现,它基于EDS协议发现自己的主机(端点,对应Envoy API是ClusterLoadAssignment.endpoints_)。
EdsClusterImpl同时充当了EDS客户端的角色。CDS客户端是单例的,而每个EDS集群都是EDS客户端,它们各自独立向ADS服务器发起订阅。
Eds集群本身可能就是通过CDS订阅获得的,在EdsClusterImpl的构造函数中,它会创建订阅:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
EdsClusterImpl::EdsClusterImpl( // 集群配置 运行时加载器 const envoy::api::v2::Cluster& cluster, Runtime::Loader& runtime, Server::Configuration::TransportSocketFactoryContext& factory_context, // 监控指标scope 提示是否通过xDS添加了此集群 Stats::ScopePtr&& stats_scope, bool added_via_api) // 父对象 : BaseDynamicClusterImpl(cluster, runtime, factory_context, std::move(stats_scope), added_via_api), // 集群管理器 本地信息 cm_(factory_context.clusterManager()), local_info_(factory_context.localInfo()), // 获取集群名称 cluster_name_(cluster.eds_cluster_config().service_name().empty() ? cluster.name() : cluster.eds_cluster_config().service_name()) { // 检查本地信息 Config::Utility::checkLocalInfo("eds", local_info_); // 获取EDS配合自 const auto& eds_config = cluster.eds_cluster_config().eds_config(); Event::Dispatcher& dispatcher = factory_context.dispatcher(); Runtime::RandomGenerator& random = factory_context.random(); Upstream::ClusterManager& cm = factory_context.clusterManager(); // 创建ClusterLoadAssignment订阅 subscription_ = Config::SubscriptionFactory::subscriptionFromConfigSource<envoy::api::v2::ClusterLoadAssignment>( // 配置源envoy::api::v2::core::ConfigSource eds_config, local_info_, dispatcher, cm, random, info_->statsScope(), // 遗留的v1 REST订阅构造器 [this, &eds_config, &cm, &dispatcher, &random]() -> Config::Subscription<envoy::api::v2::ClusterLoadAssignment>* { return new SdsSubscription(info_->stats(), eds_config, cm, dispatcher, random); }, // REST订阅方法 "envoy.api.v2.EndpointDiscoveryService.FetchEndpoints", // GRPC订阅方法 "envoy.api.v2.EndpointDiscoveryService.StreamEndpoints"); } |
在EDS集群的preInit阶段,会启动订阅,只会订阅当前集群的端点:
1 2 |
// subscription_的start方法支持传入一个vector,提供需要抓取的资源的名称 void EdsClusterImpl::startPreInit() { subscription_->start({cluster_name_}, *this); } |
GrpcMuxImpl::onReceiveMessage对接收到的推送进行处理后,转交给EdsClusterImpl::onConfigUpdate处理:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 |
void EdsClusterImpl::onConfigUpdate(const ResourceVector& resources, const std::string&) { if (resources.empty()) { // 当前集群没有ClusterLoadAssignment info_->stats().update_empty_.inc(); // 仍然需要进行预初始化 onPreInitComplete(); return; } // EDS资源长度必须为1 if (resources.size() != 1) { throw EnvoyException(fmt::format("Unexpected EDS resource length: {}", resources.size())); } // ClusterLoadAssignme const auto& cluster_load_assignment = resources[0]; // 消息合法性验证 MessageUtil::validate(cluster_load_assignment); // 被更新主机列表 std::unordered_map<std::string, HostSharedPtr> updated_hosts; // 负责管理一个集群的PriorityState,PriorityState持有每个优先级对应的 主机集+对应的位置权重图 PriorityStateManager priority_state_manager(*this, local_info_); // locality_lb_endpoint不是一个端点,而是一个位置(例如可用性区域)中所有端点的集合, // 它具有权重(相对于同优先级的其它locality)、优先级等属性 for (const auto& locality_lb_endpoint : cluster_load_assignment.endpoints()) { const uint32_t priority = locality_lb_endpoint.priority(); if (priority > 0 && !cluster_name_.empty() && cluster_name_ == cm_.localClusterName()) { // 对于本地集群,端点路由优先级必须是0 throw EnvoyException(fmt::format("Unexpected non-zero priority for local cluster '{}'.", cluster_name_)); } // 放入 PriorityState 也就是 std::vector<std::pair<HostListPtr, LocalityWeightsMap>> 中 // vector是以优先级为索引的列表,HostListPtr是Host列表,LocalityWeightsMap是Locality到Weight的映射 priority_state_manager.initializePriorityFor(locality_lb_endpoint); for (const auto& lb_endpoint : locality_lb_endpoint.lb_endpoints()) { // 遍历端点列表 // 先创建HostImpl,然后存放到PriorityState中 priority_state_manager.registerHostForPriority( "", resolveProtoAddress(lb_endpoint.endpoint().address()), locality_lb_endpoint, // 设置主机状态为不健康 lb_endpoint, Host::HealthFlag::FAILED_EDS_HEALTH); } } // 跟踪是否重建负载均衡结构 bool cluster_rebuilt = false; const uint32_t overprovisioning_factor = PROTOBUF_GET_WRAPPED_OR_DEFAULT( cluster_load_assignment.policy(), overprovisioning_factor, kDefaultOverProvisioningFactor); // 遍历新配置中所有优先级 auto& priority_state = priority_state_manager.priorityState(); for (size_t i = 0; i < priority_state.size(); ++i) { if (priority_state[i].first != nullptr) { if (locality_weights_map_.size() <= i) { locality_weights_map_.resize(i + 1); } // 为某个优先级的所有locality更新主机列表 cluster_rebuilt |= updateHostsPerLocality( i, overprovisioning_factor, *priority_state[i].first, locality_weights_map_[i], priority_state[i].second, priority_state_manager, updated_hosts); } } // 遍历所有不在配置中的优先级 for (size_t i = priority_state.size(); i < priority_set_.hostSetsPerPriority().size(); ++i) { const HostVector empty_hosts; LocalityWeightsMap empty_locality_map; if (locality_weights_map_.size() <= i) { locality_weights_map_.resize(i + 1); } cluster_rebuilt |= updateHostsPerLocality(i, overprovisioning_factor, empty_hosts, locality_weights_map_[i], empty_locality_map, priority_state_manager, updated_hosts); } all_hosts_ = std::move(updated_hosts); if (!cluster_rebuilt) { info_->stats().update_no_rebuild_.inc(); } // Preinit完成回调 onPreInitComplete(); } |
LDS客户端是作为InitManager的target之一,进行初始化的。InitManager会先初始化它的所有目标,最后初始化它自己。
InitManager会在集群管理器初始化完毕——所有集群都添加之后被调用。
如果配置了动态LDS资源,则在InstanceImpl的初始化过程中,会创建LDS客户端:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 |
if (bootstrap_.dynamic_resources().has_lds_config()) { listener_manager_->createLdsApi(bootstrap_.dynamic_resources().lds_config()); } void ListenerManagerImpl::createLdsApi(const envoy::api::v2::core::ConfigSource& lds_config) override { ASSERT(lds_api_ == nullptr); lds_api_ = factory_.createLdsApi(lds_config); } LdsApiPtr ProdListenerComponentFactory::createLdsApi(const envoy::api::v2::core::ConfigSource& lds_config) override { return std::make_unique<LdsApiImpl>( lds_config, server_.clusterManager(), server_.dispatcher(), server_.random(), server_.initManager(), server_.localInfo(), server_.stats(), server_.listenerManager()); } LdsApiImpl::LdsApiImpl(const envoy::api::v2::core::ConfigSource& lds_config, Upstream::ClusterManager& cm, Event::Dispatcher& dispatcher, Runtime::RandomGenerator& random, Init::Manager& init_manager, const LocalInfo::LocalInfo& local_info, Stats::Scope& scope, ListenerManager& lm) : listener_manager_(lm), scope_(scope.createScope("listener_manager.lds.")), cm_(cm) { // std::unique_ptr<Config::Subscription<envoy::api::v2::Listener>> subscription_; // 订阅,获得一个GrpcMuxSubscriptionImpl对象 subscription_ = Envoy::Config::SubscriptionFactory::subscriptionFromConfigSource<envoy::api::v2::Listener>( lds_config, local_info, dispatcher, cm, random, *scope_, [this, &lds_config, &cm, &dispatcher, &random, &local_info, &scope]() -> Config::Subscription<envoy::api::v2::Listener>* { return new LdsSubscription(Config::Utility::generateStats(*scope_), lds_config, cm, dispatcher, random, local_info, scope.statsOptions()); }, "envoy.api.v2.ListenerDiscoveryService.FetchListeners", "envoy.api.v2.ListenerDiscoveryService.StreamListeners"); Config::Utility::checkLocalInfo("lds", local_info); // 向初始化管理器注册自己 init_manager.registerTarget(*this); } |
初始化管理器会调用:
1 2 3 4 |
void LdsApiImpl::initialize(std::function<void()> callback) { initialize_callback_ = callback; subscription_->start({}, *this); } |
导致LDS订阅启动。
GrpcMuxImpl::onReceiveMessage对接收到的推送进行处理后,转交给LdsApiImpl::onConfigUpdate处理:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 |
void LdsApiImpl::onConfigUpdate(const ResourceVector& resources, const std::string& version_info) { // 暂停RDS订阅并在此函数执行完毕后恢复 cm_.adsMux().pause(Config::TypeUrl::get().RouteConfiguration); Cleanup rds_resume([this] { cm_.adsMux().resume(Config::TypeUrl::get().RouteConfiguration); }); // 返回结果中包含重复的监听器,不允许 std::unordered_set<std::string> listener_names; for (const auto& listener : resources) { if (!listener_names.insert(listener.name()).second) { throw EnvoyException(fmt::format("duplicate listener {} found", listener.name())); } } // 验证每个消息的合法性 for (const auto& listener : resources) { MessageUtil::validate(listener); } // 跟踪需要移除的监听器 std::unordered_map<std::string, std::reference_wrapper<Network::ListenerConfig>> listeners_to_remove; // 将所有现存监听器存放到一个映射中 for (const auto& listener : listener_manager_.listeners()) { listeners_to_remove.emplace(listener.get().name(), listener); } // 对于存在于本次订阅结果中的监听器,不被移除 for (const auto& listener : resources) { listeners_to_remove.erase(listener.name()); } // 不存在的监听器,移除。必须全量推送? for (const auto& listener : listeners_to_remove) { if (listener_manager_.removeListener(listener.first)) { } } // 添加或更新监听器,逻辑类似于静态监听器 for (const auto& listener : resources) { const std::string listener_name = listener.name(); try { // 如果新旧配置完全一样,添加会失败 if (listener_manager_.addOrUpdateListener(listener, version_info, true)) { ENVOY_LOG(info, "lds: add/update listener '{}'", listener_name); } else { ENVOY_LOG(debug, "lds: add/update listener '{}' skipped", listener_name); } } catch (const EnvoyException& e) { throw EnvoyException(fmt::format("Error adding/updating listener {}: {}", listener_name, e.what())); } } version_info_ = version_info; // LDS的初始化回调尚未调用,调用之 runInitializeCallbackIfAny(); } |
该组件一方面实现了完整的xDS协议管理服务器端,一方面对接底层基础设施(K8S),从中获取各种各样的资源。
Discovery关注的K8S资源类型包括:
- Ingress:需要此资源的原因是,Istio可以作为Ingress Controller
- Istio CRD,Istio需要:
- VirtualService、DestinationRule来构建Envoy的Cluster配置、Listener路由规则
- Gateway来为作为网关的Envoy代理(ingressgateway、egressgateway,或者任Pod)提供监听器,并由绑定到网关的VirtualService提供路由表
- ServiceEntry来创建一部分Istio服务,进而构建Envoy的Cluster配置、Listener配置
- Service:Istio需要这种资源创建Istio服务,进而构建Envoy的Cluster配置、Listener配置
- Pod:Istio需要这种资源为EDS集群更新工作负载
当Discovery发现上述某种资源变更后,会准备好一个推送上下文,并且依据此上下文生成所需的Envoy配置,推送给Envoy代理。
推送服务由DiscoveryService负责,它转调DiscoveryServer。后者提供了Envoy v2 xDS API的gRPC客户端实现,其逻辑在ads.go、cds.go、eds.go、lds.go、rds.go等多个源文件中实现,对应了xDS API的不同部分。
所有连接到Discovery的、发起了订阅请求的代理,都会接收到推送。
该组件包装了Envoy代理,主要实现以下功能:
- 从模板生成Bootstrap配置
- 守护Envoy代理,如果宕机重新启动
- 当Bootstrap配置文件更新后,重现配置Envoy代理
- 更新数字证书
Proxy有多种运行模式:sidecar、ingress、proxy,分别用作普通Pod的边车、Ingress控制器、入口/出口网关。
该组件实现了完整的xDS协议客户端。和Pilot Discovery的交互主要由GrpcMuxImpl负责,它是基于gRPC的ADS实现。
除了通过Bootstrap配置加载的静态资源,其它资源都需要通过xDS协议,向Pilot Discovery订阅以获取。GrpcMuxImpl获得资源推送后,会调用CDS、EDS、LDS、RDS等API客户端:
- CDS,动态加载集群,可能创建EDS
- EDS,为EDS集群更新端点
- LDS,动态加载监听器,可能触发工作线程的启动
[…] https://blog.gmem.cc/interaction-between-istio-pilot-and-envoy […]
[…] https://blog.gmem.cc/interaction-between-istio-pilot-and-envoy […]