Kuberentes客户端编程
| 
					 1 2 3 4 5 6 7 8 9  | 
						import (     "k8s.io/client-go/tools/clientcmd"     "k8s.io/client-go/kubernetes"     metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"     "k8s.io/api/core/v1"     "k8s.io/apimachinery/pkg/labels"     "k8s.io/apimachinery/pkg/watch"     "k8s.io/apimachinery/pkg/api/errors" )  | 
					
| 
					 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20  | 
						var (     masterURL  string     kubeConfig string ) func main() {     flag.StringVar(&masterURL, "master-url", "", "URL of kubernetes master")     flag.StringVar(&kubeConfig, "kube-config", "", "Kubernetes configuration file location")     flag.Parse()     // 构建配置信息     cfg, err := clientcmd.BuildConfigFromFlags(masterURL, kubeConfig)     if err != nil {         log.Fatalf("Invalid arguments: %s", err.Error())     }     // 创建API集     clientset, err := kubernetes.NewForConfig(cfg)     if err != nil {         log.Fatalf("Failed to connect to api server: %s", err.Error())     } }  | 
					
在Pod中访问API Server,可以使用集群授予的ServiceAccount作为凭证。参考如下代码:
| 
					 1 2 3  | 
						"k8s.io/client-go/rest" cfg, err = rest.InClusterConfig() // 不在K8S内部调用此方法会报错  | 
					
| 
					 1 2 3 4  | 
						// 获取一个Deployment dep, _ := clientset.ExtensionsV1beta1().Deployments(namespace).Get("mysql", metav1.GetOptions{}) // 获取一个Pod pod, _ := clientset.CoreV1().Pods(namespace).Get("mysql-54fb6b686b-sv8ql", metav1.GetOptions{})  | 
					
通过标签选择器列出:
| 
					 1 2 3 4 5 6 7  | 
						// labels.Set是map[string]string的typedef,进行cast以调用其方法 labelset := labels.Set(dep.Spec.Selector.MatchLabels) // 使用标签选择器列出Deployment下的Pods pods, _ := clientset.CoreV1().Pods(namespace).List(metav1.ListOptions{ // String方法的返回值就是ParseSelector支持的格式,示意:application=mysql,tier=middleware LabelSelector: labelset.String(), })  | 
					
可以持续监控目标资源的变化:
| 
					 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20  | 
						podApi := clientset.CoreV1().Pods(namespace) watchApi, _ := podApi.Watch(metav1.ListOptions{ // 和list操作使用一个Options结构     // 被监控资源的选择器     LabelSelector: labelset.String(),     // 持续监控被监控资源的变化,包括add,update,remove事件     Watch: true,     // 持续监控进行的最大时间     TimeoutSeconds: &timeout,     // 持续监控进行的最大次数     Limit: 1024, }) var debuggee string for e := range watchApi.ResultChan() {     pod := e.Object.(*v1.Pod)     if e.Type == watch.Added {         debuggee = pod.Name     } else if pod.Name == debuggee && e.Type == watch.Modified && pod.Status.Phase == v1.PodRunning {         os.Exit(0)     } }   | 
					
下面是修改Deployment环境变量定义的例子:
| 
					 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17  | 
						// 修改K8S资源时一定要注意赋值引用而非拷贝 container := &dep.Spec.Template.Spec.Containers[0] debugEnvDefined := false for idx, _ := range container.Env {     envar := &container.Env[idx]     if envar.Name == debugVar {         debugEnvDefined = true         envar.Value = debugMode         break     } } if !debugEnvDefined {     container.Env = append(container.Env, v1.EnvVar{debugVar, debugMode, nil}) } json, _ := json.MarshalIndent(container.Env, "", "\t") fmt.Printf("Updated envrioment variables of container %v:\n%v", container.Name, string(json)) depApi.Update(dep) // 更新之后,Deployment管理的Pod会自动重新创建  | 
					
下面代码片断示意了如何创建一个完整的Deployment对象:
| 
					 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36  | 
						primaryDep = &appsv1.Deployment{         // 对象元数据部分     ObjectMeta: metav1.ObjectMeta{         Name:      primaryName,         Labels:    canaryDep.Labels,         Namespace: cd.Namespace,                 // 指向其OwnerReference,也就是管理此对象的对象,在这个例子中是cd,一个CRD         OwnerReferences: []metav1.OwnerReference{             *metav1.NewControllerRef(cd, schema.GroupVersionKind{                 Group:   flaggerv1.SchemeGroupVersion.Group,                 Version: flaggerv1.SchemeGroupVersion.Version,                 Kind:    flaggerv1.CanaryKind,             }),         },     },         // 对象规格部分     Spec: appsv1.DeploymentSpec{         ProgressDeadlineSeconds: canaryDep.Spec.ProgressDeadlineSeconds,         MinReadySeconds:         canaryDep.Spec.MinReadySeconds,         RevisionHistoryLimit:    canaryDep.Spec.RevisionHistoryLimit,         Replicas:                canaryDep.Spec.Replicas,         Strategy:                canaryDep.Spec.Strategy,         Selector: &metav1.LabelSelector{             MatchLabels: map[string]string{                 "app": primaryName,             },         },         Template: corev1.PodTemplateSpec{             ObjectMeta: metav1.ObjectMeta{                 Labels:      map[string]string{"app": primaryName},                 Annotations: annotations,             },             Spec: ...,         },     }, }  | 
					
| 
					 1 2 3 4 5 6  | 
						// 获得*v1beta1.Scale scale, _ := depApi.GetScale("mysql", metav1.GetOptions{}) replicas := scale.Spec.Replicas // 缩容为0 scale.Spec.Replicas = 0 depApi.UpdateScale("mysql", scale)  | 
					
| 
					 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48  | 
						package main import (     "fmt"     metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"     "k8s.io/apimachinery/pkg/runtime/schema"     "k8s.io/client-go/dynamic"     _ "k8s.io/client-go/plugin/pkg/client/auth"     "k8s.io/client-go/tools/clientcmd" ) var (     context = "" ) func main() {     //  连接到默认Kubernetes Context     config, err := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(         clientcmd.NewDefaultClientConfigLoadingRules(),         &clientcmd.ConfigOverrides{CurrentContext: context}).ClientConfig()     if err != nil {         panic(err.Error())     }     // 创建动态客户端     dynamicClient, err := dynamic.NewForConfig(config)     if err != nil {         panic(err.Error())     }     //  创建目标资源的GroupVersionResource GVR     virtualServiceGVR := schema.GroupVersionResource{         Group:    "networking.istio.io",         Version:  "v1alpha3",         Resource: "virtualservices",     }     //  获取默认命名空间的,对应GVR的资源列表     virtualServices, err := dynamicClient.Resource(virtualServiceGVR).Namespace("default").List(metav1.ListOptions{})     if err != nil {         panic(err.Error())     }         // 遍历     for index, virtualService := range virtualServices.Items {         fmt.Printf("VirtualService %d: %s\n", index+1, virtualService.GetName())     } }  | 
					
动态客户端的CRUD接口如下:
| 
					 1 2 3 4 5 6 7 8 9 10 11  | 
						type ResourceInterface interface {     Create(obj *unstructured.Unstructured, options metav1.CreateOptions, subresources ...string) (*unstructured.Unstructured, error)     Update(obj *unstructured.Unstructured, options metav1.UpdateOptions, subresources ...string) (*unstructured.Unstructured, error)     UpdateStatus(obj *unstructured.Unstructured, options metav1.UpdateOptions) (*unstructured.Unstructured, error)     Delete(name string, options *metav1.DeleteOptions, subresources ...string) error     DeleteCollection(options *metav1.DeleteOptions, listOptions metav1.ListOptions) error     Get(name string, options metav1.GetOptions, subresources ...string) (*unstructured.Unstructured, error)     List(opts metav1.ListOptions) (*unstructured.UnstructuredList, error)     Watch(opts metav1.ListOptions) (watch.Interface, error)     Patch(name string, pt types.PatchType, data []byte, options metav1.PatchOptions, subresources ...string) (*unstructured.Unstructured, error) }  | 
					
可以看到动态资源被封装在 unstructured.Unstructured结构中。你可以使用类似下面的代码将其转换为静态类型:
| 
					 1 2 3 4 5 6  | 
						import "k8s.io/apimachinery/pkg/runtime" var obj runtime.Unstructured pod = new(corev1api.Pod) runtime.DefaultUnstructuredConverter.FromUnstructured(obj.UnstructuredContent(), pod)   | 
					
发现客户端用于获取API Server能支持的API组、版本、资源列表:
| 
					 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16  | 
						discoveryClient := discovery.NewDiscoveryClientForConfigOrDie(mgr.GetConfig()) arrayOfApiList, err := discoveryClient.ServerResources() if err != nil {     panic(err) } r.apiVersionKind = make(map[string]struct{}, 0) for _, apiList := range arrayOfApiList {     for _, api := range apiList.APIResources {         // 是否命名空间内资源         if api.Namespaced {             continue         }         // GVK         apiVersionKind[fmt.Sprintf("%s/%s", apiList.GroupVersion, api.Kind)] = struct{}{}     } }  | 
					
某些控制器具有Status.ObservedGeneration字段,来表示控制器最后一次操作所针对的资源的版本。
Generation字段表示资源最后一次被修改的版本,用于乐观并发控制。
当Status.ObservedGeneration比Generation小的时候,说明控制器还没有应用最新的资源,应当等待控制器完成后,执行更进一步操作:
| 
					 1 2 3 4 5  | 
						if deployment.Generation <= deployment.Status.ObservedGeneration {     // do something }else{     return fmt.Errorf("waiting for rollout to finish: observed deployment generation less then desired generation") }  | 
					
| 
					 1 2 3  | 
						import ref "k8s.io/client-go/tools/reference" // 创建ObjectReference ref, err := ref.GetReference(recorder.scheme, object)  | 
					
| 
					 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30  | 
						func (i *impl) ExecInPod(namespace, podName, container string, streamOptions remotecommand.StreamOptions, cmd string, args ...string) error {     execReq := i.clientset.CoreV1().RESTClient().Post().         Resource("pods").         Name(podName).         Namespace(namespace).         SubResource("exec")     command := []string{cmd}     command = append(command, args...)     execReq.VersionedParams(&corev1.PodExecOptions{         Command:   command,         Container: container,         Stdin:     streamOptions.Stdin != nil,         Stdout:    streamOptions.Stdout != nil,         Stderr:    streamOptions.Stderr != nil,         TTY:       streamOptions.Tty,     }, scheme.ParameterCodec)     exec, err := remotecommand.NewSPDYExecutor(i.restConfig, "POST", execReq.URL())     if err != nil {         return fmt.Errorf("error while creating executor: %v", err)     }     err = exec.Stream(streamOptions)     if err != nil {         return fmt.Errorf("error in stream: %v", err)     } else {         return nil     } }  | 
					
表示集群中某时某处发生的一个事件:
| 
					 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46  | 
						type Event struct {     metav1.TypeMeta      metav1.ObjectMeta      // 此事件牵涉到的对象     InvolvedObject ObjectReference      // 简短的、机器可理解的,对象变为当前状态的原因     Reason string      // 人类可读的,对象变为当前状态的说明     Message string      // 报告此事件的K8S组件     Source EventSource      // 此事件首次被记录的时间     FirstTimestamp metav1.Time      // 此事件最后一次出现的时间     LastTimestamp metav1.Time      // 此事件发生的次数     Count int32      // 事件的类型Normal, Warning     Type string      // 事件第一次被观察的时间     EventTime metav1.MicroTime      // 此事件所属的事件序列,如果为单独事件则此字段为nil     Series *EventSeries      // 针对目标对象的什么操作被执行/失败     Action string      // 次要相关的对象     Related *ObjectReference      // 报告此事件的控制器     ReportingController string      // 报告此事件的控制器实例     ReportingInstance string  }  | 
					
此结构表示事件的来源,什么组件(例如kubelet,某个operator)、什么节点产生了此事件:
| 
					 1 2 3 4  | 
						type EventSource struct {     Component string     Host string }  | 
					
表示一组连续发生的事件的集合:
| 
					 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19  | 
						type EventSeries struct {     // 到最后一次心跳为止,事件发生的次数     Count int32     // 最后一次观察到事件的时间     LastObservedTime metav1.MicroTime     // 事件序列的状态     State EventSeriesState } type EventSeriesState string const (         // 正在进行     EventSeriesStateOngoing  EventSeriesState = "Ongoing"         // 完成     EventSeriesStateFinished EventSeriesState = "Finished"         // 未知     EventSeriesStateUnknown  EventSeriesState = "Unknown" )    | 
					
| 
					 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17  | 
						import     "k8s.io/client-go/tools/record" // 创建事件广播器 eventBroadcaster := record.NewBroadcaster() // 将从广播器接收到的事件发送给日志记录器,进行日志记录 eventBroadcaster.StartLogging(logger.Named("event-broadcaster").Debugf) // 将事件持久化,默认实现是存放到K8S,可以通过kubectl describe查看事件 eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{     Interface: kubeClient.CoreV1().Events(""), }) // 日志记录器,能够产生事件,并发送给广播器处理 eventRecorder := eventBroadcaster.NewRecorder(                 // 事件源,填写组件名称,例如flagger、kubelet         scheme.Scheme, corev1.EventSource{Component: controllerAgentName}) // 产生一个事件 c.eventRecorder.Event(r, corev1.EventTypeWarning, "Synced", fmt.Sprintf(template, args...))  | 
					
来自 k8s.io/apimachinery/pkg/util/clock的clock包,可以注入真实/仿冒的时钟到任何需要基于时间执行特定逻辑的代码中。
| 
					 1 2 3 4 5 6 7 8  | 
						type Clock interface {     Now() time.Time  // 返回当前时间     Since(time.Time) time.Duration  // 从指定时间点到现在,过了多久     After(time.Duration) <-chan time.Time // 在指定时间之后,通道可读     NewTimer(time.Duration) Timer // 返回一个定时器     Sleep(time.Duration) // 导致当前线程睡眠     NewTicker(time.Duration) Ticker // 返回一个重复定时器,每隔一段时间其通道可读 }  | 
					
创建一个时钟:
| 
					 1  | 
						c := clock.RealClock{}  | 
					
创建一个重复定时器:
| 
					 1  | 
						ticker := c.clock.NewTicker(updatePeriod)  | 
					
每过一定间隔执行逻辑:
| 
					 1 2 3 4 5 6 7  | 
						defer ticker.Stop() for {     select {     case <-ticker.C():         // ...     } }  | 
					
来自 k8s.io/apimachinery/pkg/util/wait的wait包,提供监听/轮询某个条件是否达成的功能。
立即开始周期性的尝试调用条件函数,直到此函数返回true、出错,后者到达超时:
| 
					 1 2 3 4 5 6  | 
						                   // 尝试间隔 尝试(总计)超时 func PollImmediate(interval, timeout time.Duration, condition ConditionFunc) error {} // 示例 wait.PollImmediate(npdo.APIServerWaitInterval, npdo.APIServerWaitTimeout, func() (done bool, err error) {     return false, nil })  | 
					
此函数被调用时,不经等待即立即开始第一次调用条件函数。
执行一个调用,如果错误是冲突(API Server乐观并发控制),则重试:
| 
					 1 2 3  | 
						func RetryOnConflict(backoff wait.Backoff, fn func() error) error {     return OnError(backoff, errors.IsConflict, fn) }  | 
					
有时候,更新资源会接收到错误:the object has been modified; please apply your changes to the latest version and try again。这是由于你提交的资源的resourceVersion过低导致 —— 在你获取资源修改、提交之前,资源已经被别人更新过。
需要注意:
- Scale、Status子资源的版本号,和主资源是一致的
 - 单次API调用可能导致多次resourceVersion变更
 
下面是一段示意代码,说明如何处理这种错误:
| 
					 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32  | 
						depApi := clientset.ExtensionsV1beta1().Deployments(namespace) // 第一次修改:缩容到0 scale, _ := depApi.GetScale(deployment, metav1.GetOptions{}) replicas := scale.Spec.Replicas scale.Spec.Replicas = 0 // 从调用的返回值中可以获取最新的资源版本 scale, _ = depApi.UpdateScale(deployment, scale) // 最低资源版本 minVer := scale.ResourceVersion // 第二次修改:变更Deployment规格、扩容到原副本数 for {     // 反复尝试直到成功     dep, err := depApi.Get(deployment, metav1.GetOptions{         // 要求获取的资源的版本必须大于等于指定的版本         ResourceVersion: minVer,     })     dep.Spec.Replicas = &replicas     _, err = depApi.Update(dep)     if err != nil {         serr := err.(*errors.StatusError)         if serr.ErrStatus.Code == 409 {             time.Sleep(time.Second)             minVer = dep.ResourceVersion             continue         }         log.Fatalf("Failed to update deployment %s/%s: %s", namespace, deployment, err.Error())     }     break }  | 
					
            
Leave a Reply