Istio Mixer与Envoy的交互机制解读
在前些日子的文章Istio Pilot与Envoy的交互机制解读中我们详细研究了Istio Pilot如何基于xDS协议和Envoy代理进行各种配置信息的交换。Istio的另一个核心组件是Mixer,它提供三类功能:
- 遥测报告(Telemetry Reporting),该功能是服务网格可观察性的基础。为服务启用日志记录、监控、追踪、计费流
- 前置条件检查(Precondition Checking),响应服务请求之前进行一系列检查,例如身份验证、白名单检查、ACL检查
- 配额管理(Quota Management),基于特定的维度进行配额,控制对受限资源的争用
本文结合源码分析Mixer的设计、实现细节,同时关注Envoy与它的集成机制。
Mixer的代码位于mixer目录下:
子目录 | 说明 |
adapter | 包含各种适配器的实现,适配器封装了Mixer和外部基础设施后端(例如Prometheus)交互的逻辑 |
cmd |
包含以下可执行文件的入口点: mixc 用于和Mixer服务器实例进行交互的命令行客户端 mixs 在本地启动一个Mixer服务器,或者列出可用的CRD、探测Mixer服务器的状态 |
docker | Docker镜像定义 |
template |
模板,Mixer架构的基础构建块,通过自定义模板可以扩展Mixer 模板定义了将请求属性(Attribute)转换为适配器的输入的Schema(类型信息,使用Protubuf语法描述),每个适配器可以支持任意数量的template 模板决定了适配器会收到的数据、也决定了使用适配器必须创建的instance |
如果使用Istio官方默认的Chart来部署,则会创建istio-telemetry、istio-policy两套Deployment。它们的启动参数没有区别,分别负责Mixer的遥测、策略检查。这两个Deployment分别对应同名的Service,监听9091端口。
实际上网络监听是由Mixs的Sidecar,也就是Envoy负责的。Mixs Pod本身监听的是UDS unix:///sock/mixer.socket,Envoy负责将9091端口的请求转发给此UDS。
在本地调试Mixer服务端时,参考如下启动参数:
1 2 |
mixs server --port 9091 --monitoringPort 9099 --log_output_level api:debug \ --configStoreURL=k8s:///home/alex/.kube/config --configDefaultNamespace=istio-system |
mixs server的入口点位于:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
func main() { // supportedTemplates() map[string]template.Info // supportedAdapters() []adptr.InfoFn // 这两个方法都是自动生成的,包含编译的Mixer支持的模板、适配器的列表 // 模板/适配器信息中包含其属性清单 rootCmd := cmd.GetRootCmd(os.Args[1:], supportedTemplates(), supportedAdapters(), shared.Printf, shared.Fatalf) if err := rootCmd.Execute(); err != nil { os.Exit(-1) } } func serverCmd(info map[string]template.Info, adapters []adapter.InfoFn, printf, fatalf shared.FormatFn) *cobra.Command { // 默认Mixer参数 sa := server.DefaultArgs() // 使用自动生成的模板、适配器信息 sa.Templates = info sa.Adapters = adapters serverCmd := &cobra.Command{ Use: "server", Short: "Starts Mixer as a server", Run: func(cmd *cobra.Command, args []string) { // 调用runServer启动服务 runServer(sa, printf, fatalf) }, } } func runServer(sa *server.Args, printf, fatalf shared.FormatFn) { // 创建服务器对象 s, err := server.New(sa) // 启动gRPC服务 s.Run() // 等待shutdown信号可读 err = s.Wait() // 执行清理工作 _ = s.Close() } |
该函数创建一个全功能的Mixer服务器,并且准备好接收请求:
1 2 3 |
func New(a *Args) (*Server, error) { return newServer(a, newPatchTable()) } |
该方法启动Mixs服务器:
1 2 3 4 5 6 7 8 9 10 11 12 13 |
func (s *Server) Run() { // 准备好关闭通道 s.shutdown = make(chan error, 1) // 设置可用性状态,并通知探针控制器,探针被嵌入到Server s.SetAvailable(nil) go func() { // 启动gRPC服务,传入原始套接字的监听器对象 err := s.server.Serve(s.listener) // 关闭通道 s.shutdown <- err }() } |
该方法很简单,就是在shutdown通道上等待。
该方法关闭Mixs服务器使用的各种资源。
newPatchTable创建一个新的patchTable结构:
1 2 3 4 5 6 7 8 9 10 |
func newPatchTable() *patchTable { return &patchTable{ newRuntime: runtime.New, configTracing: tracing.Configure, startMonitor: startMonitor, listen: net.Listen, configLog: log.Configure, runtimeListen: func(rt *runtime.Runtime) error { return rt.StartListening() }, } } |
此结构就是几个函数的集合:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
type patchTable struct { // 此函数创建一个Runtime,Runtime是Mixer运行时环境的主要入口点 // 它监听配置、实例化Handler、创建分发机制(dispatch machinery)、处理请求 newRuntime func(s store.Store, templates map[string]*template.Info, adapters map[string]*adapter.Info, defaultConfigNamespace string, executorPool *pool.GoroutinePool, handlerPool *pool.GoroutinePool, enableTracing bool) *runtime.Runtime // 配置追踪系统,通常在启动时调用一次,此调用返回后,追踪系统可以接受数据 configTracing func(serviceName string, options *tracing.Options) (io.Closer, error) // 暴露Mixer自我监控信息的HTTP服务 startMonitor func(port uint16, enableProfiling bool, lf listenFunc) (*monitor, error) // 监听本地端口并返回一个监听器 listen listenFunc // 配置Istio的日志子系统 configLog func(options *log.Options) error // 让Runtime开始监听配置变更,每当配置变更,Runtime处理新配置并创建Dispatcher runtimeListen func(runtime *runtime.Runtime) error } |
此方法创建一个新的Mixs服务器,服务器由下面的结构表示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
type Server struct { // 关闭通道 shutdown chan error // 服务API请求的gRPC服务器 server *grpc.Server // API线程池 gp *pool.GoroutinePool // 适配器线程池 adapterGP *pool.GoroutinePool // API网络监听器 listener net.Listener // 监控服务器,此结构包含两个字段,一个是http.Server,一个是关闭通道 monitor *monitor // 用于关闭追踪子系统 tracer io.Closer // 可伸缩的策略检查缓存 checkCache *checkcache.Cache // 将入站API调用分发给配置好的适配器 dispatcher dispatcher.Dispatcher livenessProbe probe.Controller readinessProbe probe.Controller // 管理探针控制器所需要的可用性状态,内嵌 *probe.Probe } |
该方法的逻辑如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 |
func newServer(a *Args, p *patchTable) (*Server, error) { // 校验Mixs启动参数 if err := a.validate(); err != nil { return nil, err } // 配置日志子系统 if err := p.configLog(a.LoggingOptions); err != nil { return nil, err } apiPoolSize := a.APIWorkerPoolSize adapterPoolSize := a.AdapterWorkerPoolSize s := &Server{} // 创建线程池 // API 线程池 s.gp = pool.NewGoroutinePool(apiPoolSize, a.SingleThreaded) s.gp.AddWorkers(apiPoolSize) // 适配器线程池 s.adapterGP = pool.NewGoroutinePool(adapterPoolSize, a.SingleThreaded) s.adapterGP.AddWorkers(adapterPoolSize) tmplRepo := template.NewRepository(a.Templates) // 从适配器名称到adapter.Info的映射 adapterMap := config.AdapterInfoMap(a.Adapters, tmplRepo.SupportsTemplate) // 状态探针 s.Probe = probe.NewProbe() // gRPC选项 var grpcOptions []grpc.ServerOption grpcOptions = append(grpcOptions, grpc.MaxConcurrentStreams(uint32(a.MaxConcurrentStreams))) grpcOptions = append(grpcOptions, grpc.MaxMsgSize(int(a.MaxMessageSize))) // 一元(请求/应答模式)gRPC请求的服务器端拦截器 var interceptors []grpc.UnaryServerInterceptor var err error // 如果启用了追踪(tracing.option提供了ZipkinURL、JaegerURL或LogTraceSpans=true) if a.TracingOptions.TracingEnabled() { s.tracer, err = p.configTracing("istio-mixer", a.TracingOptions) if err != nil { _ = s.Close() return nil, fmt.Errorf("unable to setup tracing") } // 则添加基于OpenTracing的追踪拦截器 interceptors = append(interceptors, otgrpc.OpenTracingServerInterceptor(ot.GlobalTracer())) } // OpenTracing、Prometheus监控拦截器,都来自项目https://github.com/grpc-ecosystem // 将Prometheus拦截器添加到末尾 interceptors = append(interceptors, grpc_prometheus.UnaryServerInterceptor) // 启用Prometheus时间直方图记录,RPC调用的耗时会被记录。Prometheus持有、查询Histogram指标的成本比较高 // 生成的指标都是面向gRPC协议的、通用的,不牵涉Istio的逻辑。指标名以grpc_开头 grpc_prometheus.EnableHandlingTimeHistogram() // 将所有拦截器串连为单个拦截器,并添加到gRPC选项 grpcOptions = append(grpcOptions, grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(interceptors...))) network := "tcp" address := fmt.Sprintf(":%d", a.APIPort) if a.APIAddress != "" { idx := strings.Index(a.APIAddress, "://") if idx < 0 { address = a.APIAddress } else { network = a.APIAddress[:idx] address = a.APIAddress[idx+3:] } } if network == "unix" { // 如果监听UDS,则移除先前的文件 if err = os.Remove(address); err != nil && !os.IsNotExist(err) { // 除了文件未找到以外的错误,都不允许 return nil, fmt.Errorf("unable to remove unix://%s: %v", address, err) } } // 调用net.Listen监听 if s.listener, err = p.listen(network, address); err != nil { _ = s.Close() return nil, fmt.Errorf("unable to listen: %v", err) } // ConfigStore用于测试目的,通常都会使用ConfigStoreURL(例如k8s:///home/alex/.kube/config) st := a.ConfigStore if st != nil && a.ConfigStoreURL != "" { _ = s.Close() return nil, fmt.Errorf("invalid arguments: both ConfigStore and ConfigStoreURL are specified") } if st == nil { configStoreURL := a.ConfigStoreURL if configStoreURL == "" { configStoreURL = "k8s://" } // Registry存储URL scheme与后端实现之间的对应关系 reg := store.NewRegistry(config.StoreInventory()...) groupVersion := &schema.GroupVersion{Group: crd.ConfigAPIGroup, Version: crd.ConfigAPIVersion} // 创建一个Store实例,它持有Backend,Backend代表一个无类型的Mixer存储后端 —— 例如K8S // 默认情况下,configStoreURL的Scheme为k8s,Istio会调用config/crd.NewStore // 传入configStoreURL、GroupVersion、criticalKinds 来创建Backend if st, err = reg.NewStore(configStoreURL, groupVersion, rc.CriticalKinds()); err != nil { _ = s.Close() return nil, fmt.Errorf("unable to connect to the configuration server: %v", err) } } var rt *runtime.Runtime // 所有模板,目标决定了各分类的适配器(例如所有metric类适配器)在运行时需要处理的数据类型 templateMap := make(map[string]*template.Info, len(a.Templates)) for k, v := range a.Templates { t := v templateMap[k] = &t } // 创建运行时,传入存储、模板、适配器、线程池、是否启用追踪等信息 rt = p.newRuntime(st, templateMap, adapterMap, a.ConfigDefaultNamespace, s.gp, s.adapterGP, a.TracingOptions.TracingEnabled()) // 监听配置存储的变更,初始化配置 if err = p.runtimeListen(rt); err != nil { _ = s.Close() return nil, fmt.Errorf("unable to listen: %v", err) } // 等待配置存储同步完成 log.Info("Awaiting for config store sync...") if err := st.WaitForSynced(30 * time.Second); err != nil { return nil, err } // 设置分发器,分发器负责将API请求分发给配置好的适配器处理 s.dispatcher = rt.Dispatcher() // 如果启用了策略检查缓存,则创建LRU缓存对象 if a.NumCheckCacheEntries > 0 { s.checkCache = checkcache.New(a.NumCheckCacheEntries) } // 此全局变量决定是否利用包golang.org/x/net/trace进行gRPC调用追踪 grpc.EnableTracing = a.EnableGRPCTracing // 节流阀,限制调用频度 throttler := loadshedding.NewThrottler(a.LoadSheddingOptions) // Evaluator方法根据名称返回配置好的LoadEvaluator // LoadEvaluator能够评估请求是否超过阈值 if eval := throttler.Evaluator(loadshedding.GRPCLatencyEvaluatorName); eval != nil { grpcOptions = append(grpcOptions, grpc.StatsHandler(eval.(*loadshedding.GRPCLatencyEvaluator))) } // 创建gRPC服务器 s.server = grpc.NewServer(grpcOptions...) // 注册服务到gRPC服务器 // 注册时需要提供grpc.ServiceDesc,其中包含服务名、方法集合(方法名到处理函数的映射 // api.NewGRPCServer返回 mixerpb.MixerServer 接口,它仅仅包含Check / Report两个方法 mixerpb.RegisterMixerServer(s.server, api.NewGRPCServer(s.dispatcher, s.gp, s.checkCache, throttler)) // 探针 if a.LivenessProbeOptions.IsValid() { s.livenessProbe = probe.NewFileController(a.LivenessProbeOptions) s.RegisterProbe(s.livenessProbe, "server") s.livenessProbe.Start() } if a.ReadinessProbeOptions.IsValid() { s.readinessProbe = probe.NewFileController(a.ReadinessProbeOptions) rt.RegisterProbe(s.readinessProbe, "dispatcher") st.RegisterProbe(s.readinessProbe, "store") s.readinessProbe.Start() } // 启动监控服务 if s.monitor, err = p.startMonitor(a.MonitoringPort, a.EnableProfiling, p.listen); err != nil { _ = s.Close() return nil, fmt.Errorf("unable to setup monitoring: %v", err) } // 启动ControlZ监听器 go ctrlz.Run(a.IntrospectionOptions, nil) return s, nil } |
patchTable的newRuntime函数会调用runtime.New,创建一个新的Mixer运行时 —— Mixer运行时环境的主要入口点,负责监听配置、实例化Handler、创建分发机制(dispatch machinery)、处理请求:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 |
func New( s store.Store, templates map[string]*template.Info, adapters map[string]*adapter.Info, defaultConfigNamespace string, executorPool *pool.GoroutinePool, handlerPool *pool.GoroutinePool, enableTracing bool) *Runtime { // Ephemeral表示一个短暂的配置状态,它可以被入站配置变更事件所更新 // Ephemeral本身包含的数据没有价值,你必须调用它的BuildSnapshot方法来创建稳定的、完全解析的配置的快照 e := config.NewEphemeral(templates, adapters) rt := &Runtime{ // 默认配置命名空间 defaultConfigNamespace: defaultConfigNamespace, // 短暂配置状态 ephemeral: e, // 配置快照 snapshot: config.Empty(), // 适配器处理器列表 handlers: handler.Empty(), // API请求分发器,需要协程池 dispatcher: dispatcher.New(executorPool, enableTracing), // 适配器处理器的协程池 handlerPool: handlerPool, Probe: probe.NewProbe(), store: s, } // 从ephemeral构建出新c.snapshot、新c.handlers、新路由表(用于解析入站请求并将其路由给适当的处理器) // 然后替换路由表,最后清理上一次配置对应的处理器 rt.processNewConfig() // 设置探针结果为:尚未监听存储 rt.Probe.SetAvailable(errNotListening) return rt } |
创建Runtime之后,p.runtimeListen被调用。此函数会调用Runtime.StartListening方法来监听配置的变更,同样会立即触发processNewConfig调用。之后,processNewConfig调用会通过store.WatchChanges的回调反复发生。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
func (c *Runtime) StartListening() error { // Runtime的状态锁 c.stateLock.Lock() defer c.stateLock.Unlock() if c.shutdown != nil { return errors.New("already listening") } // 生成adapter、template等对象类型到它的proto消息的映射(合并到一个映射中) // adapter.Info.DefaultConfig、template.Info.CtrCfg,以及 // &configpb.Rule{}、&configpb.AttributeManifest{}、&v1beta1.Info{} ... // 都实现了proto.Message接口 kinds := config.KindMap(c.snapshot.Adapters, c.snapshot.Templates) // 开始监控存储,返回当前资源集(key到spec的映射)、监控用的通道 data, watchChan, err := store.StartWatch(c.store, kinds) if err != nil { return err } // 设置并覆盖相同的临时状态,其实就是把ephemeral.entries = data c.ephemeral.SetState(data) // 处理新配置 c.processNewConfig() // 初始化运行时的关闭通道 c.shutdown = make(chan struct{}) // 增加一个计数 c.waitQuiesceListening.Add(1) go func() { // 只有shutdown通道关闭,此监控配置存储变化的循环才会退出 // 当有新的配置变更被发现后,调用onConfigChange,此方法会导致processNewConfig store.WatchChanges(watchChan, c.shutdown, watchFlushDuration, c.onConfigChange) // shutdown通道关闭后, c.waitQuiesceListening.Done() }() // 重置可用性状态,此等待组不再阻塞,StopListening方法可以顺利返回 c.Probe.SetAvailable(nil) return nil } |
当配置存储有变化后,Runtime的该方法会被调用,它的逻辑很简单:
1 2 3 4 5 6 |
func (c *Runtime) onConfigChange(events []*store.Event) { // 更新或者擅长ephemeral.entries中的条目 c.ephemeral.ApplyEvent(events) // 对最新的配置进行处理 c.processNewConfig()default } |
Runtime的processNewConfig方法负责处理从配置存储(K8S)中拉取的最新CR,然后创建配置快照、创建处理器表、路由表,并改变Dispatcher的路由:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
func (c *Runtime) processNewConfig() { // 构建一个稳定的、完全解析的配置的快照 newSnapshot, _ := c.ephemeral.BuildSnapshot() // 当前运行时使用的处理器 oldHandlers := c.handlers // 创建新的处理器表 newHandlers := handler.NewTable(oldHandlers, newSnapshot, c.handlerPool) // 返回ExpressionBuilder,用于创建一系列预编译表达式 builder := compiled.NewBuilder(newSnapshot.Attributes) // 构建并返回路由表,路由表决定了什么条件下调用什么适配器 newRoutes := routing.BuildTable( newHandlers, newSnapshot, builder, c.defaultConfigNamespace, log.DebugEnabled()) // 改变分发器的路由,分发器负责基于路由表来调用适配器 oldContext := c.dispatcher.ChangeRoute(newRoutes) // 修改实例变量 c.handlers = newHandlers c.snapshot = newSnapshot log.Debugf("New routes in effect:\n%s", newRoutes) // 关闭旧的处理器,注意处理器实现了io.Closer接口,这个接口由Istio自己负责,和适配器开发无关 cleanupHandlers(oldContext, oldHandlers, newHandlers, maxCleanupDuration) } |
该方法生成一个完全解析的(没有任何外部依赖)的配置快照。快照主要包含静态、动态模板/适配器信息、以及规则信息:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 |
func (e *Ephemeral) BuildSnapshot() (*Snapshot, error) { errs := &multierror.Error{} // 下一个快照的ID id := e.nextID e.nextID++ log.Debugf("Building new config.Snapshot: id='%d'", id) // 一组和istio本身状态监控有关的Prometheus计数器 counters := newCounters(id) e.lock.RLock() // 处理属性清单,获得属性列表。清单来源有三个地方: // 1、配置存储中attributemanifest类型的CR。第一次调用该方法时,尚未加载这些CR // 2、自动生成的template.Info.AttributeManifests // 注意清单中每个属性,都具有全网格唯一的名称 attributes := e.processAttributeManifests(counters, errs) // 处理静态适配器的处理器配置 —— 各种适配器的CR/实例,获得处理器(HandlerStatic)列表 // 对于从配置存储加载的资源,如果在自动生成的adapter.Info中找到对应条目,则认为是合法的处理器 // 对于每个处理器,会创建HandlerStatic结构,此结构表示基于Compiled-in的适配器的处理器 shandlers := e.processStaticAdapterHandlerConfigs(counters, errs) // 返回属性描述符查找器(AttributeDescriptorFinder) af := ast.NewFinder(attributes) // 处理静态模板的实例配置 —— 各种模板的CR,获得实例(InstanceStatic)列表 // 对于从配置存储加载的资源,如果在自动生成的template.Info中找到对应条目,则认为是合法的实例 // 对于每个实例,会创建InstanceStatic结构,此结构表示基于Compiled-in的模板的Instance instances := e.processInstanceConfigs(af, counters, errs) // 开始处理动态资源,所谓动态资源,是指没有特定CRD的模板(也就没有对应CR的实例) // 以及没有特定CRD的适配器(也就没有对应CR的处理器) // 动态模板注册为template类型的CR dTemplates := e.processDynamicTemplateConfigs(counters, errs) // 动态适配器注册为adapter类型的CR dAdapters := e.processDynamicAdapterConfigs(dTemplates, counters, errs) // 动态处理器注册为handler类型的CR,它必须引用某个adapter的名称 dhandlers := e.processDynamicHandlerConfigs(dAdapters, counters, errs) // 动态处理器注册为instance类型的CR,它必须引用某个template的名称 dInstances := e.processDynamicInstanceConfigs(dTemplates, af, counters, errs) // 处理规则,规则可以引用上述的静态和动态资源 rules := e.processRuleConfigs(shandlers, instances, dhandlers, dInstances, af, counters, errs) // 构建配置快照 s := &Snapshot{ ID: id, Templates: e.templates, Adapters: e.adapters, TemplateMetadatas: dTemplates, AdapterMetadatas: dAdapters, Attributes: ast.NewFinder(attributes), HandlersStatic: shandlers, InstancesStatic: instances, Rules: rules, HandlersDynamic: dhandlers, InstancesDynamic: dInstances, Counters: counters, } e.lock.RUnlock() return s, errs.ErrorOrNil() } |
适配器的初始化过程,是Mixer服务器初始化的一部分。在Mixer服务器启动过程中有如下逻辑:
1 |
adapterMap := config.AdapterInfoMap(a.Adapters, tmplRepo.SupportsTemplate) |
该方法会生成得到所有适配器的adaptor.Info对象:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
type Info struct { // 适配器的正式名称,必须是RFC 1035兼容的DNS标签 // 此名称会用在Istio配置中,因此应当简短而具有描述性 Name string // 实现此适配器的包,例如 // istio.io/istio/mixer/adapter/denier Impl string // 人类可读的适配器的描述信息 Description string // 该函数指针能够创建一个新的HandlerBuilder,HandlerBuilder能够创建出此适配器的Handler NewBuilder NewBuilderFn // 此适配器声明支持的模板 SupportedTemplates []string // 传递给HandlerBuilder.Build的适配器的默认参数 DefaultConfig proto.Message } |
入参a.Adapters来自supportedAdapters(),此函数是自动生成的。a.Adapters的每个元素的类型是adapter.InfoFn。调用此函数即得到对应的adaptor.Info对象:
1 |
type InfoFn func() Info |
config.AdapterInfoMap的主要逻辑就是调用各种适配器的adapter.InfoFn方法,并且对adaptor.Info进行各种校验。例如检查它的NewBuilder、NewBuilder字段是否为非空,检查它是否和声明支持的模板兼容。
适配器如果需要初始化,那么初始化逻辑就发生在InfoFn中。
本节以Prometheus适配器为例,了解适配器的初始化过程。
1 2 3 4 5 6 7 8 9 10 11 12 13 |
const ( metricsPath = "/metrics" // Istio会暴露三个和Prometheus Exporter端口: // istio-mixer.istio-system:42422,所有由Mixer的Prometheus适配器生成的网格指标 // istio-mixer.istio-system:9093,用于监控Mixer自身的指标 // istio-mixer.istio-system:9102,Envoy生成的原始统计信息,从Statsd转换为Prometheus格式 defaultAddr = ":42422" ) func GetInfo() adapter.Info { ii, _ := GetInfoWithAddr(defaultAddr) return ii } |
GetInfoWithAddr方法的实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
func GetInfoWithAddr(addr string) (adapter.Info, Server) { // HandlerBuilder单例 singletonBuilder := &builder{ // HTTP服务器,这里不会启动监听 srv: newServer(addr), } // 创建注册表singletonBuilder.registry = prometheus.NewPedanticRegistry() // 情况指标信息 singletonBuilder.metrics = make(map[string]*cinfo) singletonBuilder.clearState() // 返回adaptor.Info对象 return adapter.Info{ Name: "prometheus", Impl: "istio.io/istio/mixer/adapter/prometheus", Description: "Publishes prometheus metrics", SupportedTemplates: []string{ metric.TemplateName, }, NewBuilder: func() adapter.HandlerBuilder { return singletonBuilder }, DefaultConfig: &config.Params{}, }, singletonBuilder.srv } |
每当配置变更后,适配器的Handler会被初始化。Runtime.processNewConfig会调用:
1 |
newHandlers := handler.NewTable(oldHandlers, newSnapshot, c.handlerPool) |
创建handler.Table,此表包含了所有实例化的、配置好的适配器的处理器的信息:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
type Table struct { // 表格条目 entries map[string]Entry counters tableCounters } // 单个处理器 type Entry struct { // 处理器的名称 Name string // 处理器对象 Handler adapter.Handler // 适配器名称 AdapterName string // 创建此处理器使用的适配器配置(参数)的签名信息 Signature signature // 传递给处理器的adapter.Env env env } |
每个适配器可以消费多个实例,对于适配器和实例的每个组合,handler.NewTable方法会为其创建Handler:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
// 适配器实例 - 模板实例的映射 // map[*HandlerStatic][]*InstanceStatic instancesByHandler := config.GetInstancesGroupedByHandlers(snapshot) // map[*HandlerDynamic][]*InstanceDynamic instancesByHandlerDynamic := config.GetInstancesGroupedByHandlersDynamic(snapshot) // 表 t := &Table{ entries: make(map[string]Entry, len(instancesByHandler)+len(instancesByHandlerDynamic)), counters: newTableCounters(snapshot.ID), } // 对于每个静态处理器 - 实例组合 for handler, instances := range instancesByHandler { // 为其创建条目,并加入到表中 createEntry(old, t, handler, instances, snapshot.ID, // 这个回调用于用于创建处理器 func(handler hndlr, instances interface{}) (h adapter.Handler, e env, err error) { // 环境信息 e = NewEnv(snapshot.ID, handler.GetName(), gp).(env) // 创建出处理器 h, err = config.BuildHandler(handler.(*config.HandlerStatic), instances.([]*config.InstanceStatic), e, snapshot.Templates) return h, e, err }) } // 对于每个动态处理器 - 实例组合 for handler, instances := range instancesByHandlerDynamic { createEntry(old, t, handler, instances, snapshot.ID, ... } |
config.BuildHandler经过几层转发,最终会调用Prometheus适配器的方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 |
func (b *builder) Build(ctx context.Context, env adapter.Env) (adapter.Handler, error) { cfg := b.cfg var metricErr *multierror.Error // 用于收集指标配置 newMetrics := make([]*config.Params_MetricInfo, 0, len(cfg.Metrics)) // 检查指标是否被重新定义,也就是对应的CR是否被修改 // 如果是,则清空指标注册表、指标映射。重定义会导致Prometheus客户端Panic // 添加、移除则没有问题 var cl *cinfo // 遍历新配置的指标列表 for _, m := range cfg.Metrics { // 当前指标表中没有匹配项,加入 if cl = b.metrics[m.InstanceName]; cl == nil { newMetrics = append(newMetrics, m) continue } // 散列值没有变,和之前的指标配置一样 if cl.sha == computeSha(m, env.Logger()) { continue } // 散列值不匹配,发生了重定义。适配器需要重现加载 env.Logger().Warningf("Metric %s redefined. Reloading adapter.", m.Name) // 重建注册表、清空指标信息 b.clearState() // 将所有指标作为新配置看待 newMetrics = cfg.Metrics break } env.Logger().Debugf("%d new metrics defined", len(newMetrics)) // 遍历处理所有新指标 var err error for _, m := range newMetrics { ns := defaultNS if len(m.Namespace) > 0 { ns = safeName(m.Namespace) } // 指标全名,即CR的名称 mname := m.InstanceName if len(m.Name) != 0 { // 转换为短名 mname = m.Name } // 构建出指标信息cinfo ci := &cinfo{kind: m.Kind, sha: computeSha(m, env.Logger())} ci.sortedLabels = make([]string, len(m.LabelNames)) copy(ci.sortedLabels, m.LabelNames) sort.Strings(ci.sortedLabels) // 根据指标类型的不同,分别处理。逻辑都是注册指标到注册表 switch m.Kind { case config.GAUGE: ci.c, err = registerOrGet(b.registry, newGaugeVec(ns, mname, m.Description, m.LabelNames)) b.metrics[m.InstanceName] = ci case config.COUNTER: ci.c, err = registerOrGet(b.registry, newCounterVec(ns, mname, m.Description, m.LabelNames)) b.metrics[m.InstanceName] = ci case config.DISTRIBUTION: ci.c, err = registerOrGet(b.registry, newHistogramVec(ns, mname, m.Description, m.LabelNames, m.Buckets)) b.metrics[m.InstanceName] = ci default: metricErr = multierror.Append(metricErr, fmt.Errorf("unknown metric kind (%d); could not register metric %v", m.Kind, m)) } } // 启动Exporter的HTTP服务器,如果已经启动则不管 if err := b.srv.Start(env, promhttp.HandlerFor(b.registry, promhttp.HandlerOpts{})); err != nil { return nil, err } // 如果配置了指标过期功能,则定期删除老旧指标 var expiryCache cache.ExpiringCache if cfg.MetricsExpirationPolicy != nil { checkDuration := cfg.MetricsExpirationPolicy.ExpiryCheckIntervalDuration if checkDuration == 0 { checkDuration = cfg.MetricsExpirationPolicy.MetricsExpiryDuration / 2 } expiryCache = cache.NewTTLWithCallback( cfg.MetricsExpirationPolicy.MetricsExpiryDuration, checkDuration, deleteOldMetrics) } return &handler{b.srv, b.metrics, expiryCache}, metricErr.ErrorOrNil() } |
b.srv.Start会启动作为Exporter的HTTP服务器:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 |
func (s *serverInst) Start(env adapter.Env, metricsHandler http.Handler) (err error) { // 加锁保护 s.lock.Lock() defer s.lock.Unlock() // 如果服务器已经启动了,则委托 // just switch the delegate handler. if s.srv != nil { s.refCnt++ s.handler.setDelegate(metricsHandler) return nil } // 否则,创建监听 listener, err := net.Listen("tcp", s.addr) s.port = listener.Addr().(*net.TCPAddr).Port // 配置ServerMux srvMux := http.NewServeMux() s.handler = &metaHandler{delegate: metricsHandler} srvMux.Handle(metricsPath, s.handler) srv := &http.Server{Addr: s.addr, Handler: srvMux} // 在后台运行 env.ScheduleDaemon(func() { // 开始监听 env.Logger().Infof("serving prometheus metrics on %d", s.port) if err := srv.Serve(listener.(*net.TCPListener)); err != nil { if err == http.ErrServerClosed { env.Logger().Infof("HTTP server stopped") } else { _ = env.Logger().Errorf("prometheus HTTP server error: %v", err) } } }) s.srv = srv s.refCnt++ return nil } |
使用Istio官方Chart安装时,其内置的Prometheus服务器会自动采集该HTTP服务器暴露的指标。
在运行期间,Envoy代理会向Mixer服务发起CHECK/REPORT/QUOTA等调用。Mixer会将这些请求转发给匹配的适配器进行处理。
本节以Prometheus适配器为例,说明REPORT请求的处理过程。
以官方Chart部署Istio时,会创建如下Rule:
1 2 3 4 5 6 7 8 9 10 11 12 13 |
apiVersion: config.istio.io/v1alpha2 kind: rule metadata: name: promhttp spec: actions: - handler: handler.prometheus instances: - requestcount.metric - requestduration.metric - requestsize.metric - responsesize.metric match: context.protocol == "http" || context.protocol == "grpc" |
这里我们测试requestcount这个指标,和它相关的Handler、Instance配置片断如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 |
# Handler apiVersion: config.istio.io/v1alpha2 kind: prometheus metadata: name: handler namespace: istio-system spec: metrics: - instance_name: requestcount.metric.istio-system kind: COUNTER label_names: - reporter - source_app - source_principal - source_workload - source_workload_namespace - source_version - destination_app - destination_principal - destination_workload - destination_workload_namespace - destination_version - destination_service - destination_service_name - destination_service_namespace - request_protocol - response_code - connection_security_policy name: requests_total # Instance kind: metric metadata: name: requestcount spec: dimensions: connection_security_policy: conditional((context.reporter.kind | "inbound") == "outbound", "unknown", conditional(connection.mtls | false, "mutual_tls", "none")) destination_app: destination.labels["app"] | "unknown" destination_principal: destination.principal | "unknown" destination_service: destination.service.host | "unknown" destination_service_name: destination.service.name | "unknown" destination_service_namespace: destination.service.namespace | "unknown" destination_version: destination.labels["version"] | "unknown" destination_workload: destination.workload.name | "unknown" destination_workload_namespace: destination.workload.namespace | "unknown" reporter: conditional((context.reporter.kind | "inbound") == "outbound", "source", "destination") request_protocol: api.protocol | context.protocol | "unknown" response_code: response.code | 200 source_app: source.labels["app"] | "unknown" source_principal: source.principal | "unknown" source_version: source.labels["version"] | "unknown" source_workload: source.workload.name | "unknown" source_workload_namespace: source.workload.namespace | "unknown" monitored_resource_type: '"UNSPECIFIED"' value: "1" |
要触发Mixer服务器端的处理逻辑,不需要运行Envoy代理,调用命令行客户端mixc就可以了。
为了匹配上面的promhttp规则,我们需要发送一个属性context.protocol的值为http的REPORT请求:
1 2 3 4 5 6 7 8 9 10 11 12 13 |
mixc report -m localhost:9091 \ -t request.time=2019-03-27T11:00:00.000Z,response.time=2019-03-27T11:00:00.900Z \ -a context.protocol=http,context.reporter.kind=outbound,source.namespace=default \ -a destination.service=kubernetes # 2019-03-27T03:52:05.237085Z info parsed scheme: "" # 2019-03-27T03:52:05.237179Z info scheme "" not registered, fallback to default scheme # 2019-03-27T03:52:05.237532Z info ccResolverWrapper: sending new addresses to cc: [{localhost:9091 0 <nil>}] # 2019-03-27T03:52:05.237592Z info ClientConn switching balancer to "pick_first" # 2019-03-27T03:52:05.237768Z info pickfirstBalancer: HandleSubConnStateChange: 0xc0001940b0, CONNECTING # 2019-03-27T03:52:05.237788Z info blockingPicker: the picked transport is not ready, loop back to repick # 2019-03-27T03:52:05.241228Z info pickfirstBalancer: HandleSubConnStateChange: 0xc0001940b0, READY # Report RPC returned OK |
Mixer处理请求的接口由以下Proto文件定义:
1 2 3 4 5 6 7 |
service Mixer { // 进行先决条件检查,或者进行配额 rpc Check(CheckRequest) returns (CheckResponse) {} // 遥测报告 rpc Report(ReportRequest) returns (ReportResponse) {} } |
通过前面章节的源码分析,我们了解到,在Mixer服务启动时,注册了OpenTracing、Prometheus的gRPC拦截器。因此首先会执行Prometheus拦截器:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
// 自动生成的代码 func _Mixer_Report_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { in := new(ReportRequest) if err := dec(in); err != nil { return nil, err } // 没有拦截器,直接调用MixerServer实现 if interceptor == nil { return srv.(MixerServer).Report(ctx, in) } info := &grpc.UnaryServerInfo{ Server: srv, FullMethod: "/istio.mixer.v1.Mixer/Report", } handler := func(ctx context.Context, req interface{}) (interface{}, error) { return srv.(MixerServer).Report(ctx, req.(*ReportRequest)) } // 实际上是有拦截器的,调用拦截器,通过拦截器再调用MixerServer实现 return interceptor(ctx, in, info, handler) } |
来自go-grpc-prometheus项目的Prometheus拦截器,逻辑如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
func UnaryServerInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { // grpc_server_started_total指标 monitor := newServerReporter(Unary, info.FullMethod) // grpc_server_msg_received_total指标 monitor.ReceivedMessage() // 调用MixerServer实现 resp, err := handler(ctx, req) // grpc_server_handled_total指标 // grpc_server_handling_seconds指标,直方图 monitor.Handled(grpc.Code(err)) if err == nil { // grpc_server_msg_sent_total monitor.SentMessage() } return resp, err } |
MixerServer接口的实现定义在api.grpcServer结构中。Report方法会逐个处理每条消息,并进行:
- 预处理:调用匹配的属性生成处理器
- 处理:调用匹配的主处理器
注意,单次Mixer请求可以携带多条消息,每条消息都对应Envoy代理处理的一个实际请求。
Report方法的实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 |
func (s *grpcServer) Report(ctx context.Context, req *mixerpb.ReportRequest) (*mixerpb.ReportResponse, error) { // 限流逻辑,默认情况下Mixer的限流是关闭的 // req.Attributes的类型是 []v1.CompressedAttributes,每个元素表示报告的一条信息,客户端可以一次报送多条信息 // 但是对于非REPORT请求,每次只能有一条消息 if s.throttler.Throttle(loadshedding.RequestInfo{PredictedCost: float64(len(req.Attributes))}) { return nil, grpc.Errorf(codes.Unavailable, "Server is currently overloaded. Please try again.") } if len(req.Attributes) == 0 { // 没有报告任何东西 return reportResp, nil } // Words表示消息级别的字典 —— 属性名的数组、字符串属性值 for i := 0; i < len(req.Attributes); i++ { if len(req.Attributes[i].Words) == 0 { // req.DefaultWords为所有消息的默认字典。可以让请求中多个消息共享字典,进而减少请求大小 req.Attributes[i].Words = req.DefaultWords } } // bag around the input proto that keeps track of reference attributes // 创建一个ProtoBag —— 基于属性Proto消息,实现Bag接口(用于访问属性集) protoBag := attribute.NewProtoBag(&req.Attributes[0], s.globalDict, s.globalWordList) // 从对象池中取得一个MutableBag,对象池避免了反复的内存分配,然后将其parent设置为protoBag // accumBag(请求包requestBag),跟踪除了第一个以外,所有消息相对于第一个的delta accumBag := attribute.GetMutableBag(protoBag) // reportBag(响应包responseBag),持有预处理之后的输出状态,预处理适配器可能会生成一些新属性,这些新属性以delta的形式存储在此 reportBag := attribute.GetMutableBag(accumBag) // 基于GlobalTracer,启动并返回操作名称(operationName)为Report的Span,使用从ctx中找到的Span作为ChildOfRef // 如果找不到作为parent的Span,则创建一个根Span reportSpan, reportCtx := opentracing.StartSpanFromContext(ctx, "Report") // 从对象池中获得reporter,为其提供路由上下文(report.rc)、报告上下文(r.ctx,其中包含了Trace树的信息) reporter := s.dispatcher.GetReporter(reportCtx) var errors *multierror.Error /* 开始逐个处理消息 */ for i := 0; i < len(req.Attributes); i++ { // 以Report为父Span,依次创建子Span: attribute bag N span, newctx := opentracing.StartSpanFromContext(reportCtx, fmt.Sprintf("attribute bag %d", i)) // 第一个属性块(消息)作为protoBag的基础,计算每个子包的delta if i > 0 { if err := accumBag.UpdateBagFromProto(&req.Attributes[i], s.globalWordList); err != nil { err = fmt.Errorf("request could not be processed due to invalid attributes: %v", err) // 为子Span记录字段,然后结束Span span.LogFields(otlog.String("error", err.Error())) span.Finish() errors = multierror.Append(errors, err) break } } lg.Debug("Dispatching Preprocess") // 预处理,将请求包分发给那些需要提前执行的适配器,例如属性生成适配器 if err := s.dispatcher.Preprocess(newctx, accumBag, reportBag); err != nil { err = fmt.Errorf("preprocessing attributes failed: %v", err) span.LogFields(otlog.String("error", err.Error())) span.Finish() errors = multierror.Append(errors, err) continue } // 主处理,分发给主适配器 lg.Debug("Dispatching to main adapters after running preprocessors") lg.Debuga("Attribute Bag: \n", reportBag) lg.Debugf("Dispatching Report %d out of %d", i+1, len(req.Attributes)) if err := reporter.Report(reportBag); err != nil { span.LogFields(otlog.String("error", err.Error())) span.Finish() errors = multierror.Append(errors, err) continue } span.Finish() // 清空包内容,准备处理下一个请求包使用 reportBag.Reset() } /* 结束逐个处理消息 */ // 重置,并放回对象池 reportBag.Done() accumBag.Done() protoBag.Done() // 刷出,调用reporter.impl.getSession.dispatchBufferedReports(),将之前缓冲的dispatchState全部分发出去 // 然后将会话放回对象池 if err := reporter.Flush(); err != nil { errors = multierror.Append(errors, err) } // 将Reporter对象也放回池中 reporter.Done() // 结束Span if errors != nil { reportSpan.LogFields(otlog.String("error", errors.Error())) } reportSpan.Finish() if errors != nil { lg.Errora("Report failed:", errors.Error()) return nil, grpc.Errorf(codes.Unknown, errors.Error()) } // 返回响应 return reportResp, nil } |
Dispatcher.Preprocess方法负责请求预处理,将请求包分发给那些需要提前执行的适配器,并收集它们产生的属性:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
func (d *Impl) Preprocess(ctx context.Context, bag attribute.Bag, responseBag *attribute.MutableBag) error { // 返回一个session,此结构表示对Dispatcher接口(的实现Impl)的一个调用会话 // 其中包含了处理调用所需的所有可变状态 // getSession从对象池获取一个session对象,然后设置它的 // s.impl,Dispatcher对象 // s.rc,路由上下文对象 // s.ctx 包含Span信息的上下文 // s.variety 需要调用的适配器的种类 // s.bag 请求包 s := d.getSession(ctx, tpb.TEMPLATE_VARIETY_ATTRIBUTE_GENERATOR, bag) // s.responseBag 响应包 s.responseBag = responseBag // 执行分发 err := s.dispatch() if err == nil { err = s.err } // 放回对象池 d.putSession(s) return err } |
session.dispatch方法真正负责请求包的分发工作:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 |
func (s *session) dispatch() error { // 根据报告者类型(从context.reporter.kind获取),默认inbound推断命名空间 // inbound 则命名空间为destination.namespace // outbound 则命名空间为source.namespace namespace, err := getIdentityNamespace(s.bag) if err != nil { // 无法获取命名空间,出错 // 更新直方图(Observe一个值): // mixer_dispatcher_destinations_per_request // mixer_dispatcher_instances_per_request updateRequestCounters(0, 0) log.Warnf("unable to determine identity namespace: '%v', operation='%d'", err, s.variety) return err } // 从路由表获得s.variety类型的、namespace命名空间的目的地列表 // 注意:如果当前命名空间没有匹配的目的地,则使用默认配置存储命名空间(istio-system)中定义的目的地 destinations := s.rc.Routes.GetDestinations(s.variety, namespace) // 要访问的目标服务 destinationService := "" v, ok := s.bag.Get("destination.service") if ok { destinationService = v.(string) } // 创建一个新的Context,携带键值对,以前面的子Span上下文为父,0=adapter.RequestData为键值对 // RequestData定义了关于请求的信息,例如它的目的服务 ctx := adapter.NewContextWithRequestData(s.ctx, &adapter.RequestData{ DestinationService: adapter.Service{ FullName: destinationService, }, }) // 确保能够将请求并行的分发给所有处理器,将s.completed设置为足够大的chan *dispatchState // 每个chan *dispatchState收集单个目的地的处理结果 s.ensureParallelism(destinations.Count()) foundQuota := false // 构建出的实例数量 ninputs := 0 // 匹配的目的地数量 ndestinations := 0 for _, destination := range destinations.Entries() { // dispatchState持有和单个目的地相关的输入/输出状态 var state *dispatchState // 对于REPORT处理器 if s.variety == tpb.TEMPLATE_VARIETY_REPORT { // 生成并缓存分发状态到s.reportStates state = s.reportStates[destination] if state == nil { // 从对象池中获取一个dispatchState并对其赋值,对象池在Mixer中大量使用,减少了内存分配 state = s.impl.getDispatchState(ctx, destination) s.reportStates[destination] = state } } for _, group := range destination.InstanceGroups { // 判断请求包是否和每个实例组匹配 groupMatched := group.Matches(s.bag) if groupMatched { ndestinations++ } // 遍历每个组中的每个实例,调用其构建器。构建器的逻辑取决于你配置的各种模板实例,例如metric的CR for j, input := range group.Builders { if s.variety == tpb.TEMPLATE_VARIETY_QUOTA { // 对于配额适配器,必须要求实例构建器名称和实例名一致 // CRD名称即模板信息名TemplateInfo.Name,例如 logentries // 实例名,即CR名,例如 kubectl -n istio-system get logentries.config.istio.io // 得到的accesslog、tcpaccesslog if !strings.EqualFold(input.InstanceShortName, s.quotaArgs.Quota) { continue } if !groupMatched { // 这是一个条件性的配额,并且当前不匹配条件,直接返回请求的额度 s.quotaResult.Amount = s.quotaArgs.Amount s.quotaResult.ValidDuration = defaultValidDuration } foundQuota = true } if !groupMatched { continue } var instance interface{} // 从请求包构建出实例,Builder方法是自动生成的 if instance, err = input.Builder(s.bag); err != nil { log.Errorf("error creating instance: destination='%v', error='%v'", destination.FriendlyName, err) s.err = multierror.Append(s.err, err) continue } ninputs++ // 对于REPORT模板,在执行分发前,尽可能的将实例累积到分发状态的instances列表中 if s.variety == tpb.TEMPLATE_VARIETY_REPORT { state.instances = append(state.instances, instance) continue } // 对于其它模板类型,直接分发给处理器 state = s.impl.getDispatchState(ctx, destination) state.instances = append(state.instances, instance) if s.variety == tpb.TEMPLATE_VARIETY_ATTRIBUTE_GENERATOR { // 属性生成处理器需要处理Mapper —— 将处理器输出映射入主属性集的函数 state.mapper = group.Mappers[j] state.inputBag = s.bag } // 配额模板相关参数 state.quotaArgs.BestEffort = s.quotaArgs.BestEffort state.quotaArgs.DeduplicationID = s.quotaArgs.DeduplicationID state.quotaArgs.QuotaAmount = s.quotaArgs.Amount // 直接分发 s.dispatchToHandler(state) } } } // Observe mixer_dispatcher_destinations_per_request // Observe mixer_dispatcher_instances_per_request updateRequestCounters(ndestinations, ninputs) // 等待所有处理器处理完毕 s.waitForDispatched() // 如果当前执行的是配额处理器,且没有找到配额,则警告但是允许访问 if s.variety == tpb.TEMPLATE_VARIETY_QUOTA && !foundQuota { s.quotaResult.Amount = s.quotaArgs.Amount s.quotaResult.ValidDuration = defaultValidDuration log.Warnf("Requested quota '%s' is not configured", s.quotaArgs.Quota) } return nil } |
需要注意:
- 对于REPORT模板,仅仅是将生成的Instance存放到dispatchState.instances数组中,不分发。延迟到所有请求消息处理完毕后,由Reporter.Flush统一分发
- 对于CHECK模板,直接调用session.dispatchToHandler进行分发
分发不是直接在当前线程调用适配器,而是排队,由协程池的调度循环异步处理:
1 2 3 4 5 6 |
func (s *session) dispatchToHandler(ds *dispatchState) { s.activeDispatches++ ds.session = s // 调用协程池,调度一个工作 s.impl.gp.ScheduleWork(ds.invokeHandler, nil) } |
dispatchState.invokeHandler方法真正直接调用适配器:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 |
func (ds *dispatchState) invokeHandler(interface{}) { // 顺利处理完毕,没有Panic reachedEnd := false defer func() { if reachedEnd { return } // 从适配器代码导致的Panic中恢复,防止Mixer直接崩了 r := recover() ds.err = fmt.Errorf("panic during handler dispatch: %v", r) log.Errorf("%v\n%s", ds.err, debug.Stack()) if log.DebugEnabled() { log.Debugf("stack dump for handler dispatch panic:\n%s", debug.Stack()) } // 提示此此目的地的分发处理完毕 ds.session.completed <- ds }() // 跟踪 span, ctx, start := ds.beginSpan(ds.ctx) log.Debugf("begin dispatch: destination='%s'", ds.destination.FriendlyName) switch ds.destination.Template.Variety { // 属性生成器 case tpb.TEMPLATE_VARIETY_ATTRIBUTE_GENERATOR: ds.outputBag, ds.err = ds.destination.Template.DispatchGenAttrs( ctx, ds.destination.Handler, ds.instances[0], ds.inputBag, ds.mapper) // 前置条件检查 case tpb.TEMPLATE_VARIETY_CHECK: ds.checkResult, ds.err = ds.destination.Template.DispatchCheck( ctx, ds.destination.Handler, ds.instances[0]) // 遥测/报告 case tpb.TEMPLATE_VARIETY_REPORT: ds.err = ds.destination.Template.DispatchReport( ctx, ds.destination.Handler, ds.instances) // 配额 case tpb.TEMPLATE_VARIETY_QUOTA: ds.quotaResult, ds.err = ds.destination.Template.DispatchQuota( ctx, ds.destination.Handler, ds.instances[0], ds.quotaArgs) // 无法处理的模板类型,Panic default: panic(fmt.Sprintf("unknown variety type: '%v'", ds.destination.Template.Variety)) } log.Debugf("complete dispatch: destination='%s' {err:%v}", ds.destination.FriendlyName, ds.err) // 追踪 ds.completeSpan(span, time.Since(start), ds.err) // 将当前目的地设置为分发处理完毕 ds.session.completed <- ds reachedEnd = true } |
可以看到,上述方法都是把调用委托给目的地的TemplateInfo.Dispatch***函数指针处理的。这些函数指针就是适配器的相应方法。对于Metric模板,Prometheus适配器的方法实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 |
func (h *handler) HandleMetric(_ context.Context, vals []*metric.Instance) error { var result *multierror.Error // 遍历Instance for _, val := range vals { // 获取该Instance对应的handler(例如requestcount.metric.istio-system)的信息(cinfo) ci := h.metrics[val.Name] if ci == nil { result = multierror.Append(result, fmt.Errorf("could not find metric info from adapter config for %s", val.Name)) continue } collector := ci.c switch ci.kind { // 按指标类型分别处理 case config.GAUGE: vec := collector.(*prometheus.GaugeVec) amt, err := promValue(val.Value) if err != nil { result = multierror.Append(result, fmt.Errorf("could not get value for metric %s: %v", val.Name, err)) continue } pl := promLabels(val.Dimensions) if h.labelsCache != nil { h.labelsCache.Set(key(val.Name, "gauge", pl, ci.sortedLabels), &cacheEntry{vec, pl}) } vec.With(pl).Set(amt) case config.COUNTER: // 转换为指标向量,指标向量的每个元素是具有不同标签值的同一类(名字相同)指标 vec := collector.(*prometheus.CounterVec) amt, err := promValue(val.Value) if err != nil { result = multierror.Append(result, fmt.Errorf("could not get value for metric %s: %v", val.Name, err)) continue } pl := promLabels(val.Dimensions) if h.labelsCache != nil { h.labelsCache.Set(key(val.Name, "counter", pl, ci.sortedLabels), &cacheEntry{vec, pl}) } // vec.With(pl)返回具有指定标签集的指标对象,这里是Counter,然后加上一个值(在当前时间点) vec.With(pl).Add(amt) case config.DISTRIBUTION: // DISTRIBUTION映射为Prometheus类型 Histogram vec := collector.(*prometheus.HistogramVec) amt, err := promValue(val.Value) if err != nil { result = multierror.Append(result, fmt.Errorf("could not get value for metric %s: %v", val.Name, err)) continue } pl := promLabels(val.Dimensions) if h.labelsCache != nil { h.labelsCache.Set(key(val.Name, "distribution", pl, ci.sortedLabels), &cacheEntry{vec, pl}) } vec.With(pl).Observe(amt) } } return result.ErrorOrNil() } cinfo struct { // 负责收集指标的接口,gauge counter等都实现了此接口 c prometheus.Collector sha [sha1.Size]byte kind config.Params_MetricInfo_Kind sortedLabels []string } |
将入站的API调用分发给配置的适配器:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
type Dispatcher interface { // 进行预处理,将请求包分发给那些需要提前执行的适配器, // 目前这种适配器主要指属性生成适配器 Preprocess(ctx context.Context, requestBag attribute.Bag, responseBag *attribute.MutableBag) error // 进行CHECK分发,基于CHECK类型模板的Instance,将被转发给感兴趣的适配器 Check(ctx context.Context, requestBag attribute.Bag) (adapter.CheckResult, error) // 获取能够缓冲REPORT请求的报告器 GetReporter(ctx context.Context) Reporter // 进行QUOTA分发 Quota(ctx context.Context, requestBag attribute.Bag, qma QuotaMethodArgs) (adapter.QuotaResult, error) } |
负责产生一系列的报告:
1 2 3 4 5 6 7 8 9 10 |
type Reporter interface { // 添加一个条目(请求包)到报告状态中 Report(requestBag attribute.Bag) error // 刷出所有缓冲的状态到适当的适配器 Flush() error // 完成Reporter的处理过程 Done() } |
目的地,包含一个目标处理器,以及需要(在满足条件的情况下)发送给它的实例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
type Destination struct { // 用于调试的目的地ID id uint32 // 需要调用的处理器 Handler adapter.Handler // 用于监控/日志目的的处理器名称 HandlerName string // 用于监控/日志目的的适配器名称(处理器类型) AdapterName string // 使用的模板,由于某些适配器支持多种模板,这些适配器可能对应多个Destination // 每种模板都有类型,并且定义了支持它的适配器必须实现的接口 Template *TemplateInfo // 实例组,每组实例在满足条件的情况下,会发送给处理器 InstanceGroups []*InstanceGroup // 最大允许的实例数 maxInstances int // 用于监控/日志目的目的地名称 FriendlyName string // 性能计数器 Counters DestinationCounters } |
此结构用于收集单个目的地(适配器+模板组合)的处理状态和结果:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
type dispatchState struct { // 所属的分发调用会话 session *session // 上下文,其中包含了OpenTracing的Span信息 ctx context.Context // 目的地 destination *routing.Destination // 对于属性生成模板,将模板输出映射入主属性列表的函数 mapper template.OutputMapperFn // 输入包 inputBag attribute.Bag // 配额请求的参数 quotaArgs adapter.QuotaArgs // 构建出的,供适配器消费的实例列表 instances []interface{} // 处理过程中的错误信息 err error // 输出包 outputBag *attribute.MutableBag // CHECK调用的结果 checkResult adapter.CheckResult // QUOTA调用的结果 quotaResult adapter.QuotaResult } |
对一个客户端CHECK/REPORT/QUOTA请求的预处理和主处理的过程,是一个会话。此结构存储相关的信息:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
type session struct { // 拥有此会话的Dispatcher impl *Impl // 本次会话使用的路由上下文 rc *RoutingContext // 上下文信息 ctx context.Context // 输入包 bag attribute.Bag // 配额调用的参数 quotaArgs QuotaMethodArgs // 输出包 responseBag *attribute.MutableBag // 报告请求的分发状态 reportStates map[*routing.Destination]*dispatchState // CHECK/QUOTA调用的结果 checkResult adapter.CheckResult quotaResult adapter.QuotaResult err error // 正在执行的分发操作数量 activeDispatches int // 收集已完成的分发 completed chan *dispatchState // 本次操作的模板类别 variety tpb.TemplateVariety } |
和模板有关的信息:
1 2 3 4 5 6 7 8 9 10 11 |
type TemplateInfo struct { // 模板名称 Name string // 模板种类 Variety tpb.TemplateVariety // 各种Mixer调用的函数指针 DispatchReport template.DispatchReportFn DispatchCheck template.DispatchCheckFn DispatchQuota template.DispatchQuotaFn DispatchGenAttrs template.DispatchGenerateAttributesFn } |
按照匹配条件分组的、需要发送给适配器的实例的信息:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
type InstanceGroup struct { // 用于调试的ID id uint32 // 预编译的表达式,何时应用此实例组 Condition compiled.Expression // 用于构建出实例的函数+名称 Builders []NamedBuilder // 映射器函数,用于将属性生成适配器的输出属性,映射入主属性集 Mappers []template.OutputMapperFn } type NamedBuilder struct { InstanceShortName string Builder template.InstanceBuilderFn } OutputMapperFn func(attrs attribute.Bag) (*attribute.MutableBag, error) |
进行配额请求时,需要的参数 + 配额(资源)的类型:
1 2 3 4 5 6 7 8 9 10 11 12 13 |
type QuotaMethodArgs struct { // 在出现RPC调用并重试时,用于额度分配/释放(Quota allocation/allocation)调用的去重复 DeduplicationID string // 分配那种配额 Quota string // 分配的量 Amount int64 // 如果设置为true,则允许响应返回比请求少的额度。如果设置为false,那么额度不足时,直接返回0 BestEffort bool } |
进行配额请求时,需要的参数:
1 2 3 4 5 |
QuotaArgs struct { DeduplicationID string QuotaAmount int64 BestEffort bool } |
由处理器提供的,额度分配的结果:
1 2 3 4 5 6 7 8 |
QuotaResult struct { // RPC调用的状态(状态码、消息、详情) Status rpc.Status // 分配的额度何时过期,0表示永不过期 ValidDuration time.Duration // 分配的额度,可能比请求的额度小 Amount int64 } |
在探索Envoy如何向Mixer发送请求之前,我们先来分析一下Envoy作为网络代理,是如何工作的。
- 通过xDS或者静态配置,获得Envoy代理的监听器信息
- 如果监听器bind_to_port,则直接调用libevent的接口,绑定监听,回调函数设置为ListenerImpl::listenCallback
- DispatcherImpl通过libevent,接收到请求,调用ListenerImpl::listenCallback
- 根据入站时的目的端口,选择适当的监听器处理请求,调用onAccept。存在Iptables重定向的情况下,监听器为15001
- 构建出监听器过滤器链
- 执行过滤器链,对于15001来说,此链只有OriginalDstFilter一个过滤器
- OriginalDstFilter恢复原始目的地址
- 查找和原始目的地址匹配的监听器,并转交请求
- 如果发生请求转交,则接受者监听器也会执行类似于2的逻辑。但是不会再次发生转交
- 实际负责连接的那个监听器,会调用ActiveListener.newConnection,并间接的创建ConnectionImpl
- ConnectionImpl会利用连接套接字(ConnectionSocketPtr)的文件描述符,调用Dispatcher.createFileEvent,注册读写事件的回调
- 到此,连接接受完毕,后续的读写事件由libevent异步触发
- 发生可读、可写、关闭事件时,ConnectionImpl::onFileEvent被调用
- 可写事件的回调onWriteReady先调用
- 可读事件的回调onReadReady后调用
- 尝试循环读取,根据读取结果设置Post操作
- 处理读取到的数据
- 遍历网络过滤器链
- 如果是L7连接,则执行HTTP网络管理器
- 遍历网络过滤器链
- 执行Post IO操作
这个混入类为任意的unique_ptr所持有的对象增加行为,允许方便的将这种对象link/unlink到列表中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
template <class T> class LinkedObject { public: // 对象唯一性指针的列表 typedef std::list<std::unique_ptr<T>> ListType; // 返回列表的迭代器 typename ListType::iterator entry(); // 对象当前是否被插入到列表,只要调用过moveInto***方法就返回true bool inserted(); // 在两个列表之间移动对象 void moveBetweenLists(ListType& list1, ListType& list2); // 移动对象到列表,放在最前面,注意所有权的转移 void moveIntoList(std::unique_ptr<T>&& item, ListType& list); // 移动对象到列表,放在最后面 void moveIntoListBack(std::unique_ptr<T>&& item, ListType& list); // 从列表中移除条目 std::unique_ptr<T> removeFromList(ListType& list); }; |
标记性接口。任何实现此接口的对象,都可以传递给Dispatcher。Dispatcher确保,未来在事件循环中删除对象。
1 2 3 4 |
class DeferredDeletable { public: virtual ~DeferredDeletable() {} }; |
使用此接口,进行事件处理时,不需要担心栈unwind的问题
抽象的连接处理器,总体负责网络连接的处理。ActiveListener、ActiveSocket的_parent都指向此对象。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
class ConnectionHandler { public: // 此处理器持有的活动连接数 virtual uint64_t numConnections() PURE; // 添加一个监听器到此处理器 virtual void addListener(ListenerConfig& config) PURE; // 根据地址查找监听器。返回监听器的指针,所有权不转移 virtual Network::Listener* findListenerByAddress(const Network::Address::Instance& address) PURE; // 移除使用指定tag作为键的监听器。监听器拥有的所有连接也会被移除 virtual void removeListeners(uint64_t listener_tag) PURE; // 停止使用指定tag作为键的监听器。监听器拥有的所有连接不会被关闭,此方法用于draining virtual void stopListeners(uint64_t listener_tag) PURE; // 停止所有监听器 virtual void stopListeners() PURE; // 禁用所有监听器。不会关闭监听器拥有的连接鹅,用于临时暂停接受连接 virtual void disableListeners() PURE; // 启用所有监听器 virtual void enableListeners() PURE; }; |
套接字监听器的抽象接口,是否此对象则停止对套接字的监听:
1 2 3 4 5 6 7 8 |
class Listener { public: // 临时禁止接收新连接 virtual void disable() PURE; // 继续接收新连接 virtual void enable() PURE; }; |
表示某个连接处理器ConnectionHandler所拥有的活动的监听器, ActiveListener引用一个Listener。
1 2 3 4 5 6 7 8 9 |
class ListenerFilter { public: /** * 在新的连接被接受,但是Connection对象尚未创建之前调用 * @param cb 此回调提供一些重要方法 * @return 过滤器管理器根据此返回状态,决定是否继续迭代过滤器链 */ virtual FilterStatus onAccept(ListenerFilterCallbacks& cb) PURE; }; |
通过参数cb,可以continueFilterChain。
此接口用于管理监听器过滤器链:
1 2 3 4 5 |
class ListenerFilterManager { public: // 为指定的监听器添加过滤器 virtual void addAcceptFilter(ListenerFilterPtr&& filter) PURE; }; |
ActiveSocket实现了ListenerFilterManager、ListenerFilterCallbacks。
此接口供监听器过滤器使用,后者通过它和监听器管理器通信:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
class ListenerFilterCallbacks { public: /** * @return ConnectionSocket 过滤器所操作的连接套接字 */ virtual ConnectionSocket& socket() PURE; /** * @return 分发事件的Dispatcher */ virtual Event::Dispatcher& dispatcher() PURE; /** * 继续执行过滤器链 */ virtual void continueFilterChain(bool success) PURE; };TransportSocket |
ActiveSocket实现了ListenerFilterManager、ListenerFilterCallbacks。
此接口供监听器使用:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
class ListenerCallbacks { public: /** * 当新连接被接受后,回调此方法 * @param socket 移动到被调用者的套接字 * @param redirected 提示套接字已经经过重定向 */ virtual void onAccept(ConnectionSocketPtr&& socket, bool hand_off_restored_destination_connections = true) PURE; /** * 当新连接被接受后,回调此方法 * @param new_connection 移动到被调用者的套接字 */ virtual void onNewConnection(ConnectionPtr&& new_connection) PURE; }; |
ActiveListener实现了此接口。
此接口用于管理过滤器链:
1 2 3 4 5 6 7 8 9 |
class FilterChainManager { public: /** * 查找匹配新连接的元数据的过滤器链 * @param socket 提供元数据 * @return const FilterChain* 使用的过滤器链,如果没有匹配返回nullptr */ virtual const FilterChain* findFilterChain(const ConnectionSocket& socket) const PURE; }; |
ListenerImpl实现了此接口。
负责添加网络过滤器给过滤器管理器,也就是Connection:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
class FilterManager { public: virtual ~FilterManager() {} // 添加一个写过滤器,过滤器以FIFO顺序调用 virtual void addWriteFilter(WriteFilterSharedPtr filter) PURE; // 添加读写过滤器,相当于同时调用addWriteFilter/addReadFilter virtual void addFilter(FilterSharedPtr filter) PURE; // 添加一个读过滤器,过滤器以FIFO顺序调用 virtual void addReadFilter(ReadFilterSharedPtr filter) PURE; // 实例化所有安装的读过滤器,相当于针对每个过滤器调用onNewConnection() virtual bool initializeReadFilters() PURE; } |
单个过滤器链的接口:
1 2 3 4 5 6 7 |
class FilterChain { public: // 基于此过滤器链的新连接,使用的TransportSocketFactory,不同链使用的工厂可能不同(传输协议不同,RAW,TLS...) virtual const TransportSocketFactory& transportSocketFactory() const PURE; // 基于此过滤器链的新连接,为了创建所有过滤器需要的工厂的集合 virtual const std::vector<FilterFactoryCb>& networkFilterFactories() const PURE; }; |
1 |
class Filter : public WriteFilter, public ReadFilter {}; |
读处理路径(处理下游发来的数据)上的二进制(4层)过滤器:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
class ReadFilter { public: /** * 当连接上的数据被读取时调用 * @param data 读取到的,可能已经被修改过的数据 * @param end_stream 当连接启用半关闭语义时,用于提示是否到了最后一字节 * @return status 过滤器管理器使用此状态决定如何进一步迭代其它过滤器 */ virtual FilterStatus onData(Buffer::Instance& data, bool end_stream) PURE; /** * 当新连接刚创建时调用,过滤器链的迭代可以被中止 * @return status 过滤器管理器使用此状态决定如何进一步迭代其它过滤器 */ virtual FilterStatus onNewConnection() PURE; /** * 初始化用于和过滤器管理器交互的读过滤器回调,过滤器被注册时,将被过滤器管理器调用一次 * 任何需要用到底层连接的构造,需要在此函数的回调中执行 * * IMPORTANT: 出站、复杂逻辑不要在此,放在onNewConnection() * */ virtual void initializeReadFilterCallbacks(ReadFilterCallbacks& callbacks) PURE; } |
写处理路径(向下游发送数据)上的二进制(4层)过滤器:
1 2 3 4 5 6 7 8 9 |
class WriteFilter { public: /** * 当在此连接上发生数据写入时调用 * @param data 需要写入的,可能已经被修改过的数据 * @param end_stream 当连接启用半关闭语义时,用于提示是否到了最后一字节 */ virtual FilterStatus onWrite(Buffer::Instance& data, bool end_stream) PURE; }; |
连接套接字,表示传递给一个Connection的套接字:
- 对于服务端,该对象表示已经Accept的套接字
- 对于客户端,该对象表示正在连接到远程地址的套接字
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
class ConnectionSocket : public virtual Socket { public: // 返回远程地址 virtual const Address::InstanceConstSharedPtr& remoteAddress() const PURE; // 用于服务器端,恢复原始目的地址 virtual void restoreLocalAddress(const Address::InstanceConstSharedPtr& local_address) PURE; // 设置远程地址 virtual void setRemoteAddress(const Address::InstanceConstSharedPtr& remote_address) PURE; // 原始目的地址是否被恢复 virtual bool localAddressRestored() const PURE; // 设置传输协议,例如RAW_BUFFER, TLS virtual void setDetectedTransportProtocol(absl::string_view protocol) PURE; // 返回传输协议 virtual absl::string_view detectedTransportProtocol() const PURE; // 设置请求的应用协议,例如ALPN in TLS virtual void setRequestedApplicationProtocols(const std::vector<absl::string_view>& protocol) PURE; // 返回请求的应用协议 virtual const std::vector<std::string>& requestedApplicationProtocols() const PURE; // 设置请求的服务器名称 virtual void setRequestedServerName(absl::string_view server_name) PURE; // 返回请求的服务器名称 virtual absl::string_view requestedServerName() const PURE; }; |
传输套接字,负责实际的读写,也进行某些数据转换(例如TLS):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
class TransportSocket { public: // 连接对象调用此方法依次,初始化传输套接字的回调 virtual void setTransportSocketCallbacks(TransportSocketCallbacks& callbacks) PURE; // 由网络级协商选择的协议 virtual std::string protocol() const PURE; // 套接字是否已经被flush和close virtual bool canFlushClose() PURE; // 关闭传输套接字 virtual void closeSocket(Network::ConnectionEvent event) PURE; // 读取到缓冲 virtual IoResult doRead(Buffer::Instance& buffer) PURE; /** * 将缓冲写入底层套接字 * @param buffer 缓冲 * @param end_stream 提示是否是流的终点,如果true则缓冲中所有数据都被写出去,连接变成半关闭 */ virtual IoResult doWrite(Buffer::Instance& buffer, bool end_stream) PURE; // 底层传输建立后回调 virtual void onConnected() PURE; // 如果当前是SSL连接,则返回Ssl::Connection,否则返回nullptr virtual const Ssl::Connection* ssl() const PURE; }; |
传输套接字使用此回调集,和Connection通信:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
class TransportSocketCallbacks { public: // 返回关联到连接的IO句柄,从此局部可以得到连接套接字的FD virtual IoHandle& ioHandle() PURE; virtual const IoHandle& ioHandle() const PURE; // 返回关联的连接 virtual Network::Connection& connection() PURE; // 是否读缓冲应该被排干(drain,也就是调用过滤器链进行处理),用于强制配置的读缓冲大小限制 virtual bool shouldDrainReadBuffer() PURE; // 将读缓冲标记为可(被事件循环)读 virtual void setReadBufferReady() PURE; // 发起(Raise)一个连接事件到Connection对象,TLS使用此方法告知握手完成 virtual void raiseEvent(ConnectionEvent event) PURE; }; |
该接口表示原始的连接,它实现了FilterManager接口:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 |
class Connection : public Event::DeferredDeletable, public FilterManager { public: // 状态枚举 enum class State { Open, Closing, Closed }; // 连接发送字节后的回调 typedef std::function<void(uint64_t bytes_sent)> BytesSentCb; // 注册当此连接上发生事件后执行的回调 virtual void addConnectionCallbacks(ConnectionCallbacks& cb) PURE; // 注册每当bytes被写入底层TransportSocket后执行的回调 virtual void addBytesSentCallback(BytesSentCb cb) PURE; // 为此连接启用半关闭语义,从一个已经被对端半关闭的连接上进行读操作,不会关闭连接 virtual void enableHalfClose(bool enabled) PURE; // 关闭连接 virtual void close(ConnectionCloseType type) PURE; // 返回分发器 virtual Event::Dispatcher& dispatcher() PURE; // 返回唯一性的本地连接ID virtual uint64_t id() const PURE; // 返回网络级协商选择的下一个使用的协议 virtual std::string nextProtocol() const PURE; // 为连接启用/禁用NO_DELAY virtual void noDelay(bool enable) PURE; // 启禁针对此连接的套接字读。当重新启用读时,如果输入缓冲有内容,会通过过滤器链分发 virtual void readDisable(bool disable) PURE; // 当禁用套接字读后,Envoy是否应当检测TCP连接关闭。默认对新连接来说,检测 virtual void detectEarlyCloseWhenReadDisabled(bool should_detect) PURE; // 读操作是否启用 virtual bool readEnabled() const PURE; // 返回远程地址 virtual const Network::Address::InstanceConstSharedPtr& remoteAddress() const PURE; // 返回本地地址,对于客户端连接来说,即原始地址;对于服务器连接来说 // 是本地的目的地址 // 对于服务器连接来说,此地址可能和代理的监听地址不一样,因为下游连接可能被重定向,或者代理在透明模式下运行 virtual const Network::Address::InstanceConstSharedPtr& localAddress() const PURE; // 更新连接状态,出于性能的考虑,最终一致 virtual void setConnectionStats(const ConnectionStats& stats) PURE; // 如果该连接是SSL,则返回SSL连接数据;否则返回nullptr virtual const Ssl::Connection* ssl() const PURE; // 返回服务器名称,对于TLS来说即SNI virtual absl::string_view requestedServerName() const PURE; // 返回连接状态 virtual State state() const PURE; /** * 写入数据到连接,数据将经过过滤器链 * @param data 需要写入的数据 * @param end_stream 如果为true,则提示此为最后一次写操作,导致连接半关闭。必须enableHalfClose(true)才能传入true */ virtual void write(Buffer::Instance& data, bool end_stream) PURE; // 设置该连接的缓冲区的软限制 // 对于读缓冲,限制处理流水线在flush到下一个stage前能缓冲的最大字节数 // 对于写缓冲,设置水位。如果缓冲了足够的数据,触发onAboveWriteBufferHighWatermark调用 virtual void setBufferLimits(uint32_t limit) PURE; // 获得软限制 virtual uint32_t bufferLimit() const PURE; // 本地地址是否被还原为原始目的地址 virtual bool localAddressRestored() const PURE; // 连接当前是否高于高水位 virtual bool aboveHighWatermark() const PURE; // 获取此连接的套接字选项 virtual const ConnectionSocket::OptionsSharedPtr& socketOptions() const PURE; // 获取关联到此连接的StreamInfo对象。StreamInfo典型用于日志目的 // 每个过滤器都可以通过StreamInfo.FilterState来添加特定的信息 // 在此上下文中每个连接对应一个StreamInfo。而对于HTTP连接管理器,每个请求对应一个StreamInfo virtual StreamInfo::StreamInfo& streamInfo() PURE; virtual const StreamInfo::StreamInfo& streamInfo() const PURE; // 设置延迟连接关闭的超时 virtual void setDelayedCloseTimeout(std::chrono::milliseconds timeout) PURE; virtual std::chrono::milliseconds delayedCloseTimeout() const PURE; } |
L4连接上发生的事件的回调:
1 2 3 4 5 6 7 8 9 10 11 12 13 |
class ConnectionCallbacks { public: virtual ~ConnectionCallbacks() {} // ConnectionEvent的回调 virtual void onEvent(ConnectionEvent event) PURE; // 当连接的写缓冲超过高水位后调用 virtual void onAboveWriteBufferHighWatermark() PURE; // 当连接的写缓冲,从超过高水位变为低于低水位后调用 virtual void onBelowWriteBufferLowWatermark() PURE; }; |
表示某个连接处理器所(通过ActiveListener)拥有的活动的连接。ActiveConnection引用一个Connection、一个ActiveListener。
表示可以拥有多个流(Stream)的HTTP客户端/服务器连接:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
class Connection { public: // 分发入站的请求数据 virtual void dispatch(Buffer::Instance& data) PURE; // 给对端提示以go away,从此时开始,不能创建新的流 virtual void goAway() PURE; // 返回连接的协议 virtual Protocol protocol() PURE; // 给对端提示以shutdown notice,对端不应该在发送任何新的流,但是对于已经达到的流u,不会被重置 virtual void shutdownNotice() PURE; // HTTP编解码器是否有数据需要写入,但是由于协议的原因(例如窗口更新),无法完成 virtual bool wantsToWrite() PURE; // 当底层的Network::Connection超过高水位后,调用此方法 virtual void onUnderlyingConnectionAboveWriteBufferHighWatermark() PURE; // 当底层的Network::Connection超过高水位后,然后由低于低水位后调用此方法 virtual void onUnderlyingConnectionBelowWriteBufferLowWatermark() PURE; }; |
HTTP连接级别的回调:
1 2 3 4 5 |
class ConnectionCallbacks { public: // 对端提示go away时触发此回调,不允许创建新流 virtual void onGoAway() PURE; }; |
服务器端连接:
1 |
class ServerConnection : public virtual Connection {}; |
HTTP连接管理器ConnectionManagerImpl.codec字段是ServerConnection类型,后者承担读取到的请求数据的分发(Dispatch,给HTTP解析器)职责。
继承ConnectionCallbacks回调,并添加方法:
1 2 3 4 5 6 7 8 9 10 |
class ServerConnectionCallbacks : public virtual ConnectionCallbacks { public: /** * 当对端初始化一个新的请求流后触发此回调 * @param response_encoder 提供用于创建响应的编码器,请求、响应由同一流对象管理 * @param is_internally_created 提示此流是流客户端创建,还是由Envoy自己创建(例如内部重定向) */ virtual StreamDecoder& newStream(StreamEncoder& response_encoder, bool is_internally_created = false) PURE; }; |
HTTP流解码器,可以解码下游发来的请求:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
class StreamDecoder { public: virtual ~StreamDecoder() {} // 处理解码后的100-Continue头的map virtual void decode100ContinueHeaders(HeaderMapPtr&& headers) PURE; // 处理解码后的头 virtual void decodeHeaders(HeaderMapPtr&& headers, bool end_stream) PURE; // 处理解码后的数据帧 virtual void decodeData(Buffer::Instance& data, bool end_stream) PURE; // 处理解码后的尾帧 virtual void decodeTrailers(HeaderMapPtr&& trailers) PURE; // 处理解码后的元数据 virtual void decodeMetadata(MetadataMapPtr&& metadata_map) PURE; }; |
这里这里的decode有歧义:
- decoded,表示经由http_parser解析,结构化为C++对象 —— HTTP语境
- decode,调用Envoy的流解码器处理那些C++对象 —— Envoy语境
HTTP流编码器,可以编码需要发送给下游的应答,接口类似于StreamDecoder。
针对HTTP流的回调:
1 2 3 4 5 6 7 8 9 10 11 |
class StreamCallbacks { public: // 对端重置了流后调用,参数是重置原因 virtual void onResetStream(StreamResetReason reason) PURE; // 当一个流(HTTP2),或者流发向的连接(HTTP1)超过高水位后调用 virtual void onAboveWriteBufferHighWatermark() PURE; // 当一个流(HTTP2),或者流发向的连接(HTTP1)从超过高水位降到低于低水位后调用 virtual void onBelowWriteBufferLowWatermark() PURE; } |
表示连接上的单个HTTP流,实现了StreamDecoder、StreamCallbacks、FilterChainFactoryCallbacks接口。
HTTP连接管理器提供给过滤器链工厂的回调集,依赖于此回调工厂能够以应用程序特定的方式构建过滤器链:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
class FilterChainFactoryCallbacks { public: // 添加读取流数据时使用的解码器 virtual void addStreamDecoderFilter(Http::StreamDecoderFilterSharedPtr filter) PURE; // 添加写入流数据时使用的编码器 virtual void addStreamEncoderFilter(Http::StreamEncoderFilterSharedPtr filter) PURE; // 添加读写编解码器 virtual void addStreamFilter(Http::StreamFilterSharedPtr filter) PURE; // 添加访问日志处理器,在流被销毁时调用 virtual void addAccessLogHandler(AccessLog::InstanceSharedPtr handler) PURE; } |
流编码/解码过滤器的基类:
1 2 3 4 5 6 7 8 9 10 11 12 13 |
class StreamFilterBase { public: /** * 在过滤器被销毁前调用此方法,这可能发生在 * 1、正常的流(下游+上游)完成后 * 2、发生重置后 * 每个过滤器负责确保在此方法的上下文中,所有异步事件被清理完毕。这些异步事件包括定时器、网络调用等 * * 不在析构函数中进行清理而使用onDestroy钩子的原因和Envoy的延迟删除模型有关。此模型规避了Stack unwind * 有关的复杂性。在onDestroy之后,过滤器不得调用编码/解码过滤器回调 */ virtual void onDestroy() PURE; }; |
流解码过滤器,继承StreamFilterBase:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
class StreamDecoderFilter : public StreamFilterBase { public: // 解码请求头 virtual FilterHeadersStatus decodeHeaders(HeaderMap& headers, bool end_stream) PURE; // 解码数据帧 virtual FilterDataStatus decodeData(Buffer::Instance& data, bool end_stream) PURE; // 解码请求尾 virtual FilterTrailersStatus decodeTrailers(HeaderMap& trailers) PURE; // 设置此解码过滤器的过滤器回调 virtual void setDecoderFilterCallbacks(StreamDecoderFilterCallbacks& callbacks) PURE; } |
传递给所有(读/写)过滤器的回调函数集,用于写响应数据、和底层流交互:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 |
class StreamFilterCallbacks { public: // 获取L4网络连接 virtual const Network::Connection* connection() PURE; // 返回线程本地的Dispatcher,从此分发器来分配定时器 virtual Event::Dispatcher& dispatcher() PURE; // 将底层的流进行重置 virtual void resetStream() PURE; // 返回当前请求使用的路由 // 实现应当能够进行路由缓存,避免反复查找。如果过滤器修改了请求头,则路由可能需要改变,此时应当调用clearRouteCache() // 未来可能会允许过滤器对路由条目进行覆盖 virtual Router::RouteConstSharedPtr route() PURE; // 返回被缓存的路由条目的上游集群信息(clusterInfo)。该方法用于避免在过滤器链中进行反复的查找,同时 // 确保当路由被picked/repicked后能提供clusterInfo的一致性视图 virtual Upstream::ClusterInfoConstSharedPtr clusterInfo() PURE; // 为当前请求清除路由缓存,如果过滤器修改了请求头,并且此修改可能影响选路,则必须调用该方法 virtual void clearRouteCache() PURE; // 返回用于日志目的的流唯一标识 virtual uint64_t streamId() PURE; // 返回用于日志目的的StreamInfo virtual StreamInfo::StreamInfo& streamInfo() PURE; // 返回追踪用的当前追踪上下文 virtual Tracing::Span& activeSpan() PURE; // 返回追踪配置 virtual const Tracing::Config& tracingConfig() PURE; }; |
继承StreamFilterCallbacks,添加专用于解码(读)过滤器的回调:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 |
class StreamDecoderFilterCallbacks : public virtual StreamFilterCallbacks { public: /** * 使用缓冲的头,以及请求体,继续迭代过滤器链。该方法仅仅在以下情况之一才会调用 * 1、先前的过滤器在decodeHeaders()后返回StopIteration * 2、先前的过滤器在decodeData()后返回StopIterationAndBuffer, StopIterationAndWatermark 或 StopIterationNoBuffer * * HTTP连接管理器会分发请求头、缓冲的请求体给过滤器链中的下一个过滤器 * * 如果请求没有完成,当前过滤器仍然会继续接受decodeData()调用,并且必须返回适当的的状态码 * */ virtual void continueDecoding() PURE; // 返回当前过滤器、或者链中先前过滤器缓冲的数据。如果尚未缓冲任何内容,返回nullpt virtual const Buffer::Instance* decodingBuffer() PURE; /** * 添加解码处理后的、缓冲的请求体数据。在某些高级用例中,decodeData()返回StopIterationAndBuffer不能满足 * 需要,需要调用此方法: * * 1) 对于header-only请求需要被转换为包含请求体的请求,可以在 decodeHeaders() 回调中调用此方法,添加请求体 * 后续过滤器会依次接收调用decodeHeaders(..., false)、decodeData(..., true)。在直接迭代、停止后继续迭代 * 场景下,都可以使用 * * * 2) 如果某个过滤器希望在end_stream=true的情况下,在一个数据回调中查看所有缓冲的数据,可以调用该方法,以立即缓冲数据 * 避免同时处理已缓冲数据、以及当前回调产生的数据 * * 3) 如果某个过滤器在调用后续过滤器时,需要添加额外的缓冲请求体数据 * * 4) 如果在decodeTrailers()回调中需要添加额外的数据。可以在前述回调的上下文中调用此方法 * 所有后续过滤器会依次接受decodeData(..., false)、decodeTrailers()调用 * * 在其它场景下调用此方法是错误 * * @param data Buffer::Instance 添加需要被解码的数据 * @param streaming_filter boolean 提示该过滤器是流式处理还是缓冲了完整请求体 */ virtual void addDecodedData(Buffer::Instance& data, bool streaming_filter) PURE; /** * 添加解码后的请求尾。只能在end_stream=true时在decodeData中调用 * 在decodeData中调用时,请求尾映射被初始化为空map并以引用的方式返回 * 该方法最多调用一次 * * @return 返回新的空请求尾map */ virtual HeaderMap& addDecodedTrailers() PURE; /* 基于其它的状态码、请求体,生成一个Envoy本地的响应并发送给下游 * 如果是gRPC请求,则本地响应编码为gRPC响应,HTTP状态码置为200。从参数生成grpc-status、grpc-message * * @param response_code HTTP状态码 * @param body_text HTTP请求体,以text/plain发送或者编码在grpc-message头中 * @param modify_headers 可选的回调函数,用于修改响应头 * @param grpc_status gRPC状态码,覆盖通过httpToGrpcStatus推导出的gRPC状态码 */ virtual void sendLocalReply(Code response_code, absl::string_view body_text, std::function<void(HeaderMap& headers)> modify_headers, const absl::optional<Grpc::Status::GrpcStatus> grpc_status) PURE; /** * 编码100-Continue响应头。该头不在encodeHeaders中处理,因为大部分情况下Envoy用户和过滤器 * 不希望代理100-Continue,而是直接吐出,可以忽略多次编码响应头encodeHeaders()的复杂性 * * @param headers supplies 需要编码的头 */ virtual void encode100ContinueHeaders(HeaderMapPtr&& headers) PURE; /** * 编码响应头。HTTP连接管理器会自动探测一些不发给下游的伪头 * * @param headers 需要编码的头 * @param end_stream 这是不是一个header-only的request/response */ virtual void encodeHeaders(HeaderMapPtr&& headers, bool end_stream) PURE; /** * 编码响应数据 * @param data 需要编码的数据 * @param end_stream 提示这是不是最后一个数据帧 */ virtual void encodeData(Buffer::Instance& data, bool end_stream) PURE; /** * 编码响应尾数据,隐含意味着流的结束 * @param trailers supplies 需要编码的尾 */ virtual void encodeTrailers(HeaderMapPtr&& trailers) PURE; /** * 编码元数据 * * @param metadata_map supplies 需要编码的元数据的unique_ptr */ virtual void encodeMetadata(MetadataMapPtr&& metadata_map) PURE; /** * 当解码过滤器的缓冲,或者过滤器需要发送数据到的那些缓冲,超越高水位后调用 * * 对于路由过滤器这样的HTTP过滤器,会使用多个缓冲(codec、connection...),该方法可能被调用多次 * 这些过滤器应当负责,在对应的缓冲被排干后,以等同次数调用低水位回调connection etc.) */ virtual void onDecoderFilterAboveWriteBufferHighWatermark() PURE; /** * 当解码过滤器的缓冲,或者过滤器需要发送数据到的那些缓冲,超越高水位后,后降低到低于低水位后调用 */ virtual void onDecoderFilterBelowWriteBufferLowWatermark() PURE; /** * 需要订阅下游流、下游连接上的水位事件的过滤器,调用此方法 * 订阅后,对于每个outstanding backed up buffer,过滤器的回调都被调用 */ virtual void addDownstreamWatermarkCallbacks(DownstreamWatermarkCallbacks& callbacks) PURE; /** * 需要停止订阅下游流、下游连接上的水位事件的过滤器,调用此方法 * 在DownstreamWatermarkCallbacks的回调函数的栈下面调用此方法不安全 */ virtual void removeDownstreamWatermarkCallbacks(DownstreamWatermarkCallbacks& callbacks) PURE; /** * 用于改变解码过滤器的缓冲区大小 * * @param limit 新的缓冲大小 */ virtual void setDecoderBufferLimit(uint32_t limit) PURE; // 返回解码过滤器缓冲大小,0表示无限制u virtual uint32_t decoderBufferLimit() PURE; // 将当前流看作是新的,就好像它的所有头都是刚到达一样 // 如果操作成功,会导致创建新的过滤器链,并且上游请求可能和原始的下游流关联 // 如果操作失败,并且下面列出的前置条件不满足,则调用者负责处理和终止原始流 // // 前置条件 // - 流必须已经被完整的读取 // - 流必须没有请求体 // // 注意HTTP消毒器不会针对这种重新创建的流进行操作,它假设消毒已经完成 virtual bool recreateStream() PURE; } |
表示活动的解码过滤器,继承ActiveStreamFilterBase,实现了StreamFilterCallbacks、StreamDecoderFilterCallbacks,也就是说,过滤器和过滤器回调是一体的。
该对象持有一个StreamDecoderFilter对象。ActiveStream通过字段 std::list<ActiveStreamDecoderFilterPtr> decoder_filters_来引用这种对象。
多个地方存在命名为ActiveRequest的结构,表示一个活动的HTTP1请求。
例如,作为Http::Http1::ServerConnectionImpl的私有成员:
1 2 3 4 5 6 7 8 9 10 11 12 |
struct ActiveRequest { // 构造请求对象时,必须传入响应编码器 ActiveRequest(ConnectionImpl& connection) : response_encoder_(connection) {} HeaderString request_url_; // 请求解码器 StreamDecoder* request_decoder_{}; // 响应编码器 ResponseStreamEncoderImpl response_encoder_; // 请求的处理是否已经完毕 bool remote_complete_{}; }; |
作为Http::CodecClient的私有成员:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
struct ActiveRequest : LinkedObject<ActiveRequest>, public Event::DeferredDeletable, public StreamCallbacks, public StreamDecoderWrapper { ActiveRequest(CodecClient& parent, StreamDecoder& inner) : StreamDecoderWrapper(inner), parent_(parent) {} // 流回调函数 void onResetStream(StreamResetReason reason) override { parent_.onReset(*this, reason); } void onAboveWriteBufferHighWatermark() override {} void onBelowWriteBufferLowWatermark() override {} void onPreDecodeComplete() override { parent_.responseDecodeComplete(*this); } void onDecodeComplete() override {} // 编码器 StreamEncoder* encoder_{}; CodecClient& parent_; } |
Envoy代理的需要创建哪些监听器,由Bootstrap配置 + xDS响应共同决定,本文不讨论细节,可以参考Istio Pilot与Envoy的交互机制解读。
如果某个监听器配置了bind_to_port(默认情况下virtual 15001配置了),则会调用libevent的API,注册套接字监听事件:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
ListenerImpl::ListenerImpl(Event::DispatcherImpl& dispatcher, Socket& socket, ListenerCallbacks& cb, bool bind_to_port, bool hand_off_restored_destination_connections) : BaseListenerImpl(dispatcher, socket), cb_(cb), hand_off_restored_destination_connections_(hand_off_restored_destination_connections), listener_(nullptr) { if (bind_to_port) { // 注册监听 setupServerSocket(dispatcher, socket); } } void ListenerImpl::setupServerSocket(Event::DispatcherImpl& dispatcher, Socket& socket) { // 重置 CSmartPtr<evconnlistener, evconnlistener_free> ListenerPtr引用为新的evconnlistener listener_.reset( // libevent的base 当前对象方法 套接字的文件描述符 evconnlistener_new(&dispatcher.base(), listenCallback, this, 0, -1, socket.ioHandle().fd())); if (!listener_) { throw CreateListenerException( fmt::format("cannot listen on socket: {}", socket.localAddress()->asString())); } if (!Network::Socket::applyOptions(socket.options(), socket, envoy::api::v2::core::SocketOption::STATE_LISTENING)) { throw CreateListenerException(fmt::format("cannot set post-listen socket option on socket: {}", socket.localAddress()->asString())); } evconnlistener_set_error_cb(listener_.get(), errorCallback); } |
Envoy默认会在15001端口上监听,当流量到达(通常是通过其它端口重定向到15001)时,Envoy的DispatcherImpl循环得到通知,并通过libevent回调Envoy::Network::ListenerImpl::listenCallback方法,该方法是一切新连接处理的起点:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
void ListenerImpl::listenCallback(evconnlistener*, evutil_socket_t fd, sockaddr* remote_addr, int remote_addr_len, void* arg) { // 传递的参数是监听器对象 ListenerImpl* listener = static_cast<ListenerImpl*>(arg); // IoSocketHandle为IO操作的抽象接口 IoHandlePtr io_handle = std::make_unique<IoSocketHandle>(fd); // 如果监听器在ANY地址(0.0.0.0)上监听,从新套接字上获取本地地址 const Address::InstanceConstSharedPtr& local_address = // 获取远程地址 const Address::InstanceConstSharedPtr& remote_address = (remote_addr->sa_family == AF_UNIX) ? Address::peerAddressFromFd(io_handle->fd()) : Address::addressFromSockAddr(*reinterpret_cast<const sockaddr_storage*>(remote_addr), remote_addr_len, local_address->ip()->version() == Address::IpVersion::v6); // 调用监听器的onAccept回调 listener->cb_.onAccept( std::make_unique<AcceptedSocketImpl>(std::move(io_handle), local_address, remote_address), listener->hand_off_restored_destination_connections_); } |
回调ConnectionHandlerImpl::ActiveListener::onAccept的逻辑如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
// 此回调在新连接创建后调用 // socket 需要移动给被调用者的套接字对象 // redirected 如果套接字是第一次被其它监听器接受,并且随后被重定向给一个新的监听器时,为true // 接收者监听器不得再次重定向 void ConnectionHandlerImpl::ActiveListener::onAccept( Network::ConnectionSocketPtr&& socket, bool hand_off_restored_destination_connections) { // 代表当前正在处理的套接字对象 auto active_socket = std::make_unique<ActiveSocket>(*this, std::move(socket), hand_off_restored_destination_connections); // 构建出过监听器过滤器链 config_.filterChainFactory().createListenerFilterChain(*active_socket); // 开始迭代过滤器链 active_socket->continueFilterChain(true); |