Kuberentes客户端编程
1 2 3 4 5 6 7 8 9 |
import ( "k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/kubernetes" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/watch" "k8s.io/apimachinery/pkg/api/errors" ) |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
var ( masterURL string kubeConfig string ) func main() { flag.StringVar(&masterURL, "master-url", "", "URL of kubernetes master") flag.StringVar(&kubeConfig, "kube-config", "", "Kubernetes configuration file location") flag.Parse() // 构建配置信息 cfg, err := clientcmd.BuildConfigFromFlags(masterURL, kubeConfig) if err != nil { log.Fatalf("Invalid arguments: %s", err.Error()) } // 创建API集 clientset, err := kubernetes.NewForConfig(cfg) if err != nil { log.Fatalf("Failed to connect to api server: %s", err.Error()) } } |
在Pod中访问API Server,可以使用集群授予的ServiceAccount作为凭证。参考如下代码:
1 2 3 |
"k8s.io/client-go/rest" cfg, err = rest.InClusterConfig() // 不在K8S内部调用此方法会报错 |
1 2 3 4 |
// 获取一个Deployment dep, _ := clientset.ExtensionsV1beta1().Deployments(namespace).Get("mysql", metav1.GetOptions{}) // 获取一个Pod pod, _ := clientset.CoreV1().Pods(namespace).Get("mysql-54fb6b686b-sv8ql", metav1.GetOptions{}) |
通过标签选择器列出:
1 2 3 4 5 6 7 |
// labels.Set是map[string]string的typedef,进行cast以调用其方法 labelset := labels.Set(dep.Spec.Selector.MatchLabels) // 使用标签选择器列出Deployment下的Pods pods, _ := clientset.CoreV1().Pods(namespace).List(metav1.ListOptions{ // String方法的返回值就是ParseSelector支持的格式,示意:application=mysql,tier=middleware LabelSelector: labelset.String(), }) |
可以持续监控目标资源的变化:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
podApi := clientset.CoreV1().Pods(namespace) watchApi, _ := podApi.Watch(metav1.ListOptions{ // 和list操作使用一个Options结构 // 被监控资源的选择器 LabelSelector: labelset.String(), // 持续监控被监控资源的变化,包括add,update,remove事件 Watch: true, // 持续监控进行的最大时间 TimeoutSeconds: &timeout, // 持续监控进行的最大次数 Limit: 1024, }) var debuggee string for e := range watchApi.ResultChan() { pod := e.Object.(*v1.Pod) if e.Type == watch.Added { debuggee = pod.Name } else if pod.Name == debuggee && e.Type == watch.Modified && pod.Status.Phase == v1.PodRunning { os.Exit(0) } } |
下面是修改Deployment环境变量定义的例子:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
// 修改K8S资源时一定要注意赋值引用而非拷贝 container := &dep.Spec.Template.Spec.Containers[0] debugEnvDefined := false for idx, _ := range container.Env { envar := &container.Env[idx] if envar.Name == debugVar { debugEnvDefined = true envar.Value = debugMode break } } if !debugEnvDefined { container.Env = append(container.Env, v1.EnvVar{debugVar, debugMode, nil}) } json, _ := json.MarshalIndent(container.Env, "", "\t") fmt.Printf("Updated envrioment variables of container %v:\n%v", container.Name, string(json)) depApi.Update(dep) // 更新之后,Deployment管理的Pod会自动重新创建 |
下面代码片断示意了如何创建一个完整的Deployment对象:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
primaryDep = &appsv1.Deployment{ // 对象元数据部分 ObjectMeta: metav1.ObjectMeta{ Name: primaryName, Labels: canaryDep.Labels, Namespace: cd.Namespace, // 指向其OwnerReference,也就是管理此对象的对象,在这个例子中是cd,一个CRD OwnerReferences: []metav1.OwnerReference{ *metav1.NewControllerRef(cd, schema.GroupVersionKind{ Group: flaggerv1.SchemeGroupVersion.Group, Version: flaggerv1.SchemeGroupVersion.Version, Kind: flaggerv1.CanaryKind, }), }, }, // 对象规格部分 Spec: appsv1.DeploymentSpec{ ProgressDeadlineSeconds: canaryDep.Spec.ProgressDeadlineSeconds, MinReadySeconds: canaryDep.Spec.MinReadySeconds, RevisionHistoryLimit: canaryDep.Spec.RevisionHistoryLimit, Replicas: canaryDep.Spec.Replicas, Strategy: canaryDep.Spec.Strategy, Selector: &metav1.LabelSelector{ MatchLabels: map[string]string{ "app": primaryName, }, }, Template: corev1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ Labels: map[string]string{"app": primaryName}, Annotations: annotations, }, Spec: ..., }, }, } |
1 2 3 4 5 6 |
// 获得*v1beta1.Scale scale, _ := depApi.GetScale("mysql", metav1.GetOptions{}) replicas := scale.Spec.Replicas // 缩容为0 scale.Spec.Replicas = 0 depApi.UpdateScale("mysql", scale) |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 |
package main import ( "fmt" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/dynamic" _ "k8s.io/client-go/plugin/pkg/client/auth" "k8s.io/client-go/tools/clientcmd" ) var ( context = "" ) func main() { // 连接到默认Kubernetes Context config, err := clientcmd.NewNonInteractiveDeferredLoadingClientConfig( clientcmd.NewDefaultClientConfigLoadingRules(), &clientcmd.ConfigOverrides{CurrentContext: context}).ClientConfig() if err != nil { panic(err.Error()) } // 创建动态客户端 dynamicClient, err := dynamic.NewForConfig(config) if err != nil { panic(err.Error()) } // 创建目标资源的GroupVersionResource GVR virtualServiceGVR := schema.GroupVersionResource{ Group: "networking.istio.io", Version: "v1alpha3", Resource: "virtualservices", } // 获取默认命名空间的,对应GVR的资源列表 virtualServices, err := dynamicClient.Resource(virtualServiceGVR).Namespace("default").List(metav1.ListOptions{}) if err != nil { panic(err.Error()) } // 遍历 for index, virtualService := range virtualServices.Items { fmt.Printf("VirtualService %d: %s\n", index+1, virtualService.GetName()) } } |
动态客户端的CRUD接口如下:
1 2 3 4 5 6 7 8 9 10 11 |
type ResourceInterface interface { Create(obj *unstructured.Unstructured, options metav1.CreateOptions, subresources ...string) (*unstructured.Unstructured, error) Update(obj *unstructured.Unstructured, options metav1.UpdateOptions, subresources ...string) (*unstructured.Unstructured, error) UpdateStatus(obj *unstructured.Unstructured, options metav1.UpdateOptions) (*unstructured.Unstructured, error) Delete(name string, options *metav1.DeleteOptions, subresources ...string) error DeleteCollection(options *metav1.DeleteOptions, listOptions metav1.ListOptions) error Get(name string, options metav1.GetOptions, subresources ...string) (*unstructured.Unstructured, error) List(opts metav1.ListOptions) (*unstructured.UnstructuredList, error) Watch(opts metav1.ListOptions) (watch.Interface, error) Patch(name string, pt types.PatchType, data []byte, options metav1.PatchOptions, subresources ...string) (*unstructured.Unstructured, error) } |
可以看到动态资源被封装在 unstructured.Unstructured结构中。你可以使用类似下面的代码将其转换为静态类型:
1 2 3 4 5 6 |
import "k8s.io/apimachinery/pkg/runtime" var obj runtime.Unstructured pod = new(corev1api.Pod) runtime.DefaultUnstructuredConverter.FromUnstructured(obj.UnstructuredContent(), pod) |
发现客户端用于获取API Server能支持的API组、版本、资源列表:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
discoveryClient := discovery.NewDiscoveryClientForConfigOrDie(mgr.GetConfig()) arrayOfApiList, err := discoveryClient.ServerResources() if err != nil { panic(err) } r.apiVersionKind = make(map[string]struct{}, 0) for _, apiList := range arrayOfApiList { for _, api := range apiList.APIResources { // 是否命名空间内资源 if api.Namespaced { continue } // GVK apiVersionKind[fmt.Sprintf("%s/%s", apiList.GroupVersion, api.Kind)] = struct{}{} } } |
某些控制器具有Status.ObservedGeneration字段,来表示控制器最后一次操作所针对的资源的版本。
Generation字段表示资源最后一次被修改的版本,用于乐观并发控制。
当Status.ObservedGeneration比Generation小的时候,说明控制器还没有应用最新的资源,应当等待控制器完成后,执行更进一步操作:
1 2 3 4 5 |
if deployment.Generation <= deployment.Status.ObservedGeneration { // do something }else{ return fmt.Errorf("waiting for rollout to finish: observed deployment generation less then desired generation") } |
1 2 3 |
import ref "k8s.io/client-go/tools/reference" // 创建ObjectReference ref, err := ref.GetReference(recorder.scheme, object) |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
func (i *impl) ExecInPod(namespace, podName, container string, streamOptions remotecommand.StreamOptions, cmd string, args ...string) error { execReq := i.clientset.CoreV1().RESTClient().Post(). Resource("pods"). Name(podName). Namespace(namespace). SubResource("exec") command := []string{cmd} command = append(command, args...) execReq.VersionedParams(&corev1.PodExecOptions{ Command: command, Container: container, Stdin: streamOptions.Stdin != nil, Stdout: streamOptions.Stdout != nil, Stderr: streamOptions.Stderr != nil, TTY: streamOptions.Tty, }, scheme.ParameterCodec) exec, err := remotecommand.NewSPDYExecutor(i.restConfig, "POST", execReq.URL()) if err != nil { return fmt.Errorf("error while creating executor: %v", err) } err = exec.Stream(streamOptions) if err != nil { return fmt.Errorf("error in stream: %v", err) } else { return nil } } |
表示集群中某时某处发生的一个事件:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 |
type Event struct { metav1.TypeMeta metav1.ObjectMeta // 此事件牵涉到的对象 InvolvedObject ObjectReference // 简短的、机器可理解的,对象变为当前状态的原因 Reason string // 人类可读的,对象变为当前状态的说明 Message string // 报告此事件的K8S组件 Source EventSource // 此事件首次被记录的时间 FirstTimestamp metav1.Time // 此事件最后一次出现的时间 LastTimestamp metav1.Time // 此事件发生的次数 Count int32 // 事件的类型Normal, Warning Type string // 事件第一次被观察的时间 EventTime metav1.MicroTime // 此事件所属的事件序列,如果为单独事件则此字段为nil Series *EventSeries // 针对目标对象的什么操作被执行/失败 Action string // 次要相关的对象 Related *ObjectReference // 报告此事件的控制器 ReportingController string // 报告此事件的控制器实例 ReportingInstance string } |
此结构表示事件的来源,什么组件(例如kubelet,某个operator)、什么节点产生了此事件:
1 2 3 4 |
type EventSource struct { Component string Host string } |
表示一组连续发生的事件的集合:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
type EventSeries struct { // 到最后一次心跳为止,事件发生的次数 Count int32 // 最后一次观察到事件的时间 LastObservedTime metav1.MicroTime // 事件序列的状态 State EventSeriesState } type EventSeriesState string const ( // 正在进行 EventSeriesStateOngoing EventSeriesState = "Ongoing" // 完成 EventSeriesStateFinished EventSeriesState = "Finished" // 未知 EventSeriesStateUnknown EventSeriesState = "Unknown" ) |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
import "k8s.io/client-go/tools/record" // 创建事件广播器 eventBroadcaster := record.NewBroadcaster() // 将从广播器接收到的事件发送给日志记录器,进行日志记录 eventBroadcaster.StartLogging(logger.Named("event-broadcaster").Debugf) // 将事件持久化,默认实现是存放到K8S,可以通过kubectl describe查看事件 eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{ Interface: kubeClient.CoreV1().Events(""), }) // 日志记录器,能够产生事件,并发送给广播器处理 eventRecorder := eventBroadcaster.NewRecorder( // 事件源,填写组件名称,例如flagger、kubelet scheme.Scheme, corev1.EventSource{Component: controllerAgentName}) // 产生一个事件 c.eventRecorder.Event(r, corev1.EventTypeWarning, "Synced", fmt.Sprintf(template, args...)) |
来自 k8s.io/apimachinery/pkg/util/clock的clock包,可以注入真实/仿冒的时钟到任何需要基于时间执行特定逻辑的代码中。
1 2 3 4 5 6 7 8 |
type Clock interface { Now() time.Time // 返回当前时间 Since(time.Time) time.Duration // 从指定时间点到现在,过了多久 After(time.Duration) <-chan time.Time // 在指定时间之后,通道可读 NewTimer(time.Duration) Timer // 返回一个定时器 Sleep(time.Duration) // 导致当前线程睡眠 NewTicker(time.Duration) Ticker // 返回一个重复定时器,每隔一段时间其通道可读 } |
创建一个时钟:
1 |
c := clock.RealClock{} |
创建一个重复定时器:
1 |
ticker := c.clock.NewTicker(updatePeriod) |
每过一定间隔执行逻辑:
1 2 3 4 5 6 7 |
defer ticker.Stop() for { select { case <-ticker.C(): // ... } } |
来自 k8s.io/apimachinery/pkg/util/wait的wait包,提供监听/轮询某个条件是否达成的功能。
立即开始周期性的尝试调用条件函数,直到此函数返回true、出错,后者到达超时:
1 2 3 4 5 6 |
// 尝试间隔 尝试(总计)超时 func PollImmediate(interval, timeout time.Duration, condition ConditionFunc) error {} // 示例 wait.PollImmediate(npdo.APIServerWaitInterval, npdo.APIServerWaitTimeout, func() (done bool, err error) { return false, nil }) |
此函数被调用时,不经等待即立即开始第一次调用条件函数。
执行一个调用,如果错误是冲突(API Server乐观并发控制),则重试:
1 2 3 |
func RetryOnConflict(backoff wait.Backoff, fn func() error) error { return OnError(backoff, errors.IsConflict, fn) } |
有时候,更新资源会接收到错误:the object has been modified; please apply your changes to the latest version and try again。这是由于你提交的资源的resourceVersion过低导致 —— 在你获取资源修改、提交之前,资源已经被别人更新过。
需要注意:
- Scale、Status子资源的版本号,和主资源是一致的
- 单次API调用可能导致多次resourceVersion变更
下面是一段示意代码,说明如何处理这种错误:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
depApi := clientset.ExtensionsV1beta1().Deployments(namespace) // 第一次修改:缩容到0 scale, _ := depApi.GetScale(deployment, metav1.GetOptions{}) replicas := scale.Spec.Replicas scale.Spec.Replicas = 0 // 从调用的返回值中可以获取最新的资源版本 scale, _ = depApi.UpdateScale(deployment, scale) // 最低资源版本 minVer := scale.ResourceVersion // 第二次修改:变更Deployment规格、扩容到原副本数 for { // 反复尝试直到成功 dep, err := depApi.Get(deployment, metav1.GetOptions{ // 要求获取的资源的版本必须大于等于指定的版本 ResourceVersion: minVer, }) dep.Spec.Replicas = &replicas _, err = depApi.Update(dep) if err != nil { serr := err.(*errors.StatusError) if serr.ErrStatus.Code == 409 { time.Sleep(time.Second) minVer = dep.ResourceVersion continue } log.Fatalf("Failed to update deployment %s/%s: %s", namespace, deployment, err.Error()) } break } |
Leave a Reply