编写Kubernetes风格的APIServer
前段时间接到一个需求做一个工具,工具将在K8S中运行。需求很适合用控制器模式实现,很自然的就基于kubebuilder进行开发了。但是和K8S环境提供方沟通时发现,他们不允许工作负载调用控制平面的接口,这该怎么办呢。
最快速的解决方案是,自己运行一套kube-apiserver + etcd。但是这对我们来说太重了,kube-apiserver很多我们不需要的特性占用了过多资源,因此这里想寻找一个更轻量的方案。
kubernetes/apiserver同步自kubernertes主代码树的taging/src/k8s.io/apiserver目录,它提供了创建K8S风格的API Server所需要的库。包括kube-apiserver、kube-aggregator、service-catalog在内的很多项目都依赖此库。
apiserver库的目的主要是用来构建API Aggregation中的Extension API Server。它提供的特性包括:
- 将authn/authz委托给主kube-apiserver
- 支持kuebctl兼容的API发现
- 支持admisson control链
- 支持版本化的API类型
K8S提供了一个样例kubernetes/sample-apiserver,但是这个例子依赖于主kube-apiserver。即使不使用authn/authz或API聚合,也是如此。你需要通过--kubeconfig来指向一个主kube-apiserver,样例中的SharedInformer依赖于会连接到主kube-apiserver来访问K8S资源。
显然我们是不能对主kube-apiserver有任何依赖的,这里分析一下sample-apiserver的代码,看看如何进行改动。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
func main() { logs.InitLogs() defer logs.FlushLogs() stopCh := genericapiserver.SetupSignalHandler() // 初始化服务器选项 options := server.NewWardleServerOptions(os.Stdout, os.Stderr) // 启动服务器 cmd := server.NewCommandStartWardleServer(options, stopCh) cmd.Flags().AddGoFlagSet(flag.CommandLine) if err := cmd.Execute(); err != nil { klog.Fatal(err) } } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
type WardleServerOptions struct { RecommendedOptions *genericoptions.RecommendedOptions SharedInformerFactory informers.SharedInformerFactory StdOut io.Writer StdErr io.Writer } func NewWardleServerOptions(out, errOut io.Writer) *WardleServerOptions { o := &WardleServerOptions{ RecommendedOptions: genericoptions.NewRecommendedOptions( // 数据默认存放在Etcd的/registry/wardle.example.com目录下 defaultEtcdPathPrefix, // 指定wardle.example.com/v1alpha1使用遗留编解码器 apiserver.Codecs.LegacyCodec(v1alpha1.SchemeGroupVersion), // API Server的进程信息 genericoptions.NewProcessInfo("wardle-apiserver", "wardle"), ), StdOut: out, StdErr: errOut, } // wardle.example.com/v1alpha1中的所有对象存储到Etcd o.RecommendedOptions.Etcd.StorageConfig.EncodeVersioner = runtime.NewMultiGroupVersioner(v1alpha1.SchemeGroupVersion, schema.GroupKind{Group: v1alpha1.GroupName}) return o } |
可以看到,选项的核心是genericoptions.RecommendedOptions,顾名思义,它用于提供运行apiserver所需的“推荐”选项:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
type RecommendedOptions struct { // Etcd相关的配置 Etcd *EtcdOptions // HTTPS相关选项,包括监听地址、证书等配置。还负责创建并设置Lookback专用的rest.Config SecureServing *SecureServingOptionsWithLoopback // authn选项 Authentication *DelegatingAuthenticationOptions // authz选项 Authorization *DelegatingAuthorizationOptions // 审计选项 Audit *AuditOptions // 用于启用剖析、竞态条件剖析 Features *FeatureOptions // 核心API选项,指定主kube-apiserver配置文件位置 CoreAPI *CoreAPIOptions // 特性开关 FeatureGate featuregate.FeatureGate // 所有以上选项的ApplyTo被调用后,调用下面的函数。返回的PluginInitializer会传递给Admission.ApplyTo ExtraAdmissionInitializers func(c *server.RecommendedConfig) ([]admission.PluginInitializer, error) Admission *AdmissionOptions // 提供服务器信息 ProcessInfo *ProcessInfo // Webhook选项 Webhook *WebhookOptions // 控制服务器的出站流量 EgressSelector *EgressSelectorOptions } |
推荐的选项取值,可以由函数genericoptions.NewRecommendedOptions()提供。RecommendedOptions支持通过命令行参数获取选项取值:
1 |
func (o *RecommendedOptions) AddFlags(fs *pflag.FlagSet) {} |
服务器实现为cobra.Command命令,首先会将RecommendedOptions绑定到命令行参数。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
func NewCommandStartWardleServer(defaults *WardleServerOptions, stopCh <-chan struct{}) *cobra.Command { o := *defaults cmd := &cobra.Command{ Short: "Launch a wardle API server", Long: "Launch a wardle API server", RunE: func(c *cobra.Command, args []string) error { // ... }, } flags := cmd.Flags() // 将选项添加为命令行标记 o.RecommendedOptions.AddFlags(flags) utilfeature.DefaultMutableFeatureGate.AddFlag(flags) return cmd } |
然后,调用cmd.Execute(),进而调用上面的RunE方法:
1 2 3 4 5 6 7 8 9 10 |
if err := o.Complete(); err != nil { return err } if err := o.Validate(args); err != nil { return err } if err := o.RunWardleServer(stopCh); err != nil { return err } return nil |
Complete方法,就是注册了一个Admission控制器,参考下文。
Validate方法调用RecommendedOptions进行选项(合并了用户提供的命令行标记)合法性校验:
1 2 3 4 5 6 7 |
func (o WardleServerOptions) Validate(args []string) error { errors := []error{} // 校验结果是错误的切片 errors = append(errors, o.RecommendedOptions.Validate()...) // 合并为单个错误 return utilerrors.NewAggregate(errors) } |
RunWardleServer方法启动API Server。它包含了将服务器选项(Option)转换为服务器配置(Config),从服务器配置实例化APIServer,并运行APIServer的整个流程:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
func (o WardleServerOptions) RunWardleServer(stopCh <-chan struct{}) error { // 选项转换为配置 config, err := o.Config() if err != nil { return err } // 配置转换为CompletedConfig,实例化APIServer server, err := config.Complete().New() if err != nil { return err } // 注册一个在API Server启动之后运行的钩子 server.GenericAPIServer.AddPostStartHookOrDie("start-sample-server-informers", func(context genericapiserver.PostStartHookContext) error { config.GenericConfig.SharedInformerFactory.Start(context.StopCh) o.SharedInformerFactory.Start(context.StopCh) return nil }) // 准备运行、 运行 APIServer return server.GenericAPIServer.PrepareRun().Run(stopCh) } |
API Server不能直接使用选项,必须将选项转换为apiserver.Config:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
func (o *WardleServerOptions) Config() (*apiserver.Config, error) { // 这里又对选项进行了若干修改 // 检查证书是否可以读取,如果不可以则尝试生成自签名证书 if err := o.RecommendedOptions.SecureServing.MaybeDefaultWithSelfSignedCerts("localhost", nil, []net.IP{net.ParseIP("127.0.0.1")}); err != nil { return nil, fmt.Errorf("error creating self-signed certificates: %v", err) } // 根据特性开关,决定是否支持分页 o.RecommendedOptions.Etcd.StorageConfig.Paging = utilfeature.DefaultFeatureGate.Enabled(features.APIListChunking) // o.RecommendedOptions.ExtraAdmissionInitializers = func(c *genericapiserver.RecommendedConfig) ([]admission.PluginInitializer, error) { // ... } // 创建推荐配置 serverConfig := genericapiserver.NewRecommendedConfig(apiserver.Codecs) // 暴露OpenAPI端点 serverConfig.OpenAPIConfig = genericapiserver.DefaultOpenAPIConfig( // 自动生成的 sampleopenapi.GetOpenAPIDefinitions, openapi.NewDefinitionNamer(apiserver.Scheme)) serverConfig.OpenAPIConfig.Info.Title = "Wardle" serverConfig.OpenAPIConfig.Info.Version = "0.1" // 将RecommendedOptions应用到RecommendedConfig if err := o.RecommendedOptions.ApplyTo(serverConfig); err != nil { return nil, err } // 选项包含GenericConfig和你自定义的选项两部分 config := &apiserver.Config{ GenericConfig: serverConfig, ExtraConfig: apiserver.ExtraConfig{}, } return config, nil } |
从上面的代码可以看到RecommendedConfig是配置的核心:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
type RecommendedConfig struct { // 用于配置GenericAPIServer的结构 Config // SharedInformerFactory用于提供K8S资源的shared informers // 该字段由RecommendedOptions.CoreAPI.ApplyTo调用设置,informer默认使用in-cluster的ClientConfig SharedInformerFactory informers.SharedInformerFactory // 由RecommendedOptions.CoreAPI.ApplyTo设置,informer使用 ClientConfig *restclient.Config } type Config struct { SecureServing *SecureServingInfo Authentication AuthenticationInfo Authorization AuthorizationInfo // 特权的本机使用的ClientConfig,PostStartHooks用到它 LoopbackClientConfig *restclient.Config // ... } |
通过调用ApplyTo方法,将RecommendedOptions中的选项传递给了RecommendedConfig:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 |
func (o *RecommendedOptions) ApplyTo(config *server.RecommendedConfig) error { // 调用config.AddHealthChecks添加Etcd的健康检查 // 设置config.RESTOptionsGetter if err := o.Etcd.ApplyTo(&config.Config); err != nil { return err } // 创建config.Listener // 初始化config.Cert、config.CipherSuites、config.SNICerts等 if err := o.SecureServing.ApplyTo(&config.Config.SecureServing, &config.Config.LoopbackClientConfig); err != nil { return err } // 初始化身份验证配置,获取相应的K8S客户端接口 if err := o.Authentication.ApplyTo(&config.Config.Authentication, config.SecureServing, config.OpenAPIConfig); err != nil { return err } // 初始化访问控制配置,获取相应的K8S客户端接口 if err := o.Authorization.ApplyTo(&config.Config.Authorization); err != nil { return err } if err := o.Audit.ApplyTo(&config.Config, config.ClientConfig, config.SharedInformerFactory, o.ProcessInfo, o.Webhook); err != nil { return err } if err := o.Features.ApplyTo(&config.Config); err != nil { return err } // 从配置文件加载kubeconfig或者使用incluster配置,提供config.ClientConfig和config.SharedInformerFactory if err := o.CoreAPI.ApplyTo(config); err != nil { return err } // 调用Admission初始化器 if initializers, err := o.ExtraAdmissionInitializers(config); err != nil { return err // 逐个初始化Admission控制器 } else if err := o.Admission.ApplyTo(&config.Config, config.SharedInformerFactory, config.ClientConfig, o.FeatureGate, initializers...); err != nil { return err } if err := o.EgressSelector.ApplyTo(&config.Config); err != nil { return err } if feature.DefaultFeatureGate.Enabled(features.APIPriorityAndFairness) { config.FlowControl = utilflowcontrol.New( config.SharedInformerFactory, kubernetes.NewForConfigOrDie(config.ClientConfig).FlowcontrolV1alpha1(), config.MaxRequestsInFlight+config.MaxMutatingRequestsInFlight, config.RequestTimeout/4, ) } return nil } |
可以看到,在生成服务器配置的阶段,对主kube-apiserver有强依赖,这些依赖导致了sample-apiserver无法脱离K8S独立运行。
RecommendedConfig还需要通过Conplete方法,变成CompletedConfig:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
// server, err := config.Complete().New() func (cfg *Config) Complete() CompletedConfig { c := completedConfig{ cfg.GenericConfig.Complete(), &cfg.ExtraConfig, } c.GenericConfig.Version = &version.Info{ Major: "1", Minor: "0", } return CompletedConfig{&c} } // Complete补全缺失的、必须的配置信息,这些信息能够从已有配置导出 func (c *RecommendedConfig) Complete() CompletedConfig { return c.Config.Complete(c.SharedInformerFactory) } |
WardleServer,是从CompletedConfig实例化的:
1 2 3 4 5 6 7 8 9 10 11 12 |
func (c completedConfig) New() (*WardleServer, error) { // 创建GenericAPIServer // 名字用于在记录日志时进行区分 // DelegationTarget用于进行APIServer的组合(composition) genericServer, err := c.GenericConfig.New("sample-apiserver", genericapiserver.NewEmptyDelegate()) if err != nil { return nil, err } s := &WardleServer{ GenericAPIServer: genericServer, } |
上面的 New()创建了核心的GenericAPIServer:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 |
func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*GenericAPIServer, error) { // 断言 if c.Serializer == nil { return nil, fmt.Errorf("Genericapiserver.New() called with config.Serializer == nil") } if c.LoopbackClientConfig == nil { return nil, fmt.Errorf("Genericapiserver.New() called with config.LoopbackClientConfig == nil") } if c.EquivalentResourceRegistry == nil { return nil, fmt.Errorf("Genericapiserver.New() called with config.EquivalentResourceRegistry == nil") } handlerChainBuilder := func(handler http.Handler) http.Handler { return c.BuildHandlerChainFunc(handler, c.Config) } // 构建请求处理器 apiServerHandler := NewAPIServerHandler(name, c.Serializer, handlerChainBuilder, delegationTarget.UnprotectedHandler()) // 创建GenericAPIServer,很多字段直接来自completedConfig s := &GenericAPIServer{ discoveryAddresses: c.DiscoveryAddresses, LoopbackClientConfig: c.LoopbackClientConfig, legacyAPIGroupPrefixes: c.LegacyAPIGroupPrefixes, admissionControl: c.AdmissionControl, Serializer: c.Serializer, AuditBackend: c.AuditBackend, Authorizer: c.Authorization.Authorizer, delegationTarget: delegationTarget, EquivalentResourceRegistry: c.EquivalentResourceRegistry, HandlerChainWaitGroup: c.HandlerChainWaitGroup, minRequestTimeout: time.Duration(c.MinRequestTimeout) * time.Second, ShutdownTimeout: c.RequestTimeout, ShutdownDelayDuration: c.ShutdownDelayDuration, SecureServingInfo: c.SecureServing, ExternalAddress: c.ExternalAddress, Handler: apiServerHandler, listedPathProvider: apiServerHandler, openAPIConfig: c.OpenAPIConfig, skipOpenAPIInstallation: c.SkipOpenAPIInstallation, postStartHooks: map[string]postStartHookEntry{}, preShutdownHooks: map[string]preShutdownHookEntry{}, disabledPostStartHooks: c.DisabledPostStartHooks, healthzChecks: c.HealthzChecks, livezChecks: c.LivezChecks, readyzChecks: c.ReadyzChecks, readinessStopCh: make(chan struct{}), livezGracePeriod: c.LivezGracePeriod, DiscoveryGroupManager: discovery.NewRootAPIsHandler(c.DiscoveryAddresses, c.Serializer), maxRequestBodyBytes: c.MaxRequestBodyBytes, livezClock: clock.RealClock{}, } // ... // 添加delegationTarget的生命周期钩子 for k, v := range delegationTarget.PostStartHooks() { s.postStartHooks[k] = v } for k, v := range delegationTarget.PreShutdownHooks() { s.preShutdownHooks[k] = v } // 添加预配置的钩子 for name, preconfiguredPostStartHook := range c.PostStartHooks { if err := s.AddPostStartHook(name, preconfiguredPostStartHook.hook); err != nil { return nil, err } } // 如果配置包含了SharedInformerFactory,而启动该SharedInformerFactory的钩子没有注册 // 则注册一个PostStart钩子来启动它 genericApiServerHookName := "generic-apiserver-start-informers" if c.SharedInformerFactory != nil { if !s.isPostStartHookRegistered(genericApiServerHookName) { err := s.AddPostStartHook(genericApiServerHookName, func(context PostStartHookContext) error { c.SharedInformerFactory.Start(context.StopCh) return nil }) if err != nil { return nil, err } // TODO: Once we get rid of /healthz consider changing this to post-start-hook. err = s.addReadyzChecks(healthz.NewInformerSyncHealthz(c.SharedInformerFactory)) if err != nil { return nil, err } } } const priorityAndFairnessConfigConsumerHookName = "priority-and-fairness-config-consumer" if s.isPostStartHookRegistered(priorityAndFairnessConfigConsumerHookName) { } else if c.FlowControl != nil { err := s.AddPostStartHook(priorityAndFairnessConfigConsumerHookName, func(context PostStartHookContext) error { go c.FlowControl.Run(context.StopCh) return nil }) if err != nil { return nil, err } // TODO(yue9944882): plumb pre-shutdown-hook for request-management system? } else { klog.V(3).Infof("Not requested to run hook %s", priorityAndFairnessConfigConsumerHookName) } // 添加delegationTarget的健康检查 for _, delegateCheck := range delegationTarget.HealthzChecks() { skip := false for _, existingCheck := range c.HealthzChecks { if existingCheck.Name() == delegateCheck.Name() { skip = true break } } if skip { continue } s.AddHealthChecks(delegateCheck) } s.listedPathProvider = routes.ListedPathProviders{s.listedPathProvider, delegationTarget} // 安装Profiling、Metrics、URL / 下显示的路径列表(listedPathProvider) installAPI(s, c.Config) // use the UnprotectedHandler from the delegation target to ensure that we don't attempt to double authenticator, authorize, // or some other part of the filter chain in delegation cases. if delegationTarget.UnprotectedHandler() == nil && c.EnableIndex { s.Handler.NonGoRestfulMux.NotFoundHandler(routes.IndexLister{ StatusCode: http.StatusNotFound, PathProvider: s.listedPathProvider, }) } return s, nil } |
GenericAPIServer.Handler就是HTTP请求的处理器,我们在下文的请求处理过程一节分析。
实例化GenericAPIServer之后,是安装APIGroup:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
// 创建APIGroupInfo,关于一组API的各种信息,包括已经注册的API(Scheme),如何进行编解码(Codec) // 如何解析查询参数(ParameterCodec) apiGroupInfo := genericapiserver.NewDefaultAPIGroupInfo(wardle.GroupName, Scheme, metav1.ParameterCodec, Codecs) // 从资源到rest.Storage的映射 v1alpha1storage := map[stcongring]rest.Storage{} v1alpha1storage["flunders"] = wardleregistry.RESTInPeace(flunderstorage.NewREST(Scheme, c.GenericConfig.RESTOptionsGetter)) v1alpha1storage["fischers"] = wardleregistry.RESTInPeace(fischerstorage.NewREST(Scheme, c.GenericConfig.RESTOptionsGetter)) // 两个版本分别对应一个映射 apiGroupInfo.VersionedResourcesStorageMap["v1alpha1"] = v1alpha1storage v1beta1storage := map[string]rest.Storage{} v1beta1storage["flunders"] = wardleregistry.RESTInPeace(flunderstorage.NewREST(Scheme, c.GenericConfig.RESTOptionsGetter)) apiGroupInfo.VersionedResourcesStorageMap["v1beta1"] = v1beta1storage if err := s.GenericAPIServer.InstallAPIGroup(&apiGroupInfo); err != nil { return nil, err } return s, nil } |
上面的flunderstorage.NewREST等方法返回的registry.REST,内嵌的genericregistry.Store不仅仅实现了rest.Storage:
1 2 3 4 5 |
type Storage interface { // 当请求的数据存放到该方法创建的对象之后,可以调用Create/Update进行持久化 // 必须返回一个适用于 Codec.DecodeInto([]byte, runtime.Object) 的指针类型 New() runtime.Object } |
还实现了rest.StandardStorage:
1 2 3 4 5 6 7 8 |
type StandardStorage interface { Getter Lister CreaterUpdater GracefulDeleter CollectionDeleter Watcher } |
实现了这些接口,意味着registry.REST能够支持API对象的增删改查和Watch。更多细节我们在下面的请求处理过程一节中探讨。
如启动服务器一节中的代码所示, 在将选项转换为配置、完成配置,并从配置实例化APIServer之后,会执行由两个步骤组成的启动逻辑。
首先是PrepareRun,这里执行一些需要在API安装(在实例化时)之后进行的操作:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
func (s *GenericAPIServer) PrepareRun() preparedGenericAPIServer { s.delegationTarget.PrepareRun() // 安装OpenAPI的Handler if s.openAPIConfig != nil && !s.skipOpenAPIInstallation { s.OpenAPIVersionedService, s.StaticOpenAPISpec = routes.OpenAPI{ Config: s.openAPIConfig, }.Install(s.Handler.GoRestfulContainer, s.Handler.NonGoRestfulMux) } // 安装健康检查的Handler s.installHealthz() s.installLivez() err := s.addReadyzShutdownCheck(s.readinessStopCh) if err != nil { klog.Errorf("Failed to install readyz shutdown check %s", err) } s.installReadyz() // 为审计后端注册关闭前钩子 if s.AuditBackend != nil { err := s.AddPreShutdownHook("audit-backend", func() error { s.AuditBackend.Shutdown() return nil }) if err != nil { klog.Errorf("Failed to add pre-shutdown hook for audit-backend %s", err) } } return preparedGenericAPIServer{s} } |
然后是Run,启动APIServer:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error { delayedStopCh := make(chan struct{}) go func() { defer close(delayedStopCh) // 收到关闭信号 <-stopCh // 一旦关闭流程被触发,/readyz就需要立刻返回错误 close(s.readinessStopCh) // 关闭服务器前休眠ShutdownDelayDuration,这让LB有个时间窗口来检测/readyz状态,不再发送请求给此服务器 time.Sleep(s.ShutdownDelayDuration) }() // 运行服务器 err := s.NonBlockingRun(delayedStopCh) if err != nil { return err } // 收到关闭信号 <-stopCh // 运行关闭前钩子 err = s.RunPreShutdownHooks() if err != nil { return err } // 等待延迟关闭信号 <-delayedStopCh // 等待现有请求完毕,然后关闭 s.HandlerChainWaitGroup.Wait() return nil } |
NonBlockingRun是启动APIServer的核心代码。它会启动一个HTTPS服务器:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 |
func (s preparedGenericAPIServer) NonBlockingRun(stopCh <-chan struct{}) error { // 这个通道用于保证HTTP Server优雅关闭,不会导致丢失审计事件 auditStopCh := make(chan struct{}) // 首先启动审计后端,这时任何请求都进不来 if s.AuditBackend != nil { if err := s.AuditBackend.Run(auditStopCh); err != nil { return fmt.Errorf("failed to run the audit backend: %v", err) } } // 下面的通道用于出错时清理listener internalStopCh := make(chan struct{}) var stoppedCh <-chan struct{} if s.SecureServingInfo != nil && s.Handler != nil { var err error // 启动HTTPS服务器,仅当证书错误或内部listen调用出错时会失败 // server loop在一个Goroutine中运行 // 可以看到,这里的s.Handler是可以被我们访问到的,因此建立HTTP(非HTTPS)服务器应该很方便 stoppedCh, err = s.SecureServingInfo.Serve(s.Handler, s.ShutdownTimeout, internalStopCh) if err != nil { close(internalStopCh) close(auditStopCh) return err } } // 清理 go func() { <-stopCh close(internalStopCh) if stoppedCh != nil { <-stoppedCh } s.HandlerChainWaitGroup.Wait() close(auditStopCh) }() // 启动后钩子 s.RunPostStartHooks(stopCh) if _, err := systemd.SdNotify(true, "READY=1\n"); err != nil { klog.Errorf("Unable to send systemd daemon successful start message: %v\n", err) } return nil } |
apis/wardle包,以及它的子包,定义了wardle.example.com组的API。
wardle包的register.go中定义了组,以及从GV获得GVK、GVR的函数:
1 2 3 4 5 6 7 8 9 10 11 |
const GroupName = "wardle.example.com" // 没有正式(formal)类型则使用此常量 var SchemeGroupVersion = schema.GroupVersion{Group: GroupName, Version: runtime.APIVersionInternal} func Kind(kind string) schema.GroupKind { return SchemeGroupVersion.WithKind(kind).GroupKind() } func Resource(resource string) schema.GroupResource { return SchemeGroupVersion.WithResource(resource).GroupResource() } |
API包通常都要提供AddToScheme变量,用于将API注册到指定的scheme:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
var ( SchemeBuilder = runtime.NewSchemeBuilder(addKnownTypes) AddToScheme = SchemeBuilder.AddToScheme ) func addKnownTypes(scheme *runtime.Scheme) error { // 注册API的核心,添加GV中的若干API类型 scheme.AddKnownTypes(SchemeGroupVersion, &Flunder{}, // 必须提供指针类型,Go反射得到类型在编码时作为Kind &FlunderList{}, &Fischer{}, &FischerList{}, ) return nil } // 所谓SchemeBuilder其实就是切片 type SchemeBuilder []func(*Scheme) error // NewSchemeBuilder方法支持多个回调函数,对这些函数调用SchemeBuilder.Register func NewSchemeBuilder(funcs ...func(*Scheme) error) SchemeBuilder { var sb SchemeBuilder sb.Register(funcs...) return sb } // 所谓Register就是添加到切片中 func (sb *SchemeBuilder) Register(funcs ...func(*Scheme) error) { for _, f := range funcs { *sb = append(*sb, f) } } // 所谓AddToScheme就是遍历回调,Go方法可以作为变量传递 func (sb *SchemeBuilder) AddToScheme(s *Scheme) error { for _, f := range *sb { if err := f(s); err != nil { return err } } return nil } |
wardle包的types.go则定义了API类型对应的Go结构体。
子包v1alpha1、v1beta1定义了API的两个版本。它们包含和wardle包类似的GroupName、SchemeGroupVersion、SchemeBuilder、AddToScheme…等变量/函数,以及对应的API类型结构体。还包括自动生成的、与APIVersionInternal版本API进行转换的函数。
回到wardle包,Install方法支持将APIVersionInternal、v1alpha1、v1beta1这些版本都注册到scheme:
1 2 3 4 5 6 |
func Install(scheme *runtime.Scheme) { utilruntime.Must(wardle.AddToScheme(scheme)) utilruntime.Must(v1beta1.AddToScheme(scheme)) utilruntime.Must(v1alpha1.AddToScheme(scheme)) utilruntime.Must(scheme.SetVersionPriority(v1beta1.SchemeGroupVersion, v1alpha1.SchemeGroupVersion)) } |
而在apiserver包中,在init时会调用上面的Install函数:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
var ( // API注册表 Scheme = runtime.NewScheme() // 编解码器工厂 Codecs = serializer.NewCodecFactory(Scheme) ) func init() { // 安装sample-apiserver提供的API install.Install(Scheme) // 将k8s.io/apimachinery/pkg/apis/meta/v1"注册到v1组,为什么? // we need to add the options to empty v1 // TODO fix the server code to avoid this metav1.AddToGroupVersion(Scheme, schema.GroupVersion{Version: "v1"}) // TODO: keep the generic API server from wanting this unversioned := schema.GroupVersion{Group: "", Version: "v1"} Scheme.AddUnversionedTypes(unversioned, &metav1.Status{}, &metav1.APIVersions{}, &metav1.APIGroupList{}, &metav1.APIGroup{}, &metav1.APIResourceList{}, ) } |
Codecs是API资源编解码器的工厂,RecommendedOptions需要此工厂:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
// ... RecommendedOptions: genericoptions.NewRecommendedOptions( defaultEtcdPathPrefix, // 得到v1alpha1版本的LegacyCodec,遗留编解码器(runtime.Codec) // LegacyCodec编码到指定的API版本,解码(从任何支持的源)到内部形式 // 此编码器总是编码为JSON // // LegacyCodec方法已经废弃,客户端/服务器应该根据MIME类型协商serializer // 并调用CodecForVersions apiserver.Codecs.LegacyCodec(v1alpha1.SchemeGroupVersion), genericoptions.NewProcessInfo("wardle-apiserver", "wardle"), ), func NewRecommendedOptions(prefix string, codec runtime.Codec, processInfo *ProcessInfo) *RecommendedOptions { return &RecommendedOptions{ // 不过这里的runtime.Codec用于将AP对象转换为JSON并存放到Etcd,而不是发给客户端 Etcd: NewEtcdOptions(storagebackend.NewDefaultConfig(prefix, codec)), // ... |
APIGroupInfo也需要此工厂:
1 2 3 4 5 6 7 8 9 10 |
apiGroupInfo := genericapiserver.NewDefaultAPIGroupInfo(wardle.GroupName, Scheme, metav1.ParameterCodec, Codecs) func NewDefaultAPIGroupInfo(group string, scheme *runtime.Scheme, parameterCodec runtime.ParameterCodec, codecs serializer.CodecFactory) APIGroupInfo { return APIGroupInfo{ // ... Scheme: scheme, ParameterCodec: parameterCodec, NegotiatedSerializer: codecs, } } |
在APIServer实例化期间,Scheme会传递给APIGroupInfo,从而实现资源 - Go类型之间的映射。
sample-server也示例了如何集成自己的Admission控制器到API Server中。
在选项中,需要注册一个Admission初始化器
1 2 3 4 5 6 7 8 9 10 |
o.RecommendedOptions.ExtraAdmissionInitializers = func(c *genericapiserver.RecommendedConfig) ([]admission.PluginInitializer, error) { client, err := clientset.NewForConfig(c.LoopbackClientConfig) if err != nil { return nil, err } informerFactory := informers.NewSharedInformerFactory(client, c.LoopbackClientConfig.Timeout) o.SharedInformerFactory = informerFactory // 初始化函数 return []admission.PluginInitializer{wardleinitializer.New(informerFactory)}, nil } |
初始化器是一个函数,会在选项转为配置的时候执行:
1 2 3 4 5 6 7 |
// 调用上面的函数 if initializers, err := o.ExtraAdmissionInitializers(config); err != nil { return err // 初始化Admission控制器 } else if err := o.Admission.ApplyTo(&config.Config, config.SharedInformerFactory, config.ClientConfig, o.FeatureGate, initializers...); err != nil { return err } |
上面的ApplyTo方法,会组建一个Admission控制器的初始化函数链,并逐个调用,以初始化所有Admission控制器。
此外,在PostStart钩子中,会启动Admission控制器所依赖的SharedInformerFactory:
1 2 3 4 5 6 7 |
server.GenericAPIServer.AddPostStartHookOrDie("start-sample-server-informers", func(context genericapiserver.PostStartHookContext) error { // 主kube-apiserver的InformerFactory,貌似没什么用 config.GenericConfig.SharedInformerFactory.Start(context.StopCh) // 次级kube-apiserver的InformerFactory,Admission控制器需要使用 o.SharedInformerFactory.Start(context.StopCh) return nil }) |
我们看一下sample-apiserver的Admission相关代码。
位于pkg/admission/wardleinitializer包中的是Admission初始化器,它能够为任何WantsInternalWardleInformerFactory类型的Admission控制器注入InformerFactory:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
type pluginInitializer struct { informers informers.SharedInformerFactory } var _ admission.PluginInitializer = pluginInitializer{} // 该函数在ExtraAdmissionInitializers函数中调用 func New(informers informers.SharedInformerFactory) pluginInitializer { return pluginInitializer{ informers: informers, } } // 该函数在o.Admission.ApplyTo中调用 func (i pluginInitializer) Initialize(plugin admission.Interface) { if wants, ok := plugin.(WantsInternalWardleInformerFactory); ok { wants.SetInternalWardleInformerFactory(i.informers) } } |
位于pkg/admission/plugin/banflunder包中的是为了的Admission控制器BanFlunder。函数:
1 2 3 4 5 |
func Register(plugins *admission.Plugins) { plugins.Register("BanFlunder", func(config io.Reader) (admission.Interface, error) { return New() }) } |
会在程序运行的很早期调用,以注册Admission控制器到API Server:
1 2 3 4 5 6 7 8 9 |
func (o *WardleServerOptions) Complete() error { // 注册插件 banflunder.Register(o.RecommendedOptions.Admission.Plugins) // 配置顺序 o.RecommendedOptions.Admission.RecommendedPluginOrder = append(o.RecommendedOptions.Admission.RecommendedPluginOrder, "BanFlunder") return nil } |
Admission控制器的核心是Admit函数,它可以修改或否决一个API Server请求:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
func (d *DisallowFlunder) Admit(ctx context.Context, a admission.Attributes, o admission.ObjectInterfaces) error { // 仅仅对特定类型的API感兴趣 if a.GetKind().GroupKind() != wardle.Kind("Flunder") { return nil } if !d.WaitForReady() { return admission.NewForbidden(a, fmt.Errorf("not yet ready to handle request")) } // 用于获取元数据 metaAccessor, err := meta.Accessor(a.GetObject()) if err != nil { return err } flunderName := metaAccessor.GetName() fischers, err := d.lister.List(labels.Everything()) if err != nil { return err } for _, fischer := range fischers { for _, disallowedFlunder := range fischer.DisallowedFlunders { if flunderName == disallowedFlunder { return errors.NewForbidden( a.GetResource().GroupResource(), a.GetName(), // 拒绝请求 fmt.Errorf("this name may not be used, please change the resource name"), ) } } } return nil } |
通过上文的分析,我们知道资源的增删改查是由registry.REST(空壳,实际是genericregistry.Store)负责的,那么HTTP请求是如何传递给它的呢?
回顾一下向APIGroupInfo中添加资源的代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 |
// 资源类型 v1alpha1storage["flunders"] = wardleregistry.RESTInPeace( // 创建registry.REST flunderstorage.NewREST(Scheme, c.GenericConfig.RESTOptionsGetter) ) func NewREST(scheme *runtime.Scheme, optsGetter generic.RESTOptionsGetter) (*registry.REST, error) { // 策略,参考下文 strategy := NewStrategy(scheme) store := &genericregistry.Store{ // 实例化资源的函数 NewFunc: func() runtime.Object { // 每次增删改查,都牵涉到结构的创建,因此在此打断点可以拦截所有请求 return &wardle.Flunder{} }, // 实例化资源列表的函数 NewListFunc: func() runtime.Object { return &wardle.FlunderList{} }, // 判断对象是否可以被该存储处理 PredicateFunc: func(label labels.Selector, field fields.Selector) storage.SelectionPredicate { return storage.SelectionPredicate{ Label: label, Field: field, GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) { apiserver, ok := obj.(*wardle.Flunder) if !ok { return nil, nil, fmt.Errorf("given object is not a Flunder") } return apiserver.ObjectMeta.Labels, SelectableFields(apiserver), nil }, } }, // 资源的复数名称,当上下文中缺少必要的请求信息时使用 DefaultQualifiedResource: wardle.Resource("flunders"), // 增删改的策略 CreateStrategy: strategy, UpdateStrategy: strategy, DeleteStrategy: strategy, } options := &generic.StoreOptions{RESTOptions: optsGetter, AttrFunc: GetAttrs} // 填充默认字段 if err := store.CompleteWithOptions(options); err != nil { return nil, err } return ®istry.REST{store}, nil } |
为了创建genericregistry.Store,需要两个信息:
- Scheme,它提供的信息是Go类型和GVK之间的映射关系。其中Kind是根据Go结构的类型名反射得到的
- generic.RESTOptionsGetter
RESTOptionsGetter用于获得RESTOptions:
1 2 3 |
type RESTOptionsGetter interface { GetRESTOptions(resource schema.GroupResource) (RESTOptions, error) } |
RESTOptions包含了关于存储的信息,尽管这个职责和名字好像没什么关系:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 |
type RESTOptions struct { // 创建一个存储后端所需的配置信息 StorageConfig *storagebackend.Config // 这是一个函数,能够提供storage.Interface和factory.DestroyFunc Decorator StorageDecorator EnableGarbageCollection bool DeleteCollectionWorkers int ResourcePrefix string CountMetricPollPeriod time.Duration } // type Config struct { // 存储后端的类型,默认etcd3 Type string // 传递给storage.Interface的所有方法的前缀,对应etcd存储前缀 Prefix string // 连接到Etcd服务器的相关信息 Transport TransportConfig // 提示APIServer是否应该支持分页 Paging bool // 负责(反)串行化 Codec runtime.Codec // 在持久化到Etcd之前,该对象输出目标将被转换为的GVK Transformer value.Transformer CompactionInterval time.Duration CountMetricPollPeriod time.Duration LeaseManagerConfig etcd3.LeaseManagerConfig } // 利用入参函数,构造storage.Interface type StorageDecorator func( config *storagebackend.Config, resourcePrefix string, keyFunc func(obj runtime.Object) (string, error), newFunc func() runtime.Object, newListFunc func() runtime.Object, getAttrsFunc storage.AttrFunc, trigger storage.IndexerFuncs, indexers *cache.Indexers) (storage.Interface, factory.DestroyFunc, error) // CRUD type Interface interface { Versioner() Versioner Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error Delete(ctx context.Context, key string, out runtime.Object, preconditions *Preconditions, validateDeletion ValidateObjectFunc) error Watch(ctx context.Context, key string, resourceVersion string, p SelectionPredicate) (watch.Interface, error) WatchList(ctx context.Context, key string, resourceVersion string, p SelectionPredicate) (watch.Interface, error) Get(ctx context.Context, key string, resourceVersion string, objPtr runtime.Object, ignoreNotFound bool) error GetToList(ctx context.Context, key string, resourceVersion string, p SelectionPredicate, listObj runtime.Object) error List(ctx context.Context, key string, resourceVersion string, p SelectionPredicate, listObj runtime.Object) error GuaranteedUpdate( ctx context.Context, key string, ptrToType runtime.Object, ignoreNotFound bool, precondtions *Preconditions, tryUpdate UpdateFunc, suggestion ...runtime.Object) error Count(key string) (int64, error) } // 这个函数用于一次性销毁任何Create()创建的、当前Storage使用的对象 type DestroyFunc func() |
在这里我们可以注意到storagebackend.Config和Etcd是有耦合的,因此想支持其它存储后端,需要在更早的节点介入。
genericregistry.Store还包含了三个Strategy字段:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 |
type Store struct { // ... CreateStrategy rest.RESTCreateStrategy UpdateStrategy rest.RESTUpdateStrategy DeleteStrategy rest.RESTDeleteStrategy // ... } type RESTCreateStrategy interface { runtime.ObjectTyper // 用于生成名称 names.NameGenerator // 对象是否必须在命名空间中 NamespaceScoped() bool // 在Validate、Canonicalize之前调用,进行对象的normalize // 例如删除不需要持久化的字段、对顺序不敏感的列表字段进行重新排序 // 不得移除这样的字段:它不能通过校验 PrepareForCreate(ctx context.Context, obj runtime.Object) // 校验,在对象的默认字段被填充后调用 Validate(ctx context.Context, obj runtime.Object) field.ErrorList // 在校验之后,持久化之前调用。正规化对象,通常实现为类型检查,或空函数 Canonicalize(obj runtime.Object) } type RESTUpdateStrategy interface { runtime.ObjectTyper NamespaceScoped() bool // 对象是否可以被PUT请求创建 AllowCreateOnUpdate() bool // 准备更新 PrepareForUpdate(ctx context.Context, obj, old runtime.Object) // 校验 ValidateUpdate(ctx context.Context, obj, old runtime.Object) field.ErrorList // 正规化 Canonicalize(obj runtime.Object) // 当对象上没有指定版本时,是否允许无条件的更新,也就是不管最新的资源版本(禁用乐观并发控制) AllowUnconditionalUpdate() bool } type RESTDeleteStrategy interface { runtime.ObjectTyper } type ObjectTyper interface { // 得到对象可能的GVK信息 ObjectKinds(Object) ([]schema.GroupVersionKind, bool, error) // Scheme是否支持指定的GVK Recognizes(gvk schema.GroupVersionKind) bool } |
可以看到,策略能够影响增删改的行为,它能够生成对象名称,校验对象合法性,甚至修改对象。 我们看一下sample-apiserver提供的策略实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 |
func NewStrategy(typer runtime.ObjectTyper) flunderStrategy { // 简单命名策略:返回请求的basename外加5位字母数字的随即后缀 return flunderStrategy{typer, names.SimpleNameGenerator} } type flunderStrategy struct { runtime.ObjectTyper names.NameGenerator } func (flunderStrategy) NamespaceScoped() bool { return true } func (flunderStrategy) PrepareForCreate(ctx context.Context, obj runtime.Object) { } func (flunderStrategy) PrepareForUpdate(ctx context.Context, obj, old runtime.Object) { } func (flunderStrategy) Validate(ctx context.Context, obj runtime.Object) field.ErrorList { flunder := obj.(*wardle.Flunder) return validation.ValidateFlunder(flunder) } func (flunderStrategy) AllowCreateOnUpdate() bool { return false } func (flunderStrategy) AllowUnconditionalUpdate() bool { return false } func (flunderStrategy) Canonicalize(obj runtime.Object) { } func (flunderStrategy) ValidateUpdate(ctx context.Context, obj, old runtime.Object) field.ErrorList { return field.ErrorList{} } package validation // ValidateFlunder validates a Flunder. func ValidateFlunder(f *wardle.Flunder) field.ErrorList { allErrs := field.ErrorList{} allErrs = append(allErrs, ValidateFlunderSpec(&f.Spec, field.NewPath("spec"))...) return allErrs } // ValidateFlunderSpec validates a FlunderSpec. func ValidateFlunderSpec(s *wardle.FlunderSpec, fldPath *field.Path) field.ErrorList { allErrs := field.ErrorList{} if len(s.FlunderReference) != 0 && len(s.FischerReference) != 0 { allErrs = append(allErrs, field.Invalid(fldPath.Child("fischerReference"), s.FischerReference, "cannot be set with flunderReference at the same time")) } else if len(s.FlunderReference) != 0 && s.ReferenceType != wardle.FlunderReferenceType { allErrs = append(allErrs, field.Invalid(fldPath.Child("flunderReference"), s.FlunderReference, "cannot be set if referenceType is not Flunder")) } else if len(s.FischerReference) != 0 && s.ReferenceType != wardle.FischerReferenceType { allErrs = append(allErrs, field.Invalid(fldPath.Child("fischerReference"), s.FischerReference, "cannot be set if referenceType is not Fischer")) } else if len(s.FischerReference) == 0 && s.ReferenceType == wardle.FischerReferenceType { allErrs = append(allErrs, field.Invalid(fldPath.Child("fischerReference"), s.FischerReference, "cannot be empty if referenceType is Fischer")) } else if len(s.FlunderReference) == 0 && s.ReferenceType == wardle.FlunderReferenceType { allErrs = append(allErrs, field.Invalid(fldPath.Child("flunderReference"), s.FlunderReference, "cannot be empty if referenceType is Flunder")) } if len(s.ReferenceType) != 0 && s.ReferenceType != wardle.FischerReferenceType && s.ReferenceType != wardle.FlunderReferenceType { allErrs = append(allErrs, field.Invalid(fldPath.Child("referenceType"), s.ReferenceType, "must be Flunder or Fischer")) } return allErrs } |
分析到这里,我们可以猜测到Create请求被genericregistry.Store处理的过程:
- 读取请求体,调用NewFunc反串行化为runtime.Obejct
- 调用PredicateFunc判断是否能够处理该对象
- 调用CreateStrategy,校验、正规化对象
- 调用RESTOptions,存储到Etcd
那么,请求是如何传递过来的,上述处理的细节又是怎样的?上文中我们已经定位到关键代码路径,通过断点很容易跟踪到完整处理流程。
在上文分析的GenericAPIServer实例过程中,它的Handler字段是这样创建的:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 |
apiServerHandler := NewAPIServerHandler(name, c.Serializer, handlerChainBuilder, delegationTarget.UnprotectedHandler()) // 处理器链的构建器,注意出入参类型一样。这是因为处理器链是一层层包裹的,而不是链表那样的结构 type HandlerChainBuilderFn func(apiHandler http.Handler) http.Handler func NewAPIServerHandler(name string, s runtime.NegotiatedSerializer, handlerChainBuilder HandlerChainBuilderFn, notFoundHandler http.Handler) *APIServerHandler { // 配置go-restful,go-restful是一个构建REST风格WebService的框架 nonGoRestfulMux := mux.NewPathRecorderMux(name) if notFoundHandler != nil { nonGoRestfulMux.NotFoundHandler(notFoundHandler) } // 容器,是一组WebService的集合 gorestfulContainer := restful.NewContainer() // 容器包含一个用户HTTP请求多路复用的ServeMux gorestfulContainer.ServeMux = http.NewServeMux() // 路由器 gorestfulContainer.Router(restful.CurlyRouter{}) // e.g. for proxy/{kind}/{name}/{*} // panic处理器 gorestfulContainer.RecoverHandler(func(panicReason interface{}, httpWriter http.ResponseWriter) { logStackOnRecover(s, panicReason, httpWriter) }) // 错误处理器 gorestfulContainer.ServiceErrorHandler(func(serviceErr restful.ServiceError, request *restful.Request, response *restful.Response) { serviceErrorHandler(s, serviceErr, request, response) }) director := director{ name: name, goRestfulContainer: gorestfulContainer, nonGoRestfulMux: nonGoRestfulMux, } return &APIServerHandler{ // 构建处理器链, 注意传入的director FullHandlerChain: handlerChainBuilder(director), GoRestfulContainer: gorestfulContainer, NonGoRestfulMux: nonGoRestfulMux, Director: director, } } type APIServerHandler struct { // 处理器链,接口是http包中标准的: // type Handler interface { // ServeHTTP(ResponseWriter, *Request) // } // 它组织一系列的过滤器,并在请求通过过滤器链后,调用Director FullHandlerChain http.Handler // 所有注册的API由此容器处理 // InstallAPIs使用该字段,其他server不应该直接访问 GoRestfulContainer *restful.Container // 链中最后一个处理器。这个类型包装一个mux对象,并且记录下注册了哪些URL路径 NonGoRestfulMux *mux.PathRecorderMux // Director用于处理fall through和proxy Director http.Handler } |
这个Handler实现了http.Handler,简单的把请求委托给FullHandlerChain处理:
1 2 3 |
func (a *APIServerHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { a.FullHandlerChain.ServeHTTP(w, r) } |
上面这个函数,就是所有HTTP请求处理的入口点。
FullHandlerChain是handlerChainBuilder()调用得到的:
1 2 3 4 5 |
var c completedConfig handlerChainBuilder := func(handler http.Handler) http.Handler { return c.BuildHandlerChainFunc(handler, c.Config) } apiServerHandler := NewAPIServerHandler(name, c.Serializer, handlerChainBuilder, delegationTarget.UnprotectedHandler()) |
completedConfig.BuildHandlerChainFunc则来自于它内嵌的Config:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 |
serverConfig := genericapiserver.NewRecommendedConfig(apiserver.Codecs) func NewRecommendedConfig(codecs serializer.CodecFactory) *RecommendedConfig { return &RecommendedConfig{ Config: *NewConfig(codecs), } } func NewConfig(codecs serializer.CodecFactory) *Config { defaultHealthChecks := []healthz.HealthChecker{healthz.PingHealthz, healthz.LogHealthz} return &Config{ Serializer: codecs, BuildHandlerChainFunc: DefaultBuildHandlerChain, // ... } func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler { // 最内层:传入的apiHandler,也就是上文中的director // 访问控制 handler := genericapifilters.WithAuthorization(apiHandler, c.Authorization.Authorizer, c.Serializer) // 访问速率控制 if c.FlowControl != nil { handler = genericfilters.WithPriorityAndFairness(handler, c.LongRunningFunc, c.FlowControl) } else { handler = genericfilters.WithMaxInFlightLimit(handler, c.MaxRequestsInFlight, c.MaxMutatingRequestsInFlight, c.LongRunningFunc) } // 身份扮演 handler = genericapifilters.WithImpersonation(handler, c.Authorization.Authorizer, c.Serializer) // 审计 handler = genericapifilters.WithAudit(handler, c.AuditBackend, c.AuditPolicyChecker, c.LongRunningFunc) // 处理身份认证失败 failedHandler := genericapifilters.Unauthorized(c.Serializer, c.Authentication.SupportsBasicAuth) failedHandler = genericapifilters.WithFailedAuthenticationAudit(failedHandler, c.AuditBackend, c.AuditPolicyChecker) // 身份验证 handler = genericapifilters.WithAuthentication(handler, c.Authentication.Authenticator, failedHandler, c.Authentication.APIAudiences) // 处理CORS请求 handler = genericfilters.WithCORS(handler, c.CorsAllowedOriginList, nil, nil, nil, "true") // 超时处理 handler = genericfilters.WithTimeoutForNonLongRunningRequests(handler, c.LongRunningFunc, c.RequestTimeout) // 所有长时间运行请求被添加到等待组,用于优雅关机 handler = genericfilters.WithWaitGroup(handler, c.LongRunningFunc, c.HandlerChainWaitGroup) // 将RequestInfo附加到Context对象 handler = genericapifilters.WithRequestInfo(handler, c.RequestInfoResolver) // HTTP Goway处理 if c.SecureServing != nil && !c.SecureServing.DisableHTTP2 && c.GoawayChance > 0 { handler = genericfilters.WithProbabilisticGoaway(handler, c.GoawayChance) } // 设置Cache-Control头为"no-cache, private",因为所有server被authn/authz保护 handler = genericapifilters.WithCacheControl(handler) // 崩溃恢复 handler = genericfilters.WithPanicRecovery(handler) return handler } |
我们可以清楚的看到默认的处理器链包含的大量过滤器,以及处理器是一层层包裹而非链表结构。
最后,我们来从头跟踪一下请求处理过程,首先看看处理器链中的过滤器们:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 |
func (a *APIServerHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { a.FullHandlerChain.ServeHTTP(w, r) } // FullHandlerChain是这个类型 type HandlerFunc func(ResponseWriter, *Request) // 巧妙的设计,实现了 // type Handler interface { // ServeHTTP(ResponseWriter, *Request) // } // 接口,但是这个实现,直接委托给类型对应的函数 func (f HandlerFunc) ServeHTTP(w ResponseWriter, r *Request) { f(w, r) } // 进入处理器链的最外层 func withPanicRecovery(handler http.Handler, crashHandler func(http.ResponseWriter, *http.Request, interface{})) http.Handler { handler = httplog.WithLogging(handler, httplog.DefaultStacktracePred) // 处理函数 return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { defer runtime.HandleCrash(func(err interface{}) { crashHandler(w, req, err) }) // 转发给内层处理器,内层处理器通过闭包捕获 handler.ServeHTTP(w, req) }) } // 省略若干中间的处理器... // 这个处理器值得注意,它将请求信息存放到context中,内层处理器可以直接使用这些信息 // 信息包括: type RequestInfo struct { // 是否针对资源/子资源的请求 IsResourceRequest bool // URL的路径部分 Path string // 小写的HTTP动词 Verb string // API前缀 APIPrefix string // API组 APIGroup string // API版本 APIVersion string // 命名空间 Namespace string // 资源类型名,通常是小写的复数形式,而不是Kind Resource string // 请求的子资源,子资源是scoped to父资源的另外一个资源,可以具有不同的Kind // 例如 /pods对应资源"pods",对应Kind为"Pod" // /pods/foo/status对应资源"pods",对应子资源 "status",对应Kind为"Pod" // 然而 /pods/foo/binding对应资源"pods",对应子资源 "binding", 而对应Kind为"Binding" Subresource string // 对于某些资源,名字是空的。如果请求指示一个名字(不在请求体中)则填写在该字段 // Parts are the path parts for the request, always starting with /{resource}/{name} Parts []string } func WithRequestInfo(handler http.Handler, resolver request.RequestInfoResolver) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { ctx := req.Context() info, err := resolver.NewRequestInfo(req) if err != nil { responsewriters.InternalError(w, req, fmt.Errorf("failed to create RequestInfo: %v", err)) return } req = req.WithContext(request.WithRequestInfo(ctx, info)) handler.ServeHTTP(w, req) }) } // 省略若干中间的处理器... |
遍历所有过滤器后,来到处理器链的最后一环,也就是director。从名字上可以看到,它在整体上负责请求的分发:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
func (d director) ServeHTTP(w http.ResponseWriter, req *http.Request) { path := req.URL.Path // 遍历已经注册的所有WebService,看看有没有负责当前URL路径的 for _, ws := range d.goRestfulContainer.RegisteredWebServices() { switch { case ws.RootPath() == "/apis": // 如果URL路径是 /apis或/apis/需要特殊处理。通常情况下,应该交由nonGoRestfulMux // 但是在启用descovery的情况下,需要直接由goRestfulContainer处理 if path == "/apis" || path == "/apis/" { klog.V(5).Infof("%v: %v %q satisfied by gorestful with webservice %v", d.name, req.Method, path, ws.RootPath()) d.goRestfulContainer.Dispatch(w, req) return } // 如果前缀匹配 case strings.HasPrefix(path, ws.RootPath()): if len(path) == len(ws.RootPath()) || path[len(ws.RootPath())] == '/' { klog.V(5).Infof("%v: %v %q satisfied by gorestful with webservice %v", d.name, req.Method, path, ws.RootPath()) // don't use servemux here because gorestful servemuxes get messed up when removing webservices // TODO fix gorestful, remove TPRs, or stop using gorestful d.goRestfulContainer.Dispatch(w, req) return } } } // 无法找到匹配,跳过 gorestful 容器 d.nonGoRestfulMux.ServeHTTP(w, req) } |
对于非API资源请求,由PathRecorderMux处理:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
func (m *PathRecorderMux) ServeHTTP(w http.ResponseWriter, r *http.Request) { m.mux.Load().(*pathHandler).ServeHTTP(w, r) } func (h *pathHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { // pathToHandler 记录了所有精确匹配路径的处理器 // 0 = /healthz/etcd -> k8s.io/apiserver/pkg/server/healthz.adaptCheckToHandler.func1 // 1 = /livez/etcd -> k8s.io/apiserver/pkg/server/healthz.adaptCheckToHandler.func1 // 2 = /metrics -> k8s.io/apiserver/pkg/server/routes.MetricsWithReset.Install.func1 // 3 = /readyz/shutdown -> k8s.io/apiserver/pkg/server/healthz.adaptCheckToHandler.func1 // 4 = /readyz/ping -> k8s.io/apiserver/pkg/server/healthz.adaptCheckToHandler.func1 // 5 = /debug/pprof/profile -> net/http/pprof.Profile // 6 = /healthz/log -> k8s.io/apiserver/pkg/server/healthz.adaptCheckToHandler.func1 // 7 = /livez/log -> k8s.io/apiserver/pkg/server/healthz.adaptCheckToHandler.func1 // 8 = /healthz/ping -> k8s.io/apiserver/pkg/server/healthz.adaptCheckToHandler.func1 // 9 = / -> // 10 = /healthz -> k8s.io/apiserver/pkg/endpoints/metrics.InstrumentHandlerFunc.func1 // 11 = /debug/flags -> k8s.io/apiserver/pkg/server/routes.DebugFlags.Index-fm // 12 = /livez/ping -> k8s.io/apiserver/pkg/server/healthz.adaptCheckToHandler.func1 // 13 = /readyz/log -> k8s.io/apiserver/pkg/server/healthz.adaptCheckToHandler.func1 // 14 = /debug/pprof -> k8s.io/apiserver/pkg/server/routes.redirectTo.func1 // 15 = /debug/pprof/trace -> net/http/pprof.Trace // 16 = /debug/flags/v -> k8s.io/apiserver/pkg/server/routes.StringFlagPutHandler.func1 // 17 = /readyz/poststarthook/start-sample-server-informers -> k8s.io/apiserver/pkg/server/healthz.adaptCheckToHandler.func1 // 18 = /livez/poststarthook/start-sample-server-informers -> k8s.io/apiserver/pkg/server/healthz.adaptCheckToHandler.func1 // 19 = /openapi/v2 -> github.com/NYTimes/gziphandler.NewGzipLevelAndMinSize.func1.1 // 20 = /healthz/poststarthook/start-sample-server-informers -> k8s.io/apiserver/pkg/server/healthz.adaptCheckToHandler.func1 // 21 = /readyz/etcd -> k8s.io/apiserver/pkg/server/healthz.adaptCheckToHandler.func1 // 22 = /index.html -> // 23 = /debug/pprof/symbol -> net/http/pprof.Symbol // 24 = /livez -> k8s.io/apiserver/pkg/endpoints/metrics.InstrumentHandlerFunc.func1 // 25 = /readyz -> k8s.io/apiserver/pkg/endpoints/metrics.InstrumentHandlerFunc.func1 if exactHandler, ok := h.pathToHandler[r.URL.Path]; ok { klog.V(5).Infof("%v: %q satisfied by exact match", h.muxName, r.URL.Path) exactHandler.ServeHTTP(w, r) return } // 前缀匹配路径的处理器,默认有/debug/flags/ 和 /debug/pprof/ for _, prefixHandler := range h.prefixHandlers { if strings.HasPrefix(r.URL.Path, prefixHandler.prefix) { klog.V(5).Infof("%v: %q satisfied by prefix %v", h.muxName, r.URL.Path, prefixHandler.prefix) prefixHandler.handler.ServeHTTP(w, r) return } } // 找不到处理器,404 klog.V(5).Infof("%v: %q satisfied by NotFoundHandler", h.muxName, r.URL.Path) h.notFoundHandler.ServeHTTP(w, r) } |
对于API资源请求,例如路径 /apis/wardle.example.com/v1beta1,则由go-restful处理。go-restful框架内部处理细节我们就忽略了,我们主要深入探讨上文中提到的安装APIGroup,每种资源的go-restful Handler都是由它注册的:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 |
func (s *GenericAPIServer) InstallAPIGroup(apiGroupInfo *APIGroupInfo) error { return s.InstallAPIGroups(apiGroupInfo) } func (s *GenericAPIServer) InstallAPIGroups(apiGroupInfos ...*APIGroupInfo) error { // 遍历API组 for _, apiGroupInfo := range apiGroupInfos { // 安装API资源 常量 /apis if err := s.installAPIResources(APIGroupPrefix, apiGroupInfo, openAPIModels); err != nil { return fmt.Errorf("unable to install api resources: %v", err) } // setup discovery // Install the version handler. // Add a handler at /apis/<groupName> to enumerate all versions supported by this group. apiVersionsForDiscovery := []metav1.GroupVersionForDiscovery{} for _, groupVersion := range apiGroupInfo.PrioritizedVersions { // Check the config to make sure that we elide versions that don't have any resources if len(apiGroupInfo.VersionedResourcesStorageMap[groupVersion.Version]) == 0 { continue } apiVersionsForDiscovery = append(apiVersionsForDiscovery, metav1.GroupVersionForDiscovery{ GroupVersion: groupVersion.String(), Version: groupVersion.Version, }) } preferredVersionForDiscovery := metav1.GroupVersionForDiscovery{ GroupVersion: apiGroupInfo.PrioritizedVersions[0].String(), Version: apiGroupInfo.PrioritizedVersions[0].Version, } apiGroup := metav1.APIGroup{ Name: apiGroupInfo.PrioritizedVersions[0].Group, Versions: apiVersionsForDiscovery, PreferredVersion: preferredVersionForDiscovery, } s.DiscoveryGroupManager.AddGroup(apiGroup) s.Handler.GoRestfulContainer.Add(discovery.NewAPIGroupHandler(s.Serializer, apiGroup).WebService()) } return nil } // 安装API资源 func (s *GenericAPIServer) installAPIResources(apiPrefix string, apiGroupInfo *APIGroupInfo, openAPIModels openapiproto.Models) error { // 遍历版本 for _, groupVersion := range apiGroupInfo.PrioritizedVersions { // 跳过没有资源的组 if len(apiGroupInfo.VersionedResourcesStorageMap[groupVersion.Version]) == 0 { klog.Warningf("Skipping API %v because it has no resources.", groupVersion) continue } apiGroupVersion := s.getAPIGroupVersion(apiGroupInfo, groupVersion, apiPrefix) if apiGroupInfo.OptionsExternalVersion != nil { apiGroupVersion.OptionsExternalVersion = apiGroupInfo.OptionsExternalVersion } apiGroupVersion.OpenAPIModels = openAPIModels apiGroupVersion.MaxRequestBodyBytes = s.maxRequestBodyBytes // 安装为go-restful的Handler if err := apiGroupVersion.InstallREST(s.Handler.GoRestfulContainer); err != nil { return fmt.Errorf("unable to setup API %v: %v", apiGroupInfo, err) } } return nil } // 注册一系列REST Handler( storage, watch, proxy, redirect)到restful容器 func (g *APIGroupVersion) InstallREST(container *restful.Container) error { // 例如/apis/wardle.example.com/v1beta1 prefix := path.Join(g.Root, g.GroupVersion.Group, g.GroupVersion.Version) installer := &APIInstaller{ group: g, prefix: prefix, minRequestTimeout: g.MinRequestTimeout, } // 执行API资源处理器的安装 apiResources, ws, registrationErrors := installer.Install() // 执行资源发现处理器的安装 versionDiscoveryHandler := discovery.NewAPIVersionHandler(g.Serializer, g.GroupVersion, staticLister{apiResources}) versionDiscoveryHandler.AddToWebService(ws) // 添加WebService到容器 container.Add(ws) return utilerrors.NewAggregate(registrationErrors) } func (a *APIInstaller) Install() ([]metav1.APIResource, *restful.WebService, []error) { var apiResources []metav1.APIResource var errors []error // 创建一个针对特定GV的WebService ws := a.newWebService() // Register the paths in a deterministic (sorted) order to get a deterministic swagger spec. paths := make([]string, len(a.group.Storage)) var i int = 0 for path := range a.group.Storage { paths[i] = path i++ } sort.Strings(paths) for _, path := range paths { // 遍历资源,注册处理器,例如flunders apiResource, err := a.registerResourceHandlers(path, a.group.Storage[path], ws) if err != nil { errors = append(errors, fmt.Errorf("error in registering resource: %s, %v", path, err)) } if apiResource != nil { apiResources = append(apiResources, *apiResource) } } return apiResources, ws, errors } func (a *APIInstaller) newWebService() *restful.WebService { ws := new(restful.WebService) // 此WebService的路径模板 ws.Path(a.prefix) // a.prefix contains "prefix/group/version" ws.Doc("API at " + a.prefix) // 向后兼容的考虑,支持没有MIME类型 ws.Consumes("*/*") // 根据API组使用的编解码器来确定响应支持的MIME类型 // 0 = {string} "application/json" // 1 = {string} "application/yaml" // 2 = {string} "application/vnd.kubernetes.protobuf" mediaTypes, streamMediaTypes := negotiation.MediaTypesForSerializer(a.group.Serializer) ws.Produces(append(mediaTypes, streamMediaTypes...)...) // 例如 wardle.example.com/v1beta1 ws.ApiVersion(a.group.GroupVersion.String()) return ws } // 注册资源处理器,过于冗长,仅仅贴片段 func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storage, ws *restful.WebService) (*metav1.APIResource, error) { // ... // creater就是Handler的核心逻辑所在,它来自rest.Storage,对接Etcd creater, isCreater := storage.(rest.Creater) // ... actions = appendIf(actions, action{"POST", resourcePath, resourceParams, namer, false}, isCreater) switch action.Verb { case "POST": handler = restfulCreateResource(creater, reqScope, admit) route := ws.POST(action.Path).To(handler). Doc(doc). Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")). Operation("create"+namespaced+kind+strings.Title(subresource)+operationSuffix). Produces(append(storageMeta.ProducesMIMETypes(action.Verb), mediaTypes...)...). Returns(http.StatusOK, "OK", producedObject). // TODO: in some cases, the API may return a v1.Status instead of the versioned object // but currently go-restful can't handle multiple different objects being returned. Returns(http.StatusCreated, "Created", producedObject). Returns(http.StatusAccepted, "Accepted", producedObject). Reads(defaultVersionedObject). Writes(producedObject) if err := AddObjectParams(ws, route, versionedCreateOptions); err != nil { return nil, err } addParams(route, action.Params) routes = append(routes, route) // ... } func restfulCreateResource(r rest.Creater, scope handlers.RequestScope, admit admission.Interface) restful.RouteFunction { return func(req *restful.Request, res *restful.Response) { handlers.CreateResource(r, &scope, admit)(res.ResponseWriter, req.Request) } } func CreateResource(r rest.Creater, scope *RequestScope, admission admission.Interface) http.HandlerFunc { return createHandler(&namedCreaterAdapter{r}, scope, admission, false) } // 创建资源处理器核心 func createHandler(r rest.NamedCreater, scope *RequestScope, admit admission.Interface, includeName bool) http.HandlerFunc { return func(w http.ResponseWriter, req *http.Request) { // For performance tracking purposes. trace := utiltrace.New("Create", utiltrace.Field{Key: "url", Value: req.URL.Path}, utiltrace.Field{Key: "user-agent", Value: &lazyTruncatedUserAgent{req}}, utiltrace.Field{Key: "client", Value: &lazyClientIP{req}}) defer trace.LogIfLong(500 * time.Millisecond) // 处理Dryrun if isDryRun(req.URL) && !utilfeature.DefaultFeatureGate.Enabled(features.DryRun) { scope.err(errors.NewBadRequest("the dryRun feature is disabled"), w, req) return } // TODO: we either want to remove timeout or document it (if we document, move timeout out of this function and declare it in api_installer) timeout := parseTimeout(req.URL.Query().Get("timeout")) // 命名空间和资源名字处理 namespace, name, err := scope.Namer.Name(req) if err != nil { if includeName { // name was required, return scope.err(err, w, req) return } // otherwise attempt to look up the namespace namespace, err = scope.Namer.Namespace(req) if err != nil { scope.err(err, w, req) return } } ctx, cancel := context.WithTimeout(req.Context(), timeout) defer cancel() ctx = request.WithNamespace(ctx, namespace) // 协商输出MIME类型 outputMediaType, _, err := negotiation.NegotiateOutputMediaType(req, scope.Serializer, scope) if err != nil { scope.err(err, w, req) return } gv := scope.Kind.GroupVersion() // 协商输入如何反串行化 s, err := negotiation.NegotiateInputSerializer(req, false, scope.Serializer) if err != nil { scope.err(err, w, req) return } // 从串行化器得到能将请求解码为特定版本的解码器 decoder := scope.Serializer.DecoderToVersion(s.Serializer, scope.HubGroupVersion) // 读取请求体 body, err := limitedReadBody(req, scope.MaxRequestBodyBytes) if err != nil { scope.err(err, w, req) return } options := &metav1.CreateOptions{} values := req.URL.Query() if err := metainternalversionscheme.ParameterCodec.DecodeParameters(values, scope.MetaGroupVersion, options); err != nil { err = errors.NewBadRequest(err.Error()) scope.err(err, w, req) return } if errs := validation.ValidateCreateOptions(options); len(errs) > 0 { err := errors.NewInvalid(schema.GroupKind{Group: metav1.GroupName, Kind: "CreateOptions"}, "", errs) scope.err(err, w, req) return } options.TypeMeta.SetGroupVersionKind(metav1.SchemeGroupVersion.WithKind("CreateOptions")) defaultGVK := scope.Kind // 实例化资源的Go结构 original := r.New() trace.Step("About to convert to expected version") // 将请求体解码到Go结构中 obj, gvk, err := decoder.Decode(body, &defaultGVK, original) if err != nil { err = transformDecodeError(scope.Typer, err, original, gvk, body) scope.err(err, w, req) return } if gvk.GroupVersion() != gv { err = errors.NewBadRequest(fmt.Sprintf("the API version in the data (%s) does not match the expected API version (%v)", gvk.GroupVersion().String(), gv.String())) scope.err(err, w, req) return } trace.Step("Conversion done") // 审计和Admission控制 ae := request.AuditEventFrom(ctx) admit = admission.WithAudit(admit, ae) audit.LogRequestObject(ae, obj, scope.Resource, scope.Subresource, scope.Serializer) userInfo, _ := request.UserFrom(ctx) // On create, get name from new object if unset if len(name) == 0 { _, name, _ = scope.Namer.ObjectName(obj) } trace.Step("About to store object in database") admissionAttributes := admission.NewAttributesRecord(obj, nil, scope.Kind, namespace, name, scope.Resource, scope.Subresource, admission.Create, options, dryrun.IsDryRun(options.DryRun), userInfo) // 构建入库函数 requestFunc := func() (runtime.Object, error) { // 调用rest.Storage进行入库 return r.Create( ctx, name, obj, rest.AdmissionToValidateObjectFunc(admit, admissionAttributes, scope), options, ) } // finishRequest能够异步执行回调,并且处理响应返回的错误 result, err := finishRequest(timeout, func() (runtime.Object, error) { if scope.FieldManager != nil { liveObj, err := scope.Creater.New(scope.Kind) if err != nil { return nil, fmt.Errorf("failed to create new object (Create for %v): %v", scope.Kind, err) } obj = scope.FieldManager.UpdateNoErrors(liveObj, obj, managerOrUserAgent(options.FieldManager, req.UserAgent())) } if mutatingAdmission, ok := admit.(admission.MutationInterface); ok && mutatingAdmission.Handles(admission.Create) { if err := mutatingAdmission.Admit(ctx, admissionAttributes, scope); err != nil { return nil, err } } result, err := requestFunc() // If the object wasn't committed to storage because it's serialized size was too large, // it is safe to remove managedFields (which can be large) and try again. if isTooLargeError(err) { if accessor, accessorErr := meta.Accessor(obj); accessorErr == nil { accessor.SetManagedFields(nil) result, err = requestFunc() } } return result, err }) if err != nil { scope.err(err, w, req) return } trace.Step("Object stored in database") code := http.StatusCreated status, ok := result.(*metav1.Status) if ok && err == nil && status.Code == 0 { status.Code = int32(code) } transformResponseObject(ctx, scope, trace, req, w, code, outputMediaType, result) } } |
回顾一下请求处理的整体逻辑:
- GenericAPIServer.Handler就是http.Handler,可以注册给任何HTTP服务器。因此我们想绕开HTTPS的限制应该很容易
- GenericAPIServer.Handler是一个层层包裹的处理器链,外层是一系列过滤器,最里面是director
- director负责整体的请求分发:
- 对于非API资源请求,分发给nonGoRestfulMux。我们可以利用这个扩展点,扩展任意形式的HTTP接口
- 对于API资源请求,分发给gorestfulContainer
- 在GenericAPIServer.InstallAPIGroup中,所有支持的API资源的所有版本,都注册为go-restful的一个WebService
- 这些WebService的逻辑包括(依赖于rest.Storage):
- 将请求解码为资源对应的Go结构
- 将Go结构编码为JSON
- 将JSON存储到Etcd
通过对sample-apiserver的代码分析,我们理解了构建自己的API Server的各种关键要素。
APIServer的核心类型是 GenericAPIServer,它是由 genericapiserver.CompletedConfig的 New()方法生成的。后者则是 genericapiserver.RecommendedConfig的 Complete()方法生成的。而RecommendedConfig又是从 genericoptions.RecommendedOptions得到的。sample-apiserver对Config、Option、Server等对象都做了一层包装,我们不关注这些wrapper。
RecommendedOptions对应了用户提供的各类选项(外加所谓推荐选项,降低使用时的复杂度),例如Etcd地址、Etcd存储前缀、APIServer的基本信息等等。调用RecommendedOptions的 ApplyTo方法,会根据选项,推导出APIServer所需的,完整的配置信息。在这个方法中,甚至会进行自签名证书等重操作,而不是简单的将信息从Option复制给Config。RecommendedOptions会依次调用它的各个字段的ApplyTo方法,从而推导出RecommendedConfig的各个字段。
RecommendedConfig的Complete方法,再一次进行配置信息的推导,主要牵涉到OpenAPI相关的配置。
CompletedConfig的New方法实例化GenericAPIServer,这一步最关键的逻辑是安装API组。API组定义了如何实现GroupVersion中API的增删改查,它将GroupVersion的每种资源映射到registry.REST,后者具有处理REST风格请求的能力,并(默认)存储到Etcd。
GenericAPIServer提供了一些钩子来处理Admission控制器的注册、初始化。以及另外一些钩子来对API Server的生命周期事件做出响应。
想实现sample-apiserver的独立运行,RecommendedOptions有三个字段必须处理:Authentication、Authorization、CoreAPI,它们都隐含了对主kube-apiserver的依赖。
Authentication依赖主kube-apiserver,是因为它需要访问TokenReviewInterface,访问kube-system中的ConfigMap。Authorization依赖主kube-apiserver,是因为它需要访问SubjectAccessReviewInterface。CoreAPI则是直接为Config提供了两个字段:ClientConfig、SharedInformerFactory。
将这些字段置空,可以解除对主kube-apiserver的依赖。这样启动sample-apiserver时就不需要提供这三个命令行选项:
--kubeconfig=/home/alex/.kube/config
--authentication-kubeconfig=/home/alex/.kube/config
--authorization-kubeconfig=/home/alex/.kube/config
但是,置空CoreAPI会导致报错:admission depends on a Kubernetes core API shared informer, it cannot be nil。这提示我们不能在不依赖主kube-apiserver的情况下使用Admission控制器这一特性,需要将Admission也置空:
1 2 3 4 |
o.RecommendedOptions.Authentication = nil o.RecommendedOptions.Authorization = nil o.RecommendedOptions.CoreAPI = nil o.RecommendedOptions.Admission = nil |
清空上述四个字段后,sample-server还会在PostStart钩子中崩溃:
1 2 3 4 |
// panic,这个SharedInformerFactory是CoreAPI选项提供的 config.GenericConfig.SharedInformerFactory.Start(context.StopCh) // 仅仅Admission控制器使用该InformerFactory o.SharedInformerFactory.Start(context.StopCh) |
由于注释中给出的原因,这个PostStart钩子已经没有意义,删除即可正常启动服务器。
GenericAPIServer的 Run方法的默认实现,是调用 s.SecureServingInfo.Serve,因而强制使用HTTPS:
1 |
stoppedCh, err = s.SecureServingInfo.Serve(s.Handler, s.ShutdownTimeout, internalStopCh) |
不过,很明显的,我们只需要将s.Handler传递给自己的http.Server即可使用HTTP。
我们的迁移工具还提供一些非Kubernetes风格的HTTP接口,那么如何集成到APIServer中呢?
在启动服务器之前,可以直接访问 GenericAPIServer.Handler.NonGoRestfulMux,NonGoRestfulMux实现了:
1 2 3 |
type mux interface { Handle(pattern string, handler http.Handler) } |
调用Handle即可为任何路径注册处理器。
通过对sample-apiserver代码的分析,我们了解到构建自己的API Server有大量繁琐的工作需要做。幸运的是,K8S提供了apiserver-builder-alpha简化这一过程。
apiserver-builder-alpha是一系列工具和库的集合,它能够:
- 为新的API资源创建Go类型、控制器(基于controller-runtime)、测试用例、文档
- 构建、(独立、在Minikube或者在K8S中)运行扩展的控制平面组件(APIServer)
- 让在控制器中watch/update资源更简单
- 让创建新的资源/子资源更简单
- 提供大部分合理的默认值
下载压缩包,解压并存放到目录,然后设置环境变量:
1 |
export PATH=$HOME/.local/kubernetes/apiserver-builder/bin/:$PATH |
你需要在$GOPATH下创建一个项目,创建一个boilerplate.go.txt文件。然后执行:
1 |
apiserver-boot init repo --domain cloud.gmem.cc |
该命令会生成如下目录结构:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
. ├── bin ├── boilerplate.go.txt ├── BUILD.bazel ├── cmd │ ├── apiserver │ │ └── main.go │ └── manager │ └── main.go ├── go.mod ├── go.sum ├── pkg │ ├── apis │ │ └── doc.go │ ├── controller │ │ └── doc.go │ ├── doc.go │ ├── openapi │ │ └── doc.go │ └── webhook │ └── webhook.go ├── PROJECT └── WORKSPAC |
cmd/apiserver/main.go,是APIServer的入口点:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
import "sigs.k8s.io/apiserver-builder-alpha/pkg/cmd/server" func main() { version := "v0" err := server.StartApiServerWithOptions(&server.StartOptions{ EtcdPath: "/registry/cloud.gmem.cc", // 无法运行,这个函数不存在 Apis: apis.GetAllApiBuilders(), Openapidefs: openapi.GetOpenAPIDefinitions, Title: "Api", Version: version, // TweakConfigFuncs []func(apiServer *apiserver.Config) error // FlagConfigFuncs []func(*cobra.Command) error }) if err != nil { panic(err) } } |
可以看到apiserver-builder-alpha进行了一些封装。
执行下面的命令,添加一个新的API资源:
1 |
apiserver-boot create group version resource --group tcm --version v1 --kind Flunder |
最后,执行命令可以在本地启动APIServer:
1 |
apiserver-boot run local |
本文提及的工具项目,最初架构是基于CRD,使用kubebuilder进行代码生成。kubebuilder的目录结构和apiserver-builder并不兼容。
此外apiserver-builder项目仍然处于Alpha阶段,并且经过测试,发现生成代码无法运行。为了避免不必要的麻烦,我们不打算使用它。
由于apiserver-builder不成熟,而且我们已经基于kubebuilder完成了大部分开发工作。因此打算基于分析sample-apiserver获得的经验,手工编写一个独立运行、使用HTTP协议的APIServer。
kubebuilder并不会生成zz_generated.openapi.go文件,因为该文件对于CRD没有意义。但是这个文件对于独立API Server是必须的。
我们需要为资源类型所在包添加注解:
1 2 3 |
// +k8s:openapi-gen=true package v1 |
并调用openapi-gen生成此文件:
1 2 3 |
openapi-gen \ --input-dirs "k8s.io/apimachinery/pkg/apis/meta/v1,k8s.io/apimachinery/pkg/runtime,k8s.io/apimachinery/pkg/version" \ --input-dirs cloud.gmem.cc/teleport/api/v1 -p cloud.gmem.cc/teleport/api/v1 -O zz_generated.openapi |
下面是完整的quick&dirty的代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 |
package main import ( v1 "cloud.gmem.cc/teleport/api/v1" "context" "fmt" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/serializer" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/validation/field" "k8s.io/apiserver/pkg/endpoints/openapi" "k8s.io/apiserver/pkg/features" "k8s.io/apiserver/pkg/registry/generic" genericregistry "k8s.io/apiserver/pkg/registry/generic/registry" "k8s.io/apiserver/pkg/registry/rest" genericapiserver "k8s.io/apiserver/pkg/server" genericoptions "k8s.io/apiserver/pkg/server/options" "k8s.io/apiserver/pkg/storage" "k8s.io/apiserver/pkg/storage/names" "k8s.io/apiserver/pkg/util/feature" utilfeature "k8s.io/apiserver/pkg/util/feature" "net" "net/http" "os" "reflect" ctrl "sigs.k8s.io/controller-runtime" ) var ( setupLog = ctrl.Log.WithName("setup") ) func main() { s := runtime.NewScheme() utilruntime.Must(v1.AddToScheme(s)) gv := v1.GroupVersion utilruntime.Must(s.SetVersionPriority(gv)) metav1.AddToGroupVersion(s, schema.GroupVersion{Version: "v1"}) unversioned := schema.GroupVersion{Group: "", Version: "v1"} s.AddUnversionedTypes(unversioned, &metav1.Status{}, &metav1.APIVersions{}, &metav1.APIGroupList{}, &metav1.APIGroup{}, &metav1.APIResourceList{}, ) // 必须注册一个__internal版本,否则报错 // failed to prepare current and previous objects: no kind "Flunder" is registered for the internal version of group "tcm.cloud.gmem.cc" in scheme gvi := gv gvi.Version = runtime.APIVersionInternal s.AddKnownTypes(gvi, &v1.Flunder{}, &v1.FlunderList{}) codecFactory := serializer.NewCodecFactory(s) codec := codecFactory.LegacyCodec(gv) options := genericoptions.NewRecommendedOptions( "/teleport/cloud.gmem.cc", codec, ) options.Etcd.StorageConfig.EncodeVersioner = runtime.NewMultiGroupVersioner(gv, schema.GroupKind{Group: gv.Group}) ips := []net.IP{net.ParseIP("127.0.0.1")} if err := options.SecureServing.MaybeDefaultWithSelfSignedCerts("localhost", nil, ips); err != nil { setupLog.Error(err, "error creating self-signed certificates") os.Exit(1) } options.Etcd.StorageConfig.Paging = utilfeature.DefaultFeatureGate.Enabled(features.APIListChunking) options.Etcd.StorageConfig.Transport.ServerList = []string{"http://etcd.gmem.cc:2379"} options.Authentication = nil options.Authorization = nil options.CoreAPI = nil options.Admission = nil options.SecureServing.BindPort = 6443 config := genericapiserver.NewRecommendedConfig(codecFactory) config.OpenAPIConfig = genericapiserver.DefaultOpenAPIConfig(v1.GetOpenAPIDefinitions, openapi.NewDefinitionNamer(s)) config.OpenAPIConfig.Info.Title = "Teleport" config.OpenAPIConfig.Info.Version = "1.0" feature.DefaultMutableFeatureGate.SetFromMap(map[string]bool{ string(features.APIPriorityAndFairness): false, }) if err := options.ApplyTo(config); err != nil { panic(err) } completedConfig := config.Complete() server, err := completedConfig.New("teleport-apiserver", genericapiserver.NewEmptyDelegate()) if err != nil { panic(err) } apiGroupInfo := genericapiserver.NewDefaultAPIGroupInfo(gv.Group, s, metav1.ParameterCodec, codecFactory) v1storage := map[string]rest.Storage{} resource := v1.ResourceFlunders v1storage[resource] = createStore( s, gv.WithResource(resource).GroupResource(), func() runtime.Object { return &v1.Flunder{} }, func() runtime.Object { return &v1.FlunderList{} }, completedConfig.RESTOptionsGetter, ) apiGroupInfo.VersionedResourcesStorageMap[gv.Version] = v1storage if err := server.InstallAPIGroups(&apiGroupInfo); err != nil { panic(err) } server.AddPostStartHookOrDie("teleport-post-start", func(context genericapiserver.PostStartHookContext) error { return nil }) preparedServer := server.PrepareRun() http.ListenAndServe(":6080", preparedServer.Handler) } func createStore(scheme *runtime.Scheme, gr schema.GroupResource, newFunc, newListFunc func() runtime.Object, optsGetter generic.RESTOptionsGetter) rest.Storage { attrs := func(obj runtime.Object) (labels.Set, fields.Set, error) { typ := reflect.TypeOf(newFunc()) if reflect.TypeOf(obj) != typ { return nil, nil, fmt.Errorf("given object is not a %s", typ.Name()) } oma := obj.(metav1.ObjectMetaAccessor) meta := oma.GetObjectMeta() return meta.GetLabels(), fields.Set{ "metadata.name": meta.GetName(), "metadata.namespace": meta.GetNamespace(), }, nil } s := strategy{ scheme, names.SimpleNameGenerator, } store := &genericregistry.Store{ NewFunc: newFunc, NewListFunc: newListFunc, PredicateFunc: func(label labels.Selector, field fields.Selector) storage.SelectionPredicate { return storage.SelectionPredicate{ Label: label, Field: field, GetAttrs: attrs, } }, DefaultQualifiedResource: gr, CreateStrategy: s, UpdateStrategy: s, DeleteStrategy: s, TableConvertor: rest.NewDefaultTableConvertor(gr), } options := &generic.StoreOptions{RESTOptions: optsGetter, AttrFunc: attrs} if err := store.CompleteWithOptions(options); err != nil { panic(err) } return store } type strategy struct { runtime.ObjectTyper names.NameGenerator } func (strategy) NamespaceScoped() bool { return true } func (strategy) PrepareForCreate(ctx context.Context, obj runtime.Object) { } func (strategy) PrepareForUpdate(ctx context.Context, obj, old runtime.Object) { } func (strategy) Validate(ctx context.Context, obj runtime.Object) field.ErrorList { return field.ErrorList{} } func (strategy) AllowCreateOnUpdate() bool { return false } func (strategy) AllowUnconditionalUpdate() bool { return false } func (strategy) Canonicalize(obj runtime.Object) { } func (strategy) ValidateUpdate(ctx context.Context, obj, old runtime.Object) field.ErrorList { return field.ErrorList{} } |
在安装APIGroup的时候,我们需要为每API组的每个版本的每种资源,指定存储后端:
1 2 3 4 5 6 7 8 9 |
// 每个组 apiGroupInfo := genericapiserver.NewDefaultAPIGroupInfo(wardle.GroupName, Scheme, metav1.ParameterCodec, Codecs) // 每个版本 v1alpha1storage := map[stcongring]rest.Storage{} // 每种资源提供一个rest.Storage v1alpha1storage["flunders"] = wardleregistry.RESTInPeace(flunderstorage.NewREST(Scheme, c.GenericConfig.RESTOptionsGetter)) // 安装APIGroup s.GenericAPIServer.InstallAPIGroup(&apiGroupInfo) |
默认情况下,使用的是genericregistry.Store,它对接到Etcd。要实现自己的存储后端,实现相关接口即可。
注意:关于存储后端,有很多细节需要处理。
下面贴一个在文件系统中,以YAML形式存储API资源的例子:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 |
package file import ( "bytes" "context" "fmt" "io/ioutil" "k8s.io/apimachinery/pkg/util/uuid" "os" "path/filepath" "reflect" "strings" "sync" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/conversion" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/watch" genericapirequest "k8s.io/apiserver/pkg/endpoints/request" "k8s.io/apiserver/pkg/registry/rest" ) var _ rest.StandardStorage = &store{} var _ rest.Scoper = &store{} var _ rest.Storage = &store{} // NewStore instantiates a new file storage func NewStore(groupResource schema.GroupResource, codec runtime.Codec, rootpath string, isNamespaced bool, newFunc func() runtime.Object, newListFunc func() runtime.Object, tc rest.TableConvertor) rest.Storage { objRoot := filepath.Join(rootpath, groupResource.Group, groupResource.Resource) if err := ensureDir(objRoot); err != nil { panic(fmt.Sprintf("unable to write data dir: %s", err)) } rest := &store{ defaultQualifiedResource: groupResource, TableConvertor: tc, codec: codec, objRootPath: objRoot, isNamespaced: isNamespaced, newFunc: newFunc, newListFunc: newListFunc, watchers: make(map[int]*yamlWatch, 10), } return rest } type store struct { rest.TableConvertor codec runtime.Codec objRootPath string isNamespaced bool muWatchers sync.RWMutex watchers map[int]*yamlWatch newFunc func() runtime.Object newListFunc func() runtime.Object defaultQualifiedResource schema.GroupResource } func (f *store) notifyWatchers(ev watch.Event) { f.muWatchers.RLock() for _, w := range f.watchers { w.ch <- ev } f.muWatchers.RUnlock() } func (f *store) New() runtime.Object { return f.newFunc() } func (f *store) NewList() runtime.Object { return f.newListFunc() } func (f *store) NamespaceScoped() bool { return f.isNamespaced } func (f *store) Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error) { return read(f.codec, f.objectFileName(ctx, name), f.newFunc) } func (f *store) List(ctx context.Context, options *metainternalversion.ListOptions) (runtime.Object, error) { newListObj := f.NewList() v, err := getListPrt(newListObj) if err != nil { return nil, err } dirname := f.objectDirName(ctx) if err := visitDir(dirname, f.newFunc, f.codec, func(path string, obj runtime.Object) { appendItem(v, obj) }); err != nil { return nil, fmt.Errorf("failed walking filepath %v", dirname) } return newListObj, nil } func (f *store) Create(ctx context.Context, obj runtime.Object, createValidation rest.ValidateObjectFunc, options *metav1.CreateOptions) (runtime.Object, error) { if createValidation != nil { if err := createValidation(ctx, obj); err != nil { return nil, err } } if f.isNamespaced { ns, ok := genericapirequest.NamespaceFrom(ctx) if !ok { return nil, apierrors.NewBadRequest("namespace required") } if err := ensureDir(filepath.Join(f.objRootPath, ns)); err != nil { return nil, err } } accessor, err := meta.Accessor(obj) if err != nil { return nil, err } if accessor.GetUID() == "" { accessor.SetUID(uuid.NewUUID()) } name := accessor.GetName() filename := f.objectFileName(ctx, name) qualifiedResource := f.qualifiedResourceFromContext(ctx) if exists(filename) { return nil, apierrors.NewAlreadyExists(qualifiedResource, name) } if err := write(f.codec, filename, obj); err != nil { return nil, apierrors.NewInternalError(err) } f.notifyWatchers(watch.Event{ Type: watch.Added, Object: obj, }) return obj, nil } func (f *store) Update(ctx context.Context, name string, objInfo rest.UpdatedObjectInfo, createValidation rest.ValidateObjectFunc, updateValidation rest.ValidateObjectUpdateFunc, forceAllowCreate bool, options *metav1.UpdateOptions) (runtime.Object, bool, error) { isCreate := false oldObj, err := f.Get(ctx, name, nil) if err != nil { if !forceAllowCreate { return nil, false, err } isCreate = true } if f.isNamespaced { // ensures namespace dir ns, ok := genericapirequest.NamespaceFrom(ctx) if !ok { return nil, false, apierrors.NewBadRequest("namespace required") } if err := ensureDir(filepath.Join(f.objRootPath, ns)); err != nil { return nil, false, err } } updatedObj, err := objInfo.UpdatedObject(ctx, oldObj) if err != nil { return nil, false, err } filename := f.objectFileName(ctx, name) if isCreate { if createValidation != nil { if err := createValidation(ctx, updatedObj); err != nil { return nil, false, err } } if err := write(f.codec, filename, updatedObj); err != nil { return nil, false, err } f.notifyWatchers(watch.Event{ Type: watch.Added, Object: updatedObj, }) return updatedObj, true, nil } if updateValidation != nil { if err := updateValidation(ctx, updatedObj, oldObj); err != nil { return nil, false, err } } if err := write(f.codec, filename, updatedObj); err != nil { return nil, false, err } f.notifyWatchers(watch.Event{ Type: watch.Modified, Object: updatedObj, }) return updatedObj, false, nil } func (f *store) Delete(ctx context.Context, name string, deleteValidation rest.ValidateObjectFunc, options *metav1.DeleteOptions) (runtime.Object, bool, error) { filename := f.objectFileName(ctx, name) qualifiedResource := f.qualifiedResourceFromContext(ctx) if !exists(filename) { return nil, false, apierrors.NewNotFound(qualifiedResource, name) } oldObj, err := f.Get(ctx, name, nil) if err != nil { return nil, false, err } if deleteValidation != nil { if err := deleteValidation(ctx, oldObj); err != nil { return nil, false, err } } if err := os.Remove(filename); err != nil { return nil, false, err } f.notifyWatchers(watch.Event{ Type: watch.Deleted, Object: oldObj, }) return oldObj, true, nil } func (f *store) DeleteCollection(ctx context.Context, deleteValidation rest.ValidateObjectFunc, options *metav1.DeleteOptions, listOptions *metainternalversion.ListOptions) (runtime.Object, error) { newListObj := f.NewList() v, err := getListPrt(newListObj) if err != nil { return nil, err } dirname := f.objectDirName(ctx) if err := visitDir(dirname, f.newFunc, f.codec, func(path string, obj runtime.Object) { _ = os.Remove(path) appendItem(v, obj) }); err != nil { return nil, fmt.Errorf("failed walking filepath %v", dirname) } return newListObj, nil } func (f *store) objectFileName(ctx context.Context, name string) string { if f.isNamespaced { // FIXME: return error if namespace is not found ns, _ := genericapirequest.NamespaceFrom(ctx) return filepath.Join(f.objRootPath, ns, name+".yaml") } return filepath.Join(f.objRootPath, name+".yaml") } func (f *store) objectDirName(ctx context.Context) string { if f.isNamespaced { // FIXME: return error if namespace is not found ns, _ := genericapirequest.NamespaceFrom(ctx) return filepath.Join(f.objRootPath, ns) } return filepath.Join(f.objRootPath) } func write(encoder runtime.Encoder, filepath string, obj runtime.Object) error { buf := new(bytes.Buffer) if err := encoder.Encode(obj, buf); err != nil { return err } return ioutil.WriteFile(filepath, buf.Bytes(), 0600) } func read(decoder runtime.Decoder, path string, newFunc func() runtime.Object) (runtime.Object, error) { content, err := ioutil.ReadFile(filepath.Clean(path)) if err != nil { return nil, err } newObj := newFunc() decodedObj, _, err := decoder.Decode(content, nil, newObj) if err != nil { return nil, err } return decodedObj, nil } func exists(filepath string) bool { _, err := os.Stat(filepath) return err == nil } func ensureDir(dirname string) error { if !exists(dirname) { return os.MkdirAll(dirname, 0700) } return nil } func visitDir(dirname string, newFunc func() runtime.Object, codec runtime.Decoder, visitFunc func(string, runtime.Object)) error { return filepath.Walk(dirname, func(path string, info os.FileInfo, err error) error { if err != nil { return err } if info.IsDir() { return nil } if !strings.HasSuffix(info.Name(), ".yaml") { return nil } newObj, err := read(codec, path, newFunc) if err != nil { return err } visitFunc(path, newObj) return nil }) } func appendItem(v reflect.Value, obj runtime.Object) { v.Set(reflect.Append(v, reflect.ValueOf(obj).Elem())) } func getListPrt(listObj runtime.Object) (reflect.Value, error) { listPtr, err := meta.GetItemsPtr(listObj) if err != nil { return reflect.Value{}, err } v, err := conversion.EnforcePtr(listPtr) if err != nil || v.Kind() != reflect.Slice { return reflect.Value{}, fmt.Errorf("need ptr to slice: %v", err) } return v, nil } func (f *store) Watch(ctx context.Context, options *metainternalversion.ListOptions) (watch.Interface, error) { yw := &yamlWatch{ id: len(f.watchers), f: f, ch: make(chan watch.Event, 10), } // On initial watch, send all the existing objects list, err := f.List(ctx, options) if err != nil { return nil, err } danger := reflect.ValueOf(list).Elem() items := danger.FieldByName("Items") for i := 0; i < items.Len(); i++ { obj := items.Index(i).Addr().Interface().(runtime.Object) yw.ch <- watch.Event{ Type: watch.Added, Object: obj, } } f.muWatchers.Lock() f.watchers[yw.id] = yw f.muWatchers.Unlock() return yw, nil } type yamlWatch struct { f *store id int ch chan watch.Event } func (w *yamlWatch) Stop() { w.f.muWatchers.Lock() delete(w.f.watchers, w.id) w.f.muWatchers.Unlock() } func (w *yamlWatch) ResultChan() <-chan watch.Event { return w.ch } func (f *store) ConvertToTable(ctx context.Context, object runtime.Object, tableOptions runtime.Object) (*metav1.Table, error) { return f.TableConvertor.ConvertToTable(ctx, object, tableOptions) } func (f *store) qualifiedResourceFromContext(ctx context.Context) schema.GroupResource { if info, ok := genericapirequest.RequestInfoFrom(ctx); ok { return schema.GroupResource{Group: info.APIGroup, Resource: info.Resource} } // some implementations access storage directly and thus the context has no RequestInfo return f.defaultQualifiedResource } |
调用NewStore即可创建一个rest.Storage。前面我们提到过存储后端有很多细节需要处理,对于上面这个样例,它没有:
- 发现正在删除中的资源,并在CRUD时作出适当响应
- 进行资源合法性校验。genericregistry.Store的做法是,调用strategy进行校验
- 自动填充某些元数据字段,包括creationTimestamp、selfLink等
假如K8S中某种资源具有状态子资源。那么当客户端更新状态子资源时,发出的HTTP请求格式为:
PUT /apis/cloud.gmem.cc/v1/namespaces/default/flunders/sample/status
它会匹配路由:
PUT /apis/cloud.gmem.cc/v1/namespaces/{namespace}/flunders/{name}/status
这个路由是专门为status子资源准备的,和主资源路由不同:
PUT /apis/cloud.gmem.cc/v1/namespaces/{namespace}/flunders/{name}
那么,主资源、子资源的处理方式有什么不同?如何影响这种资源处理逻辑呢?
InstallAPIGroups时,你只需要简单的为带有 / 的资源名字符串添加一个rest.Storage,就支持子资源了:
1 |
v1beta1storage["flunders/status"] = wardleregistry.RESTInPeace(flunderstorage.NewREST(Scheme, c.GenericConfig.RESTOptionsGetter)) |
你甚至可以直接使用父资源的rest.Storage。但是这样的结果是,客户端请求可以仅更新status,也可以更新整个flunder,一般情况下这是不符合预期的。
上面的代码,还会导致在APIServer中注册类似本章开始处的go-restful路由。
抽取路径变量namespace、name的代码是:
1 |
pathParams := pathProcessor.ExtractParameters(route, webService, httpRequest.URL.Path) |
这两个变量识别了当前操控的是什么资源。请求进而会转发给rest.Storage的Update方法:
1 |
func (e *Store) Update(ctx context.Context, name string, objInfo rest.UpdatedObjectInfo, createValidation rest.ValidateObjectFunc, updateValidation rest.ValidateObjectUpdateFunc, forceAllowCreate bool, options *metav1.UpdateOptions) (runtime.Object, bool, error) {} |
name参数传递的是资源的名字。当前是否应当(仅)更新子资源,rest.Storage无从知晓。
更新状态子资源的时候,我们通常仅仅允许更新Status字段。要达成这个目的,我们需要为子资源注册独立的rest.Storage。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 |
package store import ( "context" "k8s.io/apimachinery/pkg/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apiserver/pkg/registry/generic/registry" "k8s.io/apiserver/pkg/registry/rest" "sigs.k8s.io/apiserver-runtime/pkg/builder/resource/util" ) // CopyStatusFunc copies status from obj to old type CopyStatusFunc func(src, dst runtime.Object) // StatusStore decorates a parent storage and only updates // status subresource when updating func StatusStore(parentStore rest.StandardStorage, copyStatusFunc CopyStatusFunc) rest.Storage { switch pstor := parentStore.(type) { case *registry.Store: pstor.UpdateStrategy = &statusStrategy{ RESTUpdateStrategy: pstor.UpdateStrategy, copyStatusFunc: copyStatusFunc, } } return &statusStore{ StandardStorage: parentStore, } } var _ rest.Getter = &statusStore{} var _ rest.Updater = &statusStore{} type statusStore struct { rest.StandardStorage } var _ rest.RESTUpdateStrategy = &statusStrategy{} // statusStrategy defines a default Strategy for the status subresource. type statusStrategy struct { rest.RESTUpdateStrategy copyStatusFunc CopyStatusFunc } // PrepareForUpdate calls the PrepareForUpdate function on obj if supported, otherwise does nothing. func (s *statusStrategy) PrepareForUpdate(ctx context.Context, new, old runtime.Object) { s.copyStatusFunc(new, old) if err := util.DeepCopy(old, new); err != nil { utilruntime.HandleError(err) } } |
genericregistry.Store的更新会在一个原子操作的回调函数中进行。在回调中,它会调用Strategy的PrepareForUpdate方法。上面的statusStore的原理就是覆盖此方法,仅仅改变状态子资源。
当你的API需要引入破坏性变更时,就要考虑支持多版本化。
下面是一个典型的多版本API文件目录的布局:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
api ├── doc.go ├── fullvpcmigration_types.go ├── ├── v1 │ ├── conversion.go │ ├── doc.go │ ├── fullvpcmigration_types.go │ ├── register.go │ ├── zz_generated.conversion.go │ ├── zz_generated.deepcopy.go │ └── zz_generated.openapi.go ├── v2 │ ├── doc.go │ ├── fullvpcmigration_types.go │ ├── register.go │ ├── zz_generated.conversion.go │ ├── zz_generated.deepcopy.go │ └── zz_generated.openapi.go └── zz_generated.deepcopy.go |
API组的根目录(上面的示例项目只有一个组,因此直接将api目录作为组的根目录)下,应该存放__internal版本的资源结构定义,建议将其内容和最新版本保持一致。
这个文件应当提供包级别的注释,例如:
1 2 3 4 5 |
// +k8s:openapi-gen=true // +groupName=gmem.cc // +kubebuilder:object:generate=true package api |
这个文件用于Scheme的注册。对于__internal版本:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
package api import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" ) const ( GroupName = "gmem.cc" ) var ( // GroupVersion is group version used to register these objects GroupVersion = schema.GroupVersion{Group: GroupName, Version: runtime.APIVersionInternal} // SchemeBuilder is used to add go types to the GroupVersionKind scheme // no &scheme.Builder{} here, otherwise vk __internal/WatchEvent will double registered to k8s.io/apimachinery/pkg/apis/meta/v1.WatchEvent & // k8s.io/apimachinery/pkg/apis/meta/v1.InternalEvent, which is illegal SchemeBuilder = runtime.NewSchemeBuilder() // AddToScheme adds the types in this group-version to the given scheme. AddToScheme = SchemeBuilder.AddToScheme ) // Kind takes an unqualified kind and returns a Group qualified GroupKind func Kind(kind string) schema.GroupKind { return GroupVersion.WithKind(kind).GroupKind() } // Resource takes an unqualified resource and returns a Group qualified GroupResource func Resource(resource string) schema.GroupResource { return GroupVersion.WithResource(resource).GroupResource() } |
对于普通的版本:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
package v2 import ( "cloud.tencent.com/teleport/api" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" ) var ( // GroupVersion is group version used to register these objects GroupVersion = schema.GroupVersion{Group: api.GroupName, Version: "v2"} // SchemeBuilder is used to add go types to the GroupVersionKind scheme SchemeBuilder = runtime.NewSchemeBuilder(func(scheme *runtime.Scheme) error { metav1.AddToGroupVersion(scheme, GroupVersion) return nil }) localSchemeBuilder = &SchemeBuilder // AddToScheme adds the types in this group-version to the given scheme. AddToScheme = SchemeBuilder.AddToScheme ) // Kind takes an unqualified kind and returns a Group qualified GroupKind func Kind(kind string) schema.GroupKind { return GroupVersion.WithKind(kind).GroupKind() } // Resource takes an unqualified resource and returns a Group qualified GroupResource func Resource(resource string) schema.GroupResource { return GroupVersion.WithResource(resource).GroupResource() } |
可以看到,普通版本需要将metav1包中的某些结构注册到自己的GroupVersion。
这是每个普通版本都需要生成的OpenAPI定义。这些OpenAPI定义必须注册到API Server,否则将会导致kubectl apply等命令报404错误:
1 2 3 4 5 6 |
$(OPENAPI_GEN) \ --input-dirs "k8s.io/apimachinery/pkg/apis/meta/v1,k8s.io/apimachinery/pkg/runtime,k8s.io/apimachinery/pkg/version" \ --input-dirs cloud.tencent.com/teleport/api/v1 -o ./ -p api/v1 -O zz_generated.openapi $(OPENAPI_GEN) \ --input-dirs cloud.tencent.com/teleport/api/v2 -o ./ -p api/v2 -O zz_generated.openapi |
这是每个普通版本都需要生成的From/To __internal版本的类型转换函数。这些转换函数会通过上面的localSchemeBuilder注册到当前GroupVersion:
1 2 |
$(CONVERSION_GEN) -h hack/boilerplate.go.txt --input-dirs cloud.tencent.com/teleport/api/v1 -O zz_generated.conversion $(CONVERSION_GEN) -h hack/boilerplate.go.txt --input-dirs cloud.tencent.com/teleport/api/v2 -O zz_generated.conversion |
升级API版本的原因,自然是因为结构出现了变动。结构的变动,就意味着新旧版本有特殊的类型转换逻辑。这种逻辑显然不可能自动生成,你手工添加的转换代码应该存放在conversion.go中。
这个文件是__internal版本、普通版本中的资源对应Go结构都需要生成的深拷贝函数。
如前文所述,每个API资源(的版本),都需要一个rest.Storage,这个Storage会直接负责该API资源版本的GET/CREATE/UPDATE/DELETE/WATCH等操作。
作为默认的,针对Etcd存储后端的rest.Storage的实现genericregistry.Store,它在内部有一个Cacher。此Cacher利用缓存来处理WATCH/LIST请求,避免对Etcd过频的访问。在此Cacher内部,会使用资源的内部版本。
所谓内部版本,就是注册到__internal这个特殊版本号的资源。 __internal这个字面值由常量 runtime.APIVersionInternal提供。我们通常将组的根目录下的资源结构体,注册为__internal版本。
有了这种内部版本机制,Cacher就不需要在内存中,存储资源的不同版本。
除此之外,rest.Storage或者它的Strategy所需要的一系列资源生命周期回调函数,接受的参数,都是__internal版本。这意味着:
- 我们不需要为每个版本,编写重复的回调函数
- 在多版本化的时候,需要将这些回调函数的入参都改为__internal版本
之所以Cacher、生命周期回调函数,以及下文会提到的,kubectl和存储能够自由的选择自己需要的版本,是因为不同版本的API资源之间可以进行转换。
当你复制一份v1资源的代码为v2时,这时可以使用完全自动生成的转换函数。一旦你添加或修改了一个字段,你就需要定制转换函数了。
假设我们将FullVPCMigrationSpec.TeamName字段改为Team,则需要:
1 2 3 4 5 6 7 8 9 10 11 12 13 |
// zz_generated.conversion.go中报错的地方,就是你需要实现的函数 func Convert_v1_FullVPCMigrationSpec_To_api_FullVPCMigrationSpec(in *FullVPCMigrationSpec, out *api.FullVPCMigrationSpec, s conversion.Scope) error { // 这里编写因为字段变化还需要手工处理的部分 out.Team = in.TeamName // 然后调用自动生成的函数,这个函数和你实现的函数,名字的差异就是auto前缀 return autoConvert_v1_FullVPCMigrationSpec_To_api_FullVPCMigrationSpec(in, out, s) } func Convert_api_FullVPCMigrationSpec_To_v1_FullVPCMigrationSpec(in *api.FullVPCMigrationSpec, out *FullVPCMigrationSpec, s conversion.Scope) error { out.TeamName = in.Team return autoConvert_api_FullVPCMigrationSpec_To_v1_FullVPCMigrationSpec(in, out, s) } |
上面两个,是自动生成的转换代码中,缺失的函数,会导致编译错误。你需要自己实现它们。
带有auto前缀的版本,是自动生成的、完成了绝大部分逻辑的转换函数,你需要进行必要的手工处理,然后调用这个auto函数即可。
需要注意,转换函数都是在特定版本和__internal版本之间进行的。也就是如果v1需要转换到v2,则需要先转换为__internal,然后在由__internal转换为v2。这种设计也很好理解,不这样做随着版本的增多,转换函数的数量会爆炸式增长。
类型转换代码必须注册到Scheme,不管是在API Server、kubectl或controller-runtime这样的客户端,都依赖于Scheme。
不管是存储(串行化),还是读取(反串行化),都依赖于Codec。所谓Codec就是Serializer:
1 2 3 4 5 6 7 8 9 10 11 |
package runtime type Serializer interface { Encoder Decoder } // Codec is a Serializer that deals with the details of versioning objects. It offers the same // interface as Serializer, so this is a marker to consumers that care about the version of the objects // they receive. type Codec Serializer |
Codec由CodecFactory提供,后者持有Scheme:
1 |
serializer.NewCodecFactory(scheme) |
我们已经知道,Scheme包含这些信息:
- 各种Group、Version、Kind,映射到了什么Go结构
- Go结构上具有json标签,这些信息决定了结构的串行化格式是怎样的
- 同一个Group、Kind的不同Version,如何进行相互转换
因此,Codec有能力进行JSON或其它格式的串行化操作,并且在不同版本的Go结构之间进行转换。
对于 genericregistry.Store 来说,存储就是将API资源的Go结构转换为JSON或者ProtoBuf,保存到Etcd,它显然需要Codec的支持。
当启用多版本支持后,你需要将所有版本(prioritizedVersions)作为参数传递给CodecFactory并创建Codec:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
prioritizedVersions := []schema.GroupVersion{ { Group: "gmem.cc", Version: "v2", }, { Group: "gmem.cc", Version: "v1", }, } codec := codecFactory.LegacyCodec(prioritizedVersions...) genericOptions.Etcd.StorageConfig.EncodeVersioner = runtime.NewMultiGroupVersioner(schema.GroupVersion{ Group: "gmem.cc", Version: "v2", } ) |
并且,prioritizedVersions决定了存储一个资源的时候,优先选择的格式。例如fullvpcmigrations有v1,v2版本,因此在存储的时候会使用v2。而jointeamrequests只有v1版本,因此存储的时候只能使用v1。
注意:如果存在一个既有的v1版本的fullvpcmigration,在上述配置应用后,第一次对它进行修改,会导致存储格式修改为v2。
你需要为每个版本生成OpenAPI定义。OpenAPI的定义只是一个map,将所有版本的内容合并即可。
APIServer会暴露所有注册的资源版本。但是,它有一个版本优先级的概念:
1 |
apiGroupInfo.PrioritizedVersions = prioritizedVersions |
这个决定了kubectl的时候,优先显示为哪个版本。优选版本也会显示在api-resources子命令的输出:
1 2 3 4 |
# kubectl -s http://127.0.0.1:6080 api-resources NAME SHORTNAMES APIVERSION NAMESPACED KIND fullvpcmigrations gmem.cc/v2 true FullVPCMigration jointeamrequests gmem.cc/v1 true JoinTeamRequest |
kuebctl get命令,默认展示优选版本,但是你也可以强制要求显示为指定版本:
1 2 3 4 5 |
kubectl -s http://127.0.0.1:6080 -n default get fullvpcmigration.v1.gmem.cc # GET http://127.0.0.1:6080/apis/gmem.cc/v1/namespaces/default/fullvpcmigrations?limit=500 kubectl -s http://127.0.0.1:6080 -n default get fullvpcmigration.v2.gmem.cc # GET http://127.0.0.1:6080/apis/gmem.cc/v2/namespaces/default/fullvpcmigrations?limit=500 |
不管怎样,存储为任何版本的fullvpcmigrations都会被查询到。你可以认为在客户端视角,选择版本仅仅是选择资源的一种“视图”。
控制器所监听的资源版本,必须已经在控制器管理器的Scheme中注册。
你在Reconcile代码中,可以用任何已经注册的版本来作为Get操作的容器,类型转换会自动进行。
建议仅在读取、存储资源状态的时候,用普通版本,其余时候,都用__internal版本。这样你的控制器逻辑,在版本升级后,需要的变更会很少。
Leave a Reply