Istio Mixer与Envoy的交互机制解读
在前些日子的文章Istio Pilot与Envoy的交互机制解读中我们详细研究了Istio Pilot如何基于xDS协议和Envoy代理进行各种配置信息的交换。Istio的另一个核心组件是Mixer,它提供三类功能:
- 遥测报告(Telemetry Reporting),该功能是服务网格可观察性的基础。为服务启用日志记录、监控、追踪、计费流
- 前置条件检查(Precondition Checking),响应服务请求之前进行一系列检查,例如身份验证、白名单检查、ACL检查
- 配额管理(Quota Management),基于特定的维度进行配额,控制对受限资源的争用
本文结合源码分析Mixer的设计、实现细节,同时关注Envoy与它的集成机制。
Mixer的代码位于mixer目录下:
子目录 | 说明 |
adapter | 包含各种适配器的实现,适配器封装了Mixer和外部基础设施后端(例如Prometheus)交互的逻辑 |
cmd |
包含以下可执行文件的入口点: mixc 用于和Mixer服务器实例进行交互的命令行客户端 mixs 在本地启动一个Mixer服务器,或者列出可用的CRD、探测Mixer服务器的状态 |
docker | Docker镜像定义 |
template |
模板,Mixer架构的基础构建块,通过自定义模板可以扩展Mixer 模板定义了将请求属性(Attribute)转换为适配器的输入的Schema(类型信息,使用Protubuf语法描述),每个适配器可以支持任意数量的template 模板决定了适配器会收到的数据、也决定了使用适配器必须创建的instance |
如果使用Istio官方默认的Chart来部署,则会创建istio-telemetry、istio-policy两套Deployment。它们的启动参数没有区别,分别负责Mixer的遥测、策略检查。这两个Deployment分别对应同名的Service,监听9091端口。
实际上网络监听是由Mixs的Sidecar,也就是Envoy负责的。Mixs Pod本身监听的是UDS unix:///sock/mixer.socket,Envoy负责将9091端口的请求转发给此UDS。
在本地调试Mixer服务端时,参考如下启动参数:
1 2 |
mixs server --port 9091 --monitoringPort 9099 --log_output_level api:debug \ --configStoreURL=k8s:///home/alex/.kube/config --configDefaultNamespace=istio-system |
mixs server的入口点位于:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
func main() { // supportedTemplates() map[string]template.Info // supportedAdapters() []adptr.InfoFn // 这两个方法都是自动生成的,包含编译的Mixer支持的模板、适配器的列表 // 模板/适配器信息中包含其属性清单 rootCmd := cmd.GetRootCmd(os.Args[1:], supportedTemplates(), supportedAdapters(), shared.Printf, shared.Fatalf) if err := rootCmd.Execute(); err != nil { os.Exit(-1) } } func serverCmd(info map[string]template.Info, adapters []adapter.InfoFn, printf, fatalf shared.FormatFn) *cobra.Command { // 默认Mixer参数 sa := server.DefaultArgs() // 使用自动生成的模板、适配器信息 sa.Templates = info sa.Adapters = adapters serverCmd := &cobra.Command{ Use: "server", Short: "Starts Mixer as a server", Run: func(cmd *cobra.Command, args []string) { // 调用runServer启动服务 runServer(sa, printf, fatalf) }, } } func runServer(sa *server.Args, printf, fatalf shared.FormatFn) { // 创建服务器对象 s, err := server.New(sa) // 启动gRPC服务 s.Run() // 等待shutdown信号可读 err = s.Wait() // 执行清理工作 _ = s.Close() } |
该函数创建一个全功能的Mixer服务器,并且准备好接收请求:
1 2 3 |
func New(a *Args) (*Server, error) { return newServer(a, newPatchTable()) } |
该方法启动Mixs服务器:
1 2 3 4 5 6 7 8 9 10 11 12 13 |
func (s *Server) Run() { // 准备好关闭通道 s.shutdown = make(chan error, 1) // 设置可用性状态,并通知探针控制器,探针被嵌入到Server s.SetAvailable(nil) go func() { // 启动gRPC服务,传入原始套接字的监听器对象 err := s.server.Serve(s.listener) // 关闭通道 s.shutdown <- err }() } |
该方法很简单,就是在shutdown通道上等待。
该方法关闭Mixs服务器使用的各种资源。
newPatchTable创建一个新的patchTable结构:
1 2 3 4 5 6 7 8 9 10 |
func newPatchTable() *patchTable { return &patchTable{ newRuntime: runtime.New, configTracing: tracing.Configure, startMonitor: startMonitor, listen: net.Listen, configLog: log.Configure, runtimeListen: func(rt *runtime.Runtime) error { return rt.StartListening() }, } } |
此结构就是几个函数的集合:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
type patchTable struct { // 此函数创建一个Runtime,Runtime是Mixer运行时环境的主要入口点 // 它监听配置、实例化Handler、创建分发机制(dispatch machinery)、处理请求 newRuntime func(s store.Store, templates map[string]*template.Info, adapters map[string]*adapter.Info, defaultConfigNamespace string, executorPool *pool.GoroutinePool, handlerPool *pool.GoroutinePool, enableTracing bool) *runtime.Runtime // 配置追踪系统,通常在启动时调用一次,此调用返回后,追踪系统可以接受数据 configTracing func(serviceName string, options *tracing.Options) (io.Closer, error) // 暴露Mixer自我监控信息的HTTP服务 startMonitor func(port uint16, enableProfiling bool, lf listenFunc) (*monitor, error) // 监听本地端口并返回一个监听器 listen listenFunc // 配置Istio的日志子系统 configLog func(options *log.Options) error // 让Runtime开始监听配置变更,每当配置变更,Runtime处理新配置并创建Dispatcher runtimeListen func(runtime *runtime.Runtime) error } |
此方法创建一个新的Mixs服务器,服务器由下面的结构表示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
type Server struct { // 关闭通道 shutdown chan error // 服务API请求的gRPC服务器 server *grpc.Server // API线程池 gp *pool.GoroutinePool // 适配器线程池 adapterGP *pool.GoroutinePool // API网络监听器 listener net.Listener // 监控服务器,此结构包含两个字段,一个是http.Server,一个是关闭通道 monitor *monitor // 用于关闭追踪子系统 tracer io.Closer // 可伸缩的策略检查缓存 checkCache *checkcache.Cache // 将入站API调用分发给配置好的适配器 dispatcher dispatcher.Dispatcher livenessProbe probe.Controller readinessProbe probe.Controller // 管理探针控制器所需要的可用性状态,内嵌 *probe.Probe } |
该方法的逻辑如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 |
func newServer(a *Args, p *patchTable) (*Server, error) { // 校验Mixs启动参数 if err := a.validate(); err != nil { return nil, err } // 配置日志子系统 if err := p.configLog(a.LoggingOptions); err != nil { return nil, err } apiPoolSize := a.APIWorkerPoolSize adapterPoolSize := a.AdapterWorkerPoolSize s := &Server{} // 创建线程池 // API 线程池 s.gp = pool.NewGoroutinePool(apiPoolSize, a.SingleThreaded) s.gp.AddWorkers(apiPoolSize) // 适配器线程池 s.adapterGP = pool.NewGoroutinePool(adapterPoolSize, a.SingleThreaded) s.adapterGP.AddWorkers(adapterPoolSize) tmplRepo := template.NewRepository(a.Templates) // 从适配器名称到adapter.Info的映射 adapterMap := config.AdapterInfoMap(a.Adapters, tmplRepo.SupportsTemplate) // 状态探针 s.Probe = probe.NewProbe() // gRPC选项 var grpcOptions []grpc.ServerOption grpcOptions = append(grpcOptions, grpc.MaxConcurrentStreams(uint32(a.MaxConcurrentStreams))) grpcOptions = append(grpcOptions, grpc.MaxMsgSize(int(a.MaxMessageSize))) // 一元(请求/应答模式)gRPC请求的服务器端拦截器 var interceptors []grpc.UnaryServerInterceptor var err error // 如果启用了追踪(tracing.option提供了ZipkinURL、JaegerURL或LogTraceSpans=true) if a.TracingOptions.TracingEnabled() { s.tracer, err = p.configTracing("istio-mixer", a.TracingOptions) if err != nil { _ = s.Close() return nil, fmt.Errorf("unable to setup tracing") } // 则添加基于OpenTracing的追踪拦截器 interceptors = append(interceptors, otgrpc.OpenTracingServerInterceptor(ot.GlobalTracer())) } // OpenTracing、Prometheus监控拦截器,都来自项目https://github.com/grpc-ecosystem // 将Prometheus拦截器添加到末尾 interceptors = append(interceptors, grpc_prometheus.UnaryServerInterceptor) // 启用Prometheus时间直方图记录,RPC调用的耗时会被记录。Prometheus持有、查询Histogram指标的成本比较高 // 生成的指标都是面向gRPC协议的、通用的,不牵涉Istio的逻辑。指标名以grpc_开头 grpc_prometheus.EnableHandlingTimeHistogram() // 将所有拦截器串连为单个拦截器,并添加到gRPC选项 grpcOptions = append(grpcOptions, grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(interceptors...))) network := "tcp" address := fmt.Sprintf(":%d", a.APIPort) if a.APIAddress != "" { idx := strings.Index(a.APIAddress, "://") if idx < 0 { address = a.APIAddress } else { network = a.APIAddress[:idx] address = a.APIAddress[idx+3:] } } if network == "unix" { // 如果监听UDS,则移除先前的文件 if err = os.Remove(address); err != nil && !os.IsNotExist(err) { // 除了文件未找到以外的错误,都不允许 return nil, fmt.Errorf("unable to remove unix://%s: %v", address, err) } } // 调用net.Listen监听 if s.listener, err = p.listen(network, address); err != nil { _ = s.Close() return nil, fmt.Errorf("unable to listen: %v", err) } // ConfigStore用于测试目的,通常都会使用ConfigStoreURL(例如k8s:///home/alex/.kube/config) st := a.ConfigStore if st != nil && a.ConfigStoreURL != "" { _ = s.Close() return nil, fmt.Errorf("invalid arguments: both ConfigStore and ConfigStoreURL are specified") } if st == nil { configStoreURL := a.ConfigStoreURL if configStoreURL == "" { configStoreURL = "k8s://" } // Registry存储URL scheme与后端实现之间的对应关系 reg := store.NewRegistry(config.StoreInventory()...) groupVersion := &schema.GroupVersion{Group: crd.ConfigAPIGroup, Version: crd.ConfigAPIVersion} // 创建一个Store实例,它持有Backend,Backend代表一个无类型的Mixer存储后端 —— 例如K8S // 默认情况下,configStoreURL的Scheme为k8s,Istio会调用config/crd.NewStore // 传入configStoreURL、GroupVersion、criticalKinds 来创建Backend if st, err = reg.NewStore(configStoreURL, groupVersion, rc.CriticalKinds()); err != nil { _ = s.Close() return nil, fmt.Errorf("unable to connect to the configuration server: %v", err) } } var rt *runtime.Runtime // 所有模板,目标决定了各分类的适配器(例如所有metric类适配器)在运行时需要处理的数据类型 templateMap := make(map[string]*template.Info, len(a.Templates)) for k, v := range a.Templates { t := v templateMap[k] = &t } // 创建运行时,传入存储、模板、适配器、线程池、是否启用追踪等信息 rt = p.newRuntime(st, templateMap, adapterMap, a.ConfigDefaultNamespace, s.gp, s.adapterGP, a.TracingOptions.TracingEnabled()) // 监听配置存储的变更,初始化配置 if err = p.runtimeListen(rt); err != nil { _ = s.Close() return nil, fmt.Errorf("unable to listen: %v", err) } // 等待配置存储同步完成 log.Info("Awaiting for config store sync...") if err := st.WaitForSynced(30 * time.Second); err != nil { return nil, err } // 设置分发器,分发器负责将API请求分发给配置好的适配器处理 s.dispatcher = rt.Dispatcher() // 如果启用了策略检查缓存,则创建LRU缓存对象 if a.NumCheckCacheEntries > 0 { s.checkCache = checkcache.New(a.NumCheckCacheEntries) } // 此全局变量决定是否利用包golang.org/x/net/trace进行gRPC调用追踪 grpc.EnableTracing = a.EnableGRPCTracing // 节流阀,限制调用频度 throttler := loadshedding.NewThrottler(a.LoadSheddingOptions) // Evaluator方法根据名称返回配置好的LoadEvaluator // LoadEvaluator能够评估请求是否超过阈值 if eval := throttler.Evaluator(loadshedding.GRPCLatencyEvaluatorName); eval != nil { grpcOptions = append(grpcOptions, grpc.StatsHandler(eval.(*loadshedding.GRPCLatencyEvaluator))) } // 创建gRPC服务器 s.server = grpc.NewServer(grpcOptions...) // 注册服务到gRPC服务器 // 注册时需要提供grpc.ServiceDesc,其中包含服务名、方法集合(方法名到处理函数的映射 // api.NewGRPCServer返回 mixerpb.MixerServer 接口,它仅仅包含Check / Report两个方法 mixerpb.RegisterMixerServer(s.server, api.NewGRPCServer(s.dispatcher, s.gp, s.checkCache, throttler)) // 探针 if a.LivenessProbeOptions.IsValid() { s.livenessProbe = probe.NewFileController(a.LivenessProbeOptions) s.RegisterProbe(s.livenessProbe, "server") s.livenessProbe.Start() } if a.ReadinessProbeOptions.IsValid() { s.readinessProbe = probe.NewFileController(a.ReadinessProbeOptions) rt.RegisterProbe(s.readinessProbe, "dispatcher") st.RegisterProbe(s.readinessProbe, "store") s.readinessProbe.Start() } // 启动监控服务 if s.monitor, err = p.startMonitor(a.MonitoringPort, a.EnableProfiling, p.listen); err != nil { _ = s.Close() return nil, fmt.Errorf("unable to setup monitoring: %v", err) } // 启动ControlZ监听器 go ctrlz.Run(a.IntrospectionOptions, nil) return s, nil } |
patchTable的newRuntime函数会调用runtime.New,创建一个新的Mixer运行时 —— Mixer运行时环境的主要入口点,负责监听配置、实例化Handler、创建分发机制(dispatch machinery)、处理请求:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 |
func New( s store.Store, templates map[string]*template.Info, adapters map[string]*adapter.Info, defaultConfigNamespace string, executorPool *pool.GoroutinePool, handlerPool *pool.GoroutinePool, enableTracing bool) *Runtime { // Ephemeral表示一个短暂的配置状态,它可以被入站配置变更事件所更新 // Ephemeral本身包含的数据没有价值,你必须调用它的BuildSnapshot方法来创建稳定的、完全解析的配置的快照 e := config.NewEphemeral(templates, adapters) rt := &Runtime{ // 默认配置命名空间 defaultConfigNamespace: defaultConfigNamespace, // 短暂配置状态 ephemeral: e, // 配置快照 snapshot: config.Empty(), // 适配器处理器列表 handlers: handler.Empty(), // API请求分发器,需要协程池 dispatcher: dispatcher.New(executorPool, enableTracing), // 适配器处理器的协程池 handlerPool: handlerPool, Probe: probe.NewProbe(), store: s, } // 从ephemeral构建出新c.snapshot、新c.handlers、新路由表(用于解析入站请求并将其路由给适当的处理器) // 然后替换路由表,最后清理上一次配置对应的处理器 rt.processNewConfig() // 设置探针结果为:尚未监听存储 rt.Probe.SetAvailable(errNotListening) return rt } |
创建Runtime之后,p.runtimeListen被调用。此函数会调用Runtime.StartListening方法来监听配置的变更,同样会立即触发processNewConfig调用。之后,processNewConfig调用会通过store.WatchChanges的回调反复发生。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
func (c *Runtime) StartListening() error { // Runtime的状态锁 c.stateLock.Lock() defer c.stateLock.Unlock() if c.shutdown != nil { return errors.New("already listening") } // 生成adapter、template等对象类型到它的proto消息的映射(合并到一个映射中) // adapter.Info.DefaultConfig、template.Info.CtrCfg,以及 // &configpb.Rule{}、&configpb.AttributeManifest{}、&v1beta1.Info{} ... // 都实现了proto.Message接口 kinds := config.KindMap(c.snapshot.Adapters, c.snapshot.Templates) // 开始监控存储,返回当前资源集(key到spec的映射)、监控用的通道 data, watchChan, err := store.StartWatch(c.store, kinds) if err != nil { return err } // 设置并覆盖相同的临时状态,其实就是把ephemeral.entries = data c.ephemeral.SetState(data) // 处理新配置 c.processNewConfig() // 初始化运行时的关闭通道 c.shutdown = make(chan struct{}) // 增加一个计数 c.waitQuiesceListening.Add(1) go func() { // 只有shutdown通道关闭,此监控配置存储变化的循环才会退出 // 当有新的配置变更被发现后,调用onConfigChange,此方法会导致processNewConfig store.WatchChanges(watchChan, c.shutdown, watchFlushDuration, c.onConfigChange) // shutdown通道关闭后, c.waitQuiesceListening.Done() }() // 重置可用性状态,此等待组不再阻塞,StopListening方法可以顺利返回 c.Probe.SetAvailable(nil) return nil } |
当配置存储有变化后,Runtime的该方法会被调用,它的逻辑很简单:
1 2 3 4 5 6 |
func (c *Runtime) onConfigChange(events []*store.Event) { // 更新或者擅长ephemeral.entries中的条目 c.ephemeral.ApplyEvent(events) // 对最新的配置进行处理 c.processNewConfig()default } |
Runtime的processNewConfig方法负责处理从配置存储(K8S)中拉取的最新CR,然后创建配置快照、创建处理器表、路由表,并改变Dispatcher的路由:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
func (c *Runtime) processNewConfig() { // 构建一个稳定的、完全解析的配置的快照 newSnapshot, _ := c.ephemeral.BuildSnapshot() // 当前运行时使用的处理器 oldHandlers := c.handlers // 创建新的处理器表 newHandlers := handler.NewTable(oldHandlers, newSnapshot, c.handlerPool) // 返回ExpressionBuilder,用于创建一系列预编译表达式 builder := compiled.NewBuilder(newSnapshot.Attributes) // 构建并返回路由表,路由表决定了什么条件下调用什么适配器 newRoutes := routing.BuildTable( newHandlers, newSnapshot, builder, c.defaultConfigNamespace, log.DebugEnabled()) // 改变分发器的路由,分发器负责基于路由表来调用适配器 oldContext := c.dispatcher.ChangeRoute(newRoutes) // 修改实例变量 c.handlers = newHandlers c.snapshot = newSnapshot log.Debugf("New routes in effect:\n%s", newRoutes) // 关闭旧的处理器,注意处理器实现了io.Closer接口,这个接口由Istio自己负责,和适配器开发无关 cleanupHandlers(oldContext, oldHandlers, newHandlers, maxCleanupDuration) } |
该方法生成一个完全解析的(没有任何外部依赖)的配置快照。快照主要包含静态、动态模板/适配器信息、以及规则信息:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 |
func (e *Ephemeral) BuildSnapshot() (*Snapshot, error) { errs := &multierror.Error{} // 下一个快照的ID id := e.nextID e.nextID++ log.Debugf("Building new config.Snapshot: id='%d'", id) // 一组和istio本身状态监控有关的Prometheus计数器 counters := newCounters(id) e.lock.RLock() // 处理属性清单,获得属性列表。清单来源有三个地方: // 1、配置存储中attributemanifest类型的CR。第一次调用该方法时,尚未加载这些CR // 2、自动生成的template.Info.AttributeManifests // 注意清单中每个属性,都具有全网格唯一的名称 attributes := e.processAttributeManifests(counters, errs) // 处理静态适配器的处理器配置 —— 各种适配器的CR/实例,获得处理器(HandlerStatic)列表 // 对于从配置存储加载的资源,如果在自动生成的adapter.Info中找到对应条目,则认为是合法的处理器 // 对于每个处理器,会创建HandlerStatic结构,此结构表示基于Compiled-in的适配器的处理器 shandlers := e.processStaticAdapterHandlerConfigs(counters, errs) // 返回属性描述符查找器(AttributeDescriptorFinder) af := ast.NewFinder(attributes) // 处理静态模板的实例配置 —— 各种模板的CR,获得实例(InstanceStatic)列表 // 对于从配置存储加载的资源,如果在自动生成的template.Info中找到对应条目,则认为是合法的实例 // 对于每个实例,会创建InstanceStatic结构,此结构表示基于Compiled-in的模板的Instance instances := e.processInstanceConfigs(af, counters, errs) // 开始处理动态资源,所谓动态资源,是指没有特定CRD的模板(也就没有对应CR的实例) // 以及没有特定CRD的适配器(也就没有对应CR的处理器) // 动态模板注册为template类型的CR dTemplates := e.processDynamicTemplateConfigs(counters, errs) // 动态适配器注册为adapter类型的CR dAdapters := e.processDynamicAdapterConfigs(dTemplates, counters, errs) // 动态处理器注册为handler类型的CR,它必须引用某个adapter的名称 dhandlers := e.processDynamicHandlerConfigs(dAdapters, counters, errs) // 动态处理器注册为instance类型的CR,它必须引用某个template的名称 dInstances := e.processDynamicInstanceConfigs(dTemplates, af, counters, errs) // 处理规则,规则可以引用上述的静态和动态资源 rules := e.processRuleConfigs(shandlers, instances, dhandlers, dInstances, af, counters, errs) // 构建配置快照 s := &Snapshot{ ID: id, Templates: e.templates, Adapters: e.adapters, TemplateMetadatas: dTemplates, AdapterMetadatas: dAdapters, Attributes: ast.NewFinder(attributes), HandlersStatic: shandlers, InstancesStatic: instances, Rules: rules, HandlersDynamic: dhandlers, InstancesDynamic: dInstances, Counters: counters, } e.lock.RUnlock() return s, errs.ErrorOrNil() } |
适配器的初始化过程,是Mixer服务器初始化的一部分。在Mixer服务器启动过程中有如下逻辑:
1 |
adapterMap := config.AdapterInfoMap(a.Adapters, tmplRepo.SupportsTemplate) |
该方法会生成得到所有适配器的adaptor.Info对象:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
type Info struct { // 适配器的正式名称,必须是RFC 1035兼容的DNS标签 // 此名称会用在Istio配置中,因此应当简短而具有描述性 Name string // 实现此适配器的包,例如 // istio.io/istio/mixer/adapter/denier Impl string // 人类可读的适配器的描述信息 Description string // 该函数指针能够创建一个新的HandlerBuilder,HandlerBuilder能够创建出此适配器的Handler NewBuilder NewBuilderFn // 此适配器声明支持的模板 SupportedTemplates []string // 传递给HandlerBuilder.Build的适配器的默认参数 DefaultConfig proto.Message } |
入参a.Adapters来自supportedAdapters(),此函数是自动生成的。a.Adapters的每个元素的类型是adapter.InfoFn。调用此函数即得到对应的adaptor.Info对象:
1 |
type InfoFn func() Info |
config.AdapterInfoMap的主要逻辑就是调用各种适配器的adapter.InfoFn方法,并且对adaptor.Info进行各种校验。例如检查它的NewBuilder、NewBuilder字段是否为非空,检查它是否和声明支持的模板兼容。
适配器如果需要初始化,那么初始化逻辑就发生在InfoFn中。
本节以Prometheus适配器为例,了解适配器的初始化过程。
1 2 3 4 5 6 7 8 9 10 11 12 13 |
const ( metricsPath = "/metrics" // Istio会暴露三个和Prometheus Exporter端口: // istio-mixer.istio-system:42422,所有由Mixer的Prometheus适配器生成的网格指标 // istio-mixer.istio-system:9093,用于监控Mixer自身的指标 // istio-mixer.istio-system:9102,Envoy生成的原始统计信息,从Statsd转换为Prometheus格式 defaultAddr = ":42422" ) func GetInfo() adapter.Info { ii, _ := GetInfoWithAddr(defaultAddr) return ii } |
GetInfoWithAddr方法的实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
func GetInfoWithAddr(addr string) (adapter.Info, Server) { // HandlerBuilder单例 singletonBuilder := &builder{ // HTTP服务器,这里不会启动监听 srv: newServer(addr), } // 创建注册表singletonBuilder.registry = prometheus.NewPedanticRegistry() // 情况指标信息 singletonBuilder.metrics = make(map[string]*cinfo) singletonBuilder.clearState() // 返回adaptor.Info对象 return adapter.Info{ Name: "prometheus", Impl: "istio.io/istio/mixer/adapter/prometheus", Description: "Publishes prometheus metrics", SupportedTemplates: []string{ metric.TemplateName, }, NewBuilder: func() adapter.HandlerBuilder { return singletonBuilder }, DefaultConfig: &config.Params{}, }, singletonBuilder.srv } |
每当配置变更后,适配器的Handler会被初始化。Runtime.processNewConfig会调用:
1 |
newHandlers := handler.NewTable(oldHandlers, newSnapshot, c.handlerPool) |
创建handler.Table,此表包含了所有实例化的、配置好的适配器的处理器的信息:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
type Table struct { // 表格条目 entries map[string]Entry counters tableCounters } // 单个处理器 type Entry struct { // 处理器的名称 Name string // 处理器对象 Handler adapter.Handler // 适配器名称 AdapterName string // 创建此处理器使用的适配器配置(参数)的签名信息 Signature signature // 传递给处理器的adapter.Env env env } |
每个适配器可以消费多个实例,对于适配器和实例的每个组合,handler.NewTable方法会为其创建Handler:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
// 适配器实例 - 模板实例的映射 // map[*HandlerStatic][]*InstanceStatic instancesByHandler := config.GetInstancesGroupedByHandlers(snapshot) // map[*HandlerDynamic][]*InstanceDynamic instancesByHandlerDynamic := config.GetInstancesGroupedByHandlersDynamic(snapshot) // 表 t := &Table{ entries: make(map[string]Entry, len(instancesByHandler)+len(instancesByHandlerDynamic)), counters: newTableCounters(snapshot.ID), } // 对于每个静态处理器 - 实例组合 for handler, instances := range instancesByHandler { // 为其创建条目,并加入到表中 createEntry(old, t, handler, instances, snapshot.ID, // 这个回调用于用于创建处理器 func(handler hndlr, instances interface{}) (h adapter.Handler, e env, err error) { // 环境信息 e = NewEnv(snapshot.ID, handler.GetName(), gp).(env) // 创建出处理器 h, err = config.BuildHandler(handler.(*config.HandlerStatic), instances.([]*config.InstanceStatic), e, snapshot.Templates) return h, e, err }) } // 对于每个动态处理器 - 实例组合 for handler, instances := range instancesByHandlerDynamic { createEntry(old, t, handler, instances, snapshot.ID, ... } |
config.BuildHandler经过几层转发,最终会调用Prometheus适配器的方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 |
func (b *builder) Build(ctx context.Context, env adapter.Env) (adapter.Handler, error) { cfg := b.cfg var metricErr *multierror.Error // 用于收集指标配置 newMetrics := make([]*config.Params_MetricInfo, 0, len(cfg.Metrics)) // 检查指标是否被重新定义,也就是对应的CR是否被修改 // 如果是,则清空指标注册表、指标映射。重定义会导致Prometheus客户端Panic // 添加、移除则没有问题 var cl *cinfo // 遍历新配置的指标列表 for _, m := range cfg.Metrics { // 当前指标表中没有匹配项,加入 if cl = b.metrics[m.InstanceName]; cl == nil { newMetrics = append(newMetrics, m) continue } // 散列值没有变,和之前的指标配置一样 if cl.sha == computeSha(m, env.Logger()) { continue } // 散列值不匹配,发生了重定义。适配器需要重现加载 env.Logger().Warningf("Metric %s redefined. Reloading adapter.", m.Name) // 重建注册表、清空指标信息 b.clearState() // 将所有指标作为新配置看待 newMetrics = cfg.Metrics break } env.Logger().Debugf("%d new metrics defined", len(newMetrics)) // 遍历处理所有新指标 var err error for _, m := range newMetrics { ns := defaultNS if len(m.Namespace) > 0 { ns = safeName(m.Namespace) } // 指标全名,即CR的名称 mname := m.InstanceName if len(m.Name) != 0 { // 转换为短名 mname = m.Name } // 构建出指标信息cinfo ci := &cinfo{kind: m.Kind, sha: computeSha(m, env.Logger())} ci.sortedLabels = make([]string, len(m.LabelNames)) copy(ci.sortedLabels, m.LabelNames) sort.Strings(ci.sortedLabels) // 根据指标类型的不同,分别处理。逻辑都是注册指标到注册表 switch m.Kind { case config.GAUGE: ci.c, err = registerOrGet(b.registry, newGaugeVec(ns, mname, m.Description, m.LabelNames)) b.metrics[m.InstanceName] = ci case config.COUNTER: ci.c, err = registerOrGet(b.registry, newCounterVec(ns, mname, m.Description, m.LabelNames)) b.metrics[m.InstanceName] = ci case config.DISTRIBUTION: ci.c, err = registerOrGet(b.registry, newHistogramVec(ns, mname, m.Description, m.LabelNames, m.Buckets)) b.metrics[m.InstanceName] = ci default: metricErr = multierror.Append(metricErr, fmt.Errorf("unknown metric kind (%d); could not register metric %v", m.Kind, m)) } } // 启动Exporter的HTTP服务器,如果已经启动则不管 if err := b.srv.Start(env, promhttp.HandlerFor(b.registry, promhttp.HandlerOpts{})); err != nil { return nil, err } // 如果配置了指标过期功能,则定期删除老旧指标 var expiryCache cache.ExpiringCache if cfg.MetricsExpirationPolicy != nil { checkDuration := cfg.MetricsExpirationPolicy.ExpiryCheckIntervalDuration if checkDuration == 0 { checkDuration = cfg.MetricsExpirationPolicy.MetricsExpiryDuration / 2 } expiryCache = cache.NewTTLWithCallback( cfg.MetricsExpirationPolicy.MetricsExpiryDuration, checkDuration, deleteOldMetrics) } return &handler{b.srv, b.metrics, expiryCache}, metricErr.ErrorOrNil() } |
b.srv.Start会启动作为Exporter的HTTP服务器:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 |
func (s *serverInst) Start(env adapter.Env, metricsHandler http.Handler) (err error) { // 加锁保护 s.lock.Lock() defer s.lock.Unlock() // 如果服务器已经启动了,则委托 // just switch the delegate handler. if s.srv != nil { s.refCnt++ s.handler.setDelegate(metricsHandler) return nil } // 否则,创建监听 listener, err := net.Listen("tcp", s.addr) s.port = listener.Addr().(*net.TCPAddr).Port // 配置ServerMux srvMux := http.NewServeMux() s.handler = &metaHandler{delegate: metricsHandler} srvMux.Handle(metricsPath, s.handler) srv := &http.Server{Addr: s.addr, Handler: srvMux} // 在后台运行 env.ScheduleDaemon(func() { // 开始监听 env.Logger().Infof("serving prometheus metrics on %d", s.port) if err := srv.Serve(listener.(*net.TCPListener)); err != nil { if err == http.ErrServerClosed { env.Logger().Infof("HTTP server stopped") } else { _ = env.Logger().Errorf("prometheus HTTP server error: %v", err) } } }) s.srv = srv s.refCnt++ return nil } |
使用Istio官方Chart安装时,其内置的Prometheus服务器会自动采集该HTTP服务器暴露的指标。
在运行期间,Envoy代理会向Mixer服务发起CHECK/REPORT/QUOTA等调用。Mixer会将这些请求转发给匹配的适配器进行处理。
本节以Prometheus适配器为例,说明REPORT请求的处理过程。
以官方Chart部署Istio时,会创建如下Rule:
1 2 3 4 5 6 7 8 9 10 11 12 13 |
apiVersion: config.istio.io/v1alpha2 kind: rule metadata: name: promhttp spec: actions: - handler: handler.prometheus instances: - requestcount.metric - requestduration.metric - requestsize.metric - responsesize.metric match: context.protocol == "http" || context.protocol == "grpc" |
这里我们测试requestcount这个指标,和它相关的Handler、Instance配置片断如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 |
# Handler apiVersion: config.istio.io/v1alpha2 kind: prometheus metadata: name: handler namespace: istio-system spec: metrics: - instance_name: requestcount.metric.istio-system kind: COUNTER label_names: - reporter - source_app - source_principal - source_workload - source_workload_namespace - source_version - destination_app - destination_principal - destination_workload - destination_workload_namespace - destination_version - destination_service - destination_service_name - destination_service_namespace - request_protocol - response_code - connection_security_policy name: requests_total # Instance kind: metric metadata: name: requestcount spec: dimensions: connection_security_policy: conditional((context.reporter.kind | "inbound") == "outbound", "unknown", conditional(connection.mtls | false, "mutual_tls", "none")) destination_app: destination.labels["app"] | "unknown" destination_principal: destination.principal | "unknown" destination_service: destination.service.host | "unknown" destination_service_name: destination.service.name | "unknown" destination_service_namespace: destination.service.namespace | "unknown" destination_version: destination.labels["version"] | "unknown" destination_workload: destination.workload.name | "unknown" destination_workload_namespace: destination.workload.namespace | "unknown" reporter: conditional((context.reporter.kind | "inbound") == "outbound", "source", "destination") request_protocol: api.protocol | context.protocol | "unknown" response_code: response.code | 200 source_app: source.labels["app"] | "unknown" source_principal: source.principal | "unknown" source_version: source.labels["version"] | "unknown" source_workload: source.workload.name | "unknown" source_workload_namespace: source.workload.namespace | "unknown" monitored_resource_type: '"UNSPECIFIED"' value: "1" |
要触发Mixer服务器端的处理逻辑,不需要运行Envoy代理,调用命令行客户端mixc就可以了。
为了匹配上面的promhttp规则,我们需要发送一个属性context.protocol的值为http的REPORT请求:
1 2 3 4 5 6 7 8 9 10 11 12 13 |
mixc report -m localhost:9091 \ -t request.time=2019-03-27T11:00:00.000Z,response.time=2019-03-27T11:00:00.900Z \ -a context.protocol=http,context.reporter.kind=outbound,source.namespace=default \ -a destination.service=kubernetes # 2019-03-27T03:52:05.237085Z info parsed scheme: "" # 2019-03-27T03:52:05.237179Z info scheme "" not registered, fallback to default scheme # 2019-03-27T03:52:05.237532Z info ccResolverWrapper: sending new addresses to cc: [{localhost:9091 0 <nil>}] # 2019-03-27T03:52:05.237592Z info ClientConn switching balancer to "pick_first" # 2019-03-27T03:52:05.237768Z info pickfirstBalancer: HandleSubConnStateChange: 0xc0001940b0, CONNECTING # 2019-03-27T03:52:05.237788Z info blockingPicker: the picked transport is not ready, loop back to repick # 2019-03-27T03:52:05.241228Z info pickfirstBalancer: HandleSubConnStateChange: 0xc0001940b0, READY # Report RPC returned OK |
Mixer处理请求的接口由以下Proto文件定义:
1 2 3 4 5 6 7 |
service Mixer { // 进行先决条件检查,或者进行配额 rpc Check(CheckRequest) returns (CheckResponse) {} // 遥测报告 rpc Report(ReportRequest) returns (ReportResponse) {} } |
通过前面章节的源码分析,我们了解到,在Mixer服务启动时,注册了OpenTracing、Prometheus的gRPC拦截器。因此首先会执行Prometheus拦截器:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
// 自动生成的代码 func _Mixer_Report_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { in := new(ReportRequest) if err := dec(in); err != nil { return nil, err } // 没有拦截器,直接调用MixerServer实现 if interceptor == nil { return srv.(MixerServer).Report(ctx, in) } info := &grpc.UnaryServerInfo{ Server: srv, FullMethod: "/istio.mixer.v1.Mixer/Report", } handler := func(ctx context.Context, req interface{}) (interface{}, error) { return srv.(MixerServer).Report(ctx, req.(*ReportRequest)) } // 实际上是有拦截器的,调用拦截器,通过拦截器再调用MixerServer实现 return interceptor(ctx, in, info, handler) } |
来自go-grpc-prometheus项目的Prometheus拦截器,逻辑如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
func UnaryServerInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { // grpc_server_started_total指标 monitor := newServerReporter(Unary, info.FullMethod) // grpc_server_msg_received_total指标 monitor.ReceivedMessage() // 调用MixerServer实现 resp, err := handler(ctx, req) // grpc_server_handled_total指标 // grpc_server_handling_seconds指标,直方图 monitor.Handled(grpc.Code(err)) if err == nil { // grpc_server_msg_sent_total monitor.SentMessage() } return resp, err } |
MixerServer接口的实现定义在api.grpcServer结构中。Report方法会逐个处理每条消息,并进行:
- 预处理:调用匹配的属性生成处理器
- 处理:调用匹配的主处理器
注意,单次Mixer请求可以携带多条消息,每条消息都对应Envoy代理处理的一个实际请求。
Report方法的实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 |
func (s *grpcServer) Report(ctx context.Context, req *mixerpb.ReportRequest) (*mixerpb.ReportResponse, error) { // 限流逻辑,默认情况下Mixer的限流是关闭的 // req.Attributes的类型是 []v1.CompressedAttributes,每个元素表示报告的一条信息,客户端可以一次报送多条信息 // 但是对于非REPORT请求,每次只能有一条消息 if s.throttler.Throttle(loadshedding.RequestInfo{PredictedCost: float64(len(req.Attributes))}) { return nil, grpc.Errorf(codes.Unavailable, "Server is currently overloaded. Please try again.") } if len(req.Attributes) == 0 { // 没有报告任何东西 return reportResp, nil } // Words表示消息级别的字典 —— 属性名的数组、字符串属性值 for i := 0; i < len(req.Attributes); i++ { if len(req.Attributes[i].Words) == 0 { // req.DefaultWords为所有消息的默认字典。可以让请求中多个消息共享字典,进而减少请求大小 req.Attributes[i].Words = req.DefaultWords } } // bag around the input proto that keeps track of reference attributes // 创建一个ProtoBag —— 基于属性Proto消息,实现Bag接口(用于访问属性集) protoBag := attribute.NewProtoBag(&req.Attributes[0], s.globalDict, s.globalWordList) // 从对象池中取得一个MutableBag,对象池避免了反复的内存分配,然后将其parent设置为protoBag // accumBag(请求包requestBag),跟踪除了第一个以外,所有消息相对于第一个的delta accumBag := attribute.GetMutableBag(protoBag) // reportBag(响应包responseBag),持有预处理之后的输出状态,预处理适配器可能会生成一些新属性,这些新属性以delta的形式存储在此 reportBag := attribute.GetMutableBag(accumBag) // 基于GlobalTracer,启动并返回操作名称(operationName)为Report的Span,使用从ctx中找到的Span作为ChildOfRef // 如果找不到作为parent的Span,则创建一个根Span reportSpan, reportCtx := opentracing.StartSpanFromContext(ctx, "Report") // 从对象池中获得reporter,为其提供路由上下文(report.rc)、报告上下文(r.ctx,其中包含了Trace树的信息) reporter := s.dispatcher.GetReporter(reportCtx) var errors *multierror.Error /* 开始逐个处理消息 */ for i := 0; i < len(req.Attributes); i++ { // 以Report为父Span,依次创建子Span: attribute bag N span, newctx := opentracing.StartSpanFromContext(reportCtx, fmt.Sprintf("attribute bag %d", i)) // 第一个属性块(消息)作为protoBag的基础,计算每个子包的delta if i > 0 { if err := accumBag.UpdateBagFromProto(&req.Attributes[i], s.globalWordList); err != nil { err = fmt.Errorf("request could not be processed due to invalid attributes: %v", err) // 为子Span记录字段,然后结束Span span.LogFields(otlog.String("error", err.Error())) span.Finish() errors = multierror.Append(errors, err) break } } lg.Debug("Dispatching Preprocess") // 预处理,将请求包分发给那些需要提前执行的适配器,例如属性生成适配器 if err := s.dispatcher.Preprocess(newctx, accumBag, reportBag); err != nil { err = fmt.Errorf("preprocessing attributes failed: %v", err) span.LogFields(otlog.String("error", err.Error())) span.Finish() errors = multierror.Append(errors, err) continue } // 主处理,分发给主适配器 lg.Debug("Dispatching to main adapters after running preprocessors") lg.Debuga("Attribute Bag: \n", reportBag) lg.Debugf("Dispatching Report %d out of %d", i+1, len(req.Attributes)) if err := reporter.Report(reportBag); err != nil { span.LogFields(otlog.String("error", err.Error())) span.Finish() errors = multierror.Append(errors, err) continue } span.Finish() // 清空包内容,准备处理下一个请求包使用 reportBag.Reset() } /* 结束逐个处理消息 */ // 重置,并放回对象池 reportBag.Done() accumBag.Done() protoBag.Done() // 刷出,调用reporter.impl.getSession.dispatchBufferedReports(),将之前缓冲的dispatchState全部分发出去 // 然后将会话放回对象池 if err := reporter.Flush(); err != nil { errors = multierror.Append(errors, err) } // 将Reporter对象也放回池中 reporter.Done() // 结束Span if errors != nil { reportSpan.LogFields(otlog.String("error", errors.Error())) } reportSpan.Finish() if errors != nil { lg.Errora("Report failed:", errors.Error()) return nil, grpc.Errorf(codes.Unknown, errors.Error()) } // 返回响应 return reportResp, nil } |
Dispatcher.Preprocess方法负责请求预处理,将请求包分发给那些需要提前执行的适配器,并收集它们产生的属性:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
func (d *Impl) Preprocess(ctx context.Context, bag attribute.Bag, responseBag *attribute.MutableBag) error { // 返回一个session,此结构表示对Dispatcher接口(的实现Impl)的一个调用会话 // 其中包含了处理调用所需的所有可变状态 // getSession从对象池获取一个session对象,然后设置它的 // s.impl,Dispatcher对象 // s.rc,路由上下文对象 // s.ctx 包含Span信息的上下文 // s.variety 需要调用的适配器的种类 // s.bag 请求包 s := d.getSession(ctx, tpb.TEMPLATE_VARIETY_ATTRIBUTE_GENERATOR, bag) // s.responseBag 响应包 s.responseBag = responseBag // 执行分发 err := s.dispatch() if err == nil { err = s.err } // 放回对象池 d.putSession(s) return err } |
session.dispatch方法真正负责请求包的分发工作:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 |
func (s *session) dispatch() error { // 根据报告者类型(从context.reporter.kind获取),默认inbound推断命名空间 // inbound 则命名空间为destination.namespace // outbound 则命名空间为source.namespace namespace, err := getIdentityNamespace(s.bag) if err != nil { // 无法获取命名空间,出错 // 更新直方图(Observe一个值): // mixer_dispatcher_destinations_per_request // mixer_dispatcher_instances_per_request updateRequestCounters(0, 0) log.Warnf("unable to determine identity namespace: '%v', operation='%d'", err, s.variety) return err } // 从路由表获得s.variety类型的、namespace命名空间的目的地列表 // 注意:如果当前命名空间没有匹配的目的地,则使用默认配置存储命名空间(istio-system)中定义的目的地 destinations := s.rc.Routes.GetDestinations(s.variety, namespace) // 要访问的目标服务 destinationService := "" v, ok := s.bag.Get("destination.service") if ok { destinationService = v.(string) } // 创建一个新的Context,携带键值对,以前面的子Span上下文为父,0=adapter.RequestData为键值对 // RequestData定义了关于请求的信息,例如它的目的服务 ctx := adapter.NewContextWithRequestData(s.ctx, &adapter.RequestData{ DestinationService: adapter.Service{ FullName: destinationService, }, }) // 确保能够将请求并行的分发给所有处理器,将s.completed设置为足够大的chan *dispatchState // 每个chan *dispatchState收集单个目的地的处理结果 s.ensureParallelism(destinations.Count()) foundQuota := false // 构建出的实例数量 ninputs := 0 // 匹配的目的地数量 ndestinations := 0 for _, destination := range destinations.Entries() { // dispatchState持有和单个目的地相关的输入/输出状态 var state *dispatchState // 对于REPORT处理器 if s.variety == tpb.TEMPLATE_VARIETY_REPORT { // 生成并缓存分发状态到s.reportStates state = s.reportStates[destination] if state == nil { // 从对象池中获取一个dispatchState并对其赋值,对象池在Mixer中大量使用,减少了内存分配 state = s.impl.getDispatchState(ctx, destination) s.reportStates[destination] = state } } for _, group := range destination.InstanceGroups { // 判断请求包是否和每个实例组匹配 groupMatched := group.Matches(s.bag) if groupMatched { ndestinations++ } // 遍历每个组中的每个实例,调用其构建器。构建器的逻辑取决于你配置的各种模板实例,例如metric的CR for j, input := range group.Builders { if s.variety == tpb.TEMPLATE_VARIETY_QUOTA { // 对于配额适配器,必须要求实例构建器名称和实例名一致 // CRD名称即模板信息名TemplateInfo.Name,例如 logentries // 实例名,即CR名,例如 kubectl -n istio-system get logentries.config.istio.io // 得到的accesslog、tcpaccesslog if !strings.EqualFold(input.InstanceShortName, s.quotaArgs.Quota) { continue } if !groupMatched { // 这是一个条件性的配额,并且当前不匹配条件,直接返回请求的额度 s.quotaResult.Amount = s.quotaArgs.Amount s.quotaResult.ValidDuration = defaultValidDuration } foundQuota = true } if !groupMatched { continue } var instance interface{} // 从请求包构建出实例,Builder方法是自动生成的 if instance, err = input.Builder(s.bag); err != nil { log.Errorf("error creating instance: destination='%v', error='%v'", destination.FriendlyName, err) s.err = multierror.Append(s.err, err) continue } ninputs++ // 对于REPORT模板,在执行分发前,尽可能的将实例累积到分发状态的instances列表中 if s.variety == tpb.TEMPLATE_VARIETY_REPORT { state.instances = append(state.instances, instance) continue } // 对于其它模板类型,直接分发给处理器 state = s.impl.getDispatchState(ctx, destination) state.instances = append(state.instances, instance) if s.variety == tpb.TEMPLATE_VARIETY_ATTRIBUTE_GENERATOR { // 属性生成处理器需要处理Mapper —— 将处理器输出映射入主属性集的函数 state.mapper = group.Mappers[j] state.inputBag = s.bag } // 配额模板相关参数 state.quotaArgs.BestEffort = s.quotaArgs.BestEffort state.quotaArgs.DeduplicationID = s.quotaArgs.DeduplicationID state.quotaArgs.QuotaAmount = s.quotaArgs.Amount // 直接分发 s.dispatchToHandler(state) } } } // Observe mixer_dispatcher_destinations_per_request // Observe mixer_dispatcher_instances_per_request updateRequestCounters(ndestinations, ninputs) // 等待所有处理器处理完毕 s.waitForDispatched() // 如果当前执行的是配额处理器,且没有找到配额,则警告但是允许访问 if s.variety == tpb.TEMPLATE_VARIETY_QUOTA && !foundQuota { s.quotaResult.Amount = s.quotaArgs.Amount s.quotaResult.ValidDuration = defaultValidDuration log.Warnf("Requested quota '%s' is not configured", s.quotaArgs.Quota) } return nil } |
需要注意:
- 对于REPORT模板,仅仅是将生成的Instance存放到dispatchState.instances数组中,不分发。延迟到所有请求消息处理完毕后,由Reporter.Flush统一分发
- 对于CHECK模板,直接调用session.dispatchToHandler进行分发
分发不是直接在当前线程调用适配器,而是排队,由协程池的调度循环异步处理:
1 2 3 4 5 6 |
func (s *session) dispatchToHandler(ds *dispatchState) { s.activeDispatches++ ds.session = s // 调用协程池,调度一个工作 s.impl.gp.ScheduleWork(ds.invokeHandler, nil) } |
dispatchState.invokeHandler方法真正直接调用适配器:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 |
func (ds *dispatchState) invokeHandler(interface{}) { // 顺利处理完毕,没有Panic reachedEnd := false defer func() { if reachedEnd { return } // 从适配器代码导致的Panic中恢复,防止Mixer直接崩了 r := recover() ds.err = fmt.Errorf("panic during handler dispatch: %v", r) log.Errorf("%v\n%s", ds.err, debug.Stack()) if log.DebugEnabled() { log.Debugf("stack dump for handler dispatch panic:\n%s", debug.Stack()) } // 提示此此目的地的分发处理完毕 ds.session.completed <- ds }() // 跟踪 span, ctx, start := ds.beginSpan(ds.ctx) log.Debugf("begin dispatch: destination='%s'", ds.destination.FriendlyName) switch ds.destination.Template.Variety { // 属性生成器 case tpb.TEMPLATE_VARIETY_ATTRIBUTE_GENERATOR: ds.outputBag, ds.err = ds.destination.Template.DispatchGenAttrs( ctx, ds.destination.Handler, ds.instances[0], ds.inputBag, ds.mapper) // 前置条件检查 case tpb.TEMPLATE_VARIETY_CHECK: ds.checkResult, ds.err = ds.destination.Template.DispatchCheck( ctx, ds.destination.Handler, ds.instances[0]) // 遥测/报告 case tpb.TEMPLATE_VARIETY_REPORT: ds.err = ds.destination.Template.DispatchReport( ctx, ds.destination.Handler, ds.instances) // 配额 case tpb.TEMPLATE_VARIETY_QUOTA: ds.quotaResult, ds.err = ds.destination.Template.DispatchQuota( ctx, ds.destination.Handler, ds.instances[0], ds.quotaArgs) // 无法处理的模板类型,Panic default: panic(fmt.Sprintf("unknown variety type: '%v'", ds.destination.Template.Variety)) } log.Debugf("complete dispatch: destination='%s' {err:%v}", ds.destination.FriendlyName, ds.err) // 追踪 ds.completeSpan(span, time.Since(start), ds.err) // 将当前目的地设置为分发处理完毕 ds.session.completed <- ds reachedEnd = true } |
可以看到,上述方法都是把调用委托给目的地的TemplateInfo.Dispatch***函数指针处理的。这些函数指针就是适配器的相应方法。对于Metric模板,Prometheus适配器的方法实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 |
func (h *handler) HandleMetric(_ context.Context, vals []*metric.Instance) error { var result *multierror.Error // 遍历Instance for _, val := range vals { // 获取该Instance对应的handler(例如requestcount.metric.istio-system)的信息(cinfo) ci := h.metrics[val.Name] if ci == nil { result = multierror.Append(result, fmt.Errorf("could not find metric info from adapter config for %s", val.Name)) continue } collector := ci.c switch ci.kind { // 按指标类型分别处理 case config.GAUGE: vec := collector.(*prometheus.GaugeVec) amt, err := promValue(val.Value) if err != nil { result = multierror.Append(result, fmt.Errorf("could not get value for metric %s: %v", val.Name, err)) continue } pl := promLabels(val.Dimensions) if h.labelsCache != nil { h.labelsCache.Set(key(val.Name, "gauge", pl, ci.sortedLabels), &cacheEntry{vec, pl}) } vec.With(pl).Set(amt) case config.COUNTER: // 转换为指标向量,指标向量的每个元素是具有不同标签值的同一类(名字相同)指标 vec := collector.(*prometheus.CounterVec) amt, err := promValue(val.Value) if err != nil { result = multierror.Append(result, fmt.Errorf("could not get value for metric %s: %v", val.Name, err)) continue } pl := promLabels(val.Dimensions) if h.labelsCache != nil { h.labelsCache.Set(key(val.Name, "counter", pl, ci.sortedLabels), &cacheEntry{vec, pl}) } // vec.With(pl)返回具有指定标签集的指标对象,这里是Counter,然后加上一个值(在当前时间点) vec.With(pl).Add(amt) case config.DISTRIBUTION: // DISTRIBUTION映射为Prometheus类型 Histogram vec := collector.(*prometheus.HistogramVec) amt, err := promValue(val.Value) if err != nil { result = multierror.Append(result, fmt.Errorf("could not get value for metric %s: %v", val.Name, err)) continue } pl := promLabels(val.Dimensions) if h.labelsCache != nil { h.labelsCache.Set(key(val.Name, "distribution", pl, ci.sortedLabels), &cacheEntry{vec, pl}) } vec.With(pl).Observe(amt) } } return result.ErrorOrNil() } cinfo struct { // 负责收集指标的接口,gauge counter等都实现了此接口 c prometheus.Collector sha [sha1.Size]byte kind config.Params_MetricInfo_Kind sortedLabels []string } |
将入站的API调用分发给配置的适配器:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
type Dispatcher interface { // 进行预处理,将请求包分发给那些需要提前执行的适配器, // 目前这种适配器主要指属性生成适配器 Preprocess(ctx context.Context, requestBag attribute.Bag, responseBag *attribute.MutableBag) error // 进行CHECK分发,基于CHECK类型模板的Instance,将被转发给感兴趣的适配器 Check(ctx context.Context, requestBag attribute.Bag) (adapter.CheckResult, error) // 获取能够缓冲REPORT请求的报告器 GetReporter(ctx context.Context) Reporter // 进行QUOTA分发 Quota(ctx context.Context, requestBag attribute.Bag, qma QuotaMethodArgs) (adapter.QuotaResult, error) } |
负责产生一系列的报告:
1 2 3 4 5 6 7 8 9 10 |
type Reporter interface { // 添加一个条目(请求包)到报告状态中 Report(requestBag attribute.Bag) error // 刷出所有缓冲的状态到适当的适配器 Flush() error // 完成Reporter的处理过程 Done() } |
目的地,包含一个目标处理器,以及需要(在满足条件的情况下)发送给它的实例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
type Destination struct { // 用于调试的目的地ID id uint32 // 需要调用的处理器 Handler adapter.Handler // 用于监控/日志目的的处理器名称 HandlerName string // 用于监控/日志目的的适配器名称(处理器类型) AdapterName string // 使用的模板,由于某些适配器支持多种模板,这些适配器可能对应多个Destination // 每种模板都有类型,并且定义了支持它的适配器必须实现的接口 Template *TemplateInfo // 实例组,每组实例在满足条件的情况下,会发送给处理器 InstanceGroups []*InstanceGroup // 最大允许的实例数 maxInstances int // 用于监控/日志目的目的地名称 FriendlyName string // 性能计数器 Counters DestinationCounters } |
此结构用于收集单个目的地(适配器+模板组合)的处理状态和结果:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
type dispatchState struct { // 所属的分发调用会话 session *session // 上下文,其中包含了OpenTracing的Span信息 ctx context.Context // 目的地 destination *routing.Destination // 对于属性生成模板,将模板输出映射入主属性列表的函数 mapper template.OutputMapperFn // 输入包 inputBag attribute.Bag // 配额请求的参数 quotaArgs adapter.QuotaArgs // 构建出的,供适配器消费的实例列表 instances []interface{} // 处理过程中的错误信息 err error // 输出包 outputBag *attribute.MutableBag // CHECK调用的结果 checkResult adapter.CheckResult // QUOTA调用的结果 quotaResult adapter.QuotaResult } |
对一个客户端CHECK/REPORT/QUOTA请求的预处理和主处理的过程,是一个会话。此结构存储相关的信息:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
type session struct { // 拥有此会话的Dispatcher impl *Impl // 本次会话使用的路由上下文 rc *RoutingContext // 上下文信息 ctx context.Context // 输入包 bag attribute.Bag // 配额调用的参数 quotaArgs QuotaMethodArgs // 输出包 responseBag *attribute.MutableBag // 报告请求的分发状态 reportStates map[*routing.Destination]*dispatchState // CHECK/QUOTA调用的结果 checkResult adapter.CheckResult quotaResult adapter.QuotaResult err error // 正在执行的分发操作数量 activeDispatches int // 收集已完成的分发 completed chan *dispatchState // 本次操作的模板类别 variety tpb.TemplateVariety } |
和模板有关的信息:
1 2 3 4 5 6 7 8 9 10 11 |
type TemplateInfo struct { // 模板名称 Name string // 模板种类 Variety tpb.TemplateVariety // 各种Mixer调用的函数指针 DispatchReport template.DispatchReportFn DispatchCheck template.DispatchCheckFn DispatchQuota template.DispatchQuotaFn DispatchGenAttrs template.DispatchGenerateAttributesFn } |
按照匹配条件分组的、需要发送给适配器的实例的信息:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
type InstanceGroup struct { // 用于调试的ID id uint32 // 预编译的表达式,何时应用此实例组 Condition compiled.Expression // 用于构建出实例的函数+名称 Builders []NamedBuilder // 映射器函数,用于将属性生成适配器的输出属性,映射入主属性集 Mappers []template.OutputMapperFn } type NamedBuilder struct { InstanceShortName string Builder template.InstanceBuilderFn } OutputMapperFn func(attrs attribute.Bag) (*attribute.MutableBag, error) |
进行配额请求时,需要的参数 + 配额(资源)的类型:
1 2 3 4 5 6 7 8 9 10 11 12 13 |
type QuotaMethodArgs struct { // 在出现RPC调用并重试时,用于额度分配/释放(Quota allocation/allocation)调用的去重复 DeduplicationID string // 分配那种配额 Quota string // 分配的量 Amount int64 // 如果设置为true,则允许响应返回比请求少的额度。如果设置为false,那么额度不足时,直接返回0 BestEffort bool } |
进行配额请求时,需要的参数:
1 2 3 4 5 |
QuotaArgs struct { DeduplicationID string QuotaAmount int64 BestEffort bool } |
由处理器提供的,额度分配的结果:
1 2 3 4 5 6 7 8 |
QuotaResult struct { // RPC调用的状态(状态码、消息、详情) Status rpc.Status // 分配的额度何时过期,0表示永不过期 ValidDuration time.Duration // 分配的额度,可能比请求的额度小 Amount int64 } |
在探索Envoy如何向Mixer发送请求之前,我们先来分析一下Envoy作为网络代理,是如何工作的。
- 通过xDS或者静态配置,获得Envoy代理的监听器信息
- 如果监听器bind_to_port,则直接调用libevent的接口,绑定监听,回调函数设置为ListenerImpl::listenCallback
- DispatcherImpl通过libevent,接收到请求,调用ListenerImpl::listenCallback
- 根据入站时的目的端口,选择适当的监听器处理请求,调用onAccept。存在Iptables重定向的情况下,监听器为15001
- 构建出监听器过滤器链
- 执行过滤器链,对于15001来说,此链只有OriginalDstFilter一个过滤器
- OriginalDstFilter恢复原始目的地址
- 查找和原始目的地址匹配的监听器,并转交请求
- 如果发生请求转交,则接受者监听器也会执行类似于2的逻辑。但是不会再次发生转交
- 实际负责连接的那个监听器,会调用ActiveListener.newConnection,并间接的创建ConnectionImpl
- ConnectionImpl会利用连接套接字(ConnectionSocketPtr)的文件描述符,调用Dispatcher.createFileEvent,注册读写事件的回调
- 到此,连接接受完毕,后续的读写事件由libevent异步触发
- 发生可读、可写、关闭事件时,ConnectionImpl::onFileEvent被调用
- 可写事件的回调onWriteReady先调用
- 可读事件的回调onReadReady后调用
- 尝试循环读取,根据读取结果设置Post操作
- 处理读取到的数据
- 遍历网络过滤器链
- 如果是L7连接,则执行HTTP网络管理器
- 遍历网络过滤器链
- 执行Post IO操作
这个混入类为任意的unique_ptr所持有的对象增加行为,允许方便的将这种对象link/unlink到列表中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
template <class T> class LinkedObject { public: // 对象唯一性指针的列表 typedef std::list<std::unique_ptr<T>> ListType; // 返回列表的迭代器 typename ListType::iterator entry(); // 对象当前是否被插入到列表,只要调用过moveInto***方法就返回true bool inserted(); // 在两个列表之间移动对象 void moveBetweenLists(ListType& list1, ListType& list2); // 移动对象到列表,放在最前面,注意所有权的转移 void moveIntoList(std::unique_ptr<T>&& item, ListType& list); // 移动对象到列表,放在最后面 void moveIntoListBack(std::unique_ptr<T>&& item, ListType& list); // 从列表中移除条目 std::unique_ptr<T> removeFromList(ListType& list); }; |
标记性接口。任何实现此接口的对象,都可以传递给Dispatcher。Dispatcher确保,未来在事件循环中删除对象。
1 2 3 4 |
class DeferredDeletable { public: virtual ~DeferredDeletable() {} }; |
使用此接口,进行事件处理时,不需要担心栈unwind的问题
抽象的连接处理器,总体负责网络连接的处理。ActiveListener、ActiveSocket的_parent都指向此对象。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
class ConnectionHandler { public: // 此处理器持有的活动连接数 virtual uint64_t numConnections() PURE; // 添加一个监听器到此处理器 virtual void addListener(ListenerConfig& config) PURE; // 根据地址查找监听器。返回监听器的指针,所有权不转移 virtual Network::Listener* findListenerByAddress(const Network::Address::Instance& address) PURE; // 移除使用指定tag作为键的监听器。监听器拥有的所有连接也会被移除 virtual void removeListeners(uint64_t listener_tag) PURE; // 停止使用指定tag作为键的监听器。监听器拥有的所有连接不会被关闭,此方法用于draining virtual void stopListeners(uint64_t listener_tag) PURE; // 停止所有监听器 virtual void stopListeners() PURE; // 禁用所有监听器。不会关闭监听器拥有的连接鹅,用于临时暂停接受连接 virtual void disableListeners() PURE; // 启用所有监听器 virtual void enableListeners() PURE; }; |
套接字监听器的抽象接口,是否此对象则停止对套接字的监听:
1 2 3 4 5 6 7 8 |
class Listener { public: // 临时禁止接收新连接 virtual void disable() PURE; // 继续接收新连接 virtual void enable() PURE; }; |
表示某个连接处理器ConnectionHandler所拥有的活动的监听器, ActiveListener引用一个Listener。
1 2 3 4 5 6 7 8 9 |
class ListenerFilter { public: /** * 在新的连接被接受,但是Connection对象尚未创建之前调用 * @param cb 此回调提供一些重要方法 * @return 过滤器管理器根据此返回状态,决定是否继续迭代过滤器链 */ virtual FilterStatus onAccept(ListenerFilterCallbacks& cb) PURE; }; |
通过参数cb,可以continueFilterChain。
此接口用于管理监听器过滤器链:
1 2 3 4 5 |
class ListenerFilterManager { public: // 为指定的监听器添加过滤器 virtual void addAcceptFilter(ListenerFilterPtr&& filter) PURE; }; |
ActiveSocket实现了ListenerFilterManager、ListenerFilterCallbacks。
此接口供监听器过滤器使用,后者通过它和监听器管理器通信:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
class ListenerFilterCallbacks { public: /** * @return ConnectionSocket 过滤器所操作的连接套接字 */ virtual ConnectionSocket& socket() PURE; /** * @return 分发事件的Dispatcher */ virtual Event::Dispatcher& dispatcher() PURE; /** * 继续执行过滤器链 */ virtual void continueFilterChain(bool success) PURE; };TransportSocket |
ActiveSocket实现了ListenerFilterManager、ListenerFilterCallbacks。
此接口供监听器使用:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
class ListenerCallbacks { public: /** * 当新连接被接受后,回调此方法 * @param socket 移动到被调用者的套接字 * @param redirected 提示套接字已经经过重定向 */ virtual void onAccept(ConnectionSocketPtr&& socket, bool hand_off_restored_destination_connections = true) PURE; /** * 当新连接被接受后,回调此方法 * @param new_connection 移动到被调用者的套接字 */ virtual void onNewConnection(ConnectionPtr&& new_connection) PURE; }; |
ActiveListener实现了此接口。
此接口用于管理过滤器链:
1 2 3 4 5 6 7 8 9 |
class FilterChainManager { public: /** * 查找匹配新连接的元数据的过滤器链 * @param socket 提供元数据 * @return const FilterChain* 使用的过滤器链,如果没有匹配返回nullptr */ virtual const FilterChain* findFilterChain(const ConnectionSocket& socket) const PURE; }; |
ListenerImpl实现了此接口。
负责添加网络过滤器给过滤器管理器,也就是Connection:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
class FilterManager { public: virtual ~FilterManager() {} // 添加一个写过滤器,过滤器以FIFO顺序调用 virtual void addWriteFilter(WriteFilterSharedPtr filter) PURE; // 添加读写过滤器,相当于同时调用addWriteFilter/addReadFilter virtual void addFilter(FilterSharedPtr filter) PURE; // 添加一个读过滤器,过滤器以FIFO顺序调用 virtual void addReadFilter(ReadFilterSharedPtr filter) PURE; // 实例化所有安装的读过滤器,相当于针对每个过滤器调用onNewConnection() virtual bool initializeReadFilters() PURE; } |
单个过滤器链的接口:
1 2 3 4 5 6 7 |
class FilterChain { public: // 基于此过滤器链的新连接,使用的TransportSocketFactory,不同链使用的工厂可能不同(传输协议不同,RAW,TLS...) virtual const TransportSocketFactory& transportSocketFactory() const PURE; // 基于此过滤器链的新连接,为了创建所有过滤器需要的工厂的集合 virtual const std::vector<FilterFactoryCb>& networkFilterFactories() const PURE; }; |
1 |
class Filter : public WriteFilter, public ReadFilter {}; |
读处理路径(处理下游发来的数据)上的二进制(4层)过滤器:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
class ReadFilter { public: /** * 当连接上的数据被读取时调用 * @param data 读取到的,可能已经被修改过的数据 * @param end_stream 当连接启用半关闭语义时,用于提示是否到了最后一字节 * @return status 过滤器管理器使用此状态决定如何进一步迭代其它过滤器 */ virtual FilterStatus onData(Buffer::Instance& data, bool end_stream) PURE; /** * 当新连接刚创建时调用,过滤器链的迭代可以被中止 * @return status 过滤器管理器使用此状态决定如何进一步迭代其它过滤器 */ virtual FilterStatus onNewConnection() PURE; /** * 初始化用于和过滤器管理器交互的读过滤器回调,过滤器被注册时,将被过滤器管理器调用一次 * 任何需要用到底层连接的构造,需要在此函数的回调中执行 * * IMPORTANT: 出站、复杂逻辑不要在此,放在onNewConnection() * */ virtual void initializeReadFilterCallbacks(ReadFilterCallbacks& callbacks) PURE; } |
写处理路径(向下游发送数据)上的二进制(4层)过滤器:
1 2 3 4 5 6 7 8 9 |
class WriteFilter { public: /** * 当在此连接上发生数据写入时调用 * @param data 需要写入的,可能已经被修改过的数据 * @param end_stream 当连接启用半关闭语义时,用于提示是否到了最后一字节 */ virtual FilterStatus onWrite(Buffer::Instance& data, bool end_stream) PURE; }; |
连接套接字,表示传递给一个Connection的套接字:
- 对于服务端,该对象表示已经Accept的套接字
- 对于客户端,该对象表示正在连接到远程地址的套接字
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
class ConnectionSocket : public virtual Socket { public: // 返回远程地址 virtual const Address::InstanceConstSharedPtr& remoteAddress() const PURE; // 用于服务器端,恢复原始目的地址 virtual void restoreLocalAddress(const Address::InstanceConstSharedPtr& local_address) PURE; // 设置远程地址 virtual void setRemoteAddress(const Address::InstanceConstSharedPtr& remote_address) PURE; // 原始目的地址是否被恢复 virtual bool localAddressRestored() const PURE; // 设置传输协议,例如RAW_BUFFER, TLS virtual void setDetectedTransportProtocol(absl::string_view protocol) PURE; // 返回传输协议 virtual absl::string_view detectedTransportProtocol() const PURE; // 设置请求的应用协议,例如ALPN in TLS virtual void setRequestedApplicationProtocols(const std::vector<absl::string_view>& protocol) PURE; // 返回请求的应用协议 virtual const std::vector<std::string>& requestedApplicationProtocols() const PURE; // 设置请求的服务器名称 virtual void setRequestedServerName(absl::string_view server_name) PURE; // 返回请求的服务器名称 virtual absl::string_view requestedServerName() const PURE; }; |
传输套接字,负责实际的读写,也进行某些数据转换(例如TLS):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
class TransportSocket { public: // 连接对象调用此方法依次,初始化传输套接字的回调 virtual void setTransportSocketCallbacks(TransportSocketCallbacks& callbacks) PURE; // 由网络级协商选择的协议 virtual std::string protocol() const PURE; // 套接字是否已经被flush和close virtual bool canFlushClose() PURE; // 关闭传输套接字 virtual void closeSocket(Network::ConnectionEvent event) PURE; // 读取到缓冲 virtual IoResult doRead(Buffer::Instance& buffer) PURE; /** * 将缓冲写入底层套接字 * @param buffer 缓冲 * @param end_stream 提示是否是流的终点,如果true则缓冲中所有数据都被写出去,连接变成半关闭 */ virtual IoResult doWrite(Buffer::Instance& buffer, bool end_stream) PURE; // 底层传输建立后回调 virtual void onConnected() PURE; // 如果当前是SSL连接,则返回Ssl::Connection,否则返回nullptr virtual const Ssl::Connection* ssl() const PURE; }; |
传输套接字使用此回调集,和Connection通信:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
class TransportSocketCallbacks { public: // 返回关联到连接的IO句柄,从此局部可以得到连接套接字的FD virtual IoHandle& ioHandle() PURE; virtual const IoHandle& ioHandle() const PURE; // 返回关联的连接 virtual Network::Connection& connection() PURE; // 是否读缓冲应该被排干(drain,也就是调用过滤器链进行处理),用于强制配置的读缓冲大小限制 virtual bool shouldDrainReadBuffer() PURE; // 将读缓冲标记为可(被事件循环)读 virtual void setReadBufferReady() PURE; // 发起(Raise)一个连接事件到Connection对象,TLS使用此方法告知握手完成 virtual void raiseEvent(ConnectionEvent event) PURE; }; |
该接口表示原始的连接,它实现了FilterManager接口:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 |
class Connection : public Event::DeferredDeletable, public FilterManager { public: // 状态枚举 enum class State { Open, Closing, Closed }; // 连接发送字节后的回调 typedef std::function<void(uint64_t bytes_sent)> BytesSentCb; // 注册当此连接上发生事件后执行的回调 virtual void addConnectionCallbacks(ConnectionCallbacks& cb) PURE; // 注册每当bytes被写入底层TransportSocket后执行的回调 virtual void addBytesSentCallback(BytesSentCb cb) PURE; // 为此连接启用半关闭语义,从一个已经被对端半关闭的连接上进行读操作,不会关闭连接 virtual void enableHalfClose(bool enabled) PURE; // 关闭连接 virtual void close(ConnectionCloseType type) PURE; // 返回分发器 virtual Event::Dispatcher& dispatcher() PURE; // 返回唯一性的本地连接ID virtual uint64_t id() const PURE; // 返回网络级协商选择的下一个使用的协议 virtual std::string nextProtocol() const PURE; // 为连接启用/禁用NO_DELAY virtual void noDelay(bool enable) PURE; // 启禁针对此连接的套接字读。当重新启用读时,如果输入缓冲有内容,会通过过滤器链分发 virtual void readDisable(bool disable) PURE; // 当禁用套接字读后,Envoy是否应当检测TCP连接关闭。默认对新连接来说,检测 virtual void detectEarlyCloseWhenReadDisabled(bool should_detect) PURE; // 读操作是否启用 virtual bool readEnabled() const PURE; // 返回远程地址 virtual const Network::Address::InstanceConstSharedPtr& remoteAddress() const PURE; // 返回本地地址,对于客户端连接来说,即原始地址;对于服务器连接来说 // 是本地的目的地址 // 对于服务器连接来说,此地址可能和代理的监听地址不一样,因为下游连接可能被重定向,或者代理在透明模式下运行 virtual const Network::Address::InstanceConstSharedPtr& localAddress() const PURE; // 更新连接状态,出于性能的考虑,最终一致 virtual void setConnectionStats(const ConnectionStats& stats) PURE; // 如果该连接是SSL,则返回SSL连接数据;否则返回nullptr virtual const Ssl::Connection* ssl() const PURE; // 返回服务器名称,对于TLS来说即SNI virtual absl::string_view requestedServerName() const PURE; // 返回连接状态 virtual State state() const PURE; /** * 写入数据到连接,数据将经过过滤器链 * @param data 需要写入的数据 * @param end_stream 如果为true,则提示此为最后一次写操作,导致连接半关闭。必须enableHalfClose(true)才能传入true */ virtual void write(Buffer::Instance& data, bool end_stream) PURE; // 设置该连接的缓冲区的软限制 // 对于读缓冲,限制处理流水线在flush到下一个stage前能缓冲的最大字节数 // 对于写缓冲,设置水位。如果缓冲了足够的数据,触发onAboveWriteBufferHighWatermark调用 virtual void setBufferLimits(uint32_t limit) PURE; // 获得软限制 virtual uint32_t bufferLimit() const PURE; // 本地地址是否被还原为原始目的地址 virtual bool localAddressRestored() const PURE; // 连接当前是否高于高水位 virtual bool aboveHighWatermark() const PURE; // 获取此连接的套接字选项 virtual const ConnectionSocket::OptionsSharedPtr& socketOptions() const PURE; // 获取关联到此连接的StreamInfo对象。StreamInfo典型用于日志目的 // 每个过滤器都可以通过StreamInfo.FilterState来添加特定的信息 // 在此上下文中每个连接对应一个StreamInfo。而对于HTTP连接管理器,每个请求对应一个StreamInfo virtual StreamInfo::StreamInfo& streamInfo() PURE; virtual const StreamInfo::StreamInfo& streamInfo() const PURE; // 设置延迟连接关闭的超时 virtual void setDelayedCloseTimeout(std::chrono::milliseconds timeout) PURE; virtual std::chrono::milliseconds delayedCloseTimeout() const PURE; } |
L4连接上发生的事件的回调:
1 2 3 4 5 6 7 8 9 10 11 12 13 |
class ConnectionCallbacks { public: virtual ~ConnectionCallbacks() {} // ConnectionEvent的回调 virtual void onEvent(ConnectionEvent event) PURE; // 当连接的写缓冲超过高水位后调用 virtual void onAboveWriteBufferHighWatermark() PURE; // 当连接的写缓冲,从超过高水位变为低于低水位后调用 virtual void onBelowWriteBufferLowWatermark() PURE; }; |
表示某个连接处理器所(通过ActiveListener)拥有的活动的连接。ActiveConnection引用一个Connection、一个ActiveListener。
表示可以拥有多个流(Stream)的HTTP客户端/服务器连接:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
class Connection { public: // 分发入站的请求数据 virtual void dispatch(Buffer::Instance& data) PURE; // 给对端提示以go away,从此时开始,不能创建新的流 virtual void goAway() PURE; // 返回连接的协议 virtual Protocol protocol() PURE; // 给对端提示以shutdown notice,对端不应该在发送任何新的流,但是对于已经达到的流u,不会被重置 virtual void shutdownNotice() PURE; // HTTP编解码器是否有数据需要写入,但是由于协议的原因(例如窗口更新),无法完成 virtual bool wantsToWrite() PURE; // 当底层的Network::Connection超过高水位后,调用此方法 virtual void onUnderlyingConnectionAboveWriteBufferHighWatermark() PURE; // 当底层的Network::Connection超过高水位后,然后由低于低水位后调用此方法 virtual void onUnderlyingConnectionBelowWriteBufferLowWatermark() PURE; }; |
HTTP连接级别的回调:
1 2 3 4 5 |
class ConnectionCallbacks { public: // 对端提示go away时触发此回调,不允许创建新流 virtual void onGoAway() PURE; }; |
服务器端连接:
1 |
class ServerConnection : public virtual Connection {}; |
HTTP连接管理器ConnectionManagerImpl.codec字段是ServerConnection类型,后者承担读取到的请求数据的分发(Dispatch,给HTTP解析器)职责。
继承ConnectionCallbacks回调,并添加方法:
1 2 3 4 5 6 7 8 9 10 |
class ServerConnectionCallbacks : public virtual ConnectionCallbacks { public: /** * 当对端初始化一个新的请求流后触发此回调 * @param response_encoder 提供用于创建响应的编码器,请求、响应由同一流对象管理 * @param is_internally_created 提示此流是流客户端创建,还是由Envoy自己创建(例如内部重定向) */ virtual StreamDecoder& newStream(StreamEncoder& response_encoder, bool is_internally_created = false) PURE; }; |
HTTP流解码器,可以解码下游发来的请求:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
class StreamDecoder { public: virtual ~StreamDecoder() {} // 处理解码后的100-Continue头的map virtual void decode100ContinueHeaders(HeaderMapPtr&& headers) PURE; // 处理解码后的头 virtual void decodeHeaders(HeaderMapPtr&& headers, bool end_stream) PURE; // 处理解码后的数据帧 virtual void decodeData(Buffer::Instance& data, bool end_stream) PURE; // 处理解码后的尾帧 virtual void decodeTrailers(HeaderMapPtr&& trailers) PURE; // 处理解码后的元数据 virtual void decodeMetadata(MetadataMapPtr&& metadata_map) PURE; }; |
这里这里的decode有歧义:
- decoded,表示经由http_parser解析,结构化为C++对象 —— HTTP语境
- decode,调用Envoy的流解码器处理那些C++对象 —— Envoy语境
HTTP流编码器,可以编码需要发送给下游的应答,接口类似于StreamDecoder。
针对HTTP流的回调:
1 2 3 4 5 6 7 8 9 10 11 |
class StreamCallbacks { public: // 对端重置了流后调用,参数是重置原因 virtual void onResetStream(StreamResetReason reason) PURE; // 当一个流(HTTP2),或者流发向的连接(HTTP1)超过高水位后调用 virtual void onAboveWriteBufferHighWatermark() PURE; // 当一个流(HTTP2),或者流发向的连接(HTTP1)从超过高水位降到低于低水位后调用 virtual void onBelowWriteBufferLowWatermark() PURE; } |
表示连接上的单个HTTP流,实现了StreamDecoder、StreamCallbacks、FilterChainFactoryCallbacks接口。
HTTP连接管理器提供给过滤器链工厂的回调集,依赖于此回调工厂能够以应用程序特定的方式构建过滤器链:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
class FilterChainFactoryCallbacks { public: // 添加读取流数据时使用的解码器 virtual void addStreamDecoderFilter(Http::StreamDecoderFilterSharedPtr filter) PURE; // 添加写入流数据时使用的编码器 virtual void addStreamEncoderFilter(Http::StreamEncoderFilterSharedPtr filter) PURE; // 添加读写编解码器 virtual void addStreamFilter(Http::StreamFilterSharedPtr filter) PURE; // 添加访问日志处理器,在流被销毁时调用 virtual void addAccessLogHandler(AccessLog::InstanceSharedPtr handler) PURE; } |
流编码/解码过滤器的基类:
1 2 3 4 5 6 7 8 9 10 11 12 13 |
class StreamFilterBase { public: /** * 在过滤器被销毁前调用此方法,这可能发生在 * 1、正常的流(下游+上游)完成后 * 2、发生重置后 * 每个过滤器负责确保在此方法的上下文中,所有异步事件被清理完毕。这些异步事件包括定时器、网络调用等 * * 不在析构函数中进行清理而使用onDestroy钩子的原因和Envoy的延迟删除模型有关。此模型规避了Stack unwind * 有关的复杂性。在onDestroy之后,过滤器不得调用编码/解码过滤器回调 */ virtual void onDestroy() PURE; }; |
流解码过滤器,继承StreamFilterBase:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
class StreamDecoderFilter : public StreamFilterBase { public: // 解码请求头 virtual FilterHeadersStatus decodeHeaders(HeaderMap& headers, bool end_stream) PURE; // 解码数据帧 virtual FilterDataStatus decodeData(Buffer::Instance& data, bool end_stream) PURE; // 解码请求尾 virtual FilterTrailersStatus decodeTrailers(HeaderMap& trailers) PURE; // 设置此解码过滤器的过滤器回调 virtual void setDecoderFilterCallbacks(StreamDecoderFilterCallbacks& callbacks) PURE; } |
传递给所有(读/写)过滤器的回调函数集,用于写响应数据、和底层流交互:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 |
class StreamFilterCallbacks { public: // 获取L4网络连接 virtual const Network::Connection* connection() PURE; // 返回线程本地的Dispatcher,从此分发器来分配定时器 virtual Event::Dispatcher& dispatcher() PURE; // 将底层的流进行重置 virtual void resetStream() PURE; // 返回当前请求使用的路由 // 实现应当能够进行路由缓存,避免反复查找。如果过滤器修改了请求头,则路由可能需要改变,此时应当调用clearRouteCache() // 未来可能会允许过滤器对路由条目进行覆盖 virtual Router::RouteConstSharedPtr route() PURE; // 返回被缓存的路由条目的上游集群信息(clusterInfo)。该方法用于避免在过滤器链中进行反复的查找,同时 // 确保当路由被picked/repicked后能提供clusterInfo的一致性视图 virtual Upstream::ClusterInfoConstSharedPtr clusterInfo() PURE; // 为当前请求清除路由缓存,如果过滤器修改了请求头,并且此修改可能影响选路,则必须调用该方法 virtual void clearRouteCache() PURE; // 返回用于日志目的的流唯一标识 virtual uint64_t streamId() PURE; // 返回用于日志目的的StreamInfo virtual StreamInfo::StreamInfo& streamInfo() PURE; // 返回追踪用的当前追踪上下文 virtual Tracing::Span& activeSpan() PURE; // 返回追踪配置 virtual const Tracing::Config& tracingConfig() PURE; }; |
继承StreamFilterCallbacks,添加专用于解码(读)过滤器的回调:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 |
class StreamDecoderFilterCallbacks : public virtual StreamFilterCallbacks { public: /** * 使用缓冲的头,以及请求体,继续迭代过滤器链。该方法仅仅在以下情况之一才会调用 * 1、先前的过滤器在decodeHeaders()后返回StopIteration * 2、先前的过滤器在decodeData()后返回StopIterationAndBuffer, StopIterationAndWatermark 或 StopIterationNoBuffer * * HTTP连接管理器会分发请求头、缓冲的请求体给过滤器链中的下一个过滤器 * * 如果请求没有完成,当前过滤器仍然会继续接受decodeData()调用,并且必须返回适当的的状态码 * */ virtual void continueDecoding() PURE; // 返回当前过滤器、或者链中先前过滤器缓冲的数据。如果尚未缓冲任何内容,返回nullpt virtual const Buffer::Instance* decodingBuffer() PURE; /** * 添加解码处理后的、缓冲的请求体数据。在某些高级用例中,decodeData()返回StopIterationAndBuffer不能满足 * 需要,需要调用此方法: * * 1) 对于header-only请求需要被转换为包含请求体的请求,可以在 decodeHeaders() 回调中调用此方法,添加请求体 * 后续过滤器会依次接收调用decodeHeaders(..., false)、decodeData(..., true)。在直接迭代、停止后继续迭代 * 场景下,都可以使用 * * * 2) 如果某个过滤器希望在end_stream=true的情况下,在一个数据回调中查看所有缓冲的数据,可以调用该方法,以立即缓冲数据 * 避免同时处理已缓冲数据、以及当前回调产生的数据 * * 3) 如果某个过滤器在调用后续过滤器时,需要添加额外的缓冲请求体数据 * * 4) 如果在decodeTrailers()回调中需要添加额外的数据。可以在前述回调的上下文中调用此方法 * 所有后续过滤器会依次接受decodeData(..., false)、decodeTrailers()调用 * * 在其它场景下调用此方法是错误 * * @param data Buffer::Instance 添加需要被解码的数据 * @param streaming_filter boolean 提示该过滤器是流式处理还是缓冲了完整请求体 */ virtual void addDecodedData(Buffer::Instance& data, bool streaming_filter) PURE; /** * 添加解码后的请求尾。只能在end_stream=true时在decodeData中调用 * 在decodeData中调用时,请求尾映射被初始化为空map并以引用的方式返回 * 该方法最多调用一次 * * @return 返回新的空请求尾map */ virtual HeaderMap& addDecodedTrailers() PURE; /* 基于其它的状态码、请求体,生成一个Envoy本地的响应并发送给下游 * 如果是gRPC请求,则本地响应编码为gRPC响应,HTTP状态码置为200。从参数生成grpc-status、grpc-message * * @param response_code HTTP状态码 * @param body_text HTTP请求体,以text/plain发送或者编码在grpc-message头中 * @param modify_headers 可选的回调函数,用于修改响应头 * @param grpc_status gRPC状态码,覆盖通过httpToGrpcStatus推导出的gRPC状态码 */ virtual void sendLocalReply(Code response_code, absl::string_view body_text, std::function<void(HeaderMap& headers)> modify_headers, const absl::optional<Grpc::Status::GrpcStatus> grpc_status) PURE; /** * 编码100-Continue响应头。该头不在encodeHeaders中处理,因为大部分情况下Envoy用户和过滤器 * 不希望代理100-Continue,而是直接吐出,可以忽略多次编码响应头encodeHeaders()的复杂性 * * @param headers supplies 需要编码的头 */ virtual void encode100ContinueHeaders(HeaderMapPtr&& headers) PURE; /** * 编码响应头。HTTP连接管理器会自动探测一些不发给下游的伪头 * * @param headers 需要编码的头 * @param end_stream 这是不是一个header-only的request/response */ virtual void encodeHeaders(HeaderMapPtr&& headers, bool end_stream) PURE; /** * 编码响应数据 * @param data 需要编码的数据 * @param end_stream 提示这是不是最后一个数据帧 */ virtual void encodeData(Buffer::Instance& data, bool end_stream) PURE; /** * 编码响应尾数据,隐含意味着流的结束 * @param trailers supplies 需要编码的尾 */ virtual void encodeTrailers(HeaderMapPtr&& trailers) PURE; /** * 编码元数据 * * @param metadata_map supplies 需要编码的元数据的unique_ptr */ virtual void encodeMetadata(MetadataMapPtr&& metadata_map) PURE; /** * 当解码过滤器的缓冲,或者过滤器需要发送数据到的那些缓冲,超越高水位后调用 * * 对于路由过滤器这样的HTTP过滤器,会使用多个缓冲(codec、connection...),该方法可能被调用多次 * 这些过滤器应当负责,在对应的缓冲被排干后,以等同次数调用低水位回调connection etc.) */ virtual void onDecoderFilterAboveWriteBufferHighWatermark() PURE; /** * 当解码过滤器的缓冲,或者过滤器需要发送数据到的那些缓冲,超越高水位后,后降低到低于低水位后调用 */ virtual void onDecoderFilterBelowWriteBufferLowWatermark() PURE; /** * 需要订阅下游流、下游连接上的水位事件的过滤器,调用此方法 * 订阅后,对于每个outstanding backed up buffer,过滤器的回调都被调用 */ virtual void addDownstreamWatermarkCallbacks(DownstreamWatermarkCallbacks& callbacks) PURE; /** * 需要停止订阅下游流、下游连接上的水位事件的过滤器,调用此方法 * 在DownstreamWatermarkCallbacks的回调函数的栈下面调用此方法不安全 */ virtual void removeDownstreamWatermarkCallbacks(DownstreamWatermarkCallbacks& callbacks) PURE; /** * 用于改变解码过滤器的缓冲区大小 * * @param limit 新的缓冲大小 */ virtual void setDecoderBufferLimit(uint32_t limit) PURE; // 返回解码过滤器缓冲大小,0表示无限制u virtual uint32_t decoderBufferLimit() PURE; // 将当前流看作是新的,就好像它的所有头都是刚到达一样 // 如果操作成功,会导致创建新的过滤器链,并且上游请求可能和原始的下游流关联 // 如果操作失败,并且下面列出的前置条件不满足,则调用者负责处理和终止原始流 // // 前置条件 // - 流必须已经被完整的读取 // - 流必须没有请求体 // // 注意HTTP消毒器不会针对这种重新创建的流进行操作,它假设消毒已经完成 virtual bool recreateStream() PURE; } |
表示活动的解码过滤器,继承ActiveStreamFilterBase,实现了StreamFilterCallbacks、StreamDecoderFilterCallbacks,也就是说,过滤器和过滤器回调是一体的。
该对象持有一个StreamDecoderFilter对象。ActiveStream通过字段 std::list<ActiveStreamDecoderFilterPtr> decoder_filters_来引用这种对象。
多个地方存在命名为ActiveRequest的结构,表示一个活动的HTTP1请求。
例如,作为Http::Http1::ServerConnectionImpl的私有成员:
1 2 3 4 5 6 7 8 9 10 11 12 |
struct ActiveRequest { // 构造请求对象时,必须传入响应编码器 ActiveRequest(ConnectionImpl& connection) : response_encoder_(connection) {} HeaderString request_url_; // 请求解码器 StreamDecoder* request_decoder_{}; // 响应编码器 ResponseStreamEncoderImpl response_encoder_; // 请求的处理是否已经完毕 bool remote_complete_{}; }; |
作为Http::CodecClient的私有成员:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
struct ActiveRequest : LinkedObject<ActiveRequest>, public Event::DeferredDeletable, public StreamCallbacks, public StreamDecoderWrapper { ActiveRequest(CodecClient& parent, StreamDecoder& inner) : StreamDecoderWrapper(inner), parent_(parent) {} // 流回调函数 void onResetStream(StreamResetReason reason) override { parent_.onReset(*this, reason); } void onAboveWriteBufferHighWatermark() override {} void onBelowWriteBufferLowWatermark() override {} void onPreDecodeComplete() override { parent_.responseDecodeComplete(*this); } void onDecodeComplete() override {} // 编码器 StreamEncoder* encoder_{}; CodecClient& parent_; } |
Envoy代理的需要创建哪些监听器,由Bootstrap配置 + xDS响应共同决定,本文不讨论细节,可以参考Istio Pilot与Envoy的交互机制解读。
如果某个监听器配置了bind_to_port(默认情况下virtual 15001配置了),则会调用libevent的API,注册套接字监听事件:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
ListenerImpl::ListenerImpl(Event::DispatcherImpl& dispatcher, Socket& socket, ListenerCallbacks& cb, bool bind_to_port, bool hand_off_restored_destination_connections) : BaseListenerImpl(dispatcher, socket), cb_(cb), hand_off_restored_destination_connections_(hand_off_restored_destination_connections), listener_(nullptr) { if (bind_to_port) { // 注册监听 setupServerSocket(dispatcher, socket); } } void ListenerImpl::setupServerSocket(Event::DispatcherImpl& dispatcher, Socket& socket) { // 重置 CSmartPtr<evconnlistener, evconnlistener_free> ListenerPtr引用为新的evconnlistener listener_.reset( // libevent的base 当前对象方法 套接字的文件描述符 evconnlistener_new(&dispatcher.base(), listenCallback, this, 0, -1, socket.ioHandle().fd())); if (!listener_) { throw CreateListenerException( fmt::format("cannot listen on socket: {}", socket.localAddress()->asString())); } if (!Network::Socket::applyOptions(socket.options(), socket, envoy::api::v2::core::SocketOption::STATE_LISTENING)) { throw CreateListenerException(fmt::format("cannot set post-listen socket option on socket: {}", socket.localAddress()->asString())); } evconnlistener_set_error_cb(listener_.get(), errorCallback); } |
Envoy默认会在15001端口上监听,当流量到达(通常是通过其它端口重定向到15001)时,Envoy的DispatcherImpl循环得到通知,并通过libevent回调Envoy::Network::ListenerImpl::listenCallback方法,该方法是一切新连接处理的起点:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
void ListenerImpl::listenCallback(evconnlistener*, evutil_socket_t fd, sockaddr* remote_addr, int remote_addr_len, void* arg) { // 传递的参数是监听器对象 ListenerImpl* listener = static_cast<ListenerImpl*>(arg); // IoSocketHandle为IO操作的抽象接口 IoHandlePtr io_handle = std::make_unique<IoSocketHandle>(fd); // 如果监听器在ANY地址(0.0.0.0)上监听,从新套接字上获取本地地址 const Address::InstanceConstSharedPtr& local_address = // 获取远程地址 const Address::InstanceConstSharedPtr& remote_address = (remote_addr->sa_family == AF_UNIX) ? Address::peerAddressFromFd(io_handle->fd()) : Address::addressFromSockAddr(*reinterpret_cast<const sockaddr_storage*>(remote_addr), remote_addr_len, local_address->ip()->version() == Address::IpVersion::v6); // 调用监听器的onAccept回调 listener->cb_.onAccept( std::make_unique<AcceptedSocketImpl>(std::move(io_handle), local_address, remote_address), listener->hand_off_restored_destination_connections_); } |
回调ConnectionHandlerImpl::ActiveListener::onAccept的逻辑如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
// 此回调在新连接创建后调用 // socket 需要移动给被调用者的套接字对象 // redirected 如果套接字是第一次被其它监听器接受,并且随后被重定向给一个新的监听器时,为true // 接收者监听器不得再次重定向 void ConnectionHandlerImpl::ActiveListener::onAccept( Network::ConnectionSocketPtr&& socket, bool hand_off_restored_destination_connections) { // 代表当前正在处理的套接字对象 auto active_socket = std::make_unique<ActiveSocket>(*this, std::move(socket), hand_off_restored_destination_connections); // 构建出过监听器过滤器链 config_.filterChainFactory().createListenerFilterChain(*active_socket); // 开始迭代过滤器链 active_socket->continueFilterChain(true); // 如果监听器过滤器链没有迭代完毕,则active_socket暂存到sockets_列表里 // 防止active_socket因为超出作用域而被自动删除 if (active_socket->iter_ != active_socket->accept_filters_.end()) { active_socket->startTimer(); active_socket->moveIntoListBack(std::move(active_socket), sockets_); } } |
ActiveSocket实现了Network::ListenerFilterManager:
1 2 3 4 5 6 |
class ListenerFilterManager { public: virtual ~ListenerFilterManager() {} // 为监听器添加一个过滤器,过滤器以FIFO顺序被调用 virtual void addAcceptFilter(ListenerFilterPtr&& filter) PURE; }; |
ListenerImpl的createListenerFilterChain方法支持为ListenerFilterManager提供过滤器链:
1 2 3 |
bool ListenerImpl::createListenerFilterChain(Network::ListenerFilterManager& manager) { return Configuration::FilterChainUtility::buildFilterChain(manager, listener_filter_factories_); } |
listener_filter_factories_在ListenerImpl初始化阶段创建,它是Network::ListenerFilterFactoryCb的迭代器,表示当前监听器启用的所有监听器过滤器的工厂回调的集合。调用ListenerFilterFactoryCb可以将过滤器安装到ListenerFilterManager,也就是ActiveSocket上:
1 2 3 4 5 6 7 |
bool FilterChainUtility::buildFilterChain( Network::ListenerFilterManager& filter_manager, const std::vector<Network::ListenerFilterFactoryCb>& factories) { for (const Network::ListenerFilterFactoryCb& factory : factories) { factory(filter_manager); } return true; } |
默认情况下,监听器virtual(15001端口)只配置一个监听器过滤器OriginalDstFilter,它的工厂如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
class OriginalDstConfigFactory : public Server::Configuration::NamedListenerFilterConfigFactory { public: // 此即工厂回调的工厂 Network::ListenerFilterFactoryCb createFilterFactoryFromProto(const Protobuf::Message&, Server::Configuration::ListenerFactoryContext&) override { return [](Network::ListenerFilterManager& filter_manager) -> void { // 上段代码的factory(filter_manager)调用的是此Lambda // 简单的创建OriginalDstFilter并添加到管理器 filter_manager.addAcceptFilter(std::make_unique<OriginalDstFilter>()); }; } ProtobufTypes::MessagePtr createEmptyConfigProto() override { return std::make_unique<Envoy::ProtobufWkt::Empty>(); } std::string name() override { return ListenerFilterNames::get().OriginalDst; } }; |
当一个过滤器返回FilterStatus::StopIteration以终止过滤器迭代,那么:
- 如果希望继续遍历后续过滤器链,以true参数调用下面的方法
- 如果过滤器执行失败,需要关闭连接,以false参数调用下面的方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 |
void ConnectionHandlerImpl::ActiveSocket::continueFilterChain(bool success) { // 开始/继续迭代 if (success) { if (iter_ == accept_filters_.end()) { iter_ = accept_filters_.begin(); } else { iter_ = std::next(iter_); } // 从当前过滤器迭代到监听器过滤器集的尾部 for (; iter_ != accept_filters_.end(); iter_++) { // 调用监听器过滤器的onAccept方法,this就是ActievSocket对象 Network::FilterStatus status = (*iter_)->onAccept(*this); // 上一个过滤器提示,应当停止迭代 if (status == Network::FilterStatus::StopIteration) { // 则本次过滤器迭代终止。上一个过滤器负责在未来重启迭代 return; } } // 所有监听器过滤器都执行成功 // 检查套接字是否需要重定向给其它监听器 ActiveListener* new_listener = nullptr; // OriginalDstFilter会导致下面的分支执行 if (hand_off_restored_destination_connections_ && socket_->localAddressRestored()) { // 找到匹配原始目的地址的那个监听器 new_listener = listener_.parent_.findActiveListenerByAddress(*socket_->localAddress()); } if (new_listener != nullptr) { // 将由Iptables重定向到当前监听器的连接转交给匹配原始目的地址的监听器处理 // 同时传递hand_off_restored_destination_connections=false,防止再次重定向 new_listener->onAccept(std::move(socket_), false); } else { // 非重定向连接,或者重定向连接的接收者监听器 if (socket_->detectedTransportProtocol().empty()) { // 设置默认传输协议 socket_->setDetectedTransportProtocol( Extensions::TransportSockets::TransportSocketNames::get().RawBuffer); } // 在此监听器上创建新的连接对象 listener_.newConnection(std::move(socket_)); } } // 过滤器执行完毕,如果ActiveSocket已经linked,则unlink并删除 if (inserted()) { unlink(); } } |
该监听器过滤器的onAccept方法的实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
Network::FilterStatus OriginalDstFilter::onAccept(Network::ListenerFilterCallbacks& cb) { ENVOY_LOG(debug, "original_dst: New connection accepted"); Network::ConnectionSocket& socket = cb.socket(); const Network::Address::Instance& local_address = *socket.localAddress(); // 通过系统调用os_syscalls.getsockopt(fd, SOL_IP, SO_ORIGINAL_DST, &orig_addr, &addr_len)获取原始目的IP if (local_address.type() == Network::Address::Type::Ip) { // 我们的例子中,原始目的IP地址为127.0.0.1:9898 Network::Address::InstanceConstSharedPtr original_local_address = getOriginalDst(socket.ioHandle().fd()); // 即使对于use_original_dst设置为true的监听器(也就是当前监听器),仍然能够接收不是由iptables重定向的连接 // 如果连接不是被重定向的,那么getOriginalDst()的返回值和当前套接字的本地地址(Envoy代理端)相同 // 这种情况下,当前监听器直接处理连接,而不会转交(hand off)给其它监听器 if (original_local_address) { // 修改local_address_,并设置local_address_restored_为true socket.restoreLocalAddress(original_local_address); } } // 总是继续迭代监听器过滤器链 return Network::FilterStatus::Continue; } |
监听器过滤器的处理是以ActiveSocket为中心的,套接字请求接受后,连接的处理则以ActiveListener为中心。
执行完监听器过滤器后,ActiveSocket调用ActiveListener.newConnection,开始连接的处理:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
void ConnectionHandlerImpl::ActiveListener::newConnection(Network::ConnectionSocketPtr&& socket) { // 首先,查找匹配此套接字的过滤器链 const auto filter_chain = config_.filterChainManager().findFilterChain(*socket); if (filter_chain == nullptr) { // 找不到过滤器,记录统计信息,关闭套接字,结束处理 ENVOY_LOG_TO_LOGGER(parent_.logger_, debug, "closing connection: no matching filter chain found"); stats_.no_filter_chain_match_.inc(); socket->close(); return; } // 创建一个传输套接字,此套接字负责实际的读写操作 // 具体工厂和协议有关,默认RawBufferSocketFactory,创建RawBufferSocket auto transport_socket = filter_chain->transportSocketFactory().createTransportSocket(nullptr); // 创建连接对象,设置为connected Network::ConnectionPtr new_connection = parent_.dispatcher_.createServerConnection(std::move(socket), std::move(transport_socket)); // 设置缓冲区大小 new_connection->setBufferLimits(config_.perConnectionBufferLimitBytes()); // 写过滤器的顺序可能需要倒置 new_connection->setWriteFilterOrder(config_.reverseWriteFilterOrder()); // 为连接初始化过滤器链 const bool empty_filter_chain = !config_.filterChainFactory().createNetworkFilterChain( *new_connection, filter_chain->networkFilterFactories()); // 如果初始化过滤器链失败,则关闭连接 if (empty_filter_chain) { ENVOY_CONN_LOG_TO_LOGGER(parent_.logger_, debug, "closing connection: no filters", *new_connection); new_connection->close(Network::ConnectionCloseType::NoFlush); return;新连接处理的起点 } // 监听器的新连接回调 onNewConnection(std::move(new_connection)); } |
在ActiveListener::newConnection期间,调用config_.filterChainManager().findFilterChain来查找匹配连接的过滤器链配置:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
const Network::FilterChain* ListenerImpl::findFilterChain(const Network::ConnectionSocket& socket) const { // 本地地址(恢复到原始目的地址后的) const auto& address = socket.localAddress(); // 根据目的端口匹配 if (address->type() == Network::Address::Type::Ip) { const auto port_match = destination_ports_map_.find(address->ip()->port()); if (port_match != destination_ports_map_.end()) { return findFilterChainForDestinationIP(*port_match->second.second, socket); } } // 缺省匹配 const auto port_match = destination_ports_map_.find(0); if (port_match != destination_ports_map_.end()) { return findFilterChainForDestinationIP(*port_match->second.second, socket); } return nullptr; } |
在ActiveListener::newConnection期间,调用config_.filterChainFactory().createNetworkFilterChain()为连接实例化过滤器链:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
bool ListenerImpl::createNetworkFilterChain( Network::Connection& connection, const std::vector<Network::FilterFactoryCb>& filter_factories) { return Configuration::FilterChainUtility::buildFilterChain(connection, filter_factories); } // 和构建监听器过滤器时的逻辑一样,遍历过滤器工厂,传入Connection调用之 bool FilterChainUtility::buildFilterChain(Network::FilterManager& filter_manager, const std::vector<Network::FilterFactoryCb>& factories) { for (const Network::FilterFactoryCb& factory : factories) { factory(filter_manager); } // 初始化所有读过滤器,也就是调用每个过滤器的onNewConnection return filter_manager.initializeReadFilters(); } |
当ActiveListener为新连接准备好过滤器链后,会调用自身的:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
void ConnectionHandlerImpl::ActiveListener::onNewConnection( Network::ConnectionPtr&& new_connection) { ENVOY_CONN_LOG_TO_LOGGER(parent_.logger_, debug, "new connection", *new_connection); // 如果新连接的状态不是已经关闭 if (new_connection->state() != Network::Connection::State::Closed) { ActiveConnectionPtr active_connection( new ActiveConnection(*this, std::move(new_connection), parent_.dispatcher_.timeSystem())); // 存放到当前ActiveListener的字段中,防止析构 active_connection->moveIntoList(std::move(active_connection), connections_); // 将新连接加入到连接处理器中。注意C++ 11中++是原子操作 parent_.num_connections_++; } // 否则,新连接将在此立即析构 } |
对于HTTP协议, 处理TCP连接的逻辑仍然使用。
在实例化过滤器链时,HTTP连接会有一个过滤器 —— HTTP连接管理器(ConnectionManagerImpl),它的工厂如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
Network::FilterFactoryCb HttpConnectionManagerFilterConfigFactory::createFilterFactoryFromProtoTyped( const envoy::config::filter::network::http_connection_manager::v2::HttpConnectionManager& roto_config, Server::Configuration::FactoryContext& context) { // 线程本地的一个缓存提供者,每500ms为当前线程更新日期信息 std::shared_ptr<Http::TlsCachingDateProviderImpl> date_provider = context.singletonManager().getTyped<Http::TlsCachingDateProviderImpl>( SINGLETON_MANAGER_REGISTERED_NAME(date_provider), [&context] { return std::make_shared<Http::TlsCachingDateProviderImpl>(context.dispatcher(), context.threadLocal()); }); // 此管理器维护RouteConfigProvider,后者提供路由信息 std::shared_ptr<Router::RouteConfigProviderManager> route_config_provider_manager = context.singletonManager().getTyped<Router::RouteConfigProviderManager>( SINGLETON_MANAGER_REGISTERED_NAME(route_config_provider_manager), [&context] { return std::make_shared<Router::RouteConfigProviderManagerImpl>(context.admin()); }); // 连接管理器的配置 std::shared_ptr<HttpConnectionManagerConfig> filter_config(new HttpConnectionManagerConfig( proto_config, context, *date_provider, *route_config_provider_manager)); // 此Lambda捕获了上面的共享指针,因此避免了引用计数清零 // 此Lambda即HTTP连接管理器的L4过滤器工厂 return [route_config_provider_manager, filter_config, &context, date_provider](Network::FilterManager& filter_manager) -> void { // 为Connection添加一个读过滤器ConnectionManagerImpl filter_manager.addReadFilter(Network::ReadFilterSharedPtr{new Http::ConnectionManagerImpl( *filter_config, context.drainDecision(), context.random(), context.httpContext(), context.runtime(), context.localInfo(), context.clusterManager(), &context.overloadManager(), context.dispatcher().timeSystem())}); }; } |
ConnectionManagerImpl的构造函数如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
ConnectionManagerImpl::ConnectionManagerImpl(ConnectionManagerConfig& config, const Network::DrainDecision& drain_close, Runtime::RandomGenerator& random_generator, Http::Context& http_context, Runtime::Loader& runtime, const LocalInfo::LocalInfo& local_info, Upstream::ClusterManager& cluster_manager, Server::OverloadManager* overload_manager, Event::TimeSystem& time_system) // ConnectionManagerConfig 连接管理器的配置 // ConnectionManagerStats 统计指标 : config_(config), stats_(config_.stats()), // 连接持续的时长 conn_length_(new Stats::Timespan(stats_.named_.downstream_cx_length_ms_, time_system)), // Network::DrainDecision,给出连接是否应该被Drain并关闭 // 随机数生成器 HTTP上下文,每个服务器一个,提供Tracer等信息 drain_close_(drain_close), random_generator_(random_generator), http_context_(http_context), // 能从磁盘读取Envoy运行时快照 本地环境信息 集群管理器 runtime_(runtime), local_info_(local_info), cluster_manager_(cluster_manager), // ConnectionManagerListenerStats 连接管理器监听器统计信息 listener_stats_(config_.listenerStats()), // 过载管理,停止接受请求、禁止Keepalive overload_stop_accepting_requests_ref_( overload_manager ? overload_manager->getThreadLocalOverloadState().getState( Server::OverloadActionNames::get().StopAcceptingRequests) : Server::OverloadManager::getInactiveState()), overload_disable_keepalive_ref_( overload_manager ? overload_manager->getThreadLocalOverloadState().getState( Server::OverloadActionNames::get().DisableHttpKeepAlive) : Server::OverloadManager::getInactiveState()), // 授时和定时器管理 time_system_(time_system) {} |
实际负责连接的ActiveListener,会调用自己的newConnection创建新Connection对象:
1 2 |
Network::ConnectionPtr new_connection = parent_.dispatcher_.createServerConnection(std::move(socket), std::move(transport_socket)); |
可以看到,创建服务器端连接的职责委托给ConnectionHandle.Dispatcher对象,连接套接字、传输套接字的所有权都被转移:
1 2 3 4 5 6 |
Network::ConnectionPtr DispatcherImpl::createServerConnection(Network::ConnectionSocketPtr&& socket, Network::TransportSocketPtr&& transport_socket) { ASSERT(isThreadSafe()); return std::make_unique<Network::ConnectionImpl>(*this, std::move(socket), std::move(transport_socket), true); } |
可以看到连接套接字、传输套接字的所有权继续转移,给ConnectionImpl的构造函数:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
ConnectionImpl::ConnectionImpl(Event::Dispatcher& dispatcher, ConnectionSocketPtr&& socket, TransportSocketPtr&& transport_socket, bool connected) // 传输套接字 连接套接字 : transport_socket_(std::move(transport_socket)), socket_(std::move(socket)), // 过滤器管理器 流信息(日志用) filter_manager_(*this, *this), stream_info_(dispatcher.timeSystem()), // 创建写缓冲 write_buffer_( // 高低水位回调 dispatcher.getWatermarkFactory().create([this]() -> void { this->onLowWatermark(); }, [this]() -> void { this->onHighWatermark(); })), dispatcher_(dispatcher), id_(next_global_id_++) { // 如果连接套接字的fd不可用,认为发生OOM,让进程崩掉 RELEASE_ASSERT(ioHandle().fd() != -1, ""); // 设置为已连接 if (!connected) { connecting_ = true; } // 边缘触发,注册读写事件监听器 file_event_ = dispatcher_.createFileEvent( // 传输套接字的描述符 ioHandle().fd(), [this](uint32_t events) -> void { onFileEvent(events); }, Event::FileTriggerType::Edge, Event::FileReadyType::Read | Event::FileReadyType::Write); // 注册传输套接字回调 transport_socket_->setTransportSocketCallbacks(*this); } |
可以看到,当读写事件到达时,libevent会回调ConnectionImpl::onFileEvent
当发生可读、可写事件时,ConnectionImpl::onFileEvent被调用:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 |
void ConnectionImpl::onFileEvent(uint32_t events) { // 期望连接状态为Connected,否则认为是错误,需要关闭套接字 if (immediate_error_event_ != ConnectionEvent::Connected) { if (bind_error_) { // 绑定失败 if (connection_stats_ && connection_stats_->bind_errors_) { connection_stats_->bind_errors_->inc(); } } else { // 其它错误 ENVOY_CONN_LOG(debug, "raising immediate error", *this); } // 关闭套接字并退出 closeSocket(immediate_error_event_); return; } // 关闭事件 if (events & Event::FileReadyType::Closed) { // 过早关闭(early close)和读操作绝不需要同时发生 ASSERT(!(events & Event::FileReadyType::Read)); // 关闭套接字 ENVOY_CONN_LOG(debug, "remote early close", *this); closeSocket(ConnectionEvent::RemoteClose); return; } // 可写事件 if (events & Event::FileReadyType::Write) { onWriteReady(); } // 可读事件,由于写事件回调可能会关闭套接字(导致fd为-1),因此做个判断 if (ioHandle().fd() != -1 && (events & Event::FileReadyType::Read)) { onReadReady(); } } |
- 尝试循环读取,根据读取结果设置Post操作
- 处理读取到的数据
- 执行后操作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
void ConnectionImpl::onReadReady() { ENVOY_CONN_LOG(trace, "read ready", *this); // 断言已经连接成功 ASSERT(!connecting_); // 调用传输套接字执行循环的读操作,直到没有更多数据可读,或者出错 IoResult result = transport_socket_->doRead(read_buffer_); // 实际读取的字节数 uint64_t new_buffer_size = read_buffer_.length(); // 更新指标 updateReadBufferStats(result.bytes_processed_, new_buffer_size); // 如果到达流的终点(对端关闭),但是不支持半关闭语义 // 则后操作设置为关闭 if ((!enable_half_close_ && result.end_stream_read_)) { result.end_stream_read_ = false; result.action_ = PostIoAction::Close; } // 如果到达流终点了,或者读取的字节数不为零 read_end_stream_ |= result.end_stream_read_; if (result.bytes_processed_ != 0 || result.end_stream_read_) { // 处理读取的数据 onRead(new_buffer_size); } // 如果后操作应当关闭连接,或者两边都进入半关闭状态(一方关闭发送通道后,仍可接受另一方发送过来的数据) if (result.action_ == PostIoAction::Close || bothSidesHalfClosed()) { // 则关闭套接字 closeSocket(ConnectionEvent::RemoteClose); } } |
传输套接字的真实类型取决于传输协议(transport_protocol),默认协议是raw_buffer,对应RawBufferSocket:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 |
IoResult RawBufferSocket::doRead(Buffer::Instance& buffer) { // IO操作之后应当执行的操作,枚举Close/KeepOpen PostIoAction action = PostIoAction::KeepOpen; uint64_t bytes_read = 0; bool end_stream = false; // 循环读取 do { // 尝试读取最多16K,这个16K是随便取的值 Api::SysCallIntResult result = buffer.read(callbacks_->ioHandle().fd(), 16384); ENVOY_CONN_LOG(trace, "read returns: {}", callbacks_->connection(), result.rc_); // 依据系统调用返回码决定进一步操作 if (result.rc_ == 0) { // 对端关闭 end_stream = true; break; } else if (result.rc_ == -1) { // 远程错误(可能是没有数据可读,读完了) ENVOY_CONN_LOG(trace, "read error: {}", callbacks_->connection(), result.errno_); if (result.errno_ != EAGAIN) { // 错误号不是11(Try again)后操作设置为关闭 action = PostIoAction::Close; } break; } else { // 否则,返回码是实际读取的字节数 bytes_read += result.rc_; // 如果缓冲区超过限制 if (callbacks_->shouldDrainReadBuffer()) { callbacks_->setReadBufferReady(); break; } } } while (true); return {action, bytes_read, end_stream}; } |
循环读取了可用的数据到缓冲区后, ConnectionImpl会调用自己的onRead方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
void ConnectionImpl::onRead(uint64_t read_buffer_size) { // 连接不可读则返回 if (!read_enabled_) { return; } // 缓冲区为空,同时没有读取到流终点 if (read_buffer_size == 0 && !read_end_stream_) { return; } if (read_end_stream_) { // 针对原始套接字的read()调用,在EOF首次(可能是对端半关闭导致)发生后,总会返回0。这里要进行判断,以免重复处理 if (read_end_stream_raised_) { ASSERT(read_buffer_size == 0); return; } // 防止重复处理 read_end_stream_raised_ = true; } // 转交给过滤器管理器,过滤器管理器就是Connection本身 filter_manager_.onRead(); } |
如果有数据需要处理,则调用过滤器管理器的onRead方法:
1 2 3 4 5 6 |
void FilterManagerImpl::onRead() { // 断言上行过滤器(读取下游发来的数据)不为空 ASSERT(!upstream_filters_.empty()); // 传入nullptr,则从过滤器链的头部开始遍历 onContinueReading(nullptr); } |
这里的过滤器链遍历逻辑,和监听器过滤器链遍历很类似:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
void FilterManagerImpl::onContinueReading(ActiveReadFilter* filter) { // 如果不设置上一个迭代的过滤器,则从头开始,否则,从下一个开始 std::list<ActiveReadFilterPtr>::iterator entry; if (!filter) { entry = upstream_filters_.begin(); } else { entry = std::next(filter->entry()); } // 遍历过滤器 for (; entry != upstream_filters_.end(); entry++) { // 延迟初始化,如果过滤器尚未初始化,则调用其onNewConnection if (!(*entry)->initialized_) { (*entry)->initialized_ = true; FilterStatus status = (*entry)->filter_->onNewConnection(); if (status == FilterStatus::StopIteration) { return; } } // 获取先前的读缓冲区 BufferSource::StreamBuffer read_buffer = buffer_source_.getReadBuffer(); // 不管是可读、还是EOF,都要调用过滤器 if (read_buffer.buffer.length() > 0 || read_buffer.end_stream) { FilterStatus status = (*entry)->filter_->onData(read_buffer.buffer, read_buffer.end_stream); if (status == FilterStatus::StopIteration) { // 任何一个过滤器都可以终止迭代 return; } } } } |
过滤器链上的过滤器会被依次的调用。
对于L7连接,需要调用的网络过滤器通常就是ConnectionManagerImpl:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 |
Network::FilterStatus ConnectionManagerImpl::onData(Buffer::Instance& data, bool) { // 如果编解码器没有创建,则创建之 if (!codec_) { // 编解码器的类型是ServerConnection codec_ = config_.createCodec(read_callbacks_->connection(), data, *this); // 更新监控指标 if (codec_->protocol() == Protocol::Http2) { stats_.named_.downstream_cx_http2_total_.inc(); stats_.named_.downstream_cx_http2_active_.inc(); } else { stats_.named_.downstream_cx_http1_total_.inc(); stats_.named_.downstream_cx_http1_active_.inc(); } } bool redispatch; do { redispatch = false; try { // 尝试进行报文分发 codec_->dispatch(data); } catch (const CodecProtocolException& e) { // 分发出现错误 // 执行到这里,HTTP/1.1编解码器已经发送400状态码,HTTP/2编解码器已经发送GOAWAY ENVOY_CONN_LOG(debug, "dispatch error: {}", read_callbacks_->connection(), e.what()); stats_.named_.downstream_cx_protocol_error_.inc(); // 当出现协议错误的情况下,连接上的所有流都需要重置 resetAllStreams(); // 刷出写缓冲区、延迟关闭 read_callbacks_->connection().close(Network::ConnectionCloseType::FlushWriteAndDelay); return Network::FilterStatus::StopIteration; } // 处理入站数据可能会导致出站数据的释放,这里再次检查 // 看此连接是否可以在未决编解码数据发送后优雅的关闭 // 调用Network::ReadFilterCallbacks->connection().close(Network::ConnectionCloseType::FlushWriteAndDelay) checkForDeferredClose(); // 对于HTTP/1编解码器来说,它会在单个消息完成后,暂停分发 if (codec_->protocol() != Protocol::Http2) { // 如果没有额外流并且还有更多数据,执行重分发 if (read_callbacks_->connection().state() == Network::Connection::State::Open && data.length() > 0 && streams_.empty()) { redispatch = true; } // 如果仅有单个已经完成请求处理但未应答的非WebSockert流,则暂停套接字读,以apply back pressure if (!streams_.empty() && streams_.front()->state_.remote_complete_) { read_callbacks_->connection().readDisable(true); } } } while (redispatch); return Network::FilterStatus::StopIteration; } |
注意ConnectionManagerImpl总是会终止网络过滤器的迭代过程。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
Http::ServerConnectionPtr HttpConnectionManagerConfig::createCodec(Network::Connection& connection, const Buffer::Instance& data, Http::ServerConnectionCallbacks& callbacks) { // 根据协议类型创建适当的HTTP编解码器 switch (codec_type_) { case CodecType::HTTP1: return Http::ServerConnectionPtr{ new Http::Http1::ServerConnectionImpl(connection, callbacks, http1_settings_)}; case CodecType::HTTP2: return Http::ServerConnectionPtr{new Http::Http2::ServerConnectionImpl( connection, callbacks, context_.scope(), http2_settings_, maxRequestHeadersKb())}; case CodecType::AUTO: return Http::ConnectionManagerUtility::autoCreateCodec(connection, data, callbacks, context_.scope(), http1_settings_, http2_settings_, maxRequestHeadersKb()); } NOT_REACHED_GCOVR_EXCL_LINE; } |
默认情况下,走的是CodecType::AUTO分支:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
ServerConnectionPtr ConnectionManagerUtility::autoCreateCodec( Network::Connection& connection, const Buffer::Instance& data, ServerConnectionCallbacks& callbacks, Stats::Scope& scope, const Http1Settings& http1_settings, const Http2Settings& http2_settings, const uint32_t max_request_headers_kb) { // 基于协议探测+应用层协议协商(ALPN)来确定下一个L7协议 // Http2::ALPN_STRING == "h2",是HTTP/2在ALPN中的代号 if (determineNextProtocol(connection, data) == Http2::ALPN_STRING) { // 使用HTTP/2协议 return ServerConnectionPtr{new Http2::ServerConnectionImpl( connection, callbacks, scope, http2_settings, max_request_headers_kb)}; } else { // 使用HTTP/1协议 return ServerConnectionPtr{ new Http1::ServerConnectionImpl(connection, callbacks, http1_settings)}; } } |
HTTP协议的版本不同,则ServerConnection的实际类型不同,对于HTTP/1来说,调用Http1::ServerConnectionImpl的构造函数:
1 2 3 4 |
ServerConnectionImpl::ServerConnectionImpl(Network::Connection& connection, ServerConnectionCallbacks& callbacks, Http1Settings settings) : ConnectionImpl(connection, HTTP_REQUEST), callbacks_(callbacks), codec_settings_(settings) {} |
这个函数没什么好说的,它的初始化列表中创建了ConnectionImpl,这是http::Connection的实现:
1 2 3 4 5 6 7 8 9 10 11 |
ConnectionImpl::ConnectionImpl(Network::Connection& connection, http_parser_type type) // L2 Connection对象 支持水位的缓冲 低水位回调 : connection_(connection), output_buffer_([&]() -> void { this->onBelowLowWatermark(); }, // 高水位回调 [&]() -> void { this->onAboveHighWatermark(); }) { // 设置水位,低水位为入参的1/2,高水位为入参 output_buffer_.setWatermarks(connection.bufferLimit()); // 初始化HTTP报文解析器 http_parser_init(&parser_, type); parser_.data = this; } |
http_parser_init来自Node.js项目:
1 2 3 4 5 6 7 8 9 10 |
请求头void http_parser_init (http_parser *parser, enum http_parser_type t) { void *data = parser->data; memset(parser, 0, sizeof(*parser)); parser->data = data; parser->type = t; parser->state = (t == HTTP_REQUEST ? s_start_req : (t == HTTP_RESPONSE ? s_start_res : s_start_req_or_res)); parser->http_errno = HPE_OK; } |
此解析器比较严格,如果你的应用程序的HTTP报文头不符合规范可能导致无法解析。
HTTP连接管理器会调用ServerConnection的dispatch方法进行数据分发,后者从http1::ConnectionImpl继承的dispatch逻辑如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
void ConnectionImpl::dispatch(Buffer::Instance& data) { ENVOY_CONN_LOG(trace, "parsing {} bytes", connection_, data.length()); // 是否可以直接分发,仅对于Upgrade请求返回true if (maybeDirectDispatch(data)) { return; } // 总是尝试将解析器从暂停中恢复 http_parser_pause(&parser_, 0); ssize_t total_parsed = 0; if (data.length() > 0) { // 获取原始缓冲切片,第一个参数是切片数组,第二个为数组大小,返回值是实际需要的切片数 // 第一次调用就是为了获得实际需要的切片数 uint64_t num_slices = data.getRawSlices(nullptr, 0); // #define STACK_ARRAY(var, type, num) StackArray<type> var(::alloca(sizeof(type) * num), num) // 在栈上创建数组变量slices STACK_ARRAY(slices, Buffer::RawSlice, num_slices); // 将带解析数据分到切片中 data.getRawSlices(slices.begin(), num_slices); // 逐个处理切片 for (const Buffer::RawSlice& slice : slices) { // 获取切片的裸数据,传递给HTTP解析器 total_parsed += dispatchSlice(static_cast<const char*>(slice.mem_), slice.len_); } } else { dispatchSlice(nullptr, 0); } // 解析完毕,分发完毕,对应的Envoy解码也完毕 ENVOY_CONN_LOG(trace, "parsed {} bytes", connection_, total_parsed); // 排干已经解析的数据 data.drain(total_parsed); // 如果Upgrade请求已经被处理,并且存在: // 1、请求体数据 // 2、或者early upgrade载荷 // 需要发送,则发送之 maybeDirectDispatch(data); } |
从上面的代码我们看到,HTTP请求数据是划分为切片,逐个切片进行解析的:
1 2 3 4 5 6 7 8 9 10 11 12 |
// 切片内容示例 // GET /healthz HTTP/1.1\r\nUser-Agent: curl/7.35.0\r\nAccept: */*\r\nHost: podinfo-canary.default.svc.k8s.gmem.cc\r\n\r\n size_t ConnectionImpl::dispatchSlice(const char* slice, size_t len) { ssize_t rc = http_parser_execute(&parser_, &settings_, slice, len); // 解析失败则抛出异常 if (HTTP_PARSER_ERRNO(&parser_) != HPE_OK && HTTP_PARSER_ERRNO(&parser_) != HPE_PAUSED) { sendProtocolError(); throw CodecProtocolException("http/1.1 protocol error: " + std::string(http_errno_name(HTTP_PARSER_ERRNO(&parser_)))); } return rc; } |
注意:由于HTTP1不支持多路复用,因此请求解析结果信息都是以Http::Http1::ConnectionImpl的实例变量的形式存放的。
http_parser_execute的实现细节我们不去深究,这里主要关注一下settings_,其类型为:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
struct http_parser_settings { // 在解析了HTTP报文的各个部分之后,执行对应的回调 http_cb on_message_begin; http_data_cb on_url; http_data_cb on_status; http_data_cb on_header_field; http_data_cb on_header_value; http_cb on_headers_complete; http_data_cb on_body; http_cb on_message_complete; // 调用on_chunk_header时当前chunk的长度存放在 parser->content_length http_cb on_chunk_header; http_cb on_chunk_complete; }; |
Envoy提供的settings_变量如下,注意ConnectionImpl对象调用了HTTP解析器,并且把自身传递给parser.data:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
http_parser_settings ConnectionImpl::settings_{ [](http_parser* parser) -> int { static_cast<ConnectionImpl*>(parser->data)->onMessageBeginBase(); return 0; }, [](http_parser* parser, const char* at, size_t length) -> int { static_cast<ConnectionImpl*>(parser->data)->onUrl(at, length); return 0; }, nullptr, // on_status [](http_parser* parser, const char* at, size_t length) -> int { static_cast<ConnectionImpl*>(parser->data)->onHeaderField(at, length); return 0; }, [](http_parser* parser, const char* at, size_t length) -> int { static_cast<ConnectionImpl*>(parser->data)->onHeaderValue(at, length); return 0; }, [](http_parser* parser) -> int { return static_cast<ConnectionImpl*>(parser->data)->onHeadersCompleteBase(); }, [](http_parser* parser, const char* at, size_t length) -> int { static_cast<ConnectionImpl*>(parser->data)->onBody(at, length); return 0; }, [](http_parser* parser) -> int { static_cast<ConnectionImpl*>(parser->data)->onMessageCompleteBase(); return 0; }, nullptr, // on_chunk_header nullptr // on_chunk_complete }; |
最初被回调的是onMessageBeginBase方法,表示开始解析HTTP报文了:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
void ConnectionImpl::onMessageBeginBase() { ENVOY_CONN_LOG(trace, "message begin", connection_); ASSERT(!current_header_map_); // HeaderMapImpl是为性能高度优化的Http::HeaderMap实现,尽量避免拷贝和内存分配 current_header_map_ = std::make_unique<HeaderMapImpl>(); // 解析状态,Field / Value / Done header_parsing_state_ = HeaderParsingState::Field; onMessageBegin(); } void ServerConnectionImpl::onMessageBegin() { if (!resetStreamCalled()) { // 如果没有进行流重置,则初始化当前ActiveRequest对象 ASSERT(!active_request_); active_request_ = std::make_unique<ActiveRequest>(*this); // StreamDecoder ResponseStreamEncoderImpl active_request_->request_decoder_ = &callbacks_.newStream(active_request_->response_encoder_); } } |
解析出URL路径后,回调:
1 2 3 4 5 |
void ServerConnectionImpl::onUrl(const char* data, size_t length) { if (active_request_) { active_request_->request_url_.append(data, length); } } |
为请求设置请求URL的路径部分,例如 /healthz。
解析完每个请求头后,依次回调onHeaderField、onHeaderValue方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
void ConnectionImpl::onHeaderField(const char* data, size_t length) { if (header_parsing_state_ == HeaderParsingState::Done) { // 忽略 trailers return; } // 解析请求值后,下面的判断为true if (header_parsing_state_ == HeaderParsingState::Value) { // 完成上一个请求头的处理 completeLastHeader(); } // 暂存到一个缓冲区中 current_header_field_.append(data, length); } void ConnectionImpl::onHeaderValue(const char* data, size_t length) { if (header_parsing_state_ == HeaderParsingState::Done) { // 忽略 trailers return; } // 设置头解析状态 header_parsing_state_ = HeaderParsingState::Value; // 暂存到一个缓冲区中 current_header_value_.append(data, length); } |
在解析完最后一个请求头后会执行:
1 2 3 4 5 6 7 8 9 10 11 12 13 |
void ConnectionImpl::completeLastHeader() { if (!current_header_field_.empty()) { toLowerTable().toLowerCase(current_header_field_.buffer(), current_header_field_.size()); // 存储到请求头映射中 current_header_map_->addViaMove(std::move(current_header_field_), std::move(current_header_value_)); } // 设置头解析状态 header_parsing_state_ = HeaderParsingState::Field; // 由于std::move的移动语义 HeaderString 变成一个空壳子 ASSERT(current_header_field_.empty()); ASSERT(current_header_value_.empty()); } |
完成全部请求头的处理后,回调onHeadersCompleteBase:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 |
int ConnectionImpl::onHeadersCompleteBase() { // 将最后一个请求头加入映射 completeLastHeader(); if (!(parser_.http_major == 1 && parser_.http_minor == 1)) { // 如果不是HTTP/1.1,则设置协议 protocol_ = Protocol::Http10; } if (Utility::isUpgrade(*current_header_map_)) { // 根据请求头判定是否客户端在请求升级协议 handling_upgrade_ = true; } // 移动请求头映射 int rc = onHeadersComplete(std::move(current_header_map_)); current_header_map_.reset(); // 设置请求头解析状态 header_parsing_state_ = HeaderParsingState::Done; // 返回2,提示http_parser不去期望请求体和更多的信息 return handling_upgrade_ ? 2 : rc; } int ServerConnectionImpl::onHeadersComplete(HeaderMapImplPtr&& headers) { // 需要处理响应比请求完成发生更早的情况,这种情况可能由上层代码导致 if (active_request_) { const char* method_string = http_method_str(static_cast<http_method>(parser_.method)); // 如果请求使用HEAD方法,则给与响应编码器以提示,便于它正确设置内容长度、传输编码等头字段 active_request_->response_encoder_.isResponseToHeadRequest(parser_.method == HTTP_HEAD); // 当前CONNECT方法是不支持的,但是http_parser_parse_url需要知晓CONNECT handlePath(*headers, parser_.method); ASSERT(active_request_->request_url_.empty()); // 添加Method头 headers->insertMethod().value(method_string, strlen(method_string)); // 判断请求体是否存在,这里使用新的RFC语义来判断 —— content-length头、hunked transfer-encoding头存在 // 意味着请求体存在 —— 而不是基于HTTP方法判断 // 如果没有请求体,延迟对StreamDecoder.decodeHeaders()的调用,直到http解析器flush(回调onMessageComplete) if (parser_.flags & F_CHUNKED || (parser_.content_length > 0 && parser_.content_length != ULLONG_MAX) || handling_upgrade_) { // 没有请求体,立即解码请求头 active_request_->request_decoder_->decodeHeaders(std::move(headers), false); // If the connection has been closed (or is closing) after decoding headers, pause the parser // so we return control to the caller. if (connection_.state() != Network::Connection::State::Open) { http_parser_pause(&parser_, 1); } } else { // 移动以便延迟解码请求头 deferred_end_stream_headers_ = std::move(headers); } } return 0; } |
完成整个请求处理后,回调onMessageCompleteBase:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
void ConnectionImpl::onMessageCompleteBase() { if (handling_upgrade_) { // 如果当前是Upgrade请求则不调用onMessageComplete // Upgrade载荷将作为流的体看待 ASSERT(!deferred_end_stream_headers_); // 暂停解析 http_parser_pause(&parser_, 1); return; } onMessageComplete(); } void ServerConnectionImpl::onMessageComplete() { if (active_request_) { Buffer::OwnedImpl buffer; // 提示请求端消息处理完毕 active_request_->remote_complete_ = true; // 如果延迟了请求头解码,这里进行解码 if (deferred_end_stream_headers_) { active_request_->request_decoder_->decodeHeaders(std::move(deferred_end_stream_headers_), true); deferred_end_stream_headers_.reset(); } else { // 否则,解码数据 active_request_->request_decoder_->decodeData(buffer, true); } } // 总是暂停HTTP解析器,这样调用者同时只能处理单个请求,从而施加反向压力(apply back pressure) // 调用者需要检测缓冲中有更多的数据,并进行再次分发 http_parser_pause(&parser_, 1); } |
经过上一节的分析,我们了解到,当HTTP解析器处理完请求后,会调用ServerConnectionImpl::onMessageComplete回调,该回调则会调用ActiveStream(实现了StreamDecoder)进行请求解码。
这个请求解码是Envoy上下文的,它会执行Envoy的核心代理逻辑 —— 遍历HTTP过滤器链、进行路由选择:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 |
// 该函数的逻辑顺序很复杂,但也很重要 // // 我们希望在选路之前做尽量少的工作,并且创建一个过滤器链来最大化需要定制过滤器行为—— 例如注册访问日志器 ——的请求数量 // 要达成目标,需要在以下几个事项之间进行权衡: // 1、对无效请求进行合法性检查,因为无效请求可能因为没有完整的头信息而无法选路 // 2、检查服务器错误响应(连接关闭、HEAD请求...)所需要的状态 // 3、过滤器对请求本身的、可能影响选路的修改 // void ConnectionManagerImpl::ActiveStream::decodeHeaders(HeaderMapPtr&& headers, bool end_stream) { // 移动请求头为ActiveStream的字段 request_headers_ = std::move(headers); if (Http::Headers::get().MethodValues.Head == request_headers_->Method()->value().c_str()) { // 判断是否HEAD请求 is_head_request_ = true; } ENVOY_STREAM_LOG(debug, "request headers complete (end_stream={}):\n{}", *this, end_stream, *request_headers_); // 如果请求仅有请求头(header-only,没有体),则在此可以结束解码流程 // 如果我们将请求转换为header-only,则一旦后续的decodeData/decodeTrailers被调用则当前流就被标记为完成 // 下面的方法设置remote_complete_ = end_stream maybeEndDecode(end_stream); // 如果过载了,只要解码了请求头,就丢弃请求 // 连接管理器是为当前L4连接服务的,它是一个网络过滤器。当出现过载后,其overload_stop_accepting_requests_ref_ == Active if (connection_manager_.overload_stop_accepting_requests_ref_ == Server::OverloadActionState::Active) { // 在此特殊分支下,不去创建过滤器链 —— 如果存在内存过载风险更重要的是避免内存分配,而非创建过滤器 // 标记为过滤器已创建 state_.created_filter_chain_ = true; connection_manager_.stats_.named_.downstream_rq_overload_close_.inc(); // 由Envoy直接应答下游 sendLocalReply(Grpc::Common::hasGrpcContentType(*request_headers_), Http::Code::ServiceUnavailable, "envoy overloaded", nullptr, is_head_request_, absl::nullopt); return; } // 是否Envoy需要代理Expect: 100-Continue if (!connection_manager_.config_.proxy100Continue() && request_headers_->Expect() && request_headers_->Expect()->value() == Headers::get().ExpectValues._100Continue.c_str()) { // 执行到这里意味着Envoy在处理100-Continue,跳过过滤器链,直接发送100-Continue给编码器 // 100-Continue用于客户端在发送POST数据给服务器前,征询服务器情况,看服务器是否处理POST的数据, // 如果不处理,客户端则不上传POST数据,如果处理,则POST上传数据。在现实应用中,通常在POST大数据时, // 才会使用100-continue协议 // 服务器端的行为应该是:返回100-Continue表示自己能够处理POST数据,或者错误码 // 执行一些统计指标收集 chargeStats(continueHeader()); // 执行响应编码 response_encoder_->encode100ContinueHeaders(continueHeader()); // 移除Expect头,防止在上游再次处理 request_headers_->removeExpect(); } // 从请求头中读取UserAgent —— 针对特定user agent的统计指标 connection_manager_.user_agent_.initializeFromHeaders( *request_headers_, connection_manager_.stats_.prefix_, connection_manager_.stats_.scope_); // 确保codec版本(HTTP协议版本)是支持的 Protocol protocol = connection_manager_.codec_->protocol(); if (protocol == Protocol::Http10) { // 这种情况下,HTTP/1.x中除了1.1都可以 stream_info_.protocol(protocol); if (!connection_manager_.config_.http1Settings().accept_http_10_) { // 如果配置中没有显式支持HTTP/1.0,发送Envoy本地响应Upgrade Required sendLocalReply(false, Code::UpgradeRequired, "", nullptr, is_head_request_, absl::nullopt); return; } else { // HTTP/1.0 默认不支持连接复用,除非请求头指定Keep-Alive,需要保证连接关闭 state_.saw_connection_close_ = true; if (request_headers_->Connection() && absl::EqualsIgnoreCase(request_headers_->Connection()->value().getStringView(), Http::Headers::get().ConnectionValues.KeepAlive)) { state_.saw_connection_close_ = false; } } } // 如果缺少Host头 if (!request_headers_->Host()) { if ((protocol == Protocol::Http10) && !connection_manager_.config_.http1Settings().default_host_for_http_10_.empty()) { // 当前是HTTP10且配置了缺省Host头,则设置此头 request_headers_->insertHost().value(connection_manager_.config_.http1Settings().default_host_for_http_10_); } else { // 非HTTP10,必须有Host头,对于HTTP11来说Host头重命名为:authority // Envoy本地应答 sendLocalReply(Grpc::Common::hasGrpcContentType(*request_headers_), Code::BadRequest, "", nullptr, is_head_request_, absl::nullopt); return; } } // 处理请求头部过长的情况 ASSERT(connection_manager_.config_.maxRequestHeadersKb() > 0); if (request_headers_->byteSize() > (connection_manager_.config_.maxRequestHeadersKb() * 1024)) { sendLocalReply(Grpc::Common::hasGrpcContentType(*request_headers_), Code::RequestHeaderFieldsTooLarge, "", nullptr, is_head_request_, absl::nullopt); return; } // 当前在应用层,我们仅仅支持相对路径。在这里预期codec已经把路径打散成片 // 注意:目前HTTP11 codec仅在allow_absolute_url标记启用的情况下才进行打散操作 // 我们也会检查:path头,因为CONNECT请求没有URL路径,而当前不支持CONNECT请求 if (!request_headers_->Path() || request_headers_->Path()->value().c_str()[0] != '/') { connection_manager_.stats_.named_.downstream_rq_non_relative_path_.inc(); sendLocalReply(Grpc::Common::hasGrpcContentType(*request_headers_), Code::NotFound, "", nullptr, is_head_request_, absl::nullopt); return; } // 对于HTTP11,如果请求头Connection: Close,表示不启用keep-Alive if (protocol == Protocol::Http11 && request_headers_->Connection() && absl::EqualsIgnoreCase(request_headers_->Connection()->value().getStringView(), Http::Headers::get().ConnectionValues.Close)) { // 那么意味着客户端已经关闭连接 state_.saw_connection_close_ = true; } // 如果当前请求不是内部重定向 if (!state_.is_internally_created_) { // Only sanitize headers on first pass. // 根据配置、请求头来修改下游连接的远程地址(客户端地址) // 日志目的 stream_info_.setDownstreamRemoteAddress(ConnectionManagerUtility::mutateRequestHeaders( *request_headers_, connection_manager_.read_callbacks_->connection(), connection_manager_.config_, *snapped_route_config_, connection_manager_.random_generator_, connection_manager_.runtime_, connection_manager_.local_info_)); } ASSERT(stream_info_.downstreamRemoteAddress() != nullptr); ASSERT(!cached_route_); // 刷新缓存的路由(条目),可能设置cached_cluster_info_ —— 目标上游集群信息,意味着选路可能完成 refreshCachedRoute(); const bool upgrade_rejected = createFilterChain() == false; // TODO 如果在准备过滤器迭代时,发现链中没有任何过滤器,连接管理器应该返回404,当前实现时不返回响应 if (protocol == Protocol::Http11 && cached_route_.value()) { if (upgrade_rejected) { // 当前路由不支持升级,因此发送Envoy本地响应 connection_manager_.stats_.named_.downstream_rq_ws_on_non_ws_route_.inc(); sendLocalReply(Grpc::Common::hasGrpcContentType(*request_headers_), Code::Forbidden, "", nullptr, is_head_request_, absl::nullopt); return; } // 允许WebSocket请求穿过启用了WebSocket支持的路由 } // 如果有路由,且路由配置了idle超时 if (cached_route_.value()) { const Router::RouteEntry* route_entry = cached_route_.value()->routeEntry(); if (route_entry != nullptr && route_entry->idleTimeout()) { idle_timeout_ms_ = route_entry->idleTimeout().value(); if (idle_timeout_ms_.count()) { // 如果流超时定时器没创建,则创建之 if (stream_idle_timer_ == nullptr) { stream_idle_timer_ = connection_manager_.read_callbacks_->connection().dispatcher().createTimer( [this]() -> void { onIdleTimeout(); }); } } else if (stream_idle_timer_ != nullptr) { // 如果存在流超时定时器,但是路由的idle超时为0,则禁用定时器 stream_idle_timer_ = nullptr; } } } // 进行请求追踪 if (connection_manager_.config_.tracingConfig()) { traceRequest(); } // 解码请求头 decodeHeaders(nullptr, *request_headers_, end_stream); // 重置超时定时器 resetIdleTimer(); } |
请求头的解码逻辑位于decodeHeaders方法中,上面的方法传入的第一个参数是nullptr,表示从头开始迭代过滤器链:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 |
void ConnectionManagerImpl::ActiveStream::decodeHeaders(ActiveStreamDecoderFilter* filter, HeaderMap& headers, bool end_stream) { // 从头,或者从指定过滤器开始迭代 std::list<ActiveStreamDecoderFilterPtr>::iterator entry; std::list<ActiveStreamDecoderFilterPtr>::iterator continue_data_entry = decoder_filters_.end(); if (!filter) { entry = decoder_filters_.begin(); } else { entry = std::next(filter->entry()); } // 遍历剩下的过滤器 for (; entry != decoder_filters_.end(); entry++) { ASSERT(!(state_.filter_call_state_ & FilterCallState::DecodeHeaders)); // 设置状态位 state_.filter_call_state_ |= FilterCallState::DecodeHeaders; (*entry)->end_stream_ = // 仅仅解码请求头,或者传入end_stream=true(表示这是header-only的请求) decoding_headers_only_ || (end_stream && continue_data_entry == decoder_filters_.end()); // 调用过滤器来解码请求头,返回的状态决定后续流程走向 FilterHeadersStatus status = (*entry)->decodeHeaders(headers, (*entry)->end_stream_); // ContinueAndEndStream表示继续迭代后续过滤器,但是忽略后续的请求体、尾 —— 这意味着创建header-only请求/应答 ASSERT(!(status == FilterHeadersStatus::ContinueAndEndStream && (*entry)->end_stream_)); // 清空状态位 state_.filter_call_state_ &= ~FilterCallState::DecodeHeaders; ENVOY_STREAM_LOG(trace, "decode headers called: filter={} status={}", *this, static_cast<const void*>((*entry).get()), static_cast<uint64_t>(status)); // 处理请求头的回调被调用后的通用处理逻辑: // 根据status设置ActiveStream的一些字段,例如stopped_、headers_only、headers_continued_ // 只有返回true,才可能继续迭代 if (!(*entry)->commonHandleAfterHeadersCallback(status, decoding_headers_only_) && std::next(entry) != decoder_filters_.end()) { // 如果当前不是最后一个过滤器,停止迭代。否则,还需要继续处理先前过滤器添加了体的情况 return; } // 这里处理特殊的情况:我们使用header-only请求,但是前面的过滤器填充了请求体 // 这意味着不能在内联迭代(inline iteration)阶段再向后面的过滤器传递end_stream = true了 if (end_stream && buffered_request_data_ && continue_data_entry == decoder_filters_.end()) { // 设置下一个执行的过滤器(为当前过滤器) continue_data_entry = entry; } } if (continue_data_entry != decoder_filters_.end()) { // 从当前过滤器继续迭代,调用continueDecoding()以防再调用decodeHeaders() ASSERT(buffered_request_data_); // 仿冒stopped_ = true,因为continueDecoding() 期望之 (*continue_data_entry)->stopped_ = true; // 使用缓冲的请求头、体数据继续迭代 (*continue_data_entry)->continueDecoding(); } if (end_stream) { // 解除超时过滤器 disarmRequestTimeout(); } } |
单个过滤器解码请求头的逻辑由ActiveStreamDecoderFilter.decodeHeaders提供:
1 2 3 4 |
FilterHeadersStatus decodeHeaders(HeaderMap& headers, bool end_stream) { is_grpc_request_ = Grpc::Common::hasGrpcContentType(headers); return handle_->decodeHeaders(headers, end_stream); } |
可以看到,它只是判断一下是否gRPC请求,然后就转交给 StreamDecoderFilter handle_,这个handle是一个个具体的HTTP过滤器。
HTTP过滤器可能对请求头进行任意的操作,例如修改某个头,最终它会返回下面的枚举值之一:
1 2 3 4 5 6 7 8 |
enum class FilterHeadersStatus { // 继续迭代下一个过滤器 Continue, // 不再迭代后续过滤器 StopIteration, // 继续迭代下一个过滤器,但是不忽略报文体、尾,也就是创建header-only的请求/响应 ContinueAndEndStream }; |
返回值会影响如何进行后续的过滤器迭代。
此方法考虑了协议升级的情况:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
bool ConnectionManagerImpl::ActiveStream::createFilterChain() { // 过滤器链已经创建则返回,HTTP过滤器链只有一个(相对于单个HTTP连接管理器),而不像网络过滤器,可以有多个 if (state_.created_filter_chain_) { return false; } bool upgrade_rejected = false; // 升级的目标协议 auto upgrade = request_headers_ ? request_headers_->Upgrade() : nullptr; // 标记为过滤器已创建 state_.created_filter_chain_ = true; if (upgrade != nullptr) { // 需要进行协议升级判断 const Router::RouteEntry::UpgradeMap* upgrade_map = nullptr; // 设置UpgradeMap,包含路由条目支持的升级协议信息 if (cached_route_.has_value() && cached_route_.value() && cached_route_.value()->routeEntry()) { upgrade_map = &cached_route_.value()->routeEntry()->upgradeMap(); } // 创建升级的过滤器链 if (connection_manager_.config_.filterFactory().createUpgradeFilterChain( upgrade->value().c_str(), upgrade_map, *this)) { state_.successful_upgrade_ = true; connection_manager_.stats_.named_.downstream_cx_upgrades_total_.inc(); connection_manager_.stats_.named_.downstream_cx_upgrades_active_.inc(); return true; } else { upgrade_rejected = true; // 失败,退化为默认过滤器链,调用者将会发送Envoy本地响应提示升级失败 } } // 创建默认过滤器链 connection_manager_.config_.filterFactory().createFilterChain(*this); return !upgrade_rejected; } |
默认过滤器链在下面的方法中创建:
1 2 3 4 5 |
void HttpConnectionManagerConfig::createFilterChain(Http::FilterChainFactoryCallbacks& callbacks) { for (const Http::FilterFactoryCb& factory : filter_factories_) { factory(callbacks); } } |
可以看到,和网络过滤器一样的模式,调用各过滤器提供的工厂,传输FilterChainFactoryCallbacks。
最后一个HTTP过滤器通常都是Envoy::Router::Filter,此过滤器决定如何转发下游请求给上游集群。毕竟Envoy只是个代理,它不负责实质性的请求处理。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 |
Http::FilterHeadersStatus Filter::decodeHeaders(Http::HeaderMap& headers, bool end_stream) { // 确保向上游发送的出站请求需要的HTTP/2头都存在 ASSERT(headers.Path()); ASSERT(headers.Method()); ASSERT(headers.Host()); // 来自下游的头 downstream_headers_ = &headers; // 是否为gRPC请求 grpc_request_ = Grpc::Common::hasGrpcContentType(headers); // 增加rq_total计数 config_.stats_.rq_total_.inc(); // 获取路由 route_ = callbacks_->route(); if (!route_) { // 增加no_route计数 config_.stats_.no_route_.inc(); ENVOY_STREAM_LOG(debug, "no cluster match for URL '{}'", *callbacks_, headers.Path()->value().c_str()); // 记录没有路由这一情况到StreamInfo callbacks_->streamInfo().setResponseFlag(StreamInfo::ResponseFlag::NoRouteFound); // 设置本地响应 callbacks_->sendLocalReply(Http::Code::NotFound, "", nullptr, absl::nullopt); return Http::FilterHeadersStatus::StopIteration; } // 如果有请求的直接响应,则返回直接响应,否则返回nullptr // 直接响应即Envoy自己生成的响应,而非代理上游集群的 const auto* direct_response = route_->directResponseEntry(); if (direct_response != nullptr) { config_.stats_.rq_direct_response_.inc(); // 重写Path头 direct_response->rewritePathHeader(headers, !config_.suppress_envoy_headers_); // 发送本地响应 callbacks_->sendLocalReply( // 使用直接响应的头、体 direct_response->responseCode(), direct_response->responseBody(), // 修改响应头的Lambda [this, direct_response, &request_headers = headers](Http::HeaderMap& response_headers) -> void { // 基于请求头得到重定向路径 const auto new_path = direct_response->newPath(request_headers); if (!new_path.empty()) { // 添加头 response_headers.addReferenceKey(Http::Headers::get().Location, new_path); } // 在转发之前,进行可能是破坏性的响应头转换,例如添加/删除头 // 只能在获取所有初始响应头后调用单次 direct_response->finalizeResponseHeaders(response_headers, callbacks_->streamInfo()); }, absl::nullopt); // Router过滤器总是停止迭代 return Http::FilterHeadersStatus::StopIteration; } // 匹配请求的路由条目 route_entry_ = route_->routeEntry(); // 从集群管理器cm_中后去路由条目提供的山有集群名称,例如 outbound|9898||podinfo-canary.default.svc.k8s.gmem.cc Upstream::ThreadLocalCluster* cluster = config_.cm_.get(route_entry_->clusterName()); if (!cluster) { // 找不到集群 config_.stats_.no_cluster_.inc(); ENVOY_STREAM_LOG(debug, "unknown cluster '{}'", *callbacks_, route_entry_->clusterName()); // 记录错误并进行本地应答 callbacks_->streamInfo().setResponseFlag(StreamInfo::ResponseFlag::NoRouteFound); callbacks_->sendLocalReply(route_entry_->clusterNotFoundResponseCode(), "", nullptr, absl::nullopt); return Http::FilterHeadersStatus::StopIteration; } // 从线程本地的cluster获得ClusterInfo对象,此对象可以安全的超越ThreadLocalCluster的生命周期存在 cluster_ = cluster->info(); // 虚拟上游集群,根据请求路径确定 request_vcluster_ = route_entry_->virtualCluster(headers); ENVOY_STREAM_LOG(debug, "cluster '{}' match for URL '{}'", *callbacks_, route_entry_->clusterName(), headers.Path()->value().c_str()); // 上游集群的统计指标的备选前缀 const Http::HeaderEntry* request_alt_name = headers.EnvoyUpstreamAltStatName(); if (request_alt_name) { alt_stat_prefix_ = std::string(request_alt_name->value().c_str()) + "."; headers.removeEnvoyUpstreamAltStatName(); } // 看看是不是应该立即杀死一定比例的、此集群的流量 // maintenanceMode()返回集群是否出于维护模式,出于此模式则不应该作为路由的目标,过滤器 // 可以根据自己的需要来处理此调用的返回值。此方法的实现可能引入某种随机性,不会每次返回一致的值 if (cluster_->maintenanceMode()) { // 上游服务器资源溢出,流需要被重置 callbacks_->streamInfo().setResponseFlag(StreamInfo::ResponseFlag::UpstreamOverflow); chargeUpstreamCode(Http::Code::ServiceUnavailable, nullptr, true); // 进行本地应答 callbacks_->sendLocalReply(Http::Code::ServiceUnavailable, "maintenance mode", [this](Http::HeaderMap& headers) { if (!config_.suppress_envoy_headers_) { // 添加Envoy特殊响应头 headers.insertEnvoyOverloaded().value( Http::Headers::get().EnvoyOverloadedValues.True); } }, absl::nullopt); cluster_->stats().upstream_rq_maintenance_mode_.inc(); return Http::FilterHeadersStatus::StopIteration; } // 获取上游集群的连接池 Http::ConnectionPool::Instance* conn_pool = getConnPool(); if (!conn_pool) { // 如果无法得到/创建线程池,所有该集群没有任何可用(健康的)端点 // 发送本地响应 sendNoHealthyUpstreamResponse(); return Http::FilterHeadersStatus::StopIteration; } /* 开始向上游集群的主机发送请求 */ // 根据路由配置和请求头来决定实际使用的请求超时时间。请求头中的超时优先级更高 timeout_ = FilterUtility::finalTimeout(*route_entry_, headers, !config_.suppress_envoy_headers_, grpc_request_); // 如果请求头x-envoy-upstream-rq-timeout-alt-response存在,则在请求上游超时后 if (headers.EnvoyUpstreamRequestTimeoutAltResponse()) { // 设置响应码 timeout_response_code_ = Http::Code::NoContent; // 同时移除x-envoy-upstream-rq-timeout-alt-response头 headers.removeEnvoyUpstreamRequestTimeoutAltResponse(); } // 如果此RouteEntry所属的虚拟主机的配置要求在上游请求中添加x-envoy-attempt-count头,则添加之 include_attempt_count_ = route_entry_->includeAttemptCount(); if (include_attempt_count_) { headers.insertEnvoyAttemptCount().value(attempt_count_); } // 将当前Span的追踪上下文注入到请求头 callbacks_->activeSpan().injectContext(headers); // 在转发请求前,进行可能是销毁性的请求头转换 —— 例如URL重写、添加额外的头、删除头 // 此方法必须仅在转发前调用单次 route_entry_->finalizeRequestHeaders(headers, callbacks_->streamInfo(), !config_.suppress_envoy_headers_); // 设置Scheme头,HTTP和HTTPS FilterUtility::setUpstreamScheme(headers, *cluster_); // 重试状态 retry_state_ = createRetryState(route_entry_->retryPolicy(), headers, *cluster_, config_.runtime_, config_.random_, callbacks_->dispatcher(), route_entry_->priority()); // 请求是否应该被shadow(镜像) do_shadowing_ = FilterUtility::shouldShadow(route_entry_->shadowPolicy(), config_.runtime_, callbacks_->streamId()); ENVOY_STREAM_LOG(debug, "router decoding headers:\n{}", *callbacks_, headers); // 上游请求对象 upstream_request_ = std::make_unique<UpstreamRequest>(*this, *conn_pool); // 上游请求不会在本地走过滤器链,下面的方法仅仅是 // 1、调用conn_pool_.newStream()创建新的流 // 2、将新的流赋值给UpstreamRequest.conn_pool_stream_handle_变量 upstream_request_->encodeHeaders(end_stream); if (end_stream) { // 执行此回调,用于上游请求以异步发送的,这里不代表上游请求处理完毕 // 在Dispatcher上注册超时定时器,在上游请求执行超时后回调onResponseTimeout onRequestComplete(); } return Http::FilterHeadersStatus::StopIteration; } |
如果选择的路由的上游集群没有健康的端点,则会调用:
1 2 3 4 5 |
void Filter::sendNoHealthyUpstreamResponse() { callbacks_->streamInfo().setResponseFlag(StreamInfo::ResponseFlag::NoHealthyUpstream); chargeUpstreamCode(Http::Code::ServiceUnavailable, nullptr, false); callbacks_->sendLocalReply(Http::Code::ServiceUnavailable, "no healthy upstream", nullptr, absl::nullopt); } |
给与客户端503响应,响应体设置为 no healthy upstream 。
Envoy和上游集群主机的交互,是通过连接池进行的。每个上游主机对应一个连接池对象,根据协议和配置的不同,连接池中维持的连接数量也不同。对于HTTP/2协议,由于多路复用的关系,不考虑套接字选项的情况下,池中总是只有单个连接。
路由过滤器会调用getConnPool()来获取连接池:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
Http::ConnectionPool::Instance* Filter::getConnPool() { // 获取集群支持的特性,位域字段 auto features = cluster_->features(); // 根据上游集群的配置、下游连接的类型来决定使用什么协议 // 根据运行时配置,集群可能将HTTP2降级为HTTP1 Http::Protocol protocol; if (features & Upstream::ClusterInfo::Features::USE_DOWNSTREAM_PROTOCOL) { // 如果使用下游的协议 protocol = callbacks_->streamInfo().protocol().value(); } else { // 否则,如果上游支持HTTP2则使用之,不支持则HTTP11 protocol = (features & Upstream::ClusterInfo::Features::HTTP2) ? Http::Protocol::Http2 : Http::Protocol::Http11; } // cm_是集群管理器 return config_.cm_.httpConnPoolForCluster(route_entry_->clusterName(), route_entry_->priority(), protocol, this); } |
连接池的管理,实际上由集群管理器负责:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
Http::ConnectionPool::Instance* ClusterManagerImpl::httpConnPoolForCluster(const std::string& cluster, ResourcePriority priority, Http::Protocol protocol, LoadBalancerContext* context) { // 获取线程本地的集群管理器对象 ThreadLocalClusterManagerImpl& cluster_manager = tls_->getTyped<ThreadLocalClusterManagerImpl>(); // 根据名称查找上游集群 auto entry = cluster_manager.thread_local_clusters_.find(cluster); if (entry == cluster_manager.thread_local_clusters_.end()) { return nullptr; } // 委托给上游集群 return entry->second->connPool(priority, protocol, context); } |
获取连接池的工作进一步委托给上游集群(ClusterEntry):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
Http::ConnectionPool::Instance* ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::connPool( ResourcePriority priority, Http::Protocol protocol, LoadBalancerContext* context) { // 根据负载均衡上下文(就是Router这个过滤器),也就是负载均衡策略,来决定使用哪个主机的连接池 HostConstSharedPtr host = lb_->chooseHost(context); if (!host) { ENVOY_LOG(debug, "no healthy host for HTTP connection pool"); cluster_info_->stats().upstream_cx_none_healthy_.inc(); return nullptr; } // 从下游连接继承套接字选项 std::vector<uint8_t> hash_key = {uint8_t(protocol), uint8_t(priority)}; // 基于下游套接字选项来计算连接池的哈希键。以便基于套接字选项来控制连接池,让不同选项的连接不池化在一起 bool have_options = false; if (context && context->downstreamConnection()) { const Network::ConnectionSocket::OptionsSharedPtr& options = context->downstreamConnection()->socketOptions(); if (options) {超时 for (const auto& option : *options) { have_options = true; option->hashKey(hash_key); } } } // 获取单个主机的连接池容器 ConnPoolsContainer& container = *parent_.getHttpConnPoolsContainer(host, true); // 根据套接字选项的哈希,从连接池容器中获得连接池 Http::ConnectionPool::Instance& pool = container.pools_->getPool(hash_key, [&]() { return parent_.parent_.factory_.allocateConnPool( parent_.thread_local_dispatcher_, host, priority, protocol, have_options ? context->downstreamConnection()->socketOptions() : nullptr); }); return &pool; } |
获得可用的连接池对象后,Router过滤器会创建UpstreamRequest ,并调用它的encodeHeaders方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 |
// encodeHeaders不需要变量过滤器链,因为过滤器链是为下游连接服务的 void Filter::UpstreamRequest::encodeHeaders(bool end_stream) { ASSERT(!encode_complete_); encode_complete_ = end_stream; // 创建一个新的流,并赋值给UpstreamRequest.conn_pool_stream_handle_ // 注意UpstreamRequest实现了StreamDecoder,能够解码上游响应 Http::ConnectionPool::Cancellable* handle = conn_pool_.newStream(*this, *this); if (handle) { // 可能在newStream()调用期间发生reset,这种情况下handle为nullptr conn_pool_stream_handle_ = handle; } } |
连接池的newStream方法创建一个连接到某个上游主机的新的流:
1 2 3 4 5 6 7 8 9 10 |
/** * 在连接池上创建一个新的流 * @param response_decoder 响应解码器 —— 对于上游请求,Router过滤器需要对其返回的应答进行解码 * @param cb 当连接准备好和失败时执行的回调,如果有可用的连接/出现立即的失败,这些回调可能在当前方法的上下文中直接调用 * 这种情况下,此函数返回nullptr * @return Cancellable* 如果池中没有可用的连接,上述cb不会被立即调用,该方法会返回一个Cancellable类型的handle * 调用者可以使用该句柄来取消请求 * 注意:一旦任何回调函数被调用,则句柄不再有效。要取消请求,必须将流重置 */ virtual Cancellable* newStream(Http::StreamDecoder& response_decoder, Callbacks& callbacks) PURE; |
上述方法的实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
ConnectionPool::Cancellable* ConnPoolImpl::newStream(StreamDecoder& response_decoder, ConnectionPool::Callbacks& callbacks) { // 统计指标收集 host_->cluster().stats().upstream_rq_total_.inc(); host_->stats().rq_total_.inc(); if (!ready_clients_.empty()) { // 如果有可用的客户端,则取出一个放到不可用列表中 ready_clients_.front()->moveBetweenLists(ready_clients_, busy_clients_); ENVOY_CONN_LOG(debug, "using existing connection", *busy_clients_.front()->codec_client_); // 然后将请求关联到客户端 attachRequestToClient(*busy_clients_.front(), response_decoder, callbacks); return nullptr; } // ResourceManager非完全一致的同步最大连接数、未决请求等信息 // 是否可以创建新的请求 if (host_->cluster().resourceManager(priority_).pendingRequests().canCreate()) { // 是否可以创建新的连接 bool can_create_connection = host_->cluster().resourceManager(priority_).connections().canCreate(); if (!can_create_connection) { // 连接总数超标 host_->cluster().stats().upstream_cx_overflow_.inc(); } // 如果池中根本没有连接,则立即创建一个防止饥饿 if ((ready_clients_.size() == 0 && busy_clients_.size() == 0) || can_create_connection) { // 创建新的客户端ActiveClient // 将其放入busy_clients_列表 createNewConnection(); } // 创建请求并排队 return newPendingRequest(response_decoder, callbacks); } else { // 超过允许的未决请求的最大数量 ENVOY_LOG(debug, "max pending requests overflow"); callbacks.onPoolFailure(ConnectionPool::PoolFailureReason::Overflow, nullptr); host_->cluster().stats().upstream_rq_pending_overflow_.inc(); return nullptr; } } |
可以看到,如果连接池有空闲的HTTP客户端,则将UpstreamRequest关联到一个空闲连接:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 |
void ConnPoolImpl::attachRequestToClient(ActiveClient& client, StreamDecoder& response_decoder, ConnectionPool::Callbacks& callbacks) { ASSERT(!client.stream_wrapper_); // 将UpstreamRequest+ActiveClient封装为流编解码包装器 client.stream_wrapper_ = std::make_unique<StreamWrapper>(response_decoder, client); // 回调onPoolReady:当连接池中有连接能够处理上游请求时执行 callbacks.onPoolReady(*client.stream_wrapper_, client.real_host_description_); } // StreamWrapper的构造函数: ConnPoolImpl::StreamWrapper::StreamWrapper(StreamDecoder& response_decoder, ActiveClient& parent) // CodecClient支持多种HTTP协议类型下的多路流、底层连接的管理 : StreamEncoderWrapper(parent.codec_client_->newStream(*this)), StreamDecoderWrapper(response_decoder), parent_(parent) { // 添加回调 StreamEncoderWrapper::inner_.getStream().addCallbacks(*this); } // 底层请求流 StreamEncoder& CodecClient::newStream(StreamDecoder& response_decoder) { // response_decoder即UpstreamRequest ActiveRequestPtr request(new ActiveRequest(*this, response_decoder)); // 创建出站请求流 request->encoder_ = &codec_->newStream(*request); request->encoder_->getStream().addCallbacks(*request); request->moveIntoList(std::move(request), active_requests_); disableIdleTimer(); return *active_requests_.front()->encoder_; } StreamEncoder& ClientConnectionImpl::newStream(StreamDecoder& response_decoder) { if (resetStreamCalled()) { throw CodecClientException("cannot create new streams after calling reset"); } // 为连接启用读 while (!connection_.readEnabled()) { connection_.readDisable(false); } request_encoder_ = std::make_unique<RequestStreamEncoderImpl>(*this); // 将UpstreamRequest纳入未决响应列表 pending_responses_.emplace_back(&response_decoder); return *request_encoder_; } |
反之,如果连接池没有空闲HTTP客户端,则创建PendingRequest并排队:
1 2 3 4 5 6 7 8 |
ConnectionPool::Cancellable* ConnPoolImplBase::newPendingRequest(StreamDecoder& decoder, ConnectionPool::Callbacks& callbacks) { ENVOY_LOG(debug, "queueing request due to no available connections"); // 创建PendingRequest PendingRequestPtr pending_request(new PendingRequest(*this, decoder, callbacks)); // 加入pending_requests_列表,然后返回 pending_request->moveIntoList(std::move(pending_request), pending_requests_); return pending_requests_.front().get(); } |
排队的请求会在以后,因为某种事件而关联到可用连接。例如新的针对上游主机的L4连接建立后:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
// Envoy::Http::Http1::ConnPoolImpl::attachRequestToClient conn_pool.cc:66 // Envoy::Http::Http1::ConnPoolImpl::processIdleClient conn_pool.cc:238 client.stream_wrapper_.reset(); if (pending_requests_.empty() || delay) { // 没有未决请求,将客户端加入空闲列表 client.moveBetweenLists(busy_clients_, ready_clients_); } else { // 绑定请求到客户端 attachRequestToClient(client, pending_requests_.back()->decoder_, pending_requests_.back()->callbacks_); pending_requests_.pop_back(); } // Envoy::Http::Http1::ConnPoolImpl::onConnectionEvent conn_pool.cc:183 if (event == Network::ConnectionEvent::Connected) { conn_connect_ms_->complete(); // 有空闲客户端了,处理之 processIdleClient(client, false); } // Envoy::Http::Http1::ConnPoolImpl::ActiveClient::onEvent conn_pool.h:89 void onEvent(Network::ConnectionEvent event) override { parent_.onConnectionEvent(*this, event); } // Envoy::Network::ConnectionImpl::raiseEvent connection_impl.cc:329 void ConnectionImpl::raiseEvent(ConnectionEvent event) { for (ConnectionCallbacks* callback : callbacks_) { callback->onEvent(event); } if (state() == State::Open && event == ConnectionEvent::Connected && write_buffer_->length() > 0) { onWriteReady(); } } // Envoy::Network::RawBufferSocket::onConnected raw_buffer_socket.cc:83 void RawBufferSocket::onConnected() { callbacks_->raiseEvent(ConnectionEvent::Connected); } // Envoy::Network::ConnectionImpl::onWriteReady connection_impl.cc:519 if (error == 0) { ENVOY_CONN_LOG(debug, "connected", *this); connecting_ = false; transport_socket_->onConnected(); ... // Envoy::Network::ConnectionImpl::onFileEvent connection_impl.cc:467 |
到这里为止,我们还没搞清楚,针对上游主机的请求到底是何时、由谁发出去的。实际上这是在Router过滤器的onPoolReady回调中进行的。
不管请求是异步还是同步的关联到HTTP客户端(attachRequestToClient),都会触发onPoolReady。此回调会真正发出请求:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 |
void Filter::UpstreamRequest::onPoolReady(Http::StreamEncoder& request_encoder, Upstream::HostDescriptionConstSharedPtr host) { ENVOY_STREAM_LOG(debug, "pool ready", *parent_.callbacks_); // 设置UpstreamRequest.upstream_host_ = host // 调用UpstreamRequest、Router的StreamInfo.onUpstreamHostSelected() onUpstreamHostSelected(host); request_encoder.getStream().addCallbacks(*this); // 创建per-try的定时器。per_try_timeout_字段被设置为已启用的定时器 setupPerTryTimeout(); conn_pool_stream_handle_ = nullptr; // 将StreamWrapper设置为请求编码器 setRequestEncoder(request_encoder); calling_encode_headers_ = true; if (parent_.route_entry_->autoHostRewrite() && !host->hostname().empty()) { // 如果当前路由条目设置了自动头重写,则使用目标上游主机的名称来覆盖请求头 parent_.downstream_headers_->Host()->value(host->hostname()); } // 注入传递当前追踪需要的头 if (span_ != nullptr) { span_->injectContext(*parent_.downstream_headers_); } // 日志用途信息 stream_info_.onFirstUpstreamTxByteSent(); parent_.callbacks_->streamInfo().onFirstUpstreamTxByteSent(); // 进行请求头编码,调用StreamEncoderWrapper,后者装饰一个StreamEncoder的实现RequestStreamEncoderImpl request_encoder.encodeHeaders(*parent_.downstream_headers_, !buffered_request_body_ && encode_complete_ && !encode_trailers_); calling_encode_headers_ = false; // 在encodeHeaders()调用过程中可能发生RESET,这里需要进行测试,尽管是非常边缘的情况 // 例如对于HTTP/2 codec,当帧由于某种原因无法编码的情况下就会出现RESET —— 比如头过大,超过64K if (deferred_reset_reason_) { // 重置回调 onResetStream(deferred_reset_reason_.value()); } else { // 编码请求体 if (buffered_request_body_) { stream_info_.addBytesSent(buffered_request_body_->length()); request_encoder.encodeData(*buffered_request_body_, encode_complete_ && !encode_trailers_); } // 编码请求尾 if (encode_trailers_) { request_encoder.encodeTrailers(*parent_.downstream_trailers_); } // 记录日志用的流信息 if (encode_complete_) { stream_info_.onLastUpstreamTxByteSent(); parent_.callbacks_->streamInfo().onLastUpstreamTxByteSent(); } } } |
上游请求的编码逻辑如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
void RequestStreamEncoderImpl::encodeHeaders(const HeaderMap& headers, bool end_stream) { // Method、Path头必须存在 const HeaderEntry* method = headers.Method(); const HeaderEntry* path = headers.Path(); if (!method || !path) { throw CodecClientException(":method and :path must be specified"); } // 如果是HEAD请求 if (method->value() == Headers::get().MethodValues.Head.c_str()) { head_request_ = true; } // 如果是HEAD请求,则设置pending_response.head_request_ = true connection_.onEncodeHeaders(headers); // 写入报文最前面的部分 connection_.reserveBuffer(std::max(4096U, path->value().size() + 4096)); connection_.copyToBuffer(method->value().c_str(), method->value().size()); connection_.addCharToBuffer(' '); connection_.copyToBuffer(path->value().c_str(), path->value().size()); connection_.copyToBuffer(REQUEST_POSTFIX, sizeof(REQUEST_POSTFIX) - 1); // 写入请求头部分,包括写入一些额外的头 StreamEncoderImpl::encodeHeaders(headers, end_stream); } |
可以看到,上游请求的编码,是不走HTTP过滤器链的。
那么,上游的响应又是如何接收到的呢?在newStream方法调用createNewConnection创建新客户端时,对应的L4连接也会被创建 —— libevent事件回调会被注册:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
void ConnPoolImpl::createNewConnection() { ActiveClientPtr client(new ActiveClient(*this)); client->moveIntoList(std::move(client), busy_clients_); } // ActiveClient的构造函数会创建L4连接 ConnPoolImpl::ActiveClient::ActiveClient(ConnPoolImpl& parent) : parent_(parent), // 连接到服务器端的超时回调 connect_timer_(parent_.dispatcher_.createTimer([this]() -> void { onConnectTimeout(); })), remaining_requests_(parent_.host_->cluster().maxRequestsPerConnection()) { // ... // 调用HostImpl.createConnection() Upstream::Host::CreateConnectionData data = parent_.host_->createConnection(parent_.dispatcher_, parent_.socket_options_, nullptr); } Host::CreateConnectionData HostImpl::createConnection( Event::Dispatcher& dispatcher, const Network::ConnectionSocket::OptionsSharedPtr& options, Network::TransportSocketOptionsSharedPtr transport_socket_options) const { // 创建L4客户端连接 return {createConnection(dispatcher, *cluster_, address_, options, transport_socket_options), shared_from_this()}; } |
响应就是通过libevent回调传递,其网络层的处理路径和处理下游请求时是完全一样的——不管是读下游请求还是上游响应,L4过滤器的onData都会被调用,在onContinueReading方法中进行报文的读取。
对于HTTP1来说,当报文头读取完毕后,Http::Http1::ClientConnectionImpl::onHeadersComplete被回调,它会转调PendingResponse.decoder.decodeHeaders方法,后者进而调用UpstreamRequest::decodeHeaders:
1 2 3 4 5 6 7 8 9 10 11 12 |
void Filter::UpstreamRequest::decodeHeaders(Http::HeaderMapPtr&& headers, bool end_stream) { stream_info_.onFirstUpstreamRxByteReceived(); parent_.callbacks_->streamInfo().onFirstUpstreamRxByteReceived(); maybeEndDecode(end_stream); // 读取头 upstream_headers_ = headers.get(); // 获取响应码 const uint64_t response_code = Http::Utility::getResponseStatus(*headers); stream_info_.response_code_ = static_cast<uint32_t>(response_code); // 调用Router parent_.onUpstreamHeaders(response_code, std::move(headers), end_stream); } |
UpstreamRequest.parent_就是Router过滤器,其onUpstreamHeaders的实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 |
void Filter::onUpstreamHeaders(const uint64_t response_code, Http::HeaderMapPtr&& headers, bool end_stream) { ENVOY_STREAM_LOG(debug, "upstream headers complete: end_stream={}", *callbacks_, end_stream); // 异常检测信息收集,为上游主机添加一个状态码 upstream_request_->upstream_host_->outlierDetector().putHttpResponseCode(response_code); // 健康检查快速失败标记 x-envoy-immediate-health-check-fail,可能通过管理端点设置 if (headers->EnvoyImmediateHealthCheckFail() != nullptr) { // 设置上游主机健康状态 upstream_request_->upstream_host_->healthChecker().setUnhealthy(); } // 重试相关的处理 if (retry_state_) { // onHostAttempted:当针对一个主机的请求尝试失败了,并且可以进行下一个尝试时,调用此回调 retry_state_->onHostAttempted(upstream_request_->upstream_host_); // 判断是否应该进行重试,如果是,调用回调,也就是doRetry() RetryStatus retry_status = retry_state_->shouldRetry( headers.get(), absl::optional<Http::StreamResetReason>(), [this]() -> void { doRetry(); }); // 捕获上游主机,因为后面的setupRetry()调用会清除upstream_request_ const auto upstream_host = upstream_request_->upstream_host_; if (retry_status == RetryStatus::Yes && setupRetry(end_stream)) { // 重试 Http::CodeStats& code_stats = httpContext().codeStats(); code_stats.chargeBasicResponseStat(cluster_->statsScope(), "retry.", static_cast<Http::Code>(response_code)); upstream_host->stats().rq_error_.inc(); return; } else if (retry_status == RetryStatus::NoOverflow) { // 上游过载 callbacks_->streamInfo().setResponseFlag(StreamInfo::ResponseFlag::UpstreamOverflow); } else if (retry_status == RetryStatus::NoRetryLimitExceeded) { // 达到最大重试次数 callbacks_->streamInfo().setResponseFlag(StreamInfo::ResponseFlag::UpstreamRetryLimitExceeded); } // 由于end_stream为false时不会调用cleanup(),保证重试定时器被销毁 retry_state_.reset(); } // 处理重定向 if (static_cast<Http::Code>(response_code) == Http::Code::Found && route_entry_->internalRedirectAction() == InternalRedirectAction::Handle && setupRedirect(*headers)) { return; // If the redirect could not be handled, fail open and let it pass to the // next downstream. } // 处理响应头x-envoy-upstream-service-time if (DateUtil::timePointValid(downstream_request_complete_time_)) { Event::Dispatcher& dispatcher = callbacks_->dispatcher(); MonotonicTime response_received_time = dispatcher.timeSystem().monotonicTime(); std::chrono::milliseconds ms = std::chrono::duration_cast<std::chrono::milliseconds>( response_received_time - downstream_request_complete_time_); if (!config_.suppress_envoy_headers_) { headers->insertEnvoyUpstreamServiceTime().value(ms.count()); } } // 根据响应头来设置此上游主机是否金丝雀版本 upstream_request_->upstream_canary_ = (headers->EnvoyUpstreamCanary() && headers->EnvoyUpstreamCanary()->value() == "true") || upstream_request_->upstream_host_->canary(); chargeUpstreamCode(response_code, *headers, upstream_request_->upstream_host_, false); // 处理非500响应头,主要是进行一些指标的收集 if (!Http::CodeUtility::is5xx(response_code)) { handleNon5xxResponseHeaders(*headers, end_stream); } // downstream_set_cookies_为需要添加到上游响应头中的Cookies for (const auto& header_value : downstream_set_cookies_) { headers->addReferenceKey(Http::Headers::get().SetCookie, header_value); } // 对响应头进行一系列最后处理: // 添加一系列用户定义的响应头,按照顺序: route-action-level、route-level、virtual host level、route-action-level route_entry_->finalizeResponseHeaders(*headers, callbacks_->streamInfo()); downstream_response_started_ = true; if (end_stream) { onUpstreamComplete(); } // 开始向下游发送响应头,这个是要走过滤器链的 callbacks_->encodeHeaders(std::move(headers), end_stream); } |
可以看到,上游响应的解码,也是不走HTTP过滤器链的。
另外需要注意,不管是下游请求、上游响应,都会经由http_parser回调L7连接的on***Complete方法,不同之处是,对于下游请求来说L7连接的实现是ServerConnectionImpl,而对于上游响应来说L7连接的实现是ClientConnectionImpl。
上游响应头处理完毕后,响应体回调onMessageComplete很快执行:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
void ClientConnectionImpl::onMessageComplete() { ENVOY_CONN_LOG(trace, "message complete", connection_); if (ignore_message_complete_for_100_continue_) { ignore_message_complete_for_100_continue_ = false; return; } if (!pending_responses_.empty()) { // 取出未决响应,注意这里是HTTP11,每个连接上同时只会有一个未决响应 PendingResponse response = pending_responses_.front(); pending_responses_.pop_front(); if (deferred_end_stream_headers_) { // 解码响应头 response.decoder_->decodeHeaders(std::move(deferred_end_stream_headers_), true); deferred_end_stream_headers_.reset(); } else { // 解码响应体 Buffer::OwnedImpl buffer; response.decoder_->decodeData(buffer, true); } } } |
response.decoder就是UpstreamRequest,其decodeData方法会调用Router过滤器的onUpstreamData,这类似于读取响应头时调用onUpstreamHeaders,类似的、可能被调用的其它回调包括onUpstreamTrailers、onUpstreamMetadata。
Router过滤器不负责真正的发送上游请求,这是由连接池异步进行的。它调用upstream_request_的encodeHeaders后,立即回调onRequestComplete,后者注册了定时器来处理请求超时:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
void Filter::onRequestComplete() { downstream_end_stream_ = true; // 获取事件分发器 Event::Dispatcher& dispatcher = callbacks_->dispatcher(); downstream_request_complete_time_ = dispatcher.timeSystem().monotonicTime(); // 有可能我们得到一个立即的RESET,因此这里判断上游请求是否为空 if (upstream_request_) { maybeDoShadowing(); // 如果配置了超时,则注册定时器,回调为onResponseTimeout if (timeout_.global_timeout_.count() > 0) { response_timeout_ = dispatcher.createTimer([this]() -> void { onResponseTimeout(); }); response_timeout_->enableTimer(timeout_.global_timeout_); } } } |
如果上游请求超时,下面的函数被调用:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
void Filter::onResponseTimeout() { ENVOY_STREAM_LOG(debug, "upstream timeout", *callbacks_); cluster_->stats().upstream_rq_timeout_.inc(); // 可能在执行上游请求重试backoff期间发生超时,那时是没有上游请求的。这种情况下仿冒一个RESET if (upstream_request_) { if (upstream_request_->upstream_host_) { upstream_request_->upstream_host_->stats().rq_timeout_.inc(); } // 请求已经处理,不能取消,必须重置流 upstream_request_->resetStream(); } // 触发上游重置,重置的原因有Reset, GlobalTimeout, PerTryTimeout几种,这里是GlobalTimeout onUpstreamReset(UpstreamResetType::GlobalTimeout, absl::optional<Http::StreamResetReason>()); } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 |
void Filter::onUpstreamReset(UpstreamResetType type, const absl::optional<Http::StreamResetReason>& reset_reason) { // 全局性超时 ASSERT(type == UpstreamResetType::GlobalTimeout || upstream_request_); // 上游重置 if (type == UpstreamResetType::Reset) { ENVOY_STREAM_LOG(debug, "upstream reset", *callbacks_); } Upstream::HostDescriptionConstSharedPtr upstream_host; if (upstream_request_) { upstream_host = upstream_request_->upstream_host_; if (upstream_host) { // 为上游主机的断路检测器提供信息,如果是RESET,则记录503,否则记录504(网关超时) upstream_host->outlierDetector().putHttpResponseCode( enumToInt(type == UpstreamResetType::Reset ? Http::Code::ServiceUnavailable : timeout_response_code_)); } } // 全局超时时不会重试,已经开始响应处理后也不会重试 if (type != UpstreamResetType::GlobalTimeout && !downstream_response_started_ && retry_state_) { // 回调retry modifiers if (upstream_host != nullptr) { retry_state_->onHostAttempted(upstream_host); } // 判断是否需要重试 RetryStatus retry_status = retry_state_->shouldRetry(nullptr, reset_reason, [this]() -> void { doRetry(); }); if (retry_status == RetryStatus::Yes && setupRetry(true)) { // 需要重试 if (upstream_host) { upstream_host->stats().rq_error_.inc(); } return; // 不应该重试 } else if (retry_status == RetryStatus::NoOverflow) { callbacks_->streamInfo().setResponseFlag(StreamInfo::ResponseFlag::UpstreamOverflow); } else if (retry_status == RetryStatus::NoRetryLimitExceeded) { callbacks_->streamInfo().setResponseFlag( StreamInfo::ResponseFlag::UpstreamRetryLimitExceeded); } } // 如果尚未向下游发送任何信息,则发送具有适当响应码的响应;否则仅仅是重置响应 if (downstream_response_started_) { if (upstream_request_ != nullptr && upstream_request_->grpc_rq_success_deferred_) { upstream_request_->upstream_host_->stats().rq_error_.inc(); } // 删除重试定时器 cleanup(); callbacks_->resetStream(); } else { cleanup(); Http::Code code; const char* body; if (type == UpstreamResetType::GlobalTimeout || type == UpstreamResetType::PerTryTimeout) { callbacks_->streamInfo().setResponseFlag(StreamInfo::ResponseFlag::UpstreamRequestTimeout); code = timeout_response_code_; body = code == Http::Code::GatewayTimeout ? "upstream request timeout" : ""; } else { StreamInfo::ResponseFlag response_flags = streamResetReasonToResponseFlag(reset_reason.value()); callbacks_->streamInfo().setResponseFlag(response_flags); code = Http::Code::ServiceUnavailable; body = "upstream connect error or disconnect/reset before headers"; } const bool dropped = reset_reason && reset_reason.value() == Http::StreamResetReason::Overflow; chargeUpstreamCode(code, upstream_host, dropped); // 如果有非5xx响应,却仍然被后端重置,或者在响应开始前超时,作为一个错误看待 if (upstream_host != nullptr && !Http::CodeUtility::is5xx(enumToInt(code))) { upstream_host->stats().rq_error_.inc(); } // 发送本地响应 callbacks_->sendLocalReply(code, body, [dropped, this](Http::HeaderMap& headers) { if (dropped && !config_.suppress_envoy_headers_) { headers.insertEnvoyOverloaded().value( Http::Headers::get().EnvoyOverloadedValues.True); } }, absl::nullopt); } } |
在Envoy的HTTP解码过滤器处理下游请求的过程中,可能由于多种原因(通常是异常情况),立即应答下游客户端,而不向上游转发请求。此时会调用ActiveStreamDecoderFilter,或者直接调用ActiveStream的:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
void ConnectionManagerImpl::ActiveStream::sendLocalReply( bool is_grpc_request, Code code, absl::string_view body, const std::function<void(HeaderMap& headers)>& modify_headers, bool is_head_request, const absl::optional<Grpc::Status::GrpcStatus> grpc_status) { // 断言当前流的响应头尚未设置 ASSERT(response_headers_ == nullptr); // 对于过早错误的处理,尽可能尝试创建出过滤器链,以便记录访问日志 if (!state_.created_filter_chain_) { createFilterChain(); } // 调用此工具函数 Utility::sendLocalReply(is_grpc_request, // 编码响应头的回调 [this, modify_headers](HeaderMapPtr&& headers, bool end_stream) -> void { if (modify_headers != nullptr) { // 转发sendLocalReply的入参 modify_headers(*headers); } // 移动响应头 response_headers_ = std::move(headers); // 编码响应头 encodeHeaders(nullptr, *response_headers_, end_stream); }, // 编码响应体的回调 [this](Buffer::Instance& data, bool end_stream) -> void { // 编码响应体 encodeData(nullptr, data, end_stream); }, // 被销毁?重置 响应码 响应体 gRPC状态码 提示是否header-only state_.destroyed_, code, body, grpc_status, is_head_request); } |
上面代码调用的工具函数实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 |
void Utility::sendLocalReply( bool is_grpc, std::function<void(HeaderMapPtr&& headers, bool end_stream)> encode_headers, std::function<void(Buffer::Instance& data, bool end_stream)> encode_data, const bool& is_reset, Code response_code, absl::string_view body_text, const absl::optional<Grpc::Status::GrpcStatus> grpc_status, bool is_head_request) { // encode_headers()调用可能重置流,但是在调用它之前,不能是已重置状态 ASSERT(!is_reset); // 如果请求是gRPC,则返回trailers-only的响应 if (is_grpc) { // 处理gRPC协议的响应头 HeaderMapPtr response_headers{new HeaderMapImpl{ {Headers::get().Status, std::to_string(enumToInt(Code::OK))}, {Headers::get().ContentType, Headers::get().ContentTypeValues.Grpc}, // gRPC状态码作为响应头 {Headers::get().GrpcStatus, std::to_string( enumToInt(grpc_status ? grpc_status.value() : Grpc::Utility::httpToGrpcStatus(enumToInt(response_code))))}}}; if (!body_text.empty() && !is_head_request) { // 如果提供了响应体,则编码为gRPC消息 response_headers->insertGrpcMessage().value(body_text); } encode_headers(std::move(response_headers), true); // 编码响应头 return; } // 处理非gRPC协议的响应头 HeaderMapPtr response_headers{ new HeaderMapImpl{{Headers::get().Status, std::to_string(enumToInt(response_code))}}}; if (!body_text.empty()) { response_headers->insertContentLength().value(body_text.size()); response_headers->insertContentType().value(Headers::get().ContentTypeValues.Text); } // 对于header-only响应,编码完头即返回 if (is_head_request) { encode_headers(std::move(response_headers), true); return; } // 否则,如果响应体不为空,则编码头后,再编码体 encode_headers(std::move(response_headers), body_text.empty()); // encode_headers()) 调用可能修改了is_reset,因此再次测试: if (!body_text.empty() && !is_reset) { // OwnedImpl封装一个分配的evbuffer,evbuffer用于libevent的缓冲网络I/O的缓冲区的处理 Buffer::OwnedImpl buffer(body_text); encode_data(buffer, true); } } |
响应头编码由HTTP连接管理器的ActiveStream::encodeHeaders方法完成:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 |
void ConnectionManagerImpl::ActiveStream::encodeHeaders(ActiveStreamEncoderFilter* filter, HeaderMap& headers, bool end_stream) { // 重置per-stream的空闲定时器,也就是重新计时 resetIdleTimer(); // 解除请求超时报警 disarmRequestTimeout(); // 设置 state_.local_complete_ = end_stream,并开始迭代过滤器链 std::list<ActiveStreamEncoderFilterPtr>::iterator entry = commonEncodePrefix(filter, end_stream); // 在何处开始encodeData()调用 std::list<ActiveStreamEncoderFilterPtr>::iterator continue_data_entry = encoder_filters_.end(); for (; entry != encoder_filters_.end(); entry++) { // 设置过滤器调用状态为正在编码响应头 ASSERT(!(state_.filter_call_state_ & FilterCallState::EncodeHeaders)); state_.filter_call_state_ |= FilterCallState::EncodeHeaders; // 设置过滤器的end_stream,如果header-only,或者传入end_stream==true // end_stream意味着后面没有响应体需要处理 (*entry)->end_stream_ = encoding_headers_only_ || (end_stream && continue_data_entry == encoder_filters_.end()); // 调用过滤器进行编码 FilterHeadersStatus status = (*entry)->handle_->encodeHeaders(headers, (*entry)->end_stream_); // 重置过滤器调用状态 state_.filter_call_state_ &= ~FilterCallState::EncodeHeaders; ENVOY_STREAM_LOG(trace, "encode headers called: filter={} status={}", *this, static_cast<const void*>((*entry).get()), static_cast<uint64_t>(status)); // 根据上一个过滤器的处理结果决定是否需要继续迭代 const auto continue_iteration = (*entry)->commonHandleAfterHeadersCallback(status, encoding_heade刷出rs_only_); // 对于header-only应答,标记为local_complete_ // 这样可以保证不会在doEndStream()中重置下游请求 if (encoding_headers_only_) { state_.local_complete_ = true; } // 不继续迭代,也不会执行后面的基本响应头 if (!continue_iteration) { return; } // 这里处理使用header-only响应,但是某个过滤器添加了响应体的情况 // 不能传递end_stream = true给后续的过滤器 if (end_stream && buffered_response_data_ && continue_data_entry == encoder_filters_.end()) { continue_data_entry = entry; } } // 基本响应头 // 设置Date头 connection_manager_.config_.dateProvider().setDateHeader(headers); // 设置Server头 // 使用setReference()是安全的,因为serverName()在监听器的生命周期内不会改变 headers.insertServer().value().setReference(connection_manager_.config_.serverName()); // 如果是Upgrade请求,且没有响应体,则设置Content-Length头为0 // 否则,移除Connection头 // 移除Transfer=Encoding头 // 如果请求头中设置了Envoy强制跟踪头(x-envoy-force-trace),且存在request-id,则在响应头中设置request-id // 移除KeepAlive头 // 移除ProxyConnection头 // 根据需要添加内容到Via头 ConnectionManagerUtility::mutateResponseHeaders(headers, request_headers_.get(), connection_manager_.config_.via()); // 如果当前应当drain/close连接,在编码响应头块之前发送go away帧 if (connection_manager_.drain_state_ == DrainState::NotDraining && // drainClose如果连接应当被drain和close返回true // 如果监听器正处于draing状态(原因可能是健康检查、热重启)。此方法的返回值由监听器本地、全局DrainManager共同决定 // local_drain_manager_->drainClose() || parent_.server_.drainManager().drainClose() connection_manager_.drain_close_.drainClose()) { // 对于HTTP/1.1请求来说不做什么实质性的事情,仅仅让L4连接有额外的时间和后续请求竞争 // 此方法在HTTP/1.1和HTTP/2之间保持逻辑一致 connection_manager_.startDrainSequence(); connection_manager_.stats_.named_.downstream_cx_drain_close_.inc(); ENVOY_STREAM_LOG(debug, "drain closing connection", *this); } // 由于Connection: Close头,的原因,设置DrainState为Closing if (connection_manager_.drain_state_ == DrainState::NotDraining && state_.saw_connection_close_) { ENVOY_STREAM_LOG(debug, "closing connection due to connection close header", *this); connection_manager_.drain_state_ = DrainState::Closing; } // 由于过载,且配置了在过载后禁用Keepalive,设置DrainState为Closing if (connection_manager_.drain_state_ == DrainState::NotDraining && connection_manager_.overload_disable_keepalive_ref_ == Server::OverloadActionState::Active) { ENVOY_STREAM_LOG(debug, "disabling keepalive due to envoy overload", *this); connection_manager_.drain_state_ = DrainState::Closing; connection_manager_.stats_.named_.downstream_cx_overload_disable_keepalive_.inc(); } // 如果准备在对端尚未完成的情况下销毁流,同时连接不支持多路分发(非HTTP2),设置DrainState为Closing if (!state_.remote_complete_) { if (connection_manager_.codec_->protocol() != Protocol::Http2) { connection_manager_.drain_state_ = DrainState::Closing; } connection_manager_.stats_.named_.downstream_rq_response_before_rq_complete_.inc(); } // DrainState被置为Closing,且当前不是HTTP2 if (connection_manager_.drain_state_ == DrainState::Closing && connection_manager_.codec_->protocol() != Protocol::Http2) { // 如果不是Upgrade请求,则设置Connection:Close响应头 // 关于Connection: close,如果出现在: // 1、请求头,表示它希望服务器在发送应答消息后关闭连接 // 2、响应头,表示服务器会在发送应答消息后关闭连接,如果请求头是Connection: Keep-Alive则同时意味着服务器不支持连接重用 if (!Utility::isUpgrade(headers)) { headers.insertConnection().value().setReference(Headers::get().ConnectionValues.Close); } } // 分布式追踪相关处理 // 关于x-envoy-decorator-operation头: // 1、如果入站请求提供了此头,则应该覆盖在由追踪系统生成的server span中本地定义的operation(span)名 // 2、如果出站响应存在此头,则应该覆盖任何本地定义的client span的operation(span)名 if (connection_manager_.config_.tracingConfig()) { if (connection_manager_.config_.tracingConfig()->operation_name_ == Tracing::OperationName::Ingress) { // 对于ingress(inbound)响应 // 如果请求头没有指定x-envoy-decorator-operation,则使用decorator的operation name作为x-envoy-decorator-operation响应头 if (decorated_operation_) { headers.insertEnvoyDecoratorOperation().value(*decorated_operation_); } } else if (connection_manager_.config_.tracingConfig()->operation_name_ == Tracing::OperationName::Egress) { // 对于egress(outbound)响应 const HeaderEntry* resp_operation_override = headers.EnvoyDecoratorOperation(); // 如果已经提供x-envoy-decorator-operation,则覆盖当前Spance的operation值 if (resp_operation_override) { if (!resp_operation_override->value().empty() && active_span_) { active_span_->setOperation(resp_operation_override->value().c_str()); } // 移除x-envoy-decorator-operation头,防止传播给服务 headers.removeEnvoyDecoratorOperation(); } } } // 进行统计指标收集 chargeStats(headers); stream_info_.onFirstDownstreamTxByteSent(); // 现在实际完成基于codec的响应头编码,生成、刷出响应。如果end_stream则endEncode() response_encoder_->encodeHeaders( headers, encoding_headers_only_ || (end_stream && continue_data_entry == encoder_filters_.end())); if (continue_data_entry != encoder_filters_.end()) { // 调用之前中止迭代的过滤器的continueEncoding()方法,此方法不会重复调用encodeHeaders() // 仿冒的设置stopped_ since=true,原因是continueEncoding()要求如此 ASSERT(buffered_response_data_); (*continue_data_entry)->stopped_ = true; (*continue_data_entry)->continueEncoding(); } else { // 对于header-only响应 —— 不管是过滤器将其转换为header-only还是上游仅仅返回headers,结束响应编码的处理 maybeEndEncode(encoding_headers_only_ || end_stream); } } void ConnectionManagerImpl::ActiveStream::maybeEndEncode(bool end_stream) { if (end_stream) { // 应当接受响应编码的处理 // 增加日志信息 stream_info_.onLastDownstreamTxByteSent(); // 结束span request_response_timespan_->complete(); // 处理由于上游响应或者reset导致应当结束的流 connection_manager_.doEndStream(*this); } } |
每个过滤器的 encodeHeaders(Http::HeaderMap& headers, bool)方法会被调用,返回的Http::FilterHeadersStatus会影响响应头编码的后续处理流程。
如果响应体缓冲区不为空,则需要在编码响应头后,继续处理响应体。响应体缓冲区的内容可能是由上游服务提供,也可能是由某个过滤器写入和修改。
响应体编码由HTTP连接管理器的ActiveStream::encodeData方法完成:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 |
void ConnectionManagerImpl::ActiveStream::encodeData(ActiveStreamEncoderFilter* filter, Buffer::Instance& data, bool end_stream) { // 和编码响应头时一样,重置空闲定时器 resetIdleTimer(); // 如果先前已经设置此状态,则直接返回 if (encoding_headers_only_) { return; } // 产生编码过滤器的列表 std::list<ActiveStreamEncoderFilterPtr>::iterator entry = commonEncodePrefix(filter, end_stream); // 在何处开始encodeTrailers调用 auto trailers_added_entry = encoder_filters_.end(); // 是否在响应体编码之前,响应尾已经存在了 const bool trailers_exists_at_start = response_trailers_ != nullptr; for (; entry != encoder_filters_.end(); entry++) { // 如果任何一个过滤器的end_stream_被标记,则意味着这个以及后续的过滤器不应该处理数据 if ((*entry)->end_stream_) { return; } ASSERT(!(state_.filter_call_state_ & FilterCallState::EncodeData)); // 设置过滤器调用状态 state_.filter_call_state_ |= FilterCallState::EncodeData; if (end_stream) { // 最后一个数据帧 state_.filter_call_state_ |= FilterCallState::LastDataFrame; } // 检查response_trailers_,应对前面的过滤器的encodeData()方法调用addEncodedTrailers()的情况 // 如果前面的过滤器添加了响应尾,则通知当前、后续过滤器,流处理尚不能结束 (*entry)->end_stream_ = end_stream && !response_trailers_; // 调用过滤器进行响应体编码 FilterDataStatus status = (*entry)->handle_->encodeData(data, (*entry)->end_stream_); // 重置过滤器调用状态 state_.filter_call_state_ &= ~FilterCallState::EncodeData; if (end_stream) { state_.filter_call_state_ &= ~FilterCallState::LastDataFrame; } ENVOY_STREAM_LOG(trace, "encode data called: filter={} status={}", *this, static_cast<const void*>((*entry).get()), static_cast<uint64_t>(status)); // 迭代前没有没有响应尾,但是 现在有响应尾(某过滤器添加) if (!trailers_exists_at_start && response_trailers_ && trailers_added_entry == encoder_filters_.end()) { // 这设置为当前过滤器 trailers_added_entry = entry; } // 消息体回调通用处理逻辑 if (!(*entry)->commonHandleAfterDataCallback(status, data, state_.encoder_filters_streaming_)) { return; } } ENVOY_STREAM_LOG(trace, "encoding data via codec (size={} end_stream={})", *this, data.length(), end_stream); // 日志信息 stream_info_.addBytesSent(data.length()); // 如果在encodeData期间添加了响应尾,则需要触发decodeTrailers,让过滤器有机会处理这些尾数据 if (trailers_added_entry != encoder_filters_.end()) { response_encoder_->encodeData(data, false); encodeTrailers(trailers_added_entry->get(), *response_trailers_); } else { // 调用StreamEncoder进行实际的响应体写入,并刷出 response_encoder_->encodeData(data, end_stream); maybeEndEncode(end_stream); } } |
每个过滤器的 encodeData(Buffer::Instance&, bool)方法会被调用,返回的Http::FilterHeadersStatus会影响响应体编码的后续处理流程。
Istio使用的不是原版的Envoy,它在项目istio/proxy中对Envoy进行了扩展,并在构建时引用Envoy的某个特定Commit Id,构建出完整的、增强的Envoy二进制文件。
Istio对Envoy做的增强主要是引入若干自定义过滤器,Mixer的客户端功能就是依赖于过滤器实现的。
如果要创建完全本地的调试环境,你需要签出两个项目并构建:
1 2 3 4 5 |
# istio git clone https://github.com/istio/istio.git # istio/proxy git clone https://github.com/istio/proxy.git |
通过上面的项目,启动Pilot Discovery、Pilot Agent、Mixer三个程序。Mixer的启动方式前文已经有说明,Pilot Discovery、Agent的启动方式可以参考Istio Pilot与Envoy的交互机制解读一文,需要注意的是,必须把binaryPath参数设置为istio/proxy项目构建出的envoy的路径。
1 |
pilot proxy sidecar ... --binaryPath=/home/alex/CPP/projects/clion/istio/proxy/bazel-bin/src/envoy/envoy |
从istio/proxy构建出envoy时,注意保留调试符号。
在Pilot Agent启动后,它会产生一个Envoy子进程。你可以用GDB连接到该进程,并在GDB控制台设置源码目录:
1 2 3 |
directory /home/alex/CPP/lib/libevent/2.1.8-stable directory /home/alex/CPP/projects/clion/istio/proxy directory /home/alex/CPP/projects/clion/istio/proxy/bazel-proxy |
然后暂停程序执行,确保源码路径已经匹配上。
执行下面的命令,获取本地运行的Envoy代理的配置:
1 |
curl http://127.0.0.1:15000/config_dump |
可以看到,监听器virtual的端口是15001。假设我们想访问podinfo-canary.default.svc.k8s.gmem.cc在9898端口提供的服务,来了解Envoy代理的行为,可以先设置Iptables规则:
1 2 |
# 针对lo接口的请求不走PREROUTING链 iptables -t nat -A OUTPUT -p tcp -o lo --dport 9898 -j REDIRECT --to-port 15001 |
然后,发起请求:
1 |
curl -H 'Host: podinfo-canary.default.svc.k8s.gmem.cc' http://127.0.0.1:9898/healthz |
此请求会触发Envoy的处理流程,包括对Mixer的L4、L7过滤器的调用。
Mixer过滤器处理HTTP请求头的逻辑如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
FilterHeadersStatus Filter::decodeHeaders(HeaderMap& headers, bool) { ENVOY_LOG(debug, "Called Mixer::Filter : {}", __func__); request_total_size_ += headers.byteSize(); // 配置,包含目的服务信息 ::istio::control::http::Controller::PerRouteConfig config; auto route = decoder_callbacks_->route(); if (route) { ReadPerRouteConfig(route->routeEntry(), &config); } // control是每个线程对应一个的控制对象 // controller是Mixer控制器,以MixerFitlerConfig为参数,执行任务来保证对HTTP/TCP请求的控制 // RequestHandler handler_,请求处理器,支持对Mixer服务器发起CHECK/REPORT调用 handler_ = control_.controller()->CreateRequestHandler(config); state_ = Calling; initiating_call_ = true; // CheckData用于抽取额外的HTTP数据,供Mixer Check使用 —— 它持有HeaderMap、Envoy Metadata、网络连接等信息 CheckData check_data(headers, decoder_callbacks_->streamInfo().dynamicMetadata(), decoder_callbacks_->connection()); // HeaderUpdate用Istio属性来更新HTTP请求头 Utils::HeaderUpdate header_update(&headers); headers_ = &headers; // Check调用相关逻辑: // 1、从客户端代理中抽取转发的属性 // 2、从请求中抽取属性 // 3、从配置中抽取属性 // 4、如果有必要,将一部分属性转发给下游 // 5、执行Check调用 cancel_check_ = handler_->Check( &check_data, &header_update, // TransportCheckFunc 用于异步发起Check调用 control_.GetCheckTransport(decoder_callbacks_->activeSpan()), // CheckDoneFunc 用于异步调用完成后处理CheckResponse [this](const CheckResponseInfo& info) { completeCheck(info); }); initiating_call_ = false; if (state_ == Complete) { return FilterHeadersStatus::Continue; } ENVOY_LOG(debug, "Called Mixer::Filter : {} Stop", __func__); return FilterHeadersStatus::StopIteration; } |
从上面的代码我们可以看到,Mixer过滤器在处理下游请求头期间,会异步的发起Check调用:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
CancelFunc RequestHandlerImpl::Check(CheckData* check_data, HeaderUpdate* header_update, TransportCheckFunc transport, CheckDoneFunc on_done) { // 添加转发的属性 AddForwardAttributes(check_data); // 移除Istio属性 x-istio-attributes header_update->RemoveIstioAttributes(); // 注入一个包含静态转发属性的头 service_context_->InjectForwardedAttributes(header_update); if (!service_context_->enable_mixer_check()) { // 如果没有启动Check功能,直接以OK响应回调CheckDoneFunc CheckResponseInfo check_response_info; check_response_info.response_status = Status::OK; on_done(check_response_info); return nullptr; } // 添加Check相关属性 AddCheckAttributes(check_data); // 根据Quota配置添加quota需求 service_context_->AddQuotas(&request_context_); // 异步发送Check调用 return service_context_->client_context()->SendCheck(transport, on_done, &request_context_); } |
此异步调用完成后,回调:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 |
void Filter::completeCheck(const CheckResponseInfo& info) { auto status = info.response_status; ENVOY_LOG(debug, "Called Mixer::Filter : check complete {}", status.ToString()); // 流已经被重置,停止回调 if (state_ == Responded) { return; } route_directive_ = info.route_directive; Utils::CheckResponseInfoToStreamInfo(info, decoder_callbacks_->streamInfo()); // 处理来自路由指令的直接响应 if (route_directive_.direct_response_code() != 0) { int status_code = route_directive_.direct_response_code(); ENVOY_LOG(debug, "Mixer::Filter direct response {}", status_code); state_ = Responded; decoder_callbacks_->sendLocalReply( Code(status_code), route_directive_.direct_response_body(), [this](HeaderMap& headers) { UpdateHeaders(headers, route_directive_.response_header_operations()); }, absl::nullopt); return; } // 如果状态不是OK,即使没有直接响应,也sendLocalReply if (!status.ok()) { state_ = Responded; int status_code = ::istio::utils::StatusHttpCode(status.error_code()); decoder_callbacks_->sendLocalReply(Code(status_code), status.ToString(), nullptr, absl::nullopt); return; } // 将状态置为完成 state_ = Complete; // 更新请求头 if (nullptr != headers_) { UpdateHeaders(*headers_, route_directive_.request_header_operations()); headers_ = nullptr; if (route_directive_.request_header_operations().size() > 0) { decoder_callbacks_->clearRouteCache(); } } if (!initiating_call_) { decoder_callbacks_->continueDecoding(); } } |
Report调用是延迟触发的,Mixer过滤器实现了Envoy::AccessLog::Instance(访问记录器),Report调用作为log方法逻辑的一部分。
Envoy在处理请求之后,可能会延迟的删除一些对象:
1 2 3 |
DispatcherImpl::DispatcherImpl(Buffer::WatermarkFactoryPtr&& factory, Api::Api& api) : deferred_delete_timer_(createTimer([this]() -> void { clearDeferredDeleteList(); })), // 延迟删除定时器 |
代表当前请求流的ActiveStream对象就是这样延迟删除的,删除时其析构函数被调用:
1 2 3 4 5 6 7 8 9 |
ConnectionManagerImpl::ActiveStream::~ActiveStream() { // ... // 遍历所有日志访问记录器 for (const AccessLog::InstanceSharedPtr& access_log : connection_manager_.config_.accessLogs()) { access_log->log(request_headers_.get(), response_headers_.get(), response_trailers_.get(), stream_info_); } // ... } |
可以看到,在ActiveStream析构时会调用所有访问日志记录器,包括Envoy::Http::Mixer::Filter::log:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
void Filter::log(const HeaderMap* request_headers, const HeaderMap* response_headers, const HeaderMap* response_trailers, const StreamInfo::StreamInfo& stream_info) { ENVOY_LOG(debug, "Called Mixer::Filter : {}", __func__); if (!handler_) { if (request_headers == nullptr) { return; } // 可能因为请求被其它过滤器拒绝,Mixer过滤器没调用,因此handler尚未初始化 ::istio::control::http::Controller::PerRouteConfig config; ReadPerRouteConfig(stream_info.routeEntry(), &config); handler_ = control_.controller()->CreateRequestHandler(config); } // 如果没有调用check,则check属性没被抽取 CheckData check_data(*request_headers, stream_info.dynamicMetadata(), decoder_callbacks_->connection()); // ReportData提供接口,抽取HTTP属性,供Mixer Report调用使用 ReportData report_data(response_headers, response_trailers, stream_info, request_total_size_); handler_->Report(&check_data, &report_data); } |
Report方法的实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
void RequestHandlerImpl::Report(CheckData* check_data, ReportData* report_data) { if (!service_context_->enable_mixer_report()) { return; } // 添加转发的属性 AddForwardAttributes(check_data); // 添加Check属性 AddCheckAttributes(check_data); AttributesBuilder builder(&request_context_); // 抽取Report属性 builder.ExtractReportAttributes(report_data); // 发送Report请求 service_context_->client_context()->SendReport(request_context_); } |
发送Report请求的工作最终委托给::istio::mixerclient::MixerClient:
1 2 3 |
void ClientContextBase::SendReport(const RequestContext& request) { mixer_client_->Report(*request.attributes); } |
MixerClient包含批量处理的逻辑:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
void MixerClientImpl::Report(const Attributes &attributes) { report_batch_->Report(attributes); } void ReportBatch::Report(const Attributes& request) { std::lock_guard<std::mutex> lock(mutex_); ++total_report_calls_; // 添加请求、压缩 batch_compressor_->Add(request); // 如果超过批量限制,立即Report if (batch_compressor_->size() >= options_.max_batch_entries) { FlushWithLock(); } else { // 否则,延迟发送 if (batch_compressor_->size() == 1 && timer_create_) { if (!timer_) { timer_ = timer_create_([this]() { Flush(); }); } timer_->Start(options_.max_batch_time_ms); } } } |
MixerClient通过rRPC向Mixer服务器发送的是属性(Attributes),过滤器在调用MixerClient之前,会进行属性的抽取。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 |
void AttributesBuilder::ExtractReportAttributes(ReportData *report_data) { utils::AttributesBuilder builder(request_->attributes); std::string dest_ip; int dest_port; // 抽取 destination.ip if (report_data->GetDestinationIpPort(&dest_ip, &dest_port)) { if (!builder.HasAttribute(utils::AttributeName::kDestinationIp)) { builder.AddBytes(utils::AttributeName::kDestinationIp, dest_ip); } if (!builder.HasAttribute(utils::AttributeName::kDestinationPort)) { builder.AddInt64(utils::AttributeName::kDestinationPort, dest_port); } } std::string uid; // 抽取 destination.uid if (report_data->GetDestinationUID(&uid)) { builder.AddString(utils::AttributeName::kDestinationUID, uid); } // 抽取 response.headers 所有响应头作为一个属性 std::map<std::string, std::string> headers = report_data->GetResponseHeaders(); builder.AddStringMap(utils::AttributeName::kResponseHeaders, headers); // 抽取 response.time builder.AddTimestamp(utils::AttributeName::kResponseTime, std::chrono::system_clock::now()); ReportData::ReportInfo info; report_data->GetReportInfo(&info); // 抽取 request.size builder.AddInt64(utils::AttributeName::kRequestBodySize, info.request_body_size); // 抽取 response.size builder.AddInt64(utils::AttributeName::kResponseBodySize, info.response_body_size); // 抽取 request.total_size builder.AddInt64(utils::AttributeName::kRequestTotalSize, info.request_total_size); // 抽取 response.total_size builder.AddInt64(utils::AttributeName::kResponseTotalSize, info.response_total_size); // 抽取 response.duration builder.AddDuration(utils::AttributeName::kResponseDuration, info.duration); // 抽取check属性 if (!request_->check_status.ok()) { // 抽取 response.code builder.AddInt64(utils::AttributeName::kResponseCode, utils::StatusHttpCode(request_->check_status.error_code())); // 抽取 check.error_code builder.AddInt64(utils::AttributeName::kCheckErrorCode, request_->check_status.error_code()); // 抽取 check.error_message builder.AddString(utils::AttributeName::kCheckErrorMessage, request_->check_status.ToString()); } else { builder.AddInt64(utils::AttributeName::kResponseCode, info.response_code); } ReportData::GrpcStatus grpc_status; if (report_data->GetGrpcStatus(&grpc_status)) { // 抽取 response.grpc_status builder.AddString(utils::AttributeName::kResponseGrpcStatus, grpc_status.status); // 抽取 response.grpc_message builder.AddString(utils::AttributeName::kResponseGrpcMessage, grpc_status.message); } builder.AddString(utils::AttributeName::kContextProxyErrorCode, info.response_flags); ReportData::RbacReportInfo rbac_info; if (report_data->GetRbacReportInfo(&rbac_info)) { if (!rbac_info.permissive_resp_code.empty()) { // 抽取 context.proxy_error_code builder.AddString(utils::AttributeName::kRbacPermissiveResponseCode, rbac_info.permissive_resp_code); } if (!rbac_info.permissive_policy_id.empty()) { // 抽取 rbac.permissive.effective_policy_id" builder.AddString(utils::AttributeName::kRbacPermissivePolicyId, rbac_info.permissive_policy_id); } } builder.FlattenMapOfStringToStruct(report_data->GetDynamicFilterState()); } |
Leave a Reply