Menu

  • Home
  • Work
    • Cloud
      • Virtualization
      • IaaS
      • PaaS
    • Java
    • Go
    • C
    • C++
    • JavaScript
    • PHP
    • Python
    • Architecture
    • Others
      • Assembly
      • Ruby
      • Perl
      • Lua
      • Rust
      • XML
      • Network
      • IoT
      • GIS
      • Algorithm
      • AI
      • Math
      • RE
      • Graphic
    • OS
      • Linux
      • Windows
      • Mac OS X
    • BigData
    • Database
      • MySQL
      • Oracle
    • Mobile
      • Android
      • IOS
    • Web
      • HTML
      • CSS
  • Life
    • Cooking
    • Travel
    • Gardening
  • Gallery
  • Video
  • Music
  • Essay
  • Home
  • Work
    • Cloud
      • Virtualization
      • IaaS
      • PaaS
    • Java
    • Go
    • C
    • C++
    • JavaScript
    • PHP
    • Python
    • Architecture
    • Others
      • Assembly
      • Ruby
      • Perl
      • Lua
      • Rust
      • XML
      • Network
      • IoT
      • GIS
      • Algorithm
      • AI
      • Math
      • RE
      • Graphic
    • OS
      • Linux
      • Windows
      • Mac OS X
    • BigData
    • Database
      • MySQL
      • Oracle
    • Mobile
      • Android
      • IOS
    • Web
      • HTML
      • CSS
  • Life
    • Cooking
    • Travel
    • Gardening
  • Gallery
  • Video
  • Music
  • Essay

编写Kubernetes风格的APIServer

20
Aug
2021

编写Kubernetes风格的APIServer

By Alex
/ in PaaS
/ tags K8S
0 Comments
背景

前段时间接到一个需求做一个工具,工具将在K8S中运行。需求很适合用控制器模式实现,很自然的就基于kubebuilder进行开发了。但是和K8S环境提供方沟通时发现,他们不允许工作负载调用控制平面的接口,这该怎么办呢。

最快速的解决方案是,自己运行一套kube-apiserver + etcd。但是这对我们来说太重了,kube-apiserver很多我们不需要的特性占用了过多资源,因此这里想寻找一个更轻量的方案。

apiserver库

kubernetes/apiserver同步自kubernertes主代码树的taging/src/k8s.io/apiserver目录,它提供了创建K8S风格的API Server所需要的库。包括kube-apiserver、kube-aggregator、service-catalog在内的很多项目都依赖此库。

apiserver库的目的主要是用来构建API Aggregation中的Extension API Server。它提供的特性包括:

  1. 将authn/authz委托给主kube-apiserver
  2. 支持kuebctl兼容的API发现
  3. 支持admisson control链
  4. 支持版本化的API类型

K8S提供了一个样例kubernetes/sample-apiserver,但是这个例子依赖于主kube-apiserver。即使不使用authn/authz或API聚合,也是如此。你需要通过--kubeconfig来指向一个主kube-apiserver,样例中的SharedInformer依赖于会连接到主kube-apiserver来访问K8S资源。

sample-apiserver分析

显然我们是不能对主kube-apiserver有任何依赖的,这里分析一下sample-apiserver的代码,看看如何进行改动。

入口点
Go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
func main() {
    logs.InitLogs()
    defer logs.FlushLogs()
 
    stopCh := genericapiserver.SetupSignalHandler()
    // 初始化服务器选项
    options := server.NewWardleServerOptions(os.Stdout, os.Stderr)
    // 启动服务器
    cmd := server.NewCommandStartWardleServer(options, stopCh)
    cmd.Flags().AddGoFlagSet(flag.CommandLine)
    if err := cmd.Execute(); err != nil {
        klog.Fatal(err)
    }
}
服务器选项
Go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
type WardleServerOptions struct {
    RecommendedOptions *genericoptions.RecommendedOptions
 
    SharedInformerFactory informers.SharedInformerFactory
    StdOut                io.Writer
    StdErr                io.Writer
}
 
func NewWardleServerOptions(out, errOut io.Writer) *WardleServerOptions {
    o := &WardleServerOptions{
        RecommendedOptions: genericoptions.NewRecommendedOptions(
            // 数据默认存放在Etcd的/registry/wardle.example.com目录下
            defaultEtcdPathPrefix,
            // 指定wardle.example.com/v1alpha1使用遗留编解码器
            apiserver.Codecs.LegacyCodec(v1alpha1.SchemeGroupVersion),
            // API Server的进程信息
            genericoptions.NewProcessInfo("wardle-apiserver", "wardle"),
        ),
 
        StdOut: out,
        StdErr: errOut,
    }
    // wardle.example.com/v1alpha1中的所有对象存储到Etcd
    o.RecommendedOptions.Etcd.StorageConfig.EncodeVersioner =
        runtime.NewMultiGroupVersioner(v1alpha1.SchemeGroupVersion,
            schema.GroupKind{Group: v1alpha1.GroupName})
    return o
}

可以看到,选项的核心是genericoptions.RecommendedOptions,顾名思义,它用于提供运行apiserver所需的“推荐”选项:

Go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
type RecommendedOptions struct {
    // Etcd相关的配置
    Etcd           *EtcdOptions
    // HTTPS相关选项,包括监听地址、证书等配置。还负责创建并设置Lookback专用的rest.Config
    SecureServing  *SecureServingOptionsWithLoopback
    // authn选项
    Authentication *DelegatingAuthenticationOptions
    // authz选项
    Authorization  *DelegatingAuthorizationOptions
    // 审计选项
    Audit          *AuditOptions
    // 用于启用剖析、竞态条件剖析
    Features       *FeatureOptions
    // 核心API选项,指定主kube-apiserver配置文件位置
    CoreAPI        *CoreAPIOptions
 
    // 特性开关
    FeatureGate featuregate.FeatureGate
    // 所有以上选项的ApplyTo被调用后,调用下面的函数。返回的PluginInitializer会传递给Admission.ApplyTo
    ExtraAdmissionInitializers func(c *server.RecommendedConfig) ([]admission.PluginInitializer, error)
    Admission                  *AdmissionOptions
    // 提供服务器信息
    ProcessInfo *ProcessInfo
    // Webhook选项
    Webhook     *WebhookOptions
    // 控制服务器的出站流量
    EgressSelector *EgressSelectorOptions
}

推荐的选项取值,可以由函数genericoptions.NewRecommendedOptions()提供。RecommendedOptions支持通过命令行参数获取选项取值:

Go
1
func (o *RecommendedOptions) AddFlags(fs *pflag.FlagSet) {}
准备服务器

服务器实现为cobra.Command命令,首先会将RecommendedOptions绑定到命令行参数。

Go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func NewCommandStartWardleServer(defaults *WardleServerOptions, stopCh <-chan struct{}) *cobra.Command {
    o := *defaults
    cmd := &cobra.Command{
        Short: "Launch a wardle API server",
        Long:  "Launch a wardle API server",
        RunE: func(c *cobra.Command, args []string) error {
            // ...
        },
    }
 
    flags := cmd.Flags()
    // 将选项添加为命令行标记
    o.RecommendedOptions.AddFlags(flags)
    utilfeature.DefaultMutableFeatureGate.AddFlag(flags)
 
    return cmd
}

然后,调用cmd.Execute(),进而调用上面的RunE方法:

Go
1
2
3
4
5
6
7
8
9
10
if err := o.Complete(); err != nil {
    return err
}
if err := o.Validate(args); err != nil {
    return err
}
if err := o.RunWardleServer(stopCh); err != nil {
    return err
}
return nil

Complete方法,就是注册了一个Admission控制器,参考下文。

Validate方法调用RecommendedOptions进行选项(合并了用户提供的命令行标记)合法性校验:

Go
1
2
3
4
5
6
7
func (o WardleServerOptions) Validate(args []string) error {
    errors := []error{}
    // 校验结果是错误的切片
    errors = append(errors, o.RecommendedOptions.Validate()...)
    // 合并为单个错误
    return utilerrors.NewAggregate(errors)
}
启动服务器

RunWardleServer方法启动API Server。它包含了将服务器选项(Option)转换为服务器配置(Config),从服务器配置实例化APIServer,并运行APIServer的整个流程:

Go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
func (o WardleServerOptions) RunWardleServer(stopCh <-chan struct{}) error {
    // 选项转换为配置
    config, err := o.Config()
    if err != nil {
        return err
    }
    // 配置转换为CompletedConfig,实例化APIServer
    server, err := config.Complete().New()
    if err != nil {
        return err
    }
 
    // 注册一个在API Server启动之后运行的钩子
    server.GenericAPIServer.AddPostStartHookOrDie("start-sample-server-informers", func(context genericapiserver.PostStartHookContext) error {
        config.GenericConfig.SharedInformerFactory.Start(context.StopCh)
        o.SharedInformerFactory.Start(context.StopCh)
        return nil
    })
 
    //                             准备运行、  运行 APIServer
    return server.GenericAPIServer.PrepareRun().Run(stopCh)
}
服务器配置

API Server不能直接使用选项,必须将选项转换为apiserver.Config:

Go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
func (o *WardleServerOptions) Config() (*apiserver.Config, error) {
    // 这里又对选项进行了若干修改
 
    // 检查证书是否可以读取,如果不可以则尝试生成自签名证书
    if err := o.RecommendedOptions.SecureServing.MaybeDefaultWithSelfSignedCerts("localhost", nil, []net.IP{net.ParseIP("127.0.0.1")}); err != nil {
        return nil, fmt.Errorf("error creating self-signed certificates: %v", err)
    }
    // 根据特性开关,决定是否支持分页
    o.RecommendedOptions.Etcd.StorageConfig.Paging = utilfeature.DefaultFeatureGate.Enabled(features.APIListChunking)
    //
    o.RecommendedOptions.ExtraAdmissionInitializers = func(c *genericapiserver.RecommendedConfig) ([]admission.PluginInitializer, error) {
        // ...
    }
 
    // 创建推荐配置
    serverConfig := genericapiserver.NewRecommendedConfig(apiserver.Codecs)
    // 暴露OpenAPI端点
    serverConfig.OpenAPIConfig = genericapiserver.DefaultOpenAPIConfig(
        // 自动生成的
        sampleopenapi.GetOpenAPIDefinitions, openapi.NewDefinitionNamer(apiserver.Scheme))
    serverConfig.OpenAPIConfig.Info.Title = "Wardle"
    serverConfig.OpenAPIConfig.Info.Version = "0.1"
 
    // 将RecommendedOptions应用到RecommendedConfig
    if err := o.RecommendedOptions.ApplyTo(serverConfig); err != nil {
        return nil, err
    }
 
    // 选项包含GenericConfig和你自定义的选项两部分
    config := &apiserver.Config{
        GenericConfig: serverConfig,
        ExtraConfig:   apiserver.ExtraConfig{},
    }
    return config, nil
}

从上面的代码可以看到RecommendedConfig是配置的核心:

Go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
type RecommendedConfig struct {
    // 用于配置GenericAPIServer的结构
    Config
 
    // SharedInformerFactory用于提供K8S资源的shared informers
    // 该字段由RecommendedOptions.CoreAPI.ApplyTo调用设置,informer默认使用in-cluster的ClientConfig
    SharedInformerFactory informers.SharedInformerFactory
 
    // 由RecommendedOptions.CoreAPI.ApplyTo设置,informer使用
    ClientConfig *restclient.Config
}
 
type Config struct {
    SecureServing *SecureServingInfo
    Authentication AuthenticationInfo
    Authorization AuthorizationInfo
    // 特权的本机使用的ClientConfig,PostStartHooks用到它
    LoopbackClientConfig *restclient.Config
    // ...
}

通过调用ApplyTo方法,将RecommendedOptions中的选项传递给了RecommendedConfig:

Go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
func (o *RecommendedOptions) ApplyTo(config *server.RecommendedConfig) error {
    // 调用config.AddHealthChecks添加Etcd的健康检查
    // 设置config.RESTOptionsGetter
    if err := o.Etcd.ApplyTo(&config.Config); err != nil {
        return err
    }
    // 创建config.Listener
    // 初始化config.Cert、config.CipherSuites、config.SNICerts等
    if err := o.SecureServing.ApplyTo(&config.Config.SecureServing, &config.Config.LoopbackClientConfig); err != nil {
        return err
    }
    // 初始化身份验证配置,获取相应的K8S客户端接口
    if err := o.Authentication.ApplyTo(&config.Config.Authentication, config.SecureServing, config.OpenAPIConfig); err != nil {
        return err
    }
    // 初始化访问控制配置,获取相应的K8S客户端接口
    if err := o.Authorization.ApplyTo(&config.Config.Authorization); err != nil {
        return err
    }
    if err := o.Audit.ApplyTo(&config.Config, config.ClientConfig, config.SharedInformerFactory, o.ProcessInfo, o.Webhook); err != nil {
        return err
    }
    if err := o.Features.ApplyTo(&config.Config); err != nil {
        return err
    }
    // 从配置文件加载kubeconfig或者使用incluster配置,提供config.ClientConfig和config.SharedInformerFactory
    if err := o.CoreAPI.ApplyTo(config); err != nil {
        return err
    }
    // 调用Admission初始化器
    if initializers, err := o.ExtraAdmissionInitializers(config); err != nil {
        return err
    // 逐个初始化Admission控制器
    } else if err := o.Admission.ApplyTo(&config.Config, config.SharedInformerFactory, config.ClientConfig, o.FeatureGate, initializers...); err != nil {
        return err
    }
    if err := o.EgressSelector.ApplyTo(&config.Config); err != nil {
        return err
    }
    if feature.DefaultFeatureGate.Enabled(features.APIPriorityAndFairness) {
        config.FlowControl = utilflowcontrol.New(
            config.SharedInformerFactory,
            kubernetes.NewForConfigOrDie(config.ClientConfig).FlowcontrolV1alpha1(),
            config.MaxRequestsInFlight+config.MaxMutatingRequestsInFlight,
            config.RequestTimeout/4,
        )
    }
    return nil
}

可以看到,在生成服务器配置的阶段,对主kube-apiserver有强依赖,这些依赖导致了sample-apiserver无法脱离K8S独立运行。

RecommendedConfig还需要通过Conplete方法,变成CompletedConfig:

Go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// server, err := config.Complete().New()
 
func (cfg *Config) Complete() CompletedConfig {
    c := completedConfig{
        cfg.GenericConfig.Complete(),
        &cfg.ExtraConfig,
    }
 
    c.GenericConfig.Version = &version.Info{
        Major: "1",
        Minor: "0",
    }
 
    return CompletedConfig{&c}
}
 
// Complete补全缺失的、必须的配置信息,这些信息能够从已有配置导出
func (c *RecommendedConfig) Complete() CompletedConfig {
    return c.Config.Complete(c.SharedInformerFactory)
}
实例化APIServer

WardleServer,是从CompletedConfig实例化的:

Go
1
2
3
4
5
6
7
8
9
10
11
12
func (c completedConfig) New() (*WardleServer, error) {
    // 创建GenericAPIServer
    //                                         名字用于在记录日志时进行区分
    //                                                           DelegationTarget用于进行APIServer的组合(composition)
    genericServer, err := c.GenericConfig.New("sample-apiserver", genericapiserver.NewEmptyDelegate())
    if err != nil {
        return nil, err
    }
 
    s := &WardleServer{
        GenericAPIServer: genericServer,
    }

上面的 New()创建了核心的GenericAPIServer:

Go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*GenericAPIServer, error) {
    // 断言
    if c.Serializer == nil {
        return nil, fmt.Errorf("Genericapiserver.New() called with config.Serializer == nil")
    }
    if c.LoopbackClientConfig == nil {
        return nil, fmt.Errorf("Genericapiserver.New() called with config.LoopbackClientConfig == nil")
    }
    if c.EquivalentResourceRegistry == nil {
        return nil, fmt.Errorf("Genericapiserver.New() called with config.EquivalentResourceRegistry == nil")
    }
 
    handlerChainBuilder := func(handler http.Handler) http.Handler {
        return c.BuildHandlerChainFunc(handler, c.Config)
    }
    // 构建请求处理器
    apiServerHandler := NewAPIServerHandler(name, c.Serializer, handlerChainBuilder, delegationTarget.UnprotectedHandler())
 
    // 创建GenericAPIServer,很多字段直接来自completedConfig
    s := &GenericAPIServer{
        discoveryAddresses:         c.DiscoveryAddresses,
        LoopbackClientConfig:       c.LoopbackClientConfig,
        legacyAPIGroupPrefixes:     c.LegacyAPIGroupPrefixes,
        admissionControl:           c.AdmissionControl,
        Serializer:                 c.Serializer,
        AuditBackend:               c.AuditBackend,
        Authorizer:                 c.Authorization.Authorizer,
        delegationTarget:           delegationTarget,
        EquivalentResourceRegistry: c.EquivalentResourceRegistry,
        HandlerChainWaitGroup:      c.HandlerChainWaitGroup,
 
        minRequestTimeout:     time.Duration(c.MinRequestTimeout) * time.Second,
        ShutdownTimeout:       c.RequestTimeout,
        ShutdownDelayDuration: c.ShutdownDelayDuration,
        SecureServingInfo:     c.SecureServing,
        ExternalAddress:       c.ExternalAddress,
 
        Handler: apiServerHandler,
 
        listedPathProvider: apiServerHandler,
 
        openAPIConfig:           c.OpenAPIConfig,
        skipOpenAPIInstallation: c.SkipOpenAPIInstallation,
 
        postStartHooks:         map[string]postStartHookEntry{},
        preShutdownHooks:       map[string]preShutdownHookEntry{},
        disabledPostStartHooks: c.DisabledPostStartHooks,
 
        healthzChecks:    c.HealthzChecks,
        livezChecks:      c.LivezChecks,
        readyzChecks:     c.ReadyzChecks,
        readinessStopCh:  make(chan struct{}),
        livezGracePeriod: c.LivezGracePeriod,
 
        DiscoveryGroupManager: discovery.NewRootAPIsHandler(c.DiscoveryAddresses, c.Serializer),
 
        maxRequestBodyBytes: c.MaxRequestBodyBytes,
        livezClock:          clock.RealClock{},
    }
 
    // ...
 
    // 添加delegationTarget的生命周期钩子
    for k, v := range delegationTarget.PostStartHooks() {
        s.postStartHooks[k] = v
    }
    for k, v := range delegationTarget.PreShutdownHooks() {
        s.preShutdownHooks[k] = v
    }
 
    // 添加预配置的钩子
    for name, preconfiguredPostStartHook := range c.PostStartHooks {
        if err := s.AddPostStartHook(name, preconfiguredPostStartHook.hook); err != nil {
            return nil, err
        }
    }
 
    // 如果配置包含了SharedInformerFactory,而启动该SharedInformerFactory的钩子没有注册
    // 则注册一个PostStart钩子来启动它
    genericApiServerHookName := "generic-apiserver-start-informers"
    if c.SharedInformerFactory != nil {
        if !s.isPostStartHookRegistered(genericApiServerHookName) {
            err := s.AddPostStartHook(genericApiServerHookName, func(context PostStartHookContext) error {
                c.SharedInformerFactory.Start(context.StopCh)
                return nil
            })
            if err != nil {
                return nil, err
            }
            // TODO: Once we get rid of /healthz consider changing this to post-start-hook.
            err = s.addReadyzChecks(healthz.NewInformerSyncHealthz(c.SharedInformerFactory))
            if err != nil {
                return nil, err
            }
        }
    }
 
    const priorityAndFairnessConfigConsumerHookName = "priority-and-fairness-config-consumer"
    if s.isPostStartHookRegistered(priorityAndFairnessConfigConsumerHookName) {
    } else if c.FlowControl != nil {
        err := s.AddPostStartHook(priorityAndFairnessConfigConsumerHookName, func(context PostStartHookContext) error {
            go c.FlowControl.Run(context.StopCh)
            return nil
        })
        if err != nil {
            return nil, err
        }
        // TODO(yue9944882): plumb pre-shutdown-hook for request-management system?
    } else {
        klog.V(3).Infof("Not requested to run hook %s", priorityAndFairnessConfigConsumerHookName)
    }
 
    // 添加delegationTarget的健康检查
    for _, delegateCheck := range delegationTarget.HealthzChecks() {
        skip := false
        for _, existingCheck := range c.HealthzChecks {
            if existingCheck.Name() == delegateCheck.Name() {
                skip = true
                break
            }
        }
        if skip {
            continue
        }
        s.AddHealthChecks(delegateCheck)
    }
 
    s.listedPathProvider = routes.ListedPathProviders{s.listedPathProvider, delegationTarget}
 
    // 安装Profiling、Metrics、URL / 下显示的路径列表(listedPathProvider)
    installAPI(s, c.Config)
 
    // use the UnprotectedHandler from the delegation target to ensure that we don't attempt to double authenticator, authorize,
    // or some other part of the filter chain in delegation cases.
    if delegationTarget.UnprotectedHandler() == nil && c.EnableIndex {
        s.Handler.NonGoRestfulMux.NotFoundHandler(routes.IndexLister{
            StatusCode:   http.StatusNotFound,
            PathProvider: s.listedPathProvider,
        })
    }
 
    return s, nil
}

GenericAPIServer.Handler就是HTTP请求的处理器,我们在下文的请求处理过程一节分析。

安装APIGroup

实例化GenericAPIServer之后,是安装APIGroup:

Go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
    // 创建APIGroupInfo,关于一组API的各种信息,包括已经注册的API(Scheme),如何进行编解码(Codec)
    // 如何解析查询参数(ParameterCodec)
    apiGroupInfo := genericapiserver.NewDefaultAPIGroupInfo(wardle.GroupName, Scheme, metav1.ParameterCodec, Codecs)
 
    // 从资源到rest.Storage的映射
    v1alpha1storage := map[stcongring]rest.Storage{}
    v1alpha1storage["flunders"] = wardleregistry.RESTInPeace(flunderstorage.NewREST(Scheme, c.GenericConfig.RESTOptionsGetter))
    v1alpha1storage["fischers"] = wardleregistry.RESTInPeace(fischerstorage.NewREST(Scheme, c.GenericConfig.RESTOptionsGetter))
    // 两个版本分别对应一个映射
    apiGroupInfo.VersionedResourcesStorageMap["v1alpha1"] = v1alpha1storage
 
    v1beta1storage := map[string]rest.Storage{}
    v1beta1storage["flunders"] = wardleregistry.RESTInPeace(flunderstorage.NewREST(Scheme, c.GenericConfig.RESTOptionsGetter))
    apiGroupInfo.VersionedResourcesStorageMap["v1beta1"] = v1beta1storage
    if err := s.GenericAPIServer.InstallAPIGroup(&apiGroupInfo); err != nil {
        return nil, err
    }
 
    return s, nil
}

上面的flunderstorage.NewREST等方法返回的registry.REST,内嵌的genericregistry.Store不仅仅实现了rest.Storage:

Go
1
2
3
4
5
type Storage interface {
    // 当请求的数据存放到该方法创建的对象之后,可以调用Create/Update进行持久化
    // 必须返回一个适用于 Codec.DecodeInto([]byte, runtime.Object) 的指针类型
    New() runtime.Object
}

还实现了rest.StandardStorage:

Go
1
2
3
4
5
6
7
8
type StandardStorage interface {
    Getter
    Lister
    CreaterUpdater
    GracefulDeleter
    CollectionDeleter
    Watcher
}

实现了这些接口,意味着registry.REST能够支持API对象的增删改查和Watch。更多细节我们在下面的请求处理过程一节中探讨。

启动APIServer

如启动服务器一节中的代码所示, 在将选项转换为配置、完成配置,并从配置实例化APIServer之后,会执行由两个步骤组成的启动逻辑。

首先是PrepareRun,这里执行一些需要在API安装(在实例化时)之后进行的操作:

Go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
func (s *GenericAPIServer) PrepareRun() preparedGenericAPIServer {
    s.delegationTarget.PrepareRun()
    // 安装OpenAPI的Handler
    if s.openAPIConfig != nil && !s.skipOpenAPIInstallation {
        s.OpenAPIVersionedService, s.StaticOpenAPISpec = routes.OpenAPI{
            Config: s.openAPIConfig,
        }.Install(s.Handler.GoRestfulContainer, s.Handler.NonGoRestfulMux)
    }
    // 安装健康检查的Handler
    s.installHealthz()
    s.installLivez()
    err := s.addReadyzShutdownCheck(s.readinessStopCh)
    if err != nil {
        klog.Errorf("Failed to install readyz shutdown check %s", err)
    }
    s.installReadyz()
 
    // 为审计后端注册关闭前钩子
    if s.AuditBackend != nil {
        err := s.AddPreShutdownHook("audit-backend", func() error {
            s.AuditBackend.Shutdown()
            return nil
        })
        if err != nil {
            klog.Errorf("Failed to add pre-shutdown hook for audit-backend %s", err)
        }
    }
 
    return preparedGenericAPIServer{s}
}

然后是Run,启动APIServer:

Go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error {
    delayedStopCh := make(chan struct{})
    go func() {
        defer close(delayedStopCh)
        // 收到关闭信号
        <-stopCh
        // 一旦关闭流程被触发,/readyz就需要立刻返回错误
        close(s.readinessStopCh)
        // 关闭服务器前休眠ShutdownDelayDuration,这让LB有个时间窗口来检测/readyz状态,不再发送请求给此服务器
        time.Sleep(s.ShutdownDelayDuration)
    }()
 
    // 运行服务器
    err := s.NonBlockingRun(delayedStopCh)
    if err != nil {
        return err
    }
 
    // 收到关闭信号
    <-stopCh
 
    // 运行关闭前钩子
    err = s.RunPreShutdownHooks()
    if err != nil {
        return err
    }
 
    // 等待延迟关闭信号
    <-delayedStopCh
 
    // 等待现有请求完毕,然后关闭
    s.HandlerChainWaitGroup.Wait()
 
    return nil
}

 NonBlockingRun是启动APIServer的核心代码。它会启动一个HTTPS服务器:

Go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
func (s preparedGenericAPIServer) NonBlockingRun(stopCh <-chan struct{}) error {
    // 这个通道用于保证HTTP Server优雅关闭,不会导致丢失审计事件
    auditStopCh := make(chan struct{})
 
    // 首先启动审计后端,这时任何请求都进不来
    if s.AuditBackend != nil {
        if err := s.AuditBackend.Run(auditStopCh); err != nil {
            return fmt.Errorf("failed to run the audit backend: %v", err)
        }
    }
 
    // 下面的通道用于出错时清理listener
    internalStopCh := make(chan struct{})
    var stoppedCh <-chan struct{}
    if s.SecureServingInfo != nil && s.Handler != nil {
        var err error
        // 启动HTTPS服务器,仅当证书错误或内部listen调用出错时会失败
        // server loop在一个Goroutine中运行
        // 可以看到,这里的s.Handler是可以被我们访问到的,因此建立HTTP(非HTTPS)服务器应该很方便
        stoppedCh, err = s.SecureServingInfo.Serve(s.Handler, s.ShutdownTimeout, internalStopCh)
        if err != nil {
            close(internalStopCh)
            close(auditStopCh)
            return err
        }
    }
 
    // 清理
    go func() {
        <-stopCh
        close(internalStopCh)
        if stoppedCh != nil {
            <-stoppedCh
        }
        s.HandlerChainWaitGroup.Wait()
        close(auditStopCh)
    }()
 
    // 启动后钩子
    s.RunPostStartHooks(stopCh)
 
    if _, err := systemd.SdNotify(true, "READY=1\n"); err != nil {
        klog.Errorf("Unable to send systemd daemon successful start message: %v\n", err)
    }
 
    return nil
}
结构注册到Scheme

apis/wardle包,以及它的子包,定义了wardle.example.com组的API。

wardle包的register.go中定义了组,以及从GV获得GVK、GVR的函数:

Go
1
2
3
4
5
6
7
8
9
10
11
const GroupName = "wardle.example.com"
//                                                             没有正式(formal)类型则使用此常量
var SchemeGroupVersion = schema.GroupVersion{Group: GroupName, Version: runtime.APIVersionInternal}
 
func Kind(kind string) schema.GroupKind {
    return SchemeGroupVersion.WithKind(kind).GroupKind()
}
 
func Resource(resource string) schema.GroupResource {
    return SchemeGroupVersion.WithResource(resource).GroupResource()
}

API包通常都要提供AddToScheme变量,用于将API注册到指定的scheme:

Go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
var (
    SchemeBuilder = runtime.NewSchemeBuilder(addKnownTypes)
    AddToScheme   = SchemeBuilder.AddToScheme
)
 
func addKnownTypes(scheme *runtime.Scheme) error {
    // 注册API的核心,添加GV中的若干API类型
    scheme.AddKnownTypes(SchemeGroupVersion,
        &Flunder{}, // 必须提供指针类型,Go反射得到类型在编码时作为Kind
        &FlunderList{},
        &Fischer{},
        &FischerList{},
    )
    return nil
}
 
// 所谓SchemeBuilder其实就是切片
type SchemeBuilder []func(*Scheme) error
// NewSchemeBuilder方法支持多个回调函数,对这些函数调用SchemeBuilder.Register
func NewSchemeBuilder(funcs ...func(*Scheme) error) SchemeBuilder {
    var sb SchemeBuilder
    sb.Register(funcs...)
    return sb
}
// 所谓Register就是添加到切片中
func (sb *SchemeBuilder) Register(funcs ...func(*Scheme) error) {
    for _, f := range funcs {
        *sb = append(*sb, f)
    }
}
 
// 所谓AddToScheme就是遍历回调,Go方法可以作为变量传递
func (sb *SchemeBuilder) AddToScheme(s *Scheme) error {
    for _, f := range *sb {
        if err := f(s); err != nil {
            return err
        }
    }
    return nil
}

wardle包的types.go则定义了API类型对应的Go结构体。

子包v1alpha1、v1beta1定义了API的两个版本。它们包含和wardle包类似的GroupName、SchemeGroupVersion、SchemeBuilder、AddToScheme…等变量/函数,以及对应的API类型结构体。还包括自动生成的、与APIVersionInternal版本API进行转换的函数。

回到wardle包,Install方法支持将APIVersionInternal、v1alpha1、v1beta1这些版本都注册到scheme:

Go
1
2
3
4
5
6
func Install(scheme *runtime.Scheme) {
    utilruntime.Must(wardle.AddToScheme(scheme))
    utilruntime.Must(v1beta1.AddToScheme(scheme))
    utilruntime.Must(v1alpha1.AddToScheme(scheme))
    utilruntime.Must(scheme.SetVersionPriority(v1beta1.SchemeGroupVersion, v1alpha1.SchemeGroupVersion))
}

而在apiserver包中,在init时会调用上面的Install函数:

Go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
var (
    // API注册表
    Scheme = runtime.NewScheme()
    // 编解码器工厂
    Codecs = serializer.NewCodecFactory(Scheme)
)
 
func init() {
    // 安装sample-apiserver提供的API
    install.Install(Scheme)
 
    // 将k8s.io/apimachinery/pkg/apis/meta/v1"注册到v1组,为什么?
    // we need to add the options to empty v1
    // TODO fix the server code to avoid this
    metav1.AddToGroupVersion(Scheme, schema.GroupVersion{Version: "v1"})
 
    // TODO: keep the generic API server from wanting this
    unversioned := schema.GroupVersion{Group: "", Version: "v1"}
    Scheme.AddUnversionedTypes(unversioned,
        &metav1.Status{},
        &metav1.APIVersions{},
        &metav1.APIGroupList{},
        &metav1.APIGroup{},
        &metav1.APIResourceList{},
    )
}

Codecs是API资源编解码器的工厂,RecommendedOptions需要此工厂:

Go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
        // ...
        RecommendedOptions: genericoptions.NewRecommendedOptions(
            defaultEtcdPathPrefix,
            // 得到v1alpha1版本的LegacyCodec,遗留编解码器(runtime.Codec)
            // LegacyCodec编码到指定的API版本,解码(从任何支持的源)到内部形式
            // 此编码器总是编码为JSON
            //
            // LegacyCodec方法已经废弃,客户端/服务器应该根据MIME类型协商serializer
            // 并调用CodecForVersions
            apiserver.Codecs.LegacyCodec(v1alpha1.SchemeGroupVersion),
            genericoptions.NewProcessInfo("wardle-apiserver", "wardle"),
        ),
 
func NewRecommendedOptions(prefix string, codec runtime.Codec, processInfo *ProcessInfo) *RecommendedOptions {
    return &RecommendedOptions{
        //  不过这里的runtime.Codec用于将AP对象转换为JSON并存放到Etcd,而不是发给客户端
        Etcd:           NewEtcdOptions(storagebackend.NewDefaultConfig(prefix, codec)),
    // ...

APIGroupInfo也需要此工厂:

Go
1
2
3
4
5
6
7
8
9
10
apiGroupInfo := genericapiserver.NewDefaultAPIGroupInfo(wardle.GroupName, Scheme, metav1.ParameterCodec, Codecs)
 
func NewDefaultAPIGroupInfo(group string, scheme *runtime.Scheme, parameterCodec runtime.ParameterCodec, codecs serializer.CodecFactory) APIGroupInfo {
    return APIGroupInfo{
        // ...
        Scheme:                 scheme,
        ParameterCodec:         parameterCodec,
        NegotiatedSerializer:   codecs,
    }
}

在APIServer实例化期间,Scheme会传递给APIGroupInfo,从而实现资源 - Go类型之间的映射。

Admission控制器

sample-server也示例了如何集成自己的Admission控制器到API Server中。

在选项中,需要注册一个Admission初始化器

Go
1
2
3
4
5
6
7
8
9
10
    o.RecommendedOptions.ExtraAdmissionInitializers = func(c *genericapiserver.RecommendedConfig) ([]admission.PluginInitializer, error) {
        client, err := clientset.NewForConfig(c.LoopbackClientConfig)
        if err != nil {
            return nil, err
        }
        informerFactory := informers.NewSharedInformerFactory(client, c.LoopbackClientConfig.Timeout)
        o.SharedInformerFactory = informerFactory
        //                                   初始化函数
        return []admission.PluginInitializer{wardleinitializer.New(informerFactory)}, nil
    }

初始化器是一个函数,会在选项转为配置的时候执行:

Go
1
2
3
4
5
6
7
    // 调用上面的函数
    if initializers, err := o.ExtraAdmissionInitializers(config); err != nil {
        return err
    // 初始化Admission控制器
    } else if err := o.Admission.ApplyTo(&config.Config, config.SharedInformerFactory, config.ClientConfig, o.FeatureGate, initializers...); err != nil {
        return err
    }

上面的ApplyTo方法,会组建一个Admission控制器的初始化函数链,并逐个调用,以初始化所有Admission控制器。

此外,在PostStart钩子中,会启动Admission控制器所依赖的SharedInformerFactory:

Go
1
2
3
4
5
6
7
    server.GenericAPIServer.AddPostStartHookOrDie("start-sample-server-informers", func(context genericapiserver.PostStartHookContext) error {
        // 主kube-apiserver的InformerFactory,貌似没什么用
        config.GenericConfig.SharedInformerFactory.Start(context.StopCh)
        // 次级kube-apiserver的InformerFactory,Admission控制器需要使用
        o.SharedInformerFactory.Start(context.StopCh)
        return nil
    })

我们看一下sample-apiserver的Admission相关代码。

位于pkg/admission/wardleinitializer包中的是Admission初始化器,它能够为任何WantsInternalWardleInformerFactory类型的Admission控制器注入InformerFactory:

Go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
type pluginInitializer struct {
    informers informers.SharedInformerFactory
}
 
var _ admission.PluginInitializer = pluginInitializer{}
 
// 该函数在ExtraAdmissionInitializers函数中调用
func New(informers informers.SharedInformerFactory) pluginInitializer {
    return pluginInitializer{
        informers: informers,
    }
}
 
// 该函数在o.Admission.ApplyTo中调用
func (i pluginInitializer) Initialize(plugin admission.Interface) {
    if wants, ok := plugin.(WantsInternalWardleInformerFactory); ok {
        wants.SetInternalWardleInformerFactory(i.informers)
    }
}

位于pkg/admission/plugin/banflunder包中的是为了的Admission控制器BanFlunder。函数:

Go
1
2
3
4
5
func Register(plugins *admission.Plugins) {
    plugins.Register("BanFlunder", func(config io.Reader) (admission.Interface, error) {
        return New()
    })
}

会在程序运行的很早期调用,以注册Admission控制器到API Server:

Go
1
2
3
4
5
6
7
8
9
func (o *WardleServerOptions) Complete() error {
    // 注册插件
    banflunder.Register(o.RecommendedOptions.Admission.Plugins)
 
    // 配置顺序
    o.RecommendedOptions.Admission.RecommendedPluginOrder = append(o.RecommendedOptions.Admission.RecommendedPluginOrder, "BanFlunder")
 
    return nil
}

Admission控制器的核心是Admit函数,它可以修改或否决一个API Server请求:

Go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
func (d *DisallowFlunder) Admit(ctx context.Context, a admission.Attributes, o admission.ObjectInterfaces) error {
    // 仅仅对特定类型的API感兴趣
    if a.GetKind().GroupKind() != wardle.Kind("Flunder") {
        return nil
    }
 
    if !d.WaitForReady() {
        return admission.NewForbidden(a, fmt.Errorf("not yet ready to handle request"))
    }
 
    // 用于获取元数据
    metaAccessor, err := meta.Accessor(a.GetObject())
    if err != nil {
        return err
    }
    flunderName := metaAccessor.GetName()
 
    fischers, err := d.lister.List(labels.Everything())
    if err != nil {
        return err
    }
 
    for _, fischer := range fischers {
        for _, disallowedFlunder := range fischer.DisallowedFlunders {
            if flunderName == disallowedFlunder {
                return errors.NewForbidden(
                    a.GetResource().GroupResource(),
                    a.GetName(),
                    // 拒绝请求
                    fmt.Errorf("this name may not be used, please change the resource name"),
                )
            }
        }
    }
    return nil
}
请求处理过程

通过上文的分析,我们知道资源的增删改查是由registry.REST(空壳,实际是genericregistry.Store)负责的,那么HTTP请求是如何传递给它的呢?

回顾一下向APIGroupInfo中添加资源的代码:

Go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
//              资源类型          
v1alpha1storage["flunders"] = wardleregistry.RESTInPeace(
    // 创建registry.REST
    flunderstorage.NewREST(Scheme, c.GenericConfig.RESTOptionsGetter)
)
 
 
func NewREST(scheme *runtime.Scheme, optsGetter generic.RESTOptionsGetter) (*registry.REST, error) {
    // 策略,参考下文
    strategy := NewStrategy(scheme)
 
    store := &genericregistry.Store{
        // 实例化资源的函数
        NewFunc:                  func() runtime.Object {
            // 每次增删改查,都牵涉到结构的创建,因此在此打断点可以拦截所有请求
            return &wardle.Flunder{}
        },
        // 实例化资源列表的函数
        NewListFunc:              func() runtime.Object { return &wardle.FlunderList{} },
        // 判断对象是否可以被该存储处理
        PredicateFunc: func(label labels.Selector, field fields.Selector) storage.SelectionPredicate {
            return storage.SelectionPredicate{
                Label: label,
                Field: field,
                GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) {
                    apiserver, ok := obj.(*wardle.Flunder)
                    if !ok {
                        return nil, nil, fmt.Errorf("given object is not a Flunder")
                    }
                    return apiserver.ObjectMeta.Labels, SelectableFields(apiserver), nil
                },
            }
        },
        // 资源的复数名称,当上下文中缺少必要的请求信息时使用
        DefaultQualifiedResource: wardle.Resource("flunders"),
        // 增删改的策略
        CreateStrategy: strategy,
        UpdateStrategy: strategy,
        DeleteStrategy: strategy,
    }
    options := &generic.StoreOptions{RESTOptions: optsGetter, AttrFunc: GetAttrs}
    // 填充默认字段
    if err := store.CompleteWithOptions(options); err != nil {
        return nil, err
    }
    return &registry.REST{store}, nil
}

为了创建genericregistry.Store,需要两个信息:

  1. Scheme,它提供的信息是Go类型和GVK之间的映射关系。其中Kind是根据Go结构的类型名反射得到的
  2. generic.RESTOptionsGetter

RESTOptionsGetter用于获得RESTOptions:

Go
1
2
3
type RESTOptionsGetter interface {
    GetRESTOptions(resource schema.GroupResource) (RESTOptions, error)
}

RESTOptions包含了关于存储的信息,尽管这个职责和名字好像没什么关系:

Go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
type RESTOptions struct {
    // 创建一个存储后端所需的配置信息
    StorageConfig *storagebackend.Config
    // 这是一个函数,能够提供storage.Interface和factory.DestroyFunc
    Decorator     StorageDecorator
 
    EnableGarbageCollection bool
    DeleteCollectionWorkers int
    ResourcePrefix          string
    CountMetricPollPeriod   time.Duration
}
 
//
type Config struct {
    // 存储后端的类型,默认etcd3
    Type string
    // 传递给storage.Interface的所有方法的前缀,对应etcd存储前缀
    Prefix string
    // 连接到Etcd服务器的相关信息
    Transport TransportConfig
    // 提示APIServer是否应该支持分页
    Paging bool
    // 负责(反)串行化
    Codec runtime.Codec
    // 在持久化到Etcd之前,该对象输出目标将被转换为的GVK
    Transformer value.Transformer
 
    CompactionInterval time.Duration
    CountMetricPollPeriod time.Duration
    LeaseManagerConfig etcd3.LeaseManagerConfig
}
 
 
// 利用入参函数,构造storage.Interface
type StorageDecorator func(
    config *storagebackend.Config,
    resourcePrefix string,
    keyFunc func(obj runtime.Object) (string, error),
    newFunc func() runtime.Object,
    newListFunc func() runtime.Object,
    getAttrsFunc storage.AttrFunc,
    trigger storage.IndexerFuncs,
    indexers *cache.Indexers) (storage.Interface, factory.DestroyFunc, error)
// CRUD
type Interface interface {
    Versioner() Versioner
    Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error
    Delete(ctx context.Context, key string, out runtime.Object, preconditions *Preconditions, validateDeletion ValidateObjectFunc) error
    Watch(ctx context.Context, key string, resourceVersion string, p SelectionPredicate) (watch.Interface, error)
    WatchList(ctx context.Context, key string, resourceVersion string, p SelectionPredicate) (watch.Interface, error)
    Get(ctx context.Context, key string, resourceVersion string, objPtr runtime.Object, ignoreNotFound bool) error
    GetToList(ctx context.Context, key string, resourceVersion string, p SelectionPredicate, listObj runtime.Object) error
    List(ctx context.Context, key string, resourceVersion string, p SelectionPredicate, listObj runtime.Object) error
    GuaranteedUpdate(
        ctx context.Context, key string, ptrToType runtime.Object, ignoreNotFound bool,
        precondtions *Preconditions, tryUpdate UpdateFunc, suggestion ...runtime.Object) error
    Count(key string) (int64, error)
}
// 这个函数用于一次性销毁任何Create()创建的、当前Storage使用的对象
type DestroyFunc func()

在这里我们可以注意到storagebackend.Config和Etcd是有耦合的,因此想支持其它存储后端,需要在更早的节点介入。

genericregistry.Store还包含了三个Strategy字段:

Go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
type Store struct {
    // ...
    CreateStrategy rest.RESTCreateStrategy
    UpdateStrategy rest.RESTUpdateStrategy
    DeleteStrategy rest.RESTDeleteStrategy
    // ...
}
 
type RESTCreateStrategy interface {
    runtime.ObjectTyper
    // 用于生成名称
    names.NameGenerator
    // 对象是否必须在命名空间中
    NamespaceScoped() bool
    // 在Validate、Canonicalize之前调用,进行对象的normalize
    // 例如删除不需要持久化的字段、对顺序不敏感的列表字段进行重新排序
    // 不得移除这样的字段:它不能通过校验
    PrepareForCreate(ctx context.Context, obj runtime.Object)
    // 校验,在对象的默认字段被填充后调用
    Validate(ctx context.Context, obj runtime.Object) field.ErrorList
    // 在校验之后,持久化之前调用。正规化对象,通常实现为类型检查,或空函数
    Canonicalize(obj runtime.Object)
}
 
type RESTUpdateStrategy interface {
    runtime.ObjectTyper
    NamespaceScoped() bool
    // 对象是否可以被PUT请求创建
    AllowCreateOnUpdate() bool
    // 准备更新
    PrepareForUpdate(ctx context.Context, obj, old runtime.Object)
    // 校验
    ValidateUpdate(ctx context.Context, obj, old runtime.Object) field.ErrorList
    // 正规化
    Canonicalize(obj runtime.Object)
    // 当对象上没有指定版本时,是否允许无条件的更新,也就是不管最新的资源版本(禁用乐观并发控制)
    AllowUnconditionalUpdate() bool
}
 
type RESTDeleteStrategy interface {
    runtime.ObjectTyper
}
 
type ObjectTyper interface {
    // 得到对象可能的GVK信息
    ObjectKinds(Object) ([]schema.GroupVersionKind, bool, error)
    // Scheme是否支持指定的GVK
    Recognizes(gvk schema.GroupVersionKind) bool
}

可以看到,策略能够影响增删改的行为,它能够生成对象名称,校验对象合法性,甚至修改对象。 我们看一下sample-apiserver提供的策略实现:

Go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
func NewStrategy(typer runtime.ObjectTyper) flunderStrategy {
    // 简单命名策略:返回请求的basename外加5位字母数字的随即后缀
    return flunderStrategy{typer, names.SimpleNameGenerator}
}
 
type flunderStrategy struct {
    runtime.ObjectTyper
    names.NameGenerator
}
 
func (flunderStrategy) NamespaceScoped() bool {
    return true
}
 
func (flunderStrategy) PrepareForCreate(ctx context.Context, obj runtime.Object) {
}
 
func (flunderStrategy) PrepareForUpdate(ctx context.Context, obj, old runtime.Object) {
}
 
func (flunderStrategy) Validate(ctx context.Context, obj runtime.Object) field.ErrorList {
    flunder := obj.(*wardle.Flunder)
    return validation.ValidateFlunder(flunder)
}
 
func (flunderStrategy) AllowCreateOnUpdate() bool {
    return false
}
 
func (flunderStrategy) AllowUnconditionalUpdate() bool {
    return false
}
 
func (flunderStrategy) Canonicalize(obj runtime.Object) {
}
 
func (flunderStrategy) ValidateUpdate(ctx context.Context, obj, old runtime.Object) field.ErrorList {
    return field.ErrorList{}
}
 
 
 
package validation
 
// ValidateFlunder validates a Flunder.
func ValidateFlunder(f *wardle.Flunder) field.ErrorList {
    allErrs := field.ErrorList{}
 
    allErrs = append(allErrs, ValidateFlunderSpec(&f.Spec, field.NewPath("spec"))...)
 
    return allErrs
}
 
// ValidateFlunderSpec validates a FlunderSpec.
func ValidateFlunderSpec(s *wardle.FlunderSpec, fldPath *field.Path) field.ErrorList {
    allErrs := field.ErrorList{}
 
    if len(s.FlunderReference) != 0 && len(s.FischerReference) != 0 {
        allErrs = append(allErrs, field.Invalid(fldPath.Child("fischerReference"), s.FischerReference, "cannot be set with flunderReference at the same time"))
    } else if len(s.FlunderReference) != 0 && s.ReferenceType != wardle.FlunderReferenceType {
        allErrs = append(allErrs, field.Invalid(fldPath.Child("flunderReference"), s.FlunderReference, "cannot be set if referenceType is not Flunder"))
    } else if len(s.FischerReference) != 0 && s.ReferenceType != wardle.FischerReferenceType {
        allErrs = append(allErrs, field.Invalid(fldPath.Child("fischerReference"), s.FischerReference, "cannot be set if referenceType is not Fischer"))
    } else if len(s.FischerReference) == 0 && s.ReferenceType == wardle.FischerReferenceType {
        allErrs = append(allErrs, field.Invalid(fldPath.Child("fischerReference"), s.FischerReference, "cannot be empty if referenceType is Fischer"))
    } else if len(s.FlunderReference) == 0 && s.ReferenceType == wardle.FlunderReferenceType {
        allErrs = append(allErrs, field.Invalid(fldPath.Child("flunderReference"), s.FlunderReference, "cannot be empty if referenceType is Flunder"))
    }
 
    if len(s.ReferenceType) != 0 && s.ReferenceType != wardle.FischerReferenceType && s.ReferenceType != wardle.FlunderReferenceType {
        allErrs = append(allErrs, field.Invalid(fldPath.Child("referenceType"), s.ReferenceType, "must be Flunder or Fischer"))
    }
 
    return allErrs
}

分析到这里,我们可以猜测到Create请求被genericregistry.Store处理的过程:

  1. 读取请求体,调用NewFunc反串行化为runtime.Obejct
  2. 调用PredicateFunc判断是否能够处理该对象
  3. 调用CreateStrategy,校验、正规化对象
  4. 调用RESTOptions,存储到Etcd

那么,请求是如何传递过来的,上述处理的细节又是怎样的?上文中我们已经定位到关键代码路径,通过断点很容易跟踪到完整处理流程。

在上文分析的GenericAPIServer实例过程中,它的Handler字段是这样创建的:

Go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
apiServerHandler := NewAPIServerHandler(name, c.Serializer, handlerChainBuilder, delegationTarget.UnprotectedHandler())
 
// 处理器链的构建器,注意出入参类型一样。这是因为处理器链是一层层包裹的,而不是链表那样的结构
type HandlerChainBuilderFn func(apiHandler http.Handler) http.Handler
 
func NewAPIServerHandler(name string, s runtime.NegotiatedSerializer,
        handlerChainBuilder HandlerChainBuilderFn, notFoundHandler http.Handler) *APIServerHandler {
 
    // 配置go-restful,go-restful是一个构建REST风格WebService的框架
    nonGoRestfulMux := mux.NewPathRecorderMux(name)
    if notFoundHandler != nil {
        nonGoRestfulMux.NotFoundHandler(notFoundHandler)
    }
 
    // 容器,是一组WebService的集合
    gorestfulContainer := restful.NewContainer()
    // 容器包含一个用户HTTP请求多路复用的ServeMux
    gorestfulContainer.ServeMux = http.NewServeMux()
    // 路由器
    gorestfulContainer.Router(restful.CurlyRouter{}) // e.g. for proxy/{kind}/{name}/{*}
    // panic处理器
    gorestfulContainer.RecoverHandler(func(panicReason interface{}, httpWriter http.ResponseWriter) {
        logStackOnRecover(s, panicReason, httpWriter)
    })
    // 错误处理器
    gorestfulContainer.ServiceErrorHandler(func(serviceErr restful.ServiceError, request *restful.Request, response *restful.Response) {
        serviceErrorHandler(s, serviceErr, request, response)
    })
 
    director := director{
        name:               name,
        goRestfulContainer: gorestfulContainer,
        nonGoRestfulMux:    nonGoRestfulMux,
    }
 
    return &APIServerHandler{
        // 构建处理器链,                 注意传入的director
        FullHandlerChain:   handlerChainBuilder(director),
        GoRestfulContainer: gorestfulContainer,
        NonGoRestfulMux:    nonGoRestfulMux,
        Director:           director,
    }
}
 
 
type APIServerHandler struct {
    // 处理器链,接口是http包中标准的:
    // type Handler interface {
    //     ServeHTTP(ResponseWriter, *Request)
    // }
    // 它组织一系列的过滤器,并在请求通过过滤器链后,调用Director
    FullHandlerChain http.Handler
    // 所有注册的API由此容器处理
    // InstallAPIs使用该字段,其他server不应该直接访问
    GoRestfulContainer *restful.Container
    // 链中最后一个处理器。这个类型包装一个mux对象,并且记录下注册了哪些URL路径
    NonGoRestfulMux *mux.PathRecorderMux
 
    // Director用于处理fall through和proxy
    Director http.Handler
}

 这个Handler实现了http.Handler,简单的把请求委托给FullHandlerChain处理:

Go
1
2
3
func (a *APIServerHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
    a.FullHandlerChain.ServeHTTP(w, r)
}

上面这个函数,就是所有HTTP请求处理的入口点。

FullHandlerChain是handlerChainBuilder()调用得到的:

Go
1
2
3
4
5
var c completedConfig
handlerChainBuilder := func(handler http.Handler) http.Handler {
    return c.BuildHandlerChainFunc(handler, c.Config)
}
apiServerHandler := NewAPIServerHandler(name, c.Serializer, handlerChainBuilder, delegationTarget.UnprotectedHandler())

completedConfig.BuildHandlerChainFunc则来自于它内嵌的Config:

Go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
serverConfig := genericapiserver.NewRecommendedConfig(apiserver.Codecs)
func NewRecommendedConfig(codecs serializer.CodecFactory) *RecommendedConfig {
    return &RecommendedConfig{
        Config: *NewConfig(codecs),
    }
}
func NewConfig(codecs serializer.CodecFactory) *Config {
    defaultHealthChecks := []healthz.HealthChecker{healthz.PingHealthz, healthz.LogHealthz}
    return &Config{
        Serializer:                  codecs,
        BuildHandlerChainFunc:       DefaultBuildHandlerChain,
    // ...
}
 
func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler {
    // 最内层:传入的apiHandler,也就是上文中的director
    // 访问控制
    handler := genericapifilters.WithAuthorization(apiHandler, c.Authorization.Authorizer, c.Serializer)
    // 访问速率控制
    if c.FlowControl != nil {
        handler = genericfilters.WithPriorityAndFairness(handler, c.LongRunningFunc, c.FlowControl)
    } else {
        handler = genericfilters.WithMaxInFlightLimit(handler, c.MaxRequestsInFlight, c.MaxMutatingRequestsInFlight, c.LongRunningFunc)
    }
    // 身份扮演
    handler = genericapifilters.WithImpersonation(handler, c.Authorization.Authorizer, c.Serializer)
    // 审计
    handler = genericapifilters.WithAudit(handler, c.AuditBackend, c.AuditPolicyChecker, c.LongRunningFunc)
    // 处理身份认证失败
    failedHandler := genericapifilters.Unauthorized(c.Serializer, c.Authentication.SupportsBasicAuth)
    failedHandler = genericapifilters.WithFailedAuthenticationAudit(failedHandler, c.AuditBackend, c.AuditPolicyChecker)
    // 身份验证
    handler = genericapifilters.WithAuthentication(handler, c.Authentication.Authenticator, failedHandler, c.Authentication.APIAudiences)
    // 处理CORS请求
    handler = genericfilters.WithCORS(handler, c.CorsAllowedOriginList, nil, nil, nil, "true")
    // 超时处理
    handler = genericfilters.WithTimeoutForNonLongRunningRequests(handler, c.LongRunningFunc, c.RequestTimeout)
    // 所有长时间运行请求被添加到等待组,用于优雅关机
    handler = genericfilters.WithWaitGroup(handler, c.LongRunningFunc, c.HandlerChainWaitGroup)
    // 将RequestInfo附加到Context对象
    handler = genericapifilters.WithRequestInfo(handler, c.RequestInfoResolver)
    // HTTP Goway处理
    if c.SecureServing != nil && !c.SecureServing.DisableHTTP2 && c.GoawayChance > 0 {
        handler = genericfilters.WithProbabilisticGoaway(handler, c.GoawayChance)
    }
    // 设置Cache-Control头为"no-cache, private",因为所有server被authn/authz保护
    handler = genericapifilters.WithCacheControl(handler)
    // 崩溃恢复
    handler = genericfilters.WithPanicRecovery(handler)
    return handler
}

我们可以清楚的看到默认的处理器链包含的大量过滤器,以及处理器是一层层包裹而非链表结构。

最后,我们来从头跟踪一下请求处理过程,首先看看处理器链中的过滤器们:

Go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
func (a *APIServerHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
    a.FullHandlerChain.ServeHTTP(w, r)
}
// FullHandlerChain是这个类型
type HandlerFunc func(ResponseWriter, *Request)
// 巧妙的设计,实现了
// type Handler interface {
//    ServeHTTP(ResponseWriter, *Request)
// }
// 接口,但是这个实现,直接委托给类型对应的函数
func (f HandlerFunc) ServeHTTP(w ResponseWriter, r *Request) {
    f(w, r)
}
 
// 进入处理器链的最外层
func withPanicRecovery(handler http.Handler, crashHandler func(http.ResponseWriter, *http.Request, interface{})) http.Handler {
    handler = httplog.WithLogging(handler, httplog.DefaultStacktracePred)
    //                      处理函数
    return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
        defer runtime.HandleCrash(func(err interface{}) {
            crashHandler(w, req, err)
        })
 
        // 转发给内层处理器,内层处理器通过闭包捕获
        handler.ServeHTTP(w, req)
    })
}
 
// 省略若干中间的处理器...
 
// 这个处理器值得注意,它将请求信息存放到context中,内层处理器可以直接使用这些信息
// 信息包括:
type RequestInfo struct {
    // 是否针对资源/子资源的请求
    IsResourceRequest bool
    // URL的路径部分
    Path string
    // 小写的HTTP动词
    Verb string
    // API前缀
    APIPrefix  string
    // API组
    APIGroup   string
    // API版本
    APIVersion string
    // 命名空间
    Namespace  string
    // 资源类型名,通常是小写的复数形式,而不是Kind
    Resource string
    // 请求的子资源,子资源是scoped to父资源的另外一个资源,可以具有不同的Kind
    // 例如 /pods对应资源"pods",对应Kind为"Pod"
    //      /pods/foo/status对应资源"pods",对应子资源 "status",对应Kind为"Pod"
    // 然而 /pods/foo/binding对应资源"pods",对应子资源 "binding", 而对应Kind为"Binding"
    Subresource string
    // 对于某些资源,名字是空的。如果请求指示一个名字(不在请求体中)则填写在该字段
    // Parts are the path parts for the request, always starting with /{resource}/{name}
    Parts []string
}
func WithRequestInfo(handler http.Handler, resolver request.RequestInfoResolver) http.Handler {
    return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
        ctx := req.Context()
        info, err := resolver.NewRequestInfo(req)
        if err != nil {
            responsewriters.InternalError(w, req, fmt.Errorf("failed to create RequestInfo: %v", err))
            return
        }
 
        req = req.WithContext(request.WithRequestInfo(ctx, info))
 
        handler.ServeHTTP(w, req)
    })
}
 
// 省略若干中间的处理器...

遍历所有过滤器后,来到处理器链的最后一环,也就是director。从名字上可以看到,它在整体上负责请求的分发:

Go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
func (d director) ServeHTTP(w http.ResponseWriter, req *http.Request) {
    path := req.URL.Path
 
    // 遍历已经注册的所有WebService,看看有没有负责当前URL路径的
    for _, ws := range d.goRestfulContainer.RegisteredWebServices() {
        switch {
        case ws.RootPath() == "/apis":
            // 如果URL路径是 /apis或/apis/需要特殊处理。通常情况下,应该交由nonGoRestfulMux
            // 但是在启用descovery的情况下,需要直接由goRestfulContainer处理
            if path == "/apis" || path == "/apis/" {
                klog.V(5).Infof("%v: %v %q satisfied by gorestful with webservice %v", d.name, req.Method, path, ws.RootPath())
                d.goRestfulContainer.Dispatch(w, req)
                return
            }
        // 如果前缀匹配
        case strings.HasPrefix(path, ws.RootPath()):
            if len(path) == len(ws.RootPath()) || path[len(ws.RootPath())] == '/' {
                klog.V(5).Infof("%v: %v %q satisfied by gorestful with webservice %v", d.name, req.Method, path, ws.RootPath())
                // don't use servemux here because gorestful servemuxes get messed up when removing webservices
                // TODO fix gorestful, remove TPRs, or stop using gorestful
                d.goRestfulContainer.Dispatch(w, req)
                return
            }
        }
    }
 
    // 无法找到匹配,跳过 gorestful 容器
    d.nonGoRestfulMux.ServeHTTP(w, req)
}

 对于非API资源请求,由PathRecorderMux处理:

Go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
func (m *PathRecorderMux) ServeHTTP(w http.ResponseWriter, r *http.Request) {
    m.mux.Load().(*pathHandler).ServeHTTP(w, r)
}
func (h *pathHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
    // pathToHandler 记录了所有精确匹配路径的处理器
    //   0 = /healthz/etcd -> k8s.io/apiserver/pkg/server/healthz.adaptCheckToHandler.func1
    //   1 = /livez/etcd -> k8s.io/apiserver/pkg/server/healthz.adaptCheckToHandler.func1
    //   2 = /metrics -> k8s.io/apiserver/pkg/server/routes.MetricsWithReset.Install.func1
    //   3 = /readyz/shutdown -> k8s.io/apiserver/pkg/server/healthz.adaptCheckToHandler.func1
    //   4 = /readyz/ping -> k8s.io/apiserver/pkg/server/healthz.adaptCheckToHandler.func1
    //   5 = /debug/pprof/profile -> net/http/pprof.Profile
    //   6 = /healthz/log -> k8s.io/apiserver/pkg/server/healthz.adaptCheckToHandler.func1
    //   7 = /livez/log -> k8s.io/apiserver/pkg/server/healthz.adaptCheckToHandler.func1
    //   8 = /healthz/ping -> k8s.io/apiserver/pkg/server/healthz.adaptCheckToHandler.func1
    //   9 = / ->
    //   10 = /healthz -> k8s.io/apiserver/pkg/endpoints/metrics.InstrumentHandlerFunc.func1
    //   11 = /debug/flags -> k8s.io/apiserver/pkg/server/routes.DebugFlags.Index-fm
    //   12 = /livez/ping -> k8s.io/apiserver/pkg/server/healthz.adaptCheckToHandler.func1
    //   13 = /readyz/log -> k8s.io/apiserver/pkg/server/healthz.adaptCheckToHandler.func1
    //   14 = /debug/pprof -> k8s.io/apiserver/pkg/server/routes.redirectTo.func1
    //   15 = /debug/pprof/trace -> net/http/pprof.Trace
    //   16 = /debug/flags/v -> k8s.io/apiserver/pkg/server/routes.StringFlagPutHandler.func1
    //   17 = /readyz/poststarthook/start-sample-server-informers -> k8s.io/apiserver/pkg/server/healthz.adaptCheckToHandler.func1
    //   18 = /livez/poststarthook/start-sample-server-informers -> k8s.io/apiserver/pkg/server/healthz.adaptCheckToHandler.func1
    //   19 = /openapi/v2 -> github.com/NYTimes/gziphandler.NewGzipLevelAndMinSize.func1.1
    //   20 = /healthz/poststarthook/start-sample-server-informers -> k8s.io/apiserver/pkg/server/healthz.adaptCheckToHandler.func1
    //   21 = /readyz/etcd -> k8s.io/apiserver/pkg/server/healthz.adaptCheckToHandler.func1
    //   22 = /index.html ->
    //   23 = /debug/pprof/symbol -> net/http/pprof.Symbol
    //   24 = /livez -> k8s.io/apiserver/pkg/endpoints/metrics.InstrumentHandlerFunc.func1
    //   25 = /readyz -> k8s.io/apiserver/pkg/endpoints/metrics.InstrumentHandlerFunc.func1
    if exactHandler, ok := h.pathToHandler[r.URL.Path]; ok {
        klog.V(5).Infof("%v: %q satisfied by exact match", h.muxName, r.URL.Path)
        exactHandler.ServeHTTP(w, r)
        return
    }
 
    // 前缀匹配路径的处理器,默认有/debug/flags/ 和 /debug/pprof/
    for _, prefixHandler := range h.prefixHandlers {
        if strings.HasPrefix(r.URL.Path, prefixHandler.prefix) {
            klog.V(5).Infof("%v: %q satisfied by prefix %v", h.muxName, r.URL.Path, prefixHandler.prefix)
            prefixHandler.handler.ServeHTTP(w, r)
            return
        }
    }
 
    // 找不到处理器,404
    klog.V(5).Infof("%v: %q satisfied by NotFoundHandler", h.muxName, r.URL.Path)
    h.notFoundHandler.ServeHTTP(w, r)
}

对于API资源请求,例如路径 /apis/wardle.example.com/v1beta1,则由go-restful处理。go-restful框架内部处理细节我们就忽略了,我们主要深入探讨上文中提到的安装APIGroup,每种资源的go-restful Handler都是由它注册的:

Go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
func (s *GenericAPIServer) InstallAPIGroup(apiGroupInfo *APIGroupInfo) error {
    return s.InstallAPIGroups(apiGroupInfo)
}
func (s *GenericAPIServer) InstallAPIGroups(apiGroupInfos ...*APIGroupInfo) error {
    // 遍历API组
    for _, apiGroupInfo := range apiGroupInfos {
        //          安装API资源        常量 /apis
        if err := s.installAPIResources(APIGroupPrefix, apiGroupInfo, openAPIModels); err != nil {
            return fmt.Errorf("unable to install api resources: %v", err)
        }
 
        // setup discovery
        // Install the version handler.
        // Add a handler at /apis/<groupName> to enumerate all versions supported by this group.
        apiVersionsForDiscovery := []metav1.GroupVersionForDiscovery{}
        for _, groupVersion := range apiGroupInfo.PrioritizedVersions {
            // Check the config to make sure that we elide versions that don't have any resources
            if len(apiGroupInfo.VersionedResourcesStorageMap[groupVersion.Version]) == 0 {
                continue
            }
            apiVersionsForDiscovery = append(apiVersionsForDiscovery, metav1.GroupVersionForDiscovery{
                GroupVersion: groupVersion.String(),
                Version:      groupVersion.Version,
            })
        }
        preferredVersionForDiscovery := metav1.GroupVersionForDiscovery{
            GroupVersion: apiGroupInfo.PrioritizedVersions[0].String(),
            Version:      apiGroupInfo.PrioritizedVersions[0].Version,
        }
        apiGroup := metav1.APIGroup{
            Name:             apiGroupInfo.PrioritizedVersions[0].Group,
            Versions:         apiVersionsForDiscovery,
            PreferredVersion: preferredVersionForDiscovery,
        }
 
        s.DiscoveryGroupManager.AddGroup(apiGroup)
        s.Handler.GoRestfulContainer.Add(discovery.NewAPIGroupHandler(s.Serializer, apiGroup).WebService())
    }
    return nil
}
 
// 安装API资源
func (s *GenericAPIServer) installAPIResources(apiPrefix string, apiGroupInfo *APIGroupInfo, openAPIModels openapiproto.Models) error {
    // 遍历版本
    for _, groupVersion := range apiGroupInfo.PrioritizedVersions {
        // 跳过没有资源的组
        if len(apiGroupInfo.VersionedResourcesStorageMap[groupVersion.Version]) == 0 {
            klog.Warningf("Skipping API %v because it has no resources.", groupVersion)
            continue
        }
 
        apiGroupVersion := s.getAPIGroupVersion(apiGroupInfo, groupVersion, apiPrefix)
        if apiGroupInfo.OptionsExternalVersion != nil {
            apiGroupVersion.OptionsExternalVersion = apiGroupInfo.OptionsExternalVersion
        }
        apiGroupVersion.OpenAPIModels = openAPIModels
        apiGroupVersion.MaxRequestBodyBytes = s.maxRequestBodyBytes
        // 安装为go-restful的Handler
        if err := apiGroupVersion.InstallREST(s.Handler.GoRestfulContainer); err != nil {
            return fmt.Errorf("unable to setup API %v: %v", apiGroupInfo, err)
        }
    }
 
    return nil
}
 
// 注册一系列REST Handler( storage, watch, proxy, redirect)到restful容器
func (g *APIGroupVersion) InstallREST(container *restful.Container) error {
    // 例如/apis/wardle.example.com/v1beta1
    prefix := path.Join(g.Root, g.GroupVersion.Group, g.GroupVersion.Version)
    installer := &APIInstaller{
        group:             g,
        prefix:            prefix,
        minRequestTimeout: g.MinRequestTimeout,
    }
    // 执行API资源处理器的安装
    apiResources, ws, registrationErrors := installer.Install()
    // 执行资源发现处理器的安装
    versionDiscoveryHandler := discovery.NewAPIVersionHandler(g.Serializer, g.GroupVersion, staticLister{apiResources})
    versionDiscoveryHandler.AddToWebService(ws)
    // 添加WebService到容器
    container.Add(ws)
    return utilerrors.NewAggregate(registrationErrors)
}
func (a *APIInstaller) Install() ([]metav1.APIResource, *restful.WebService, []error) {
    var apiResources []metav1.APIResource
    var errors []error
    // 创建一个针对特定GV的WebService
    ws := a.newWebService()
 
    // Register the paths in a deterministic (sorted) order to get a deterministic swagger spec.
    paths := make([]string, len(a.group.Storage))
    var i int = 0
    for path := range a.group.Storage {
        paths[i] = path
        i++
    }
    sort.Strings(paths)
    for _, path := range paths {
        // 遍历资源,注册处理器,例如flunders
        apiResource, err := a.registerResourceHandlers(path, a.group.Storage[path], ws)
        if err != nil {
            errors = append(errors, fmt.Errorf("error in registering resource: %s, %v", path, err))
        }
        if apiResource != nil {
            apiResources = append(apiResources, *apiResource)
        }
    }
    return apiResources, ws, errors
}
func (a *APIInstaller) newWebService() *restful.WebService {
    ws := new(restful.WebService)
    // 此WebService的路径模板
    ws.Path(a.prefix)
    // a.prefix contains "prefix/group/version"
    ws.Doc("API at " + a.prefix)
    // 向后兼容的考虑,支持没有MIME类型
    ws.Consumes("*/*")
    // 根据API组使用的编解码器来确定响应支持的MIME类型
    //   0 = {string} "application/json"
    //   1 = {string} "application/yaml"
    //   2 = {string} "application/vnd.kubernetes.protobuf"
    mediaTypes, streamMediaTypes := negotiation.MediaTypesForSerializer(a.group.Serializer)
    ws.Produces(append(mediaTypes, streamMediaTypes...)...)
    // 例如 wardle.example.com/v1beta1
    ws.ApiVersion(a.group.GroupVersion.String())
    return ws
}
// 注册资源处理器,过于冗长,仅仅贴片段
func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storage, ws *restful.WebService) (*metav1.APIResource, error) {
    // ...
    // creater就是Handler的核心逻辑所在,它来自rest.Storage,对接Etcd
    creater, isCreater := storage.(rest.Creater)
    // ...
    actions = appendIf(actions, action{"POST", resourcePath, resourceParams, namer, false}, isCreater)
    switch action.Verb {
        case "POST":
            handler = restfulCreateResource(creater, reqScope, admit)
 
            route := ws.POST(action.Path).To(handler).
                Doc(doc).
                Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")).
                Operation("create"+namespaced+kind+strings.Title(subresource)+operationSuffix).
                Produces(append(storageMeta.ProducesMIMETypes(action.Verb), mediaTypes...)...).
                Returns(http.StatusOK, "OK", producedObject).
                // TODO: in some cases, the API may return a v1.Status instead of the versioned object
                // but currently go-restful can't handle multiple different objects being returned.
                Returns(http.StatusCreated, "Created", producedObject).
                Returns(http.StatusAccepted, "Accepted", producedObject).
                Reads(defaultVersionedObject).
                Writes(producedObject)
            if err := AddObjectParams(ws, route, versionedCreateOptions); err != nil {
                return nil, err
            }
            addParams(route, action.Params)
            routes = append(routes, route)
    // ...
}
func restfulCreateResource(r rest.Creater, scope handlers.RequestScope, admit admission.Interface) restful.RouteFunction {
    return func(req *restful.Request, res *restful.Response) {
        handlers.CreateResource(r, &scope, admit)(res.ResponseWriter, req.Request)
    }
}
func CreateResource(r rest.Creater, scope *RequestScope, admission admission.Interface) http.HandlerFunc {
    return createHandler(&namedCreaterAdapter{r}, scope, admission, false)
}
// 创建资源处理器核心
func createHandler(r rest.NamedCreater, scope *RequestScope, admit admission.Interface, includeName bool) http.HandlerFunc {
    return func(w http.ResponseWriter, req *http.Request) {
        // For performance tracking purposes.
        trace := utiltrace.New("Create", utiltrace.Field{Key: "url", Value: req.URL.Path}, utiltrace.Field{Key: "user-agent", Value: &lazyTruncatedUserAgent{req}}, utiltrace.Field{Key: "client", Value: &lazyClientIP{req}})
        defer trace.LogIfLong(500 * time.Millisecond)
        // 处理Dryrun
        if isDryRun(req.URL) && !utilfeature.DefaultFeatureGate.Enabled(features.DryRun) {
            scope.err(errors.NewBadRequest("the dryRun feature is disabled"), w, req)
            return
        }
 
        // TODO: we either want to remove timeout or document it (if we document, move timeout out of this function and declare it in api_installer)
        timeout := parseTimeout(req.URL.Query().Get("timeout"))
        // 命名空间和资源名字处理
        namespace, name, err := scope.Namer.Name(req)
        if err != nil {
            if includeName {
                // name was required, return
                scope.err(err, w, req)
                return
            }
 
            // otherwise attempt to look up the namespace
            namespace, err = scope.Namer.Namespace(req)
            if err != nil {
                scope.err(err, w, req)
                return
            }
        }
 
        ctx, cancel := context.WithTimeout(req.Context(), timeout)
        defer cancel()
        ctx = request.WithNamespace(ctx, namespace)
        // 协商输出MIME类型
        outputMediaType, _, err := negotiation.NegotiateOutputMediaType(req, scope.Serializer, scope)
        if err != nil {
            scope.err(err, w, req)
            return
        }
 
        gv := scope.Kind.GroupVersion()
        // 协商输入如何反串行化
        s, err := negotiation.NegotiateInputSerializer(req, false, scope.Serializer)
        if err != nil {
            scope.err(err, w, req)
            return
        }
        // 从串行化器得到能将请求解码为特定版本的解码器
        decoder := scope.Serializer.DecoderToVersion(s.Serializer, scope.HubGroupVersion)
 
        // 读取请求体
        body, err := limitedReadBody(req, scope.MaxRequestBodyBytes)
        if err != nil {
            scope.err(err, w, req)
            return
        }
 
        options := &metav1.CreateOptions{}
        values := req.URL.Query()
        if err := metainternalversionscheme.ParameterCodec.DecodeParameters(values, scope.MetaGroupVersion, options); err != nil {
            err = errors.NewBadRequest(err.Error())
            scope.err(err, w, req)
            return
        }
        if errs := validation.ValidateCreateOptions(options); len(errs) > 0 {
            err := errors.NewInvalid(schema.GroupKind{Group: metav1.GroupName, Kind: "CreateOptions"}, "", errs)
            scope.err(err, w, req)
            return
        }
        options.TypeMeta.SetGroupVersionKind(metav1.SchemeGroupVersion.WithKind("CreateOptions"))
 
        defaultGVK := scope.Kind
        // 实例化资源的Go结构
        original := r.New()
        trace.Step("About to convert to expected version")
        // 将请求体解码到Go结构中
        obj, gvk, err := decoder.Decode(body, &defaultGVK, original)
        if err != nil {
            err = transformDecodeError(scope.Typer, err, original, gvk, body)
            scope.err(err, w, req)
            return
        }
        if gvk.GroupVersion() != gv {
            err = errors.NewBadRequest(fmt.Sprintf("the API version in the data (%s) does not match the expected API version (%v)", gvk.GroupVersion().String(), gv.String()))
            scope.err(err, w, req)
            return
        }
        trace.Step("Conversion done")
 
        // 审计和Admission控制
        ae := request.AuditEventFrom(ctx)
        admit = admission.WithAudit(admit, ae)
        audit.LogRequestObject(ae, obj, scope.Resource, scope.Subresource, scope.Serializer)
 
        userInfo, _ := request.UserFrom(ctx)
 
        // On create, get name from new object if unset
        if len(name) == 0 {
            _, name, _ = scope.Namer.ObjectName(obj)
        }
 
        trace.Step("About to store object in database")
        admissionAttributes := admission.NewAttributesRecord(obj, nil, scope.Kind, namespace, name, scope.Resource, scope.Subresource, admission.Create, options, dryrun.IsDryRun(options.DryRun), userInfo)
        // 构建入库函数
        requestFunc := func() (runtime.Object, error) {
            // 调用rest.Storage进行入库
            return r.Create(
                ctx,
                name,
                obj,
                rest.AdmissionToValidateObjectFunc(admit, admissionAttributes, scope),
                options,
            )
        }
        // finishRequest能够异步执行回调,并且处理响应返回的错误
        result, err := finishRequest(timeout, func() (runtime.Object, error) {
            if scope.FieldManager != nil {
                liveObj, err := scope.Creater.New(scope.Kind)
                if err != nil {
                    return nil, fmt.Errorf("failed to create new object (Create for %v): %v", scope.Kind, err)
                }
                obj = scope.FieldManager.UpdateNoErrors(liveObj, obj, managerOrUserAgent(options.FieldManager, req.UserAgent()))
            }
            if mutatingAdmission, ok := admit.(admission.MutationInterface); ok && mutatingAdmission.Handles(admission.Create) {
                if err := mutatingAdmission.Admit(ctx, admissionAttributes, scope); err != nil {
                    return nil, err
                }
            }
            result, err := requestFunc()
            // If the object wasn't committed to storage because it's serialized size was too large,
            // it is safe to remove managedFields (which can be large) and try again.
            if isTooLargeError(err) {
                if accessor, accessorErr := meta.Accessor(obj); accessorErr == nil {
                    accessor.SetManagedFields(nil)
                    result, err = requestFunc()
                }
            }
            return result, err
        })
        if err != nil {
            scope.err(err, w, req)
            return
        }
        trace.Step("Object stored in database")
 
        code := http.StatusCreated
        status, ok := result.(*metav1.Status)
        if ok && err == nil && status.Code == 0 {
            status.Code = int32(code)
        }
 
        transformResponseObject(ctx, scope, trace, req, w, code, outputMediaType, result)
    }
}

回顾一下请求处理的整体逻辑:

  1. GenericAPIServer.Handler就是http.Handler,可以注册给任何HTTP服务器。因此我们想绕开HTTPS的限制应该很容易
  2. GenericAPIServer.Handler是一个层层包裹的处理器链,外层是一系列过滤器,最里面是director
  3. director负责整体的请求分发:
    1. 对于非API资源请求,分发给nonGoRestfulMux。我们可以利用这个扩展点,扩展任意形式的HTTP接口
    2. 对于API资源请求,分发给gorestfulContainer
  4. 在GenericAPIServer.InstallAPIGroup中,所有支持的API资源的所有版本,都注册为go-restful的一个WebService
  5. 这些WebService的逻辑包括(依赖于rest.Storage):
    1. 将请求解码为资源对应的Go结构
    2. 将Go结构编码为JSON
    3. 将JSON存储到Etcd
sample-apiserver小结

通过对sample-apiserver的代码分析,我们理解了构建自己的API Server的各种关键要素。

APIServer的核心类型是 GenericAPIServer,它是由 genericapiserver.CompletedConfig的 New()方法生成的。后者则是 genericapiserver.RecommendedConfig的 Complete()方法生成的。而RecommendedConfig又是从 genericoptions.RecommendedOptions得到的。sample-apiserver对Config、Option、Server等对象都做了一层包装,我们不关注这些wrapper。

RecommendedOptions对应了用户提供的各类选项(外加所谓推荐选项,降低使用时的复杂度),例如Etcd地址、Etcd存储前缀、APIServer的基本信息等等。调用RecommendedOptions的 ApplyTo方法,会根据选项,推导出APIServer所需的,完整的配置信息。在这个方法中,甚至会进行自签名证书等重操作,而不是简单的将信息从Option复制给Config。RecommendedOptions会依次调用它的各个字段的ApplyTo方法,从而推导出RecommendedConfig的各个字段。

RecommendedConfig的Complete方法,再一次进行配置信息的推导,主要牵涉到OpenAPI相关的配置。

CompletedConfig的New方法实例化GenericAPIServer,这一步最关键的逻辑是安装API组。API组定义了如何实现GroupVersion中API的增删改查,它将GroupVersion的每种资源映射到registry.REST,后者具有处理REST风格请求的能力,并(默认)存储到Etcd。

GenericAPIServer提供了一些钩子来处理Admission控制器的注册、初始化。以及另外一些钩子来对API Server的生命周期事件做出响应。

sample-apiserver改造
解除对kube-apiserver的依赖

想实现sample-apiserver的独立运行,RecommendedOptions有三个字段必须处理:Authentication、Authorization、CoreAPI,它们都隐含了对主kube-apiserver的依赖。

Authentication依赖主kube-apiserver,是因为它需要访问TokenReviewInterface,访问kube-system中的ConfigMap。Authorization依赖主kube-apiserver,是因为它需要访问SubjectAccessReviewInterface。CoreAPI则是直接为Config提供了两个字段:ClientConfig、SharedInformerFactory。

将这些字段置空,可以解除对主kube-apiserver的依赖。这样启动sample-apiserver时就不需要提供这三个命令行选项:

--kubeconfig=/home/alex/.kube/config
--authentication-kubeconfig=/home/alex/.kube/config
--authorization-kubeconfig=/home/alex/.kube/config

但是,置空CoreAPI会导致报错:admission depends on a Kubernetes core API shared informer, it cannot be nil。这提示我们不能在不依赖主kube-apiserver的情况下使用Admission控制器这一特性,需要将Admission也置空:

Go
1
2
3
4
o.RecommendedOptions.Authentication = nil
o.RecommendedOptions.Authorization = nil
o.RecommendedOptions.CoreAPI = nil
o.RecommendedOptions.Admission = nil

清空上述四个字段后,sample-server还会在PostStart钩子中崩溃:

Go
1
2
3
4
// panic,这个SharedInformerFactory是CoreAPI选项提供的
config.GenericConfig.SharedInformerFactory.Start(context.StopCh)
// 仅仅Admission控制器使用该InformerFactory
o.SharedInformerFactory.Start(context.StopCh)

由于注释中给出的原因,这个PostStart钩子已经没有意义,删除即可正常启动服务器。

使用HTTP而非HTTPS

GenericAPIServer的 Run方法的默认实现,是调用 s.SecureServingInfo.Serve,因而强制使用HTTPS:

Go
1
stoppedCh, err = s.SecureServingInfo.Serve(s.Handler, s.ShutdownTimeout, internalStopCh)

不过,很明显的,我们只需要将s.Handler传递给自己的http.Server即可使用HTTP。

添加任意HTTP接口

我们的迁移工具还提供一些非Kubernetes风格的HTTP接口,那么如何集成到APIServer中呢?

在启动服务器之前,可以直接访问 GenericAPIServer.Handler.NonGoRestfulMux,NonGoRestfulMux实现了:

Go
1
2
3
type mux interface {
    Handle(pattern string, handler http.Handler)
}

调用Handle即可为任何路径注册处理器。

apiserver-builder-alpha

通过对sample-apiserver代码的分析,我们了解到构建自己的API Server有大量繁琐的工作需要做。幸运的是,K8S提供了apiserver-builder-alpha简化这一过程。

apiserver-builder-alpha是一系列工具和库的集合,它能够:

  1. 为新的API资源创建Go类型、控制器(基于controller-runtime)、测试用例、文档
  2. 构建、(独立、在Minikube或者在K8S中)运行扩展的控制平面组件(APIServer)
  3. 让在控制器中watch/update资源更简单
  4. 让创建新的资源/子资源更简单
  5. 提供大部分合理的默认值
安装

下载压缩包,解压并存放到目录,然后设置环境变量:

Shell
1
export PATH=$HOME/.local/kubernetes/apiserver-builder/bin/:$PATH
起步

你需要在$GOPATH下创建一个项目,创建一个boilerplate.go.txt文件。然后执行:

Shell
1
apiserver-boot init repo --domain cloud.gmem.cc

该命令会生成如下目录结构:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
.
├── bin
├── boilerplate.go.txt
├── BUILD.bazel
├── cmd
│   ├── apiserver
│   │   └── main.go
│   └── manager
│       └── main.go
├── go.mod
├── go.sum
├── pkg
│   ├── apis
│   │   └── doc.go
│   ├── controller
│   │   └── doc.go
│   ├── doc.go
│   ├── openapi
│   │   └── doc.go
│   └── webhook
│       └── webhook.go
├── PROJECT
└── WORKSPAC

 cmd/apiserver/main.go,是APIServer的入口点:

Go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import "sigs.k8s.io/apiserver-builder-alpha/pkg/cmd/server"
 
func main() {
    version := "v0"
 
    err := server.StartApiServerWithOptions(&server.StartOptions{
        EtcdPath:         "/registry/cloud.gmem.cc",
        //                无法运行,这个函数不存在
        Apis:             apis.GetAllApiBuilders(),
        Openapidefs:      openapi.GetOpenAPIDefinitions,
        Title:            "Api",
        Version:          version,
 
        // TweakConfigFuncs []func(apiServer *apiserver.Config) error
        // FlagConfigFuncs []func(*cobra.Command) error
    })
    if err != nil {
        panic(err)
    }
}

可以看到apiserver-builder-alpha进行了一些封装。

执行下面的命令,添加一个新的API资源:

Shell
1
apiserver-boot create group version resource --group tcm --version v1 --kind Flunder

最后,执行命令可以在本地启动APIServer:

Shell
1
apiserver-boot run local
问题

本文提及的工具项目,最初架构是基于CRD,使用kubebuilder进行代码生成。kubebuilder的目录结构和apiserver-builder并不兼容。

此外apiserver-builder项目仍然处于Alpha阶段,并且经过测试,发现生成代码无法运行。为了避免不必要的麻烦,我们不打算使用它。

编写APIServer

由于apiserver-builder不成熟,而且我们已经基于kubebuilder完成了大部分开发工作。因此打算基于分析sample-apiserver获得的经验,手工编写一个独立运行、使用HTTP协议的APIServer。

kubebuilder并不会生成zz_generated.openapi.go文件,因为该文件对于CRD没有意义。但是这个文件对于独立API Server是必须的。

我们需要为资源类型所在包添加注解:

api/v1/doc.go
Go
1
2
3
// +k8s:openapi-gen=true
 
package v1

并调用openapi-gen生成此文件:

Shell
1
2
3
openapi-gen  \
    --input-dirs "k8s.io/apimachinery/pkg/apis/meta/v1,k8s.io/apimachinery/pkg/runtime,k8s.io/apimachinery/pkg/version" \
    --input-dirs cloud.gmem.cc/teleport/api/v1    -p cloud.gmem.cc/teleport/api/v1 -O zz_generated.openapi

下面是完整的quick&dirty的代码:

Go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
package main
 
import (
    v1 "cloud.gmem.cc/teleport/api/v1"
    "context"
    "fmt"
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    "k8s.io/apimachinery/pkg/fields"
    "k8s.io/apimachinery/pkg/labels"
    "k8s.io/apimachinery/pkg/runtime"
    "k8s.io/apimachinery/pkg/runtime/schema"
    "k8s.io/apimachinery/pkg/runtime/serializer"
    utilruntime "k8s.io/apimachinery/pkg/util/runtime"
    "k8s.io/apimachinery/pkg/util/validation/field"
    "k8s.io/apiserver/pkg/endpoints/openapi"
    "k8s.io/apiserver/pkg/features"
    "k8s.io/apiserver/pkg/registry/generic"
    genericregistry "k8s.io/apiserver/pkg/registry/generic/registry"
    "k8s.io/apiserver/pkg/registry/rest"
    genericapiserver "k8s.io/apiserver/pkg/server"
    genericoptions "k8s.io/apiserver/pkg/server/options"
    "k8s.io/apiserver/pkg/storage"
    "k8s.io/apiserver/pkg/storage/names"
    "k8s.io/apiserver/pkg/util/feature"
    utilfeature "k8s.io/apiserver/pkg/util/feature"
    "net"
    "net/http"
    "os"
    "reflect"
    ctrl "sigs.k8s.io/controller-runtime"
)
 
var (
    setupLog = ctrl.Log.WithName("setup")
)
 
func main() {
 
    s := runtime.NewScheme()
    utilruntime.Must(v1.AddToScheme(s))
    gv := v1.GroupVersion
    utilruntime.Must(s.SetVersionPriority(gv))
    metav1.AddToGroupVersion(s, schema.GroupVersion{Version: "v1"})
    unversioned := schema.GroupVersion{Group: "", Version: "v1"}
    s.AddUnversionedTypes(unversioned,
        &metav1.Status{},
        &metav1.APIVersions{},
        &metav1.APIGroupList{},
        &metav1.APIGroup{},
        &metav1.APIResourceList{},
    )
 
    //  必须注册一个__internal版本,否则报错
    //  failed to prepare current and previous objects: no kind "Flunder" is registered for the internal version of group "tcm.cloud.gmem.cc" in scheme
    gvi := gv
    gvi.Version = runtime.APIVersionInternal
    s.AddKnownTypes(gvi, &v1.Flunder{}, &v1.FlunderList{})
 
    codecFactory := serializer.NewCodecFactory(s)
    codec := codecFactory.LegacyCodec(gv)
    options := genericoptions.NewRecommendedOptions(
        "/teleport/cloud.gmem.cc",
        codec,
    )
    options.Etcd.StorageConfig.EncodeVersioner = runtime.NewMultiGroupVersioner(gv, schema.GroupKind{Group: gv.Group})
    ips := []net.IP{net.ParseIP("127.0.0.1")}
    if err := options.SecureServing.MaybeDefaultWithSelfSignedCerts("localhost", nil, ips); err != nil {
        setupLog.Error(err, "error creating self-signed certificates")
        os.Exit(1)
    }
    options.Etcd.StorageConfig.Paging = utilfeature.DefaultFeatureGate.Enabled(features.APIListChunking)
    options.Etcd.StorageConfig.Transport.ServerList = []string{"http://etcd.gmem.cc:2379"}
 
    options.Authentication = nil
    options.Authorization = nil
    options.CoreAPI = nil
    options.Admission = nil
    options.SecureServing.BindPort = 6443
 
    config := genericapiserver.NewRecommendedConfig(codecFactory)
    config.OpenAPIConfig = genericapiserver.DefaultOpenAPIConfig(v1.GetOpenAPIDefinitions,
        openapi.NewDefinitionNamer(s))
    config.OpenAPIConfig.Info.Title = "Teleport"
    config.OpenAPIConfig.Info.Version = "1.0"
 
    feature.DefaultMutableFeatureGate.SetFromMap(map[string]bool{
        string(features.APIPriorityAndFairness): false,
    })
 
    if err := options.ApplyTo(config); err != nil {
        panic(err)
    }
    completedConfig := config.Complete()
    server, err := completedConfig.New("teleport-apiserver", genericapiserver.NewEmptyDelegate())
    if err != nil {
        panic(err)
    }
 
    apiGroupInfo := genericapiserver.NewDefaultAPIGroupInfo(gv.Group, s, metav1.ParameterCodec, codecFactory)
    v1storage := map[string]rest.Storage{}
    resource := v1.ResourceFlunders
    v1storage[resource] = createStore(
        s,
        gv.WithResource(resource).GroupResource(),
        func() runtime.Object { return &v1.Flunder{} },
        func() runtime.Object { return &v1.FlunderList{} },
        completedConfig.RESTOptionsGetter,
    )
    apiGroupInfo.VersionedResourcesStorageMap[gv.Version] = v1storage
    if err := server.InstallAPIGroups(&apiGroupInfo); err != nil {
        panic(err)
    }
    server.AddPostStartHookOrDie("teleport-post-start", func(context genericapiserver.PostStartHookContext) error {
        return nil
    })
    preparedServer := server.PrepareRun()
    http.ListenAndServe(":6080", preparedServer.Handler)
}
 
func createStore(scheme *runtime.Scheme, gr schema.GroupResource, newFunc, newListFunc func() runtime.Object,
    optsGetter generic.RESTOptionsGetter) rest.Storage {
    attrs := func(obj runtime.Object) (labels.Set, fields.Set, error) {
        typ := reflect.TypeOf(newFunc())
        if reflect.TypeOf(obj) != typ {
            return nil, nil, fmt.Errorf("given object is not a %s", typ.Name())
        }
        oma := obj.(metav1.ObjectMetaAccessor)
        meta := oma.GetObjectMeta()
        return meta.GetLabels(), fields.Set{
            "metadata.name":      meta.GetName(),
            "metadata.namespace": meta.GetNamespace(),
        }, nil
    }
    s := strategy{
        scheme,
        names.SimpleNameGenerator,
    }
    store := &genericregistry.Store{
        NewFunc:     newFunc,
        NewListFunc: newListFunc,
        PredicateFunc: func(label labels.Selector, field fields.Selector) storage.SelectionPredicate {
            return storage.SelectionPredicate{
                Label:    label,
                Field:    field,
                GetAttrs: attrs,
            }
        },
        DefaultQualifiedResource: gr,
 
        CreateStrategy: s,
        UpdateStrategy: s,
        DeleteStrategy: s,
 
        TableConvertor: rest.NewDefaultTableConvertor(gr),
    }
    options := &generic.StoreOptions{RESTOptions: optsGetter, AttrFunc: attrs}
    if err := store.CompleteWithOptions(options); err != nil {
        panic(err)
    }
    return store
}
 
type strategy struct {
    runtime.ObjectTyper
    names.NameGenerator
}
 
func (strategy) NamespaceScoped() bool {
    return true
}
 
func (strategy) PrepareForCreate(ctx context.Context, obj runtime.Object) {
}
 
func (strategy) PrepareForUpdate(ctx context.Context, obj, old runtime.Object) {
}
 
func (strategy) Validate(ctx context.Context, obj runtime.Object) field.ErrorList {
    return field.ErrorList{}
}
 
func (strategy) AllowCreateOnUpdate() bool {
    return false
}
 
func (strategy) AllowUnconditionalUpdate() bool {
    return false
}
 
func (strategy) Canonicalize(obj runtime.Object) {
}
 
func (strategy) ValidateUpdate(ctx context.Context, obj, old runtime.Object) field.ErrorList {
    return field.ErrorList{}
}
定制存储后端

在安装APIGroup的时候,我们需要为每API组的每个版本的每种资源,指定存储后端:

Go
1
2
3
4
5
6
7
8
9
// 每个组
apiGroupInfo := genericapiserver.NewDefaultAPIGroupInfo(wardle.GroupName, Scheme, metav1.ParameterCodec, Codecs)
// 每个版本
v1alpha1storage := map[stcongring]rest.Storage{}
// 每种资源提供一个rest.Storage
v1alpha1storage["flunders"] = wardleregistry.RESTInPeace(flunderstorage.NewREST(Scheme, c.GenericConfig.RESTOptionsGetter))
 
// 安装APIGroup
s.GenericAPIServer.InstallAPIGroup(&apiGroupInfo)

默认情况下,使用的是genericregistry.Store,它对接到Etcd。要实现自己的存储后端,实现相关接口即可。

注意:关于存储后端,有很多细节需要处理。

基于文件的存储

下面贴一个在文件系统中,以YAML形式存储API资源的例子:

Go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
package file
 
import (
    "bytes"
    "context"
    "fmt"
    "io/ioutil"
    "k8s.io/apimachinery/pkg/util/uuid"
    "os"
    "path/filepath"
    "reflect"
    "strings"
    "sync"
 
    apierrors "k8s.io/apimachinery/pkg/api/errors"
    "k8s.io/apimachinery/pkg/api/meta"
    metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion"
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    "k8s.io/apimachinery/pkg/conversion"
    "k8s.io/apimachinery/pkg/runtime"
    "k8s.io/apimachinery/pkg/runtime/schema"
    "k8s.io/apimachinery/pkg/watch"
    genericapirequest "k8s.io/apiserver/pkg/endpoints/request"
    "k8s.io/apiserver/pkg/registry/rest"
)
 
var _ rest.StandardStorage = &store{}
var _ rest.Scoper = &store{}
var _ rest.Storage = &store{}
 
// NewStore instantiates a new file storage
func NewStore(groupResource schema.GroupResource, codec runtime.Codec, rootpath string, isNamespaced bool,
    newFunc func() runtime.Object, newListFunc func() runtime.Object, tc rest.TableConvertor) rest.Storage {
    objRoot := filepath.Join(rootpath, groupResource.Group, groupResource.Resource)
    if err := ensureDir(objRoot); err != nil {
        panic(fmt.Sprintf("unable to write data dir: %s", err))
    }
    rest := &store{
        defaultQualifiedResource: groupResource,
        TableConvertor:           tc,
        codec:                    codec,
        objRootPath:              objRoot,
        isNamespaced:             isNamespaced,
        newFunc:                  newFunc,
        newListFunc:              newListFunc,
        watchers:                 make(map[int]*yamlWatch, 10),
    }
    return rest
}
 
type store struct {
    rest.TableConvertor
    codec        runtime.Codec
    objRootPath  string
    isNamespaced bool
 
    muWatchers sync.RWMutex
    watchers   map[int]*yamlWatch
 
    newFunc                  func() runtime.Object
    newListFunc              func() runtime.Object
    defaultQualifiedResource schema.GroupResource
}
 
func (f *store) notifyWatchers(ev watch.Event) {
    f.muWatchers.RLock()
    for _, w := range f.watchers {
        w.ch <- ev
    }
    f.muWatchers.RUnlock()
}
 
func (f *store) New() runtime.Object {
    return f.newFunc()
}
 
func (f *store) NewList() runtime.Object {
    return f.newListFunc()
}
 
func (f *store) NamespaceScoped() bool {
    return f.isNamespaced
}
 
func (f *store) Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error) {
    return read(f.codec, f.objectFileName(ctx, name), f.newFunc)
}
 
func (f *store) List(ctx context.Context, options *metainternalversion.ListOptions) (runtime.Object, error) {
    newListObj := f.NewList()
    v, err := getListPrt(newListObj)
    if err != nil {
        return nil, err
    }
 
    dirname := f.objectDirName(ctx)
    if err := visitDir(dirname, f.newFunc, f.codec, func(path string, obj runtime.Object) {
        appendItem(v, obj)
    }); err != nil {
        return nil, fmt.Errorf("failed walking filepath %v", dirname)
    }
    return newListObj, nil
}
 
func (f *store) Create(ctx context.Context, obj runtime.Object, createValidation rest.ValidateObjectFunc,
    options *metav1.CreateOptions) (runtime.Object, error) {
    if createValidation != nil {
        if err := createValidation(ctx, obj); err != nil {
            return nil, err
        }
    }
    if f.isNamespaced {
        ns, ok := genericapirequest.NamespaceFrom(ctx)
        if !ok {
            return nil, apierrors.NewBadRequest("namespace required")
        }
        if err := ensureDir(filepath.Join(f.objRootPath, ns)); err != nil {
            return nil, err
        }
    }
 
    accessor, err := meta.Accessor(obj)
    if err != nil {
        return nil, err
    }
    if accessor.GetUID() == "" {
        accessor.SetUID(uuid.NewUUID())
    }
 
    name := accessor.GetName()
    filename := f.objectFileName(ctx, name)
    qualifiedResource := f.qualifiedResourceFromContext(ctx)
    if exists(filename) {
        return nil, apierrors.NewAlreadyExists(qualifiedResource, name)
    }
 
    if err := write(f.codec, filename, obj); err != nil {
        return nil, apierrors.NewInternalError(err)
    }
 
    f.notifyWatchers(watch.Event{
        Type:   watch.Added,
        Object: obj,
    })
 
    return obj, nil
}
 
func (f *store) Update(ctx context.Context, name string, objInfo rest.UpdatedObjectInfo,
    createValidation rest.ValidateObjectFunc, updateValidation rest.ValidateObjectUpdateFunc,
    forceAllowCreate bool, options *metav1.UpdateOptions) (runtime.Object, bool, error) {
    isCreate := false
    oldObj, err := f.Get(ctx, name, nil)
    if err != nil {
        if !forceAllowCreate {
            return nil, false, err
        }
        isCreate = true
    }
 
    if f.isNamespaced {
        // ensures namespace dir
        ns, ok := genericapirequest.NamespaceFrom(ctx)
        if !ok {
            return nil, false, apierrors.NewBadRequest("namespace required")
        }
        if err := ensureDir(filepath.Join(f.objRootPath, ns)); err != nil {
            return nil, false, err
        }
    }
 
    updatedObj, err := objInfo.UpdatedObject(ctx, oldObj)
    if err != nil {
        return nil, false, err
    }
    filename := f.objectFileName(ctx, name)
 
    if isCreate {
        if createValidation != nil {
            if err := createValidation(ctx, updatedObj); err != nil {
                return nil, false, err
            }
        }
        if err := write(f.codec, filename, updatedObj); err != nil {
            return nil, false, err
        }
        f.notifyWatchers(watch.Event{
            Type:   watch.Added,
            Object: updatedObj,
        })
        return updatedObj, true, nil
    }
 
    if updateValidation != nil {
        if err := updateValidation(ctx, updatedObj, oldObj); err != nil {
            return nil, false, err
        }
    }
    if err := write(f.codec, filename, updatedObj); err != nil {
        return nil, false, err
    }
    f.notifyWatchers(watch.Event{
        Type:   watch.Modified,
        Object: updatedObj,
    })
    return updatedObj, false, nil
}
 
func (f *store) Delete(ctx context.Context, name string, deleteValidation rest.ValidateObjectFunc,
    options *metav1.DeleteOptions) (runtime.Object, bool, error) {
    filename := f.objectFileName(ctx, name)
    qualifiedResource := f.qualifiedResourceFromContext(ctx)
    if !exists(filename) {
        return nil, false, apierrors.NewNotFound(qualifiedResource, name)
    }
 
    oldObj, err := f.Get(ctx, name, nil)
    if err != nil {
        return nil, false, err
    }
    if deleteValidation != nil {
        if err := deleteValidation(ctx, oldObj); err != nil {
            return nil, false, err
        }
    }
 
    if err := os.Remove(filename); err != nil {
        return nil, false, err
    }
    f.notifyWatchers(watch.Event{
        Type:   watch.Deleted,
        Object: oldObj,
    })
    return oldObj, true, nil
}
 
func (f *store) DeleteCollection(ctx context.Context, deleteValidation rest.ValidateObjectFunc,
    options *metav1.DeleteOptions, listOptions *metainternalversion.ListOptions) (runtime.Object, error) {
    newListObj := f.NewList()
    v, err := getListPrt(newListObj)
    if err != nil {
        return nil, err
    }
    dirname := f.objectDirName(ctx)
    if err := visitDir(dirname, f.newFunc, f.codec, func(path string, obj runtime.Object) {
        _ = os.Remove(path)
        appendItem(v, obj)
    }); err != nil {
        return nil, fmt.Errorf("failed walking filepath %v", dirname)
    }
    return newListObj, nil
}
 
func (f *store) objectFileName(ctx context.Context, name string) string {
    if f.isNamespaced {
        // FIXME: return error if namespace is not found
        ns, _ := genericapirequest.NamespaceFrom(ctx)
        return filepath.Join(f.objRootPath, ns, name+".yaml")
    }
    return filepath.Join(f.objRootPath, name+".yaml")
}
 
func (f *store) objectDirName(ctx context.Context) string {
    if f.isNamespaced {
        // FIXME: return error if namespace is not found
        ns, _ := genericapirequest.NamespaceFrom(ctx)
        return filepath.Join(f.objRootPath, ns)
    }
    return filepath.Join(f.objRootPath)
}
 
func write(encoder runtime.Encoder, filepath string, obj runtime.Object) error {
    buf := new(bytes.Buffer)
    if err := encoder.Encode(obj, buf); err != nil {
        return err
    }
    return ioutil.WriteFile(filepath, buf.Bytes(), 0600)
}
 
func read(decoder runtime.Decoder, path string, newFunc func() runtime.Object) (runtime.Object, error) {
    content, err := ioutil.ReadFile(filepath.Clean(path))
    if err != nil {
        return nil, err
    }
    newObj := newFunc()
    decodedObj, _, err := decoder.Decode(content, nil, newObj)
    if err != nil {
        return nil, err
    }
    return decodedObj, nil
}
 
func exists(filepath string) bool {
    _, err := os.Stat(filepath)
    return err == nil
}
 
func ensureDir(dirname string) error {
    if !exists(dirname) {
        return os.MkdirAll(dirname, 0700)
    }
    return nil
}
 
func visitDir(dirname string, newFunc func() runtime.Object, codec runtime.Decoder,
    visitFunc func(string, runtime.Object)) error {
    return filepath.Walk(dirname, func(path string, info os.FileInfo, err error) error {
        if err != nil {
            return err
        }
        if info.IsDir() {
            return nil
        }
        if !strings.HasSuffix(info.Name(), ".yaml") {
            return nil
        }
        newObj, err := read(codec, path, newFunc)
        if err != nil {
            return err
        }
        visitFunc(path, newObj)
        return nil
    })
}
 
func appendItem(v reflect.Value, obj runtime.Object) {
    v.Set(reflect.Append(v, reflect.ValueOf(obj).Elem()))
}
 
func getListPrt(listObj runtime.Object) (reflect.Value, error) {
    listPtr, err := meta.GetItemsPtr(listObj)
    if err != nil {
        return reflect.Value{}, err
    }
    v, err := conversion.EnforcePtr(listPtr)
    if err != nil || v.Kind() != reflect.Slice {
        return reflect.Value{}, fmt.Errorf("need ptr to slice: %v", err)
    }
    return v, nil
}
 
func (f *store) Watch(ctx context.Context, options *metainternalversion.ListOptions) (watch.Interface, error) {
    yw := &yamlWatch{
        id: len(f.watchers),
        f:  f,
        ch: make(chan watch.Event, 10),
    }
    // On initial watch, send all the existing objects
    list, err := f.List(ctx, options)
    if err != nil {
        return nil, err
    }
 
    danger := reflect.ValueOf(list).Elem()
    items := danger.FieldByName("Items")
 
    for i := 0; i < items.Len(); i++ {
        obj := items.Index(i).Addr().Interface().(runtime.Object)
        yw.ch <- watch.Event{
            Type:   watch.Added,
            Object: obj,
        }
    }
 
    f.muWatchers.Lock()
    f.watchers[yw.id] = yw
    f.muWatchers.Unlock()
 
    return yw, nil
}
 
type yamlWatch struct {
    f  *store
    id int
    ch chan watch.Event
}
 
func (w *yamlWatch) Stop() {
    w.f.muWatchers.Lock()
    delete(w.f.watchers, w.id)
    w.f.muWatchers.Unlock()
}
 
func (w *yamlWatch) ResultChan() <-chan watch.Event {
    return w.ch
}
 
func (f *store) ConvertToTable(ctx context.Context, object runtime.Object,
    tableOptions runtime.Object) (*metav1.Table, error) {
    return f.TableConvertor.ConvertToTable(ctx, object, tableOptions)
}
func (f *store) qualifiedResourceFromContext(ctx context.Context) schema.GroupResource {
    if info, ok := genericapirequest.RequestInfoFrom(ctx); ok {
        return schema.GroupResource{Group: info.APIGroup, Resource: info.Resource}
    }
    // some implementations access storage directly and thus the context has no RequestInfo
    return f.defaultQualifiedResource
}

调用NewStore即可创建一个rest.Storage。前面我们提到过存储后端有很多细节需要处理,对于上面这个样例,它没有:

  1. 发现正在删除中的资源,并在CRUD时作出适当响应
  2. 进行资源合法性校验。genericregistry.Store的做法是,调用strategy进行校验
  3. 自动填充某些元数据字段,包括creationTimestamp、selfLink等
处理子资源

假如K8S中某种资源具有状态子资源。那么当客户端更新状态子资源时,发出的HTTP请求格式为:

PUT /apis/cloud.gmem.cc/v1/namespaces/default/flunders/sample/status

它会匹配路由:

PUT /apis/cloud.gmem.cc/v1/namespaces/{namespace}/flunders/{name}/status

这个路由是专门为status子资源准备的,和主资源路由不同:

PUT /apis/cloud.gmem.cc/v1/namespaces/{namespace}/flunders/{name}

那么,主资源、子资源的处理方式有什么不同?如何影响这种资源处理逻辑呢?

注册子资源

InstallAPIGroups时,你只需要简单的为带有 / 的资源名字符串添加一个rest.Storage,就支持子资源了:

Go
1
v1beta1storage["flunders/status"] = wardleregistry.RESTInPeace(flunderstorage.NewREST(Scheme, c.GenericConfig.RESTOptionsGetter))

你甚至可以直接使用父资源的rest.Storage。但是这样的结果是,客户端请求可以仅更新status,也可以更新整个flunder,一般情况下这是不符合预期的。

上面的代码,还会导致在APIServer中注册类似本章开始处的go-restful路由。

抽取路径变量namespace、name的代码是:

Go
1
pathParams := pathProcessor.ExtractParameters(route, webService, httpRequest.URL.Path)

这两个变量识别了当前操控的是什么资源。请求进而会转发给rest.Storage的Update方法:

Go
1
func (e *Store) Update(ctx context.Context, name string, objInfo rest.UpdatedObjectInfo, createValidation rest.ValidateObjectFunc, updateValidation rest.ValidateObjectUpdateFunc, forceAllowCreate bool, options *metav1.UpdateOptions) (runtime.Object, bool, error) {}

name参数传递的是资源的名字。当前是否应当(仅)更新子资源,rest.Storage无从知晓。

子资源处理器

更新状态子资源的时候,我们通常仅仅允许更新Status字段。要达成这个目的,我们需要为子资源注册独立的rest.Storage。

Go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
package store
 
import (
    "context"
    "k8s.io/apimachinery/pkg/runtime"
    utilruntime "k8s.io/apimachinery/pkg/util/runtime"
    "k8s.io/apiserver/pkg/registry/generic/registry"
    "k8s.io/apiserver/pkg/registry/rest"
    "sigs.k8s.io/apiserver-runtime/pkg/builder/resource/util"
)
 
// CopyStatusFunc copies status from obj to old
type CopyStatusFunc func(src, dst runtime.Object)
 
// StatusStore decorates a parent storage and only updates
// status subresource when updating
func StatusStore(parentStore rest.StandardStorage, copyStatusFunc CopyStatusFunc) rest.Storage {
    switch pstor := parentStore.(type) {
    case *registry.Store:
        pstor.UpdateStrategy = &statusStrategy{
            RESTUpdateStrategy: pstor.UpdateStrategy,
            copyStatusFunc:     copyStatusFunc,
        }
    }
    return &statusStore{
        StandardStorage: parentStore,
    }
}
 
var _ rest.Getter = &statusStore{}
var _ rest.Updater = &statusStore{}
 
type statusStore struct {
    rest.StandardStorage
}
 
var _ rest.RESTUpdateStrategy = &statusStrategy{}
 
// statusStrategy defines a default Strategy for the status subresource.
type statusStrategy struct {
    rest.RESTUpdateStrategy
    copyStatusFunc CopyStatusFunc
}
 
// PrepareForUpdate calls the PrepareForUpdate function on obj if supported, otherwise does nothing.
func (s *statusStrategy) PrepareForUpdate(ctx context.Context, new, old runtime.Object) {
    s.copyStatusFunc(new, old)
    if err := util.DeepCopy(old, new); err != nil {
        utilruntime.HandleError(err)
    }
}

genericregistry.Store的更新会在一个原子操作的回调函数中进行。在回调中,它会调用Strategy的PrepareForUpdate方法。上面的statusStore的原理就是覆盖此方法,仅仅改变状态子资源。

多版本化

当你的API需要引入破坏性变更时,就要考虑支持多版本化。

API文件布局

下面是一个典型的多版本API文件目录的布局:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
api
├── doc.go
├── fullvpcmigration_types.go
├── 
├── v1
│   ├── conversion.go
│   ├── doc.go
│   ├── fullvpcmigration_types.go
│   ├── register.go
│   ├── zz_generated.conversion.go
│   ├── zz_generated.deepcopy.go
│   └── zz_generated.openapi.go
├── v2
│   ├── doc.go
│   ├── fullvpcmigration_types.go
│   ├── register.go
│   ├── zz_generated.conversion.go
│   ├── zz_generated.deepcopy.go
│   └── zz_generated.openapi.go
└── zz_generated.deepcopy.go

 API组的根目录(上面的示例项目只有一个组,因此直接将api目录作为组的根目录)下,应该存放__internal版本的资源结构定义,建议将其内容和最新版本保持一致。

doc.go

这个文件应当提供包级别的注释,例如:

Go
1
2
3
4
5
// +k8s:openapi-gen=true
// +groupName=gmem.cc
// +kubebuilder:object:generate=true
 
package api
register.go

这个文件用于Scheme的注册。对于__internal版本:

Go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package api
 
import (
    "k8s.io/apimachinery/pkg/runtime"
    "k8s.io/apimachinery/pkg/runtime/schema"
)
 
const (
    GroupName = "gmem.cc"
)
 
var (
    // GroupVersion is group version used to register these objects
    GroupVersion = schema.GroupVersion{Group: GroupName, Version: runtime.APIVersionInternal}
 
    // SchemeBuilder is used to add go types to the GroupVersionKind scheme
    // no &scheme.Builder{} here, otherwise vk __internal/WatchEvent will double registered to k8s.io/apimachinery/pkg/apis/meta/v1.WatchEvent &
    // k8s.io/apimachinery/pkg/apis/meta/v1.InternalEvent, which is illegal
    SchemeBuilder = runtime.NewSchemeBuilder()
 
    // AddToScheme adds the types in this group-version to the given scheme.
    AddToScheme = SchemeBuilder.AddToScheme
)
 
// Kind takes an unqualified kind and returns a Group qualified GroupKind
func Kind(kind string) schema.GroupKind {
    return GroupVersion.WithKind(kind).GroupKind()
}
 
// Resource takes an unqualified resource and returns a Group qualified GroupResource
func Resource(resource string) schema.GroupResource {
    return GroupVersion.WithResource(resource).GroupResource()
}

对于普通的版本:

Go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package v2
 
import (
    "cloud.tencent.com/teleport/api"
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    "k8s.io/apimachinery/pkg/runtime"
    "k8s.io/apimachinery/pkg/runtime/schema"
)
 
var (
    // GroupVersion is group version used to register these objects
    GroupVersion = schema.GroupVersion{Group: api.GroupName, Version: "v2"}
 
    // SchemeBuilder is used to add go types to the GroupVersionKind scheme
    SchemeBuilder = runtime.NewSchemeBuilder(func(scheme *runtime.Scheme) error {
        metav1.AddToGroupVersion(scheme, GroupVersion)
        return nil
    })
    localSchemeBuilder = &SchemeBuilder
 
    // AddToScheme adds the types in this group-version to the given scheme.
    AddToScheme = SchemeBuilder.AddToScheme
)
 
// Kind takes an unqualified kind and returns a Group qualified GroupKind
func Kind(kind string) schema.GroupKind {
    return GroupVersion.WithKind(kind).GroupKind()
}
 
// Resource takes an unqualified resource and returns a Group qualified GroupResource
func Resource(resource string) schema.GroupResource {
    return GroupVersion.WithResource(resource).GroupResource()
}

可以看到,普通版本需要将metav1包中的某些结构注册到自己的GroupVersion。

zz_generated.openapi.go

这是每个普通版本都需要生成的OpenAPI定义。这些OpenAPI定义必须注册到API Server,否则将会导致kubectl apply等命令报404错误:

Go
1
2
3
4
5
6
$(OPENAPI_GEN)  \
    --input-dirs "k8s.io/apimachinery/pkg/apis/meta/v1,k8s.io/apimachinery/pkg/runtime,k8s.io/apimachinery/pkg/version" \
    --input-dirs cloud.tencent.com/teleport/api/v1 -o ./  -p api/v1 -O zz_generated.openapi
 
$(OPENAPI_GEN)  \
    --input-dirs cloud.tencent.com/teleport/api/v2 -o ./  -p api/v2 -O zz_generated.openapi
zz_generated.conversion.go

这是每个普通版本都需要生成的From/To __internal版本的类型转换函数。这些转换函数会通过上面的localSchemeBuilder注册到当前GroupVersion:

Shell
1
2
$(CONVERSION_GEN) -h hack/boilerplate.go.txt --input-dirs cloud.tencent.com/teleport/api/v1 -O zz_generated.conversion
$(CONVERSION_GEN) -h hack/boilerplate.go.txt --input-dirs cloud.tencent.com/teleport/api/v2 -O zz_generated.conversion

升级API版本的原因,自然是因为结构出现了变动。结构的变动,就意味着新旧版本有特殊的类型转换逻辑。这种逻辑显然不可能自动生成,你手工添加的转换代码应该存放在conversion.go中。

zz_generated.deepcopy.go

这个文件是__internal版本、普通版本中的资源对应Go结构都需要生成的深拷贝函数。

关于__internal版本

如前文所述,每个API资源(的版本),都需要一个rest.Storage,这个Storage会直接负责该API资源版本的GET/CREATE/UPDATE/DELETE/WATCH等操作。

作为默认的,针对Etcd存储后端的rest.Storage的实现genericregistry.Store,它在内部有一个Cacher。此Cacher利用缓存来处理WATCH/LIST请求,避免对Etcd过频的访问。在此Cacher内部,会使用资源的内部版本。

所谓内部版本,就是注册到__internal这个特殊版本号的资源。 __internal这个字面值由常量 runtime.APIVersionInternal提供。我们通常将组的根目录下的资源结构体,注册为__internal版本。

有了这种内部版本机制,Cacher就不需要在内存中,存储资源的不同版本。

除此之外,rest.Storage或者它的Strategy所需要的一系列资源生命周期回调函数,接受的参数,都是__internal版本。这意味着:

  1. 我们不需要为每个版本,编写重复的回调函数
  2. 在多版本化的时候,需要将这些回调函数的入参都改为__internal版本
生成和定制转换函数

之所以Cacher、生命周期回调函数,以及下文会提到的,kubectl和存储能够自由的选择自己需要的版本,是因为不同版本的API资源之间可以进行转换。

当你复制一份v1资源的代码为v2时,这时可以使用完全自动生成的转换函数。一旦你添加或修改了一个字段,你就需要定制转换函数了。

假设我们将FullVPCMigrationSpec.TeamName字段改为Team,则需要:

Go
1
2
3
4
5
6
7
8
9
10
11
12
13
// zz_generated.conversion.go中报错的地方,就是你需要实现的函数
 
func Convert_v1_FullVPCMigrationSpec_To_api_FullVPCMigrationSpec(in *FullVPCMigrationSpec, out *api.FullVPCMigrationSpec, s conversion.Scope) error {
    // 这里编写因为字段变化还需要手工处理的部分
    out.Team = in.TeamName
    // 然后调用自动生成的函数,这个函数和你实现的函数,名字的差异就是auto前缀
    return autoConvert_v1_FullVPCMigrationSpec_To_api_FullVPCMigrationSpec(in, out, s)
}
 
func Convert_api_FullVPCMigrationSpec_To_v1_FullVPCMigrationSpec(in *api.FullVPCMigrationSpec, out *FullVPCMigrationSpec, s conversion.Scope) error {
    out.TeamName = in.Team
    return autoConvert_api_FullVPCMigrationSpec_To_v1_FullVPCMigrationSpec(in, out, s)
}

上面两个,是自动生成的转换代码中,缺失的函数,会导致编译错误。你需要自己实现它们。

带有auto前缀的版本,是自动生成的、完成了绝大部分逻辑的转换函数,你需要进行必要的手工处理,然后调用这个auto函数即可。

需要注意,转换函数都是在特定版本和__internal版本之间进行的。也就是如果v1需要转换到v2,则需要先转换为__internal,然后在由__internal转换为v2。这种设计也很好理解,不这样做随着版本的增多,转换函数的数量会爆炸式增长。

类型转换代码必须注册到Scheme,不管是在API Server、kubectl或controller-runtime这样的客户端,都依赖于Scheme。

多版本如何存储

不管是存储(串行化),还是读取(反串行化),都依赖于Codec。所谓Codec就是Serializer:

Go
1
2
3
4
5
6
7
8
9
10
11
package runtime
 
type Serializer interface {
    Encoder
    Decoder
}
 
// Codec is a Serializer that deals with the details of versioning objects. It offers the same
// interface as Serializer, so this is a marker to consumers that care about the version of the objects
// they receive.
type Codec Serializer

Codec由CodecFactory提供,后者持有Scheme:

Go
1
serializer.NewCodecFactory(scheme)

我们已经知道,Scheme包含这些信息:

  1. 各种Group、Version、Kind,映射到了什么Go结构
  2. Go结构上具有json标签,这些信息决定了结构的串行化格式是怎样的
  3. 同一个Group、Kind的不同Version,如何进行相互转换 

因此,Codec有能力进行JSON或其它格式的串行化操作,并且在不同版本的Go结构之间进行转换。

对于 genericregistry.Store 来说,存储就是将API资源的Go结构转换为JSON或者ProtoBuf,保存到Etcd,它显然需要Codec的支持。

当启用多版本支持后,你需要将所有版本(prioritizedVersions)作为参数传递给CodecFactory并创建Codec:

Go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
prioritizedVersions := []schema.GroupVersion{
    {
        Group: "gmem.cc",
        Version: "v2",
    },
    {
        Group: "gmem.cc",
        Version: "v1",
    },
}
codec := codecFactory.LegacyCodec(prioritizedVersions...)
 
genericOptions.Etcd.StorageConfig.EncodeVersioner = runtime.NewMultiGroupVersioner(schema.GroupVersion{
    Group: "gmem.cc",
    Version: "v2",
} )

并且,prioritizedVersions决定了存储一个资源的时候,优先选择的格式。例如fullvpcmigrations有v1,v2版本,因此在存储的时候会使用v2。而jointeamrequests只有v1版本,因此存储的时候只能使用v1。

注意:如果存在一个既有的v1版本的fullvpcmigration,在上述配置应用后,第一次对它进行修改,会导致存储格式修改为v2。

多版本的OpenAPI

你需要为每个版本生成OpenAPI定义。OpenAPI的定义只是一个map,将所有版本的内容合并即可。

APIServer暴露哪个版本

APIServer会暴露所有注册的资源版本。但是,它有一个版本优先级的概念:

Go
1
apiGroupInfo.PrioritizedVersions = prioritizedVersions

这个决定了kubectl的时候,优先显示为哪个版本。优选版本也会显示在api-resources子命令的输出:

Shell
1
2
3
4
# kubectl -s http://127.0.0.1:6080 api-resources  
NAME                    SHORTNAMES   APIVERSION                 NAMESPACED   KIND
fullvpcmigrations                    gmem.cc/v2   true         FullVPCMigration
jointeamrequests                     gmem.cc/v1   true         JoinTeamRequest

kuebctl get命令,默认展示优选版本,但是你也可以强制要求显示为指定版本:

Shell
1
2
3
4
5
kubectl -s http://127.0.0.1:6080 -n default get fullvpcmigration.v1.gmem.cc
# GET http://127.0.0.1:6080/apis/gmem.cc/v1/namespaces/default/fullvpcmigrations?limit=500
 
kubectl -s http://127.0.0.1:6080 -n default get fullvpcmigration.v2.gmem.cc
# GET http://127.0.0.1:6080/apis/gmem.cc/v2/namespaces/default/fullvpcmigrations?limit=500

不管怎样,存储为任何版本的fullvpcmigrations都会被查询到。你可以认为在客户端视角,选择版本仅仅是选择资源的一种“视图”。

控制器中的版本选择

控制器所监听的资源版本,必须已经在控制器管理器的Scheme中注册。

你在Reconcile代码中,可以用任何已经注册的版本来作为Get操作的容器,类型转换会自动进行。  

建议仅在读取、存储资源状态的时候,用普通版本,其余时候,都用__internal版本。这样你的控制器逻辑,在版本升级后,需要的变更会很少。

← 记录一次KeyDB缓慢的定位过程
草缸2021 →

Leave a Reply Cancel reply

Your email address will not be published. Required fields are marked *

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code class="" title="" data-url=""> <del datetime=""> <em> <i> <q cite=""> <strike> <strong> <pre class="" title="" data-url=""> <span class="" title="" data-url="">

Related Posts

  • Kubernetes故障检测和自愈
  • Kata Containers学习笔记
  • CNI学习笔记
  • K8S集群跨云迁移
  • Kubernetes学习笔记

Recent Posts

  • Investigating and Solving the Issue of Failed Certificate Request with ZeroSSL and Cert-Manager
  • A Comprehensive Study of Kotlin for Java Developers
  • 背诵营笔记
  • 利用LangChain和语言模型交互
  • 享学营笔记
ABOUT ME

汪震 | Alex Wong

江苏淮安人,现居北京。目前供职于腾讯云,专注容器方向。

GitHub:gmemcc

Git:git.gmem.cc

Email:gmemjunk@gmem.cc@me.com

ABOUT GMEM

绿色记忆是我的个人网站,域名gmem.cc中G是Green的简写,MEM是Memory的简写,CC则是我的小天使彩彩名字的简写。

我在这里记录自己的工作与生活,同时和大家分享一些编程方面的知识。

GMEM HISTORY
v2.00:微风
v1.03:单车旅行
v1.02:夏日版
v1.01:未完成
v0.10:彩虹天堂
v0.01:阳光海岸
MIRROR INFO
Meta
  • Log in
  • Entries RSS
  • Comments RSS
  • WordPress.org
Recent Posts
  • Investigating and Solving the Issue of Failed Certificate Request with ZeroSSL and Cert-Manager
    In this blog post, I will walk ...
  • A Comprehensive Study of Kotlin for Java Developers
    Introduction Purpose of the Study Understanding the Mo ...
  • 背诵营笔记
    Day 1 Find Your Greatness 原文 Greatness. It’s just ...
  • 利用LangChain和语言模型交互
    LangChain是什么 从名字上可以看出来,LangChain可以用来构建自然语言处理能力的链条。它是一个库 ...
  • 享学营笔记
    Unit 1 At home Lesson 1 In the ...
  • K8S集群跨云迁移
    要将K8S集群从一个云服务商迁移到另外一个,需要解决以下问题: 各种K8S资源的迁移 工作负载所挂载的数 ...
  • Terraform快速参考
    简介 Terraform用于实现基础设施即代码(infrastructure as code)—— 通过代码( ...
  • 草缸2021
    经过四个多月的努力,我的小小荷兰景到达极致了状态。

  • 编写Kubernetes风格的APIServer
    背景 前段时间接到一个需求做一个工具,工具将在K8S中运行。需求很适合用控制器模式实现,很自然的就基于kube ...
  • 记录一次KeyDB缓慢的定位过程
    环境说明 运行环境 这个问题出现在一套搭建在虚拟机上的Kubernetes 1.18集群上。集群有三个节点: ...
  • eBPF学习笔记
    简介 BPF,即Berkeley Packet Filter,是一个古老的网络封包过滤机制。它允许从用户空间注 ...
  • IPVS模式下ClusterIP泄露宿主机端口的问题
    问题 在一个启用了IPVS模式kube-proxy的K8S集群中,运行着一个Docker Registry服务 ...
  • 念爷爷
      今天是爷爷的头七,十二月七日、阴历十月廿三中午,老人家与世长辞。   九月初,回家看望刚动完手术的爸爸,发

  • 6 杨梅坑

  • liuhuashan
    深圳人才公园的网红景点 —— 流花山

  • 1 2020年10月拈花湾

  • 内核缺陷触发的NodePort服务63秒延迟问题
    现象 我们有一个新创建的TKE 1.3.0集群,使用基于Galaxy + Flannel(VXLAN模式)的容 ...
  • Galaxy学习笔记
    简介 Galaxy是TKEStack的一个网络组件,支持为TKE集群提供Overlay/Underlay容器网 ...
TOPLINKS
  • Zitahli's blue 91 people like this
  • 梦中的婚礼 64 people like this
  • 汪静好 61 people like this
  • 那年我一岁 36 people like this
  • 为了爱 28 people like this
  • 小绿彩 26 people like this
  • 彩虹姐姐的笑脸 24 people like this
  • 杨梅坑 6 people like this
  • 亚龙湾之旅 1 people like this
  • 汪昌博 people like this
  • 2013年11月香山 10 people like this
  • 2013年7月秦皇岛 6 people like this
  • 2013年6月蓟县盘山 5 people like this
  • 2013年2月梅花山 2 people like this
  • 2013年淮阴自贡迎春灯会 3 people like this
  • 2012年镇江金山游 1 people like this
  • 2012年徽杭古道 9 people like this
  • 2011年清明节后扬州行 1 people like this
  • 2008年十一云龙公园 5 people like this
  • 2008年之秋忆 7 people like this
  • 老照片 13 people like this
  • 火一样的六月 16 people like this
  • 发黄的相片 3 people like this
  • Cesium学习笔记 90 people like this
  • IntelliJ IDEA知识集锦 59 people like this
  • Bazel学习笔记 38 people like this
  • 基于Kurento搭建WebRTC服务器 38 people like this
  • PhoneGap学习笔记 32 people like this
  • NaCl学习笔记 32 people like this
  • 使用Oracle Java Mission Control监控JVM运行状态 29 people like this
  • Ceph学习笔记 27 people like this
  • 基于Calico的CNI 27 people like this
Tag Cloud
ActiveMQ AspectJ CDT Ceph Chrome CNI Command Cordova Coroutine CXF Cygwin DNS Docker eBPF Eclipse ExtJS F7 FAQ Groovy Hibernate HTTP IntelliJ IO编程 IPVS JacksonJSON JMS JSON JVM K8S kernel LB libvirt Linux知识 Linux编程 LOG Maven MinGW Mock Monitoring Multimedia MVC MySQL netfs Netty Nginx NIO Node.js NoSQL Oracle PDT PHP Redis RPC Scheduler ServiceMesh SNMP Spring SSL svn Tomcat TSDB Ubuntu WebGL WebRTC WebService WebSocket wxWidgets XDebug XML XPath XRM ZooKeeper 亚龙湾 单元测试 学习笔记 实时处理 并发编程 彩姐 性能剖析 性能调优 文本处理 新特性 架构模式 系统编程 网络编程 视频监控 设计模式 远程调试 配置文件 齐塔莉
Recent Comments
  • qg on Istio中的透明代理问题
  • heao on 基于本地gRPC的Go插件系统
  • 黄豆豆 on Ginkgo学习笔记
  • cloud on OpenStack学习笔记
  • 5dragoncon on Cilium学习笔记
  • Archeb on 重温iptables
  • C/C++编程:WebSocketpp(Linux + Clion + boostAsio) – 源码巴士 on 基于C/C++的WebSocket库
  • jerbin on eBPF学习笔记
  • point on Istio中的透明代理问题
  • G on Istio中的透明代理问题
  • 绿色记忆:Go语言单元测试和仿冒 on Ginkgo学习笔记
  • point on Istio中的透明代理问题
  • 【Maven】maven插件开发实战 – IT汇 on Maven插件开发
  • chenlx on eBPF学习笔记
  • Alex on eBPF学习笔记
  • CFC4N on eBPF学习笔记
  • 李运田 on 念爷爷
  • yongman on 记录一次KeyDB缓慢的定位过程
  • Alex on Istio中的透明代理问题
  • will on Istio中的透明代理问题
  • will on Istio中的透明代理问题
  • haolipeng on 基于本地gRPC的Go插件系统
  • 吴杰 on 基于C/C++的WebSocket库
©2005-2025 Gmem.cc | Powered by WordPress | 京ICP备18007345号-2