Galaxy学习笔记
Galaxy是TKEStack的一个网络组件,支持为TKE集群提供Overlay/Underlay容器网络。Galaxy的一个特性是能够提供浮动IP(弹性IP) —— 即使Pod因为节点宕机而漂移到其它节点,其IP地址也能够保持不变。
Galaxy提供四类网络,支持为每个工作负载单独配置网络模式。
默认模式,基于Flannel的VXLAN或者Host Gateway(路由)方案。同节点容器通信不走网桥,报文直接利用主机路由转发,跨节点容器通信利用VXLAN协议封装或者直接路由,节点间路由记录在Etcd中。优点:简单可靠,性能不错,支持网络策略。
tke-installer默认安装的TKEStack会自动配置Galaxy为Overlay模式,CNI配置:
1 2 3 4 5 |
{ "type": "galaxy-sdn", "capabilities": {"portMappings": true}, "cniVersion": "0.2.0" } |
在该模式下:
- Flannel在每个Kubelet节点上分配一个子网,并将其保存在etcd和本地路径 /run/flannel/subnet.env
- Kubelet调用Galaxy的CNI插件galaxy-sdn,该插件会通过UDS调用本机的Galaxy进程
- Galaxy进程则又调用Flannel CNI,解析/run/flannel/subnet.env中的子网信息
- Flannel CNI会调用:
- Bridge CNI或Veth CNI来为POD配置网络
- 调用host-lo
架构图:
该模式下,容器IP由宿主机网络提供,容器与宿主机可以直接路由,性能更好。支持基于Linux Bridge/MacVlan/IPVlan和SRIOV的容器-宿主机二层联通,可以根据业务场景和硬件环境,具体选择使用哪种网桥。
要使用Galaxy Underlay网络,需要启用Galaxy-ipam组件,该组件为Pod分配IP,浮动IP的能力也由它提供。
利用K8S的hostPort配置,将容器端口映射到宿主机端口。如果不指定hostPort,Galaxy进行随机映射。
利用K8S的HostNetwork模式,直接使用物理网络。
Galaxy有三类组件构成。
以DaemonSet方式运行在每个节点上,通过调用各种CNI插件来配置容器网络。
Galaxy将实际的创建CNI的工作委托给其它CNI插件,也就是说它扮演一个装饰器的角色。Galaxy支持任何标准CNI插件,它也内置了若干CNI插件。
是一个K8S Scheduler插件。kube-scheduler通过HTTP调用Galaxy-ipam,实现浮动IP的配置和管理。
因为仅仅在重新调度Pod的时候才会面临IP地址变化的问题,因此这个组件实现为Scheduler插件就很自然了。
Galaxy支持任何标准的CNI插件,你可以将其用作一个CNI框架,就像multus-cni那样。
Galaxy还包含了一系列内置的CNI插件:
插件 | 说明 |
SDN CNI | 使用Overlay网络时,此插件是CNI的入口。该插件会通过UDS调用Galaxy守护进程,转发Kubelet提供的所有CNI参数 |
Veth CNI | 用于Overlay网络中,创建一个VETH对来连接容器和宿主机命名空间。该插件从ipam插件得到Pod的IP |
underlay-veth CNI | 用于Underlay网络,创建VETH对来连接容器和宿主机命名空间,该插件不会创建网桥,而是使用宿主机路由规则来进行封包转发 |
Vlan CNI |
用于Underlay网络,创建一个VETH对,连接容器和宿主机上的bridge/macvlan/ipvlan设备 此插件支持为Pod配置VLAN,它能够从CNI参数,例如ipinfos=[{"ip":"192.168.0.68/26","vlan":2,"gateway":"192.168.0.65"}] ,或者ipam CNI插件的结果中获得IP地址 |
SRIOV CNI | 用于Underlay网络,利用以太网服务器适配器(Ethernet Server Adapter)的SR-IOV,能够创建VF设备并将其放入容器的网络命名空间 |
TKE route ENI CNI | 用于腾讯云 |
1 2 3 4 5 6 7 8 |
go get -d tkestack.io/galaxy cd $GOPATH/src/tkestack.io/galaxy # 构建所有二进制文件 make # 构建指定二进制文件 make BINS="galxy galxy-ipam" make BINS="galxy-ipam" |
1 2 3 4 5 6 7 8 |
# 构建所有镜像 make image # 构建指定镜像 make image BINS="galxy-ipam" # 为指定体系结构构建 make image.multiarch PLATFORMS="linux_arm64" |
清单文件可以到GitHub下载,默认内容如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 |
--- apiVersion: rbac.authorization.k8s.io/v1 # kubernetes versions before 1.8.0 should use rbac.authorization.k8s.io/v1beta1 kind: ClusterRole metadata: name: galaxy rules: - apiGroups: [""] resources: - pods - namespaces - nodes - pods/binding verbs: ["list", "watch", "get", "patch", "create", "update"] - apiGroups: ["apps", "extensions"] resources: - statefulsets - deployments verbs: ["list", "watch"] - apiGroups: [""] resources: - configmaps - endpoints - events verbs: ["get", "list", "watch", "update", "create", "patch"] - apiGroups: ["galaxy.k8s.io"] resources: - pools - floatingips verbs: ["get", "list", "watch", "update", "create", "patch", "delete"] - apiGroups: ["apiextensions.k8s.io"] resources: - customresourcedefinitions verbs: - "*" - apiGroups: ["networking.k8s.io"] resources: - networkpolicies verbs: ["get", "list", "watch"] --- apiVersion: v1 kind: ServiceAccount metadata: name: galaxy namespace: kube-system --- apiVersion: rbac.authorization.k8s.io/v1 # kubernetes versions before 1.8.0 should use rbac.authorization.k8s.io/v1beta1 kind: ClusterRoleBinding metadata: name: galaxy roleRef: apiGroup: rbac.authorization.k8s.io kind: ClusterRole name: galaxy subjects: - kind: ServiceAccount name: galaxy namespace: kube-system --- apiVersion: apps/v1 kind: DaemonSet metadata: labels: app: galaxy name: galaxy namespace: kube-system spec: selector: matchLabels: app: galaxy template: metadata: labels: app: galaxy spec: priorityClassName: system-node-critical serviceAccountName: galaxy hostNetwork: true hostPID: true containers: - image: tkestack/galaxy:v1.0.7 command: ["/bin/sh"] # 入口点脚本: # 拷贝来自ConfigMap的Galaxy配置到宿主机 # cp -p /etc/galaxy/cni/00-galaxy.conf /etc/cni/net.d/; # 拷贝CNI插件二进制文件到宿主机 # cp -p /opt/cni/galaxy/bin/galaxy-sdn /opt/cni/galaxy/bin/loopback /opt/cni/bin/; # 启动Galaxy守护进程 腾讯云中运行要这个参数 # /usr/bin/galaxy --logtostderr=true --v=3 --route-eni # qcloud galaxy should run with --route-eni args: ["-c", "cp -p /etc/galaxy/cni/00-galaxy.conf /etc/cni/net.d/; cp -p /opt/cni/galaxy/bin/galaxy-sdn /opt/cni/galaxy/bin/loopback /opt/cni/bin/; /usr/bin/galaxy --logtostderr=true --v=3 --route-eni"] # private-cloud should run without --route-eni # args: ["-c", "cp -p /etc/galaxy/cni/00-galaxy.conf /etc/cni/net.d/; cp -p /opt/cni/galaxy/bin/galaxy-sdn /opt/cni/galaxy/bin/loopback /opt/cni/bin/; /usr/bin/galaxy --logtostderr=true --v=3"] imagePullPolicy: Always env: - name: MY_NODE_NAME valueFrom: fieldRef: fieldPath: spec.nodeName - name: DOCKER_HOST value: unix:///host/run/docker.sock name: galaxy resources: requests: cpu: 100m memory: 200Mi securityContext: privileged: true volumeMounts: - name: galaxy-run mountPath: /var/run/galaxy/ - name: flannel-run mountPath: /run/flannel - name: galaxy-etc mountPath: /etc/galaxy - name: cni-config mountPath: /etc/cni/net.d/ - name: cni-bin mountPath: /opt/cni/bin - name: cni-etc mountPath: /etc/galaxy/cni - name: cni-state mountPath: /var/lib/cni - name: docker-sock mountPath: /host/run/ - name: tz-config mountPath: /etc/localtime terminationGracePeriodSeconds: 30 tolerations: - operator: Exists volumes: - name: galaxy-run hostPath: path: /var/run/galaxy - name: flannel-run hostPath: path: /run/flannel - configMap: defaultMode: 420 name: galaxy-etc name: galaxy-etc - name: cni-config hostPath: path: /etc/cni/net.d/ - name: cni-bin hostPath: path: /opt/cni/bin - name: cni-state hostPath: path: /var/lib/cni - configMap: defaultMode: 420 name: cni-etc name: cni-etc - name: docker-sock # in case of docker restart, /run/docker.sock may change, we have to mount the /run directory hostPath: path: /run/ - name: tz-config hostPath: path: /etc/localtime --- apiVersion: v1 kind: ConfigMap metadata: name: galaxy-etc namespace: kube-system data: # Galaxy配置文件 # update network card name in "galaxy-k8s-vlan" and "galaxy-k8s-sriov" if necessary # update vf_num in "galaxy-k8s-sriov" according to demand # update ENIIPNetwork to tke-route-eni if running on qcloud galaxy.json: | { "NetworkConf":[ {"name":"tke-route-eni","type":"tke-route-eni","eni":"eth1","routeTable":1}, {"name":"galaxy-flannel","type":"galaxy-flannel", "delegate":{"type":"galaxy-veth"},"subnetFile":"/run/flannel/subnet.env"}, {"name":"galaxy-k8s-vlan","type":"galaxy-k8s-vlan", "device":"eth1", "default_bridge_name": "br0"}, {"name":"galaxy-k8s-sriov","type": "galaxy-k8s-sriov", "device": "eth1", "vf_num": 10} ], "DefaultNetworks": ["galaxy-flannel"], "ENIIPNetwork": "galaxy-k8s-vlan" } --- apiVersion: v1 kind: ConfigMap metadata: name: cni-etc namespace: kube-system data: # CNI网络配置 00-galaxy.conf: | { "name": "galaxy-sdn", "type": "galaxy-sdn", "capabilities": {"portMappings": true}, "cniVersion": "0.2.0" } |
根据实际运行环境、使用的网络模式,需要进行调整。
需要保证命令行标记: --network-plugin=cni --cni-bin-dir=/opt/cni/bin/,并重启Kubelet。
这种网络模式依赖于Flannel,参考Flannel学习笔记。
你需要修改上述Galaxy清单中的galaxy-etc,来配置默认网络(DefaultNetworks):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 |
{ // 所有支持的网络模式的配置 "NetworkConf":[ { "name":"tke-route-eni", // 如果name为空,Galaxy假设它和type相同 "type":"tke-route-eni", "eni":"eth1", "routeTable":1 }, { "name":"galaxy-flannel", // 这个插件就是flannel的CNI,只是被改了名字 "type":"galaxy-flannel", "delegate":{ "type":"galaxy-veth" }, "subnetFile":"/run/flannel/subnet.env" }, { "name":"galaxy-k8s-vlan", "type":"galaxy-k8s-vlan", "device":"eth1", "default_bridge_name":"br0" }, { "name":"galaxy-k8s-sriov", "type":"galaxy-k8s-sriov", "device":"eth1", "vf_num":10 }, { "name":"galaxy-underlay-veth", "type":"galaxy-underlay-veth", "device":"eth1" } ], // Pod默认使用这个网络,注意可以加入多个网络 "DefaultNetworks":[ "galaxy-flannel" // 使用基于Flannel的Overlay网络 ], // 如果Pod需要腾讯云弹性网卡(ENI)而且没有配置k8s.v1.cni.cncf.io/networks注解,则 // 默认使用此网络。这个配置可以避免需要为所有需要Underlay网络的Pod添加注解 "ENIIPNetwork":"galaxy-k8s-vlan" } |
可以为Pod指定注解:
1 |
k8s.v1.cni.cncf.io/networks: galaxy-flannel,galaxy-k8s-sriov |
来提示Galaxy应该为它配置哪些网络。
Galaxy可以和其它CNI插件同时存在。需要注意的一点是,--network-conf-dir目录下其它插件的配置文件,其文件名的字典排序不应该比00-galaxy.conf更小,否则Kubelet会在调用Galaxy CNI插件之前,调用其它CNI插件,这可能不符合预期。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
--alsologtostderr # 记录日志到文件,同时记录到标准错误 --bridge-nf-call-iptables # 是否配置 bridge-nf-call-iptables,启用/禁用对网桥转发的封包被iptables规则过滤,默认true --cni-paths stringSlice # 除了从kubelet接收的,额外的CNI路径,默认/opt/cni/galaxy/bin --flannel-allocated-ip-dir string # Flannel CNI插件在何处存放分配的IP地址,默认/var/lib/cni/networks --flannel-gc-interval duration # 执行Flannel网络垃圾回收的间隔,默认10s --gc-dirs string # 清理哪些目录,这些目录中的文件名包含容器ID # 默认 /var/lib/cni/flannel,/var/lib/cni/galaxy,/var/lib/cni/galaxy/port --hostname-override string # 覆盖kubelet hostname,如果指定该参数,则Galaxy使用它从API Server得到节点对象 --ip-forward # 是否启用IP转发 --json-config-path string # Galaxy配置文件路径,默认/etc/galaxy/galaxy.json --kubeconfig string # Kubelet配置文件 --log-backtrace-at traceLocation # 如果日志在file:N打印,打印栈追踪。默认default :0 --log-dir string # 日志输出目录 --log-flush-frequency duration # 日志刷出间隔,默认5s --logtostderr # 输出到标准错误而非文件 --master string # API Server的地址和端口 --network-conf-dir string # 额外的CNI网络配置文件。默认/etc/cni/net.d/ --network-policy # 启用网络策略支持 --route-eni # 是否启用腾讯云route-eni --stderrthreshold severity # 以上级别的日志打印到标准错误。默认2 -v, --v Level # 日志冗长级别 |
Underlay网络必须配合Galaxy-ipam使用,Galaxy-ipam负责为Pod分配或释放IP地址:
- 你需要规划容器网络中使用的Underlay IP范围(注意和物理网路上其它节点的IP冲突),并且配置到ConfigMap floatingip-config中
- 调度Pod时,kube-scheduler会在filter/priority/bind方法中调用Galaxy-ipam
- Galaxy-ipam检查Pod是否配置了Reserved IP,如果是,则Galaxy-ipam仅将此IP所在的可用子网的节点标记为有效节点,否则所有节点都将被标记为有效节点。在Pod绑定IP期间,Galaxy-ipam分配一个IP并将其写入到Pod annotations中
- Galaxy从Pod的注解中获得IP,并将其作为参数传递给CNI,通过CNI配置Pod IP
Galaxy的浮动IP能力由Galaxy-ipam提供,后者是一个Kubernetes Scheudler Extender,K8S的调度器会调用Galaxy-ipam,影响其filtering/binding的过程。浮动IP必须配合Underlay网络使用。
整体工作流图如下:
清单文件可以到GitHub下载,默认内容如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 |
apiVersion: v1 kind: Service metadata: name: galaxy-ipam namespace: kube-system labels: app: galaxy-ipam spec: type: NodePort ports: - name: scheduler-port port: 9040 targetPort: 9040 nodePort: 32760 protocol: TCP - name: api-port port: 9041 targetPort: 9041 nodePort: 32761 protocol: TCP selector: app: galaxy-ipam --- apiVersion: rbac.authorization.k8s.io/v1 # kubernetes versions before 1.8.0 should use rbac.authorization.k8s.io/v1beta1 kind: ClusterRole metadata: name: galaxy-ipam rules: - apiGroups: [""] resources: - pods - namespaces - nodes - pods/binding verbs: ["list", "watch", "get", "patch", "create"] - apiGroups: ["apps", "extensions"] resources: - statefulsets - deployments verbs: ["list", "watch"] - apiGroups: [""] resources: - configmaps - endpoints - events verbs: ["get", "list", "watch", "update", "create", "patch"] - apiGroups: ["galaxy.k8s.io"] resources: - pools - floatingips verbs: ["get", "list", "watch", "update", "create", "patch", "delete"] - apiGroups: ["apiextensions.k8s.io"] resources: - customresourcedefinitions verbs: - "*" - apiGroups: ["apps.tkestack.io"] resources: - tapps verbs: ["list", "watch"] --- apiVersion: v1 kind: ServiceAccount metadata: name: galaxy-ipam namespace: kube-system --- apiVersion: rbac.authorization.k8s.io/v1 # kubernetes versions before 1.8.0 should use rbac.authorization.k8s.io/v1beta1 kind: ClusterRoleBinding metadata: name: galaxy-ipam roleRef: apiGroup: rbac.authorization.k8s.io kind: ClusterRole name: galaxy-ipam subjects: - kind: ServiceAccount name: galaxy-ipam namespace: kube-system --- apiVersion: apps/v1 kind: Deployment metadata: labels: app: galaxy-ipam name: galaxy-ipam namespace: kube-system spec: replicas: 1 selector: matchLabels: app: galaxy-ipam template: metadata: labels: app: galaxy-ipam spec: priorityClassName: system-cluster-critical affinity: podAntiAffinity: requiredDuringSchedulingIgnoredDuringExecution: - labelSelector: matchExpressions: - key: app operator: In values: - galaxy-ipam topologyKey: "kubernetes.io/hostname" serviceAccountName: galaxy-ipam hostNetwork: true dnsPolicy: ClusterFirstWithHostNet containers: - image: tkestack/galaxy-ipam:v1.0.7 args: - --logtostderr=true - --profiling - --v=3 - --config=/etc/galaxy/galaxy-ipam.json # 这个服务暴露一个端口,供Scheduler调用 - --port=9040 - --api-port=9041 - --leader-elect command: - /usr/bin/galaxy-ipam ports: - containerPort: 9040 - containerPort: 9041 imagePullPolicy: Always name: galaxy-ipam resources: requests: cpu: 100m memory: 200Mi volumeMounts: - name: kube-config mountPath: /etc/kubernetes/ - name: galaxy-ipam-log mountPath: /data/galaxy-ipam/logs - name: galaxy-ipam-etc mountPath: /etc/galaxy - name: tz-config mountPath: /etc/localtime terminationGracePeriodSeconds: 30 tolerations: - effect: NoSchedule key: node-role.kubernetes.io/master operator: Exists volumes: - name: kube-config hostPath: path: /etc/kubernetes/ - name: galaxy-ipam-log emptyDir: {} - configMap: defaultMode: 420 name: galaxy-ipam-etc name: galaxy-ipam-etc - name: tz-config hostPath: path: /etc/localtime --- apiVersion: v1 kind: ConfigMap metadata: name: galaxy-ipam-etc namespace: kube-system data: # 配置文件 galaxy-ipam.json: | { "schedule_plugin": { # 如果不是使用腾讯云的弹性网卡(ENI),去掉这一行 "cloudProviderGrpcAddr": "127.0.0.2:80" } } |
你需要为K8S调度器配置调度策略,此策略现在可以放在ConfigMap中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
cat <<EOF | kubectl create -f - apiVersion: v1 kind: ConfigMap metadata: name: scheduler-policy namespace: kube-system data: policy.cfg: | { "kind": "Policy", "apiVersion": "v1", "extenders": [ { // 通过HTTP调用扩展 "urlPrefix": "http://127.0.0.1:9040/v1", "httpTimeout": 10000000000, "filterVerb": "filter", "BindVerb": "bind", "weight": 1, "enableHttps": false, "managedResources": [ { "name": "tke.cloud.tencent.com/eni-ip", "ignoredByScheduler": false } ] } ] } EOF |
然后为调度器添加命令行标记: --policy-configmap=scheduler-policy即可。
运行在裸金属环境下时,可以使用如下配置:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
kind: ConfigMap apiVersion: v1 metadata: name: floatingip-config namespace: kube-system data: floatingips: | [ { # 节点的CIDR,这个网络范围内的节点可以运行具有浮动IP的Pod "nodeSubnets": ["10.0.0.0/16"], # 可以分配给Pod的IP地址范围 "ips": ["10.0.70.2~10.0.70.241"], # Pod IP子网信息 "subnet":"10.0.70.0/24", # Pod IP网关信息 "gateway":"10.0.70.1", # Pod IP所属VLAN,如果Pod IP和Node IP不在同一VLAN,需要设置该字段,同时 # 确保节点所连接的交换机时一个trunk port "vlan": 1024 } ] |
一个nodeSubnet可以对应多个Pod Subnet,例如:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
// 如果Pod运行在10.49.28.0/26,那么它可以具有10.0.80.2~10.0.80.4或者10.0.81.2~10.0.81.4的IP地址 // 如果Pod运行在10.49.29.0/24,则只能具有10.0.80.0/24的IP地址 [{ "nodeSubnets": ["10.49.28.0/26", "10.49.29.0/24"], "ips": ["10.0.80.2~10.0.80.4"], "subnet": "10.0.80.0/24", "gateway": "10.0.80.1" }, { "nodeSubnets": ["10.49.28.0/26"], "ips": ["10.0.81.2~10.0.81.4"], "subnet": "10.0.81.0/24", "gateway": "10.0.81.1", "vlan": 3 }] |
只要可分配的IP地址范围不重叠,多个nodeSubnet可以共享同一个Pod Subnet:
1 2 3 4 5 6 7 8 9 10 11 12 13 |
[{ "routableSubnet": "10.180.1.2/32", "ips": ["10.180.154.2~10.180.154.3"], "subnet": "10.180.154.0/24", "gateway": "10.180.154.1", "vlan": 3 }, { "routableSubnet": "10.180.1.3/32", "ips": ["10.180.154.7~10.180.154.8"], "subnet": "10.180.154.0/24", "gateway": "10.180.154.1", "vlan": 3 }] |
要保留一个IP不被分配,可以使用下面的FloatingIP资源:
1 2 3 4 5 6 7 8 9 10 11 12 13 |
apiVersion: galaxy.k8s.io/v1alpha1 kind: FloatingIP metadata: # 名字是需要保留的IP name: 10.0.0.1 labels: # 从1.0.8下面这个标签不再需要 ipType: internalIP # 这个标签必须保留 reserved: this-is-not-for-pods spec: key: pool__reserved-for-node_ policy: 2 |
目前浮动IP仅仅支持Deployment、StatefulSet产生的工作负载。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
apiVersion: apps/v1 kind: Deployment ... spec: ... template: metadata: annotations: # 如果默认网络不是galaxy-k8s-vlan则必须添加下面的注解 k8s.v1.cni.cncf.io/networks: galaxy-k8s-vlan # IP释放策略: # 为空/不指定:Pod一旦停止,就释放IP # immutable:仅仅在删除、缩容Deployment / StatefulSet的情况下才释放IP # never:即使Deployment / StatefulSet被删除,也不会释放IP。后续的同名Deployment # /StatefulSet会重用已分配的IP k8s.v1.cni.galaxy.io/release-policy: immutable creationTimestamp: null labels: k8s-app: nnn qcloud-app: nnn spec: containers: - image: nginx imagePullPolicy: Always name: nnn resources: limits: cpu: 500m memory: 1Gi # 扩展的资源限制,在某个容器中配置即可 tke.cloud.tencent.com/eni-ip: "1" requests: cpu: 250m memory: 256Mi tke.cloud.tencent.com/eni-ip: "1" |
生成的Pod中,通过注解 k8s.v1.cni.galaxy.io/args指明了它的IP地址等信息。CNI插件也是基于这些信息为容器网络命名空间设置IP地址、路由的:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
apiVersion: v1 kind: Pod metadata: name: nnn-7df5984746-58hjm annotations: k8s.v1.cni.cncf.io/networks: galaxy-k8s-vlan k8s.v1.cni.galaxy.io/args: '{"common":{"ipinfos":[{"ip":"192.168.64.202/24", "vlan":0,"gateway":"192.168.64.1","routable_subnet":"172.21.64.0/20"}]}}' k8s.v1.cni.galaxy.io/release-policy: immutable ... spec: ... status: ... hostIP: 172.21.64.15 phase: Running podIP: 192.168.64.202 podIPs: - ip: 192.168.64.202 |
为Deployment设置注解 tke.cloud.tencent.com/eni-ip-pool,可以让多个Deployment共享一个IP池。
使用IP池的情况下,默认的IP释放策略为never,除非通过注解 k8s.v1.cni.galaxy.io/release-policy指定其它策略。
默认情况下,IP池的大小随着Deployment/StatefulSet的副本数量的增加而增大,你可以设置固定尺寸的IP池:
1 2 3 4 5 |
apiVersion: galaxy.k8s.io/v1alpha1 kind: Pool metadata: name: example-pool size: 4 |
通过HTTP接口创建IP池的时候,可以设置preAllocateIP=true来为池预先分配IP,通过kubectl创建CR时无法实现预分配。
该CR保存了浮动IP,及其绑定的工作负载信息:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
apiVersion: galaxy.k8s.io/v1alpha1 kind: FloatingIP metadata: creationTimestamp: "2020-03-04T08:28:15Z" generation: 1 labels: ipType: internalIP # IP地址 name: 192.168.64.202 resourceVersion: "2744910" selfLink: /apis/galaxy.k8s.io/v1alpha1/floatingips/192.168.64.202 uid: b5d55f27-4548-44c7-b8ad-570814b55026 spec: attribute: '{"NodeName":"172.21.64.15"}' # 绑定的工作负载 key: dp_default_nnn_nnn-7df5984746-58hjm policy: 1 subnet: 172.21.64.0/20 updateTime: "2020-03-04T08:28:15Z" |
Galaxy-ipam提供了基于Swagger 2.0的API,为galaxy-ipam提供命令行选项 --swagger,则它能够在下面的URL展示此API:
http://${galaxy-ipam-ip}:9041/apidocs.json/v1
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
// 查询分配给default命名空间中的StatefulSet sts的IP地址 // curl 'http://192.168.30.7:9041/v1/ip?appName=sts&appType=statefulsets&namespace=default' { "last": true, "totalElements": 2, "totalPages": 1, "first": true, "numberOfElements": 2, "size": 10, "number": 0, "content": [ { "ip": "10.0.0.112", "namespace": "default", "appName": "sts", "podName": "sts-0", "policy": 2, "appType": "statefulset", "updateTime": "2020-05-29T11:11:44.633383558Z", "status": "Deleted", "releasable": true }, { "ip": "10.0.0.174", "namespace": "default", "appName": "sts", "podName": "sts-1", "policy": 2, "appType": "statefulset", "updateTime": "2020-05-29T11:11:45.132450117Z", "status": "Deleted", "releasable": true } ] } |
1 2 3 4 5 6 7 8 9 10 11 |
// curl -X POST -H "Content-type: application/json" -d ' // { "ips": [ // { "ip":"10.0.0.112", "appName":"sts", "appType":"statefulset", "podName":"sts-0","namespace":"default"}, // {"ip":"10.0.0.174", "appName":"sts", "appType":"statefulset", "podName":"sts-1", "namespace":"default"} // ]} // ' // http://192.168.30.7:9041/v1/ip { "code": 200, "message": "" } |
Deployment的默认更新策略是StrategyType=RollingUpdate,25% max unavailable, 25% max surge。这意味着这在滚动更新过程中,可能存在超过副本数限制25%的Pod。这可能导致死锁状态:
- 假设Deployment副本数为3,那么有一个副本不可用,就超过25%的限制。Deployment的控制器只能先创建新Pod,然后等它就绪后再删除一个旧Pod
- 如果浮动IP的释放策略被设置为immutable/never,这就意味着新的Pod需要从一个旧Pod那复用IP,然而旧Pod还尚未删除
这会导致新Pod卡斯在调度阶段。
使用K8S的NodePort服务,可以将一组Pod通过宿主机端口暴露到集群外部。但是如果希望访问StatefulSet的每个特定Pod的端口,使用K8S NodePort服务无法实现。
使用K8S的HostPort,则会存在两个Pod调度到同一节点,进而产生端口冲突的问题:
1 2 3 4 5 6 |
spec: containers: - image: ... ports: - containerPort: 9040 hostPort: 9040 |
Galaxy提供了一个功能,可以进行随机的端口映射:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
apiVersion: apps/v1 kind: Deployment metadata: labels: app: hello name: hello spec: replicas: 1 selector: matchLabels: app: hello template: metadata: labels: app: hello annotations: # 为Pod设置此注解即可 tkestack.io/portmapping: "" spec: containers: - image: ... ports: - containerPort: 9040 |
这样Galaxy会利用iptables,随机的映射宿主机的端口到Pod的每个容器端口,并且将此随机端口回填到tkestack.io/portmapping注解中。
在TKEStack的缺省配置(基于Flannel的VXLAN模式的Overlay网络)下,每个节点只有一个CNI配置:
1 2 3 4 5 |
{ "type": "galaxy-sdn", "capabilities": {"portMappings": true}, "cniVersion": "0.2.0" } |
我们先看看这个CNI插件做的什么事情,又是如何和Flannel交互的。Galaxy所有自带的CNI插件都位于github.com/tkestack/galaxy/cni目录下,其中galaxy-sdn插件入口点如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 |
package main import ( "bytes" "encoding/json" "fmt" "io/ioutil" "net" "net/http" "os" "strings" "github.com/containernetworking/cni/pkg/skel" // 使用0.20版本的CNI t020 "github.com/containernetworking/cni/pkg/types/020" "github.com/containernetworking/cni/pkg/version" galaxyapi "tkestack.io/galaxy/pkg/api/galaxy" "tkestack.io/galaxy/pkg/api/galaxy/private" ) type cniPlugin struct { socketPath string } func NewCNIPlugin(socketPath string) *cniPlugin { return &cniPlugin{socketPath: socketPath} } // Create and fill a CNIRequest with this plugin's environment and stdin which // contain the CNI variables and configuration func newCNIRequest(args *skel.CmdArgs) *galaxyapi.CNIRequest { envMap := make(map[string]string) for _, item := range os.Environ() { idx := strings.Index(item, "=") if idx > 0 { envMap[strings.TrimSpace(item[:idx])] = item[idx+1:] } } return &galaxyapi.CNIRequest{ Env: envMap, Config: args.StdinData, } } // 调用Galaxy守护进程 func (p *cniPlugin) doCNI(url string, req *galaxyapi.CNIRequest) ([]byte, error) { data, err := json.Marshal(req) if err != nil { return nil, fmt.Errorf("failed to marshal CNI request %v: %v", req, err) } // 通过UDS连接 client := &http.Client{ Transport: &http.Transport{ Dial: func(proto, addr string) (net.Conn, error) { return net.Dial("unix", p.socketPath) }, }, } resp, err := client.Post(url, "application/json", bytes.NewReader(data)) if err != nil { return nil, fmt.Errorf("failed to send CNI request: %v", err) } defer resp.Body.Close() // nolint: errcheck body, err := ioutil.ReadAll(resp.Body) if err != nil { return nil, fmt.Errorf("failed to read CNI result: %v", err) } if resp.StatusCode != 200 { return nil, fmt.Errorf("galaxy returns: %s", string(body)) } return body, nil } // 就是将CNI请求转发给Galaxy守护进程处理 func (p *cniPlugin) CmdAdd(args *skel.CmdArgs) (*t020.Result, error) { // 由于通过UDS通信,URL的hostname部分随便写了一个dummy,就是简单的调用/cni这个Endpoint body, err := p.doCNI("http://dummy/cni", newCNIRequest(args)) if err != nil { return nil, err } result := &t020.Result{} if err := json.Unmarshal(body, result); err != nil { return nil, fmt.Errorf("failed to unmarshal response '%s': %v", string(body), err) } return result, nil } // Send the ADD command environment and config to the CNI server, printing // the IPAM result to stdout when called as a CNI plugin func (p *cniPlugin) skelCmdAdd(args *skel.CmdArgs) error { result, err := p.CmdAdd(args) if err != nil { return err } return result.Print() } // Send the DEL command environment and config to the CNI server func (p *cniPlugin) CmdDel(args *skel.CmdArgs) error { _, err := p.doCNI("http://dummy/cni", newCNIRequest(args)) return err } // 使用skel框架开发 func main() { // 传入UDS套接字的路径 p := NewCNIPlugin(private.GalaxySocketPath) skel.PluginMain(p.skelCmdAdd, p.CmdDel, version.Legacy) } |
可以看到galaxy-sdn就是个空壳子,所有工作都是委托给Galaxy守护进程处理的。
整体流程:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 |
func init() { // this ensures that main runs only on main thread (thread group leader). // since namespace ops (unshare, setns) are done for a single thread, we // must ensure that the goroutine does not jump from OS thread to thread runtime.LockOSThread() _, pANet, _ = net.ParseCIDR("10.0.0.0/8") _, pBNet, _ = net.ParseCIDR("172.16.0.0/12") _, pCNet, _ = net.ParseCIDR("192.168.0.0/16") } func main() { d = &vlan.VlanDriver{} skel.PluginMain(cmdAdd, cmdDel, version.Legacy) } func cmdAdd(args *skel.CmdArgs) error { // 加载CNI配置文件 conf, err := d.LoadConf(args.StdinData) if err != nil { return err } // 首先尝试从args获得IPInfo,如果失败则调用第三方IPAM插件分配IP vlanIds, results, err := ipam.Allocate(conf.IPAM.Type, args) if err != nil { return err } // 判断是否创建默认网桥 if d.DisableDefaultBridge == nil { defaultTrue := true d.DisableDefaultBridge = &defaultTrue for i := range vlanIds { if vlanIds[i] == 0 { *d.DisableDefaultBridge = false } } } // 初始化VLAN驱动,主要是在必要的情况下对宿主机上的网桥进行配置,或者进行针对MACVlan/IPVlan的配置 if err := d.Init(); err != nil { return fmt.Errorf("failed to setup bridge %v", err) } result020s, err := resultConvert(results) if err != nil { return err } // 配置宿主机网络 if err := setupNetwork(result020s, vlanIds, args); err != nil { return err } result020s[0].DNS = conf.DNS return result020s[0].Print() } func cmdDel(args *skel.CmdArgs) error { // 删除所有VETH对 if err := utils.DeleteAllVeth(args.Netns); err != nil { return err } conf, err := d.LoadConf(args.StdinData) if err != nil { return err } // 是否IP地址 return ipam.Release(conf.IPAM.Type, args) } |
VlanDriver是此CNI的核心。
1 2 3 4 5 6 7 8 9 |
type VlanDriver struct { //FIXME add a file lock cause we are running multiple processes? *NetConf // 不管是物理接口,还是VLAN子接口,都位于宿主机命名空间 // 所有VLAN接口的物理接口(父接口)的设备索引,例如eth1 vlanParentIndex int // 如果启用VLAN,则是当前所属VLAN的子接口的索引。否则是物理接口的索引 DeviceIndex int } |
ADD/DEL命令都需要调用下面的方法加载CNI配置:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
const ( VlanPrefix = "vlan" BridgePrefix = "docker" DefaultBridge = "docker" // IPVLAN有两种模式L2/L3 DefaultIPVlanMode = "l3" ) func (d *VlanDriver) LoadConf(bytes []byte) (*NetConf, error) { conf := &NetConf{} if err := json.Unmarshal(bytes, conf); err != nil { return nil, fmt.Errorf("failed to load netconf: %v", err) } if conf.DefaultBridgeName == "" { conf.DefaultBridgeName = DefaultBridge } if conf.BridgeNamePrefix == "" { conf.BridgeNamePrefix = BridgePrefix } if conf.VlanNamePrefix == "" { conf.VlanNamePrefix = VlanPrefix } if conf.IpVlanMode == "" { conf.IpVlanMode = DefaultIPVlanMode } d.NetConf = conf return conf, nil } |
CNI配置定义如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
type NetConf struct { // CNI标准网络配置 types.NetConf // 持有IDC IP地址的设备名,例如eth1或eth1.12(VLAN子接口) Device string `json:"device"` // 上述Device为空的时候,候选设备列表。取第一个存在的设备 Devices []string `json:"devices"` // 交换机实现方式 macvlan, bridge(默认), pure(避免创建不必要的网桥) Switch string `json:"switch"` // IPVLAN模式,可选l2, l3(默认), l3s IpVlanMode string `json:"ipvlan_mode"` // 不去创建默认网桥 DisableDefaultBridge *bool `json:"disable_default_bridge"` // 默认网桥的名字 DefaultBridgeName string `json:"default_bridge_name"` // 网桥名字前缀 BridgeNamePrefix string `json:"bridge_name_prefix"` // VLAN接口名字前缀 VlanNamePrefix string `json:"vlan_name_prefix"` // 是否启用免费ARP GratuitousArpRequest bool `json:"gratuitous_arp_request"` // 设置MTU MTU int `json:"mtu"` } |
ADD命令,加载完CNI配置后,首先是调用IPAM进行IP地址分配:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 |
func Allocate(ipamType string, args *skel.CmdArgs) ([]uint16, []types.Result, error) { var ( vlanId uint16 err error ) // 解析key1=val1;key2=val2格式的CNI参数。这些参数来自Pod的k8s.v1.cni.galaxy.io/args注解, // 如果使用浮动IP,则Galaxy IPAM会设置该注解,将IP地址填写在ipinfos字段 kvMap, err := cniutil.ParseCNIArgs(args.Args) if err != nil { return nil, nil, err } var results []types.Result var vlanIDs []uint16 // 读取ipinfos字段 if ipInfoStr := kvMap[constant.IPInfosKey]; ipInfoStr != "" { // 解析IP信息 var ipInfos []constant.IPInfo if err := json.Unmarshal([]byte(ipInfoStr), &ipInfos); err != nil { return nil, nil, fmt.Errorf("failed to unmarshal ipInfo from args %q: %v", args.Args, err) } if len(ipInfos) == 0 { return nil, nil, fmt.Errorf("empty ipInfos") } for j := range ipInfos { results = append(results, cniutil.IPInfoToResult(&ipInfos[j])) vlanIDs = append(vlanIDs, ipInfos[j].Vlan) } // 直接“分配”Pod注解里写的IP地址 return vlanIDs, results, nil } // 如果Pod注解里面没有IP地址信息,则需要调用谋者IPAM插件,因此断言ipamType不为空: if ipamType == "" { return nil, nil, fmt.Errorf("neither ipInfo from cni args nor ipam type from netconf") } // 调用IPAM插件 generalResult, err := ipam.ExecAdd(ipamType, args.StdinData) if err != nil { return nil, nil, err } result, err := t020.GetResult(generalResult) if err != nil { return nil, nil, err } if result.IP4 == nil { return nil, nil, fmt.Errorf("IPAM plugin returned missing IPv4 config") } return append(vlanIDs, vlanId), append(results, generalResult), err } |
如果使用Galaxy IPAM提供的浮动IP功能,则上述代码会自动获取已经分配的IP信息并返回。
分配/获取IP地址之后,就执行VlanDriver的初始化:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 |
func (d *VlanDriver) Init() error { var ( device netlink.Link err error ) // 这个驱动需要指定宿主机上的父接口 if d.Device != "" { device, err = netlink.LinkByName(d.Device) } else { if len(d.Devices) == 0 { return fmt.Errorf("a device is needed to use vlan plugin") } for _, devName := range d.Devices { device, err = netlink.LinkByName(devName) if err == nil { break } } } if err != nil { return fmt.Errorf("Error getting device %s: %v", d.Device, err) } // 默认MTU从父接口上查找 if d.MTU == 0 { d.MTU = device.Attrs().MTU } d.DeviceIndex = device.Attrs().Index d.vlanParentIndex = device.Attrs().Index // 如果指定的设备是VLAN子接口,则使用其父接口的索引 if device.Type() == "vlan" { //A vlan device d.vlanParentIndex = device.Attrs().ParentIndex //glog.Infof("root device %s is a vlan device, parent index %d", d.Device, d.vlanParentIndex) } if d.IPVlanMode() { switch d.GetIPVlanMode() { case netlink.IPVLAN_MODE_L3S, netlink.IPVLAN_MODE_L3: // 允许绑定到非本地地址,什么作用 return utils.EnableNonlocalBind() default: return nil } } else if d.MacVlanMode() { return nil } else if d.PureMode() { if err := d.initPureModeArgs(); err != nil { return err } return utils.EnableNonlocalBind() } if d.DisableDefaultBridge != nil && *d.DisableDefaultBridge { return nil } // 需要创建默认网桥 // 获得父接口的IP地址 v4Addr, err := netlink.AddrList(device, netlink.FAMILY_V4) if err != nil { return fmt.Errorf("Errror getting ipv4 address %v", err) } filteredAddr := network.FilterLoopbackAddr(v4Addr) if len(filteredAddr) == 0 { // 父接口没有IP地址,那么地址应当转给网桥了。检查确保网桥设备存在 bri, err := netlink.LinkByName(d.DefaultBridgeName) if err != nil { return fmt.Errorf("Error getting bri device %s: %v", d.DefaultBridgeName, err) } if bri.Attrs().Index != device.Attrs().MasterIndex { return fmt.Errorf("No available address found on device %s", d.Device) } } else { // 否则,意味着网桥应该还不存在,初始化之 if err := d.initVlanBridgeDevice(device, filteredAddr); err != nil { return err } } return nil } // 不使用MacVlan/IPVlan,则需要依赖网桥 func (d *VlanDriver) initVlanBridgeDevice(device netlink.Link, filteredAddr []netlink.Addr) error { // 创建网桥 bri, err := getOrCreateBridge(d.DefaultBridgeName, device.Attrs().HardwareAddr) if err != nil { return err } // 启动网桥 if err := netlink.LinkSetUp(bri); err != nil { return fmt.Errorf("failed to set up bridge device %s: %v", d.DefaultBridgeName, err) } // 获取父接口路由列表 rs, err := netlink.RouteList(device, nl.FAMILY_V4) if err != nil { return fmt.Errorf("failed to list route of device %s", device.Attrs().Name) } defer func() { if err != nil { // 如果出现错误,则路由需要加回去 for i := range rs { _ = netlink.RouteAdd(&rs[i]) } } }() // 将父接口的IP地址、路由,转移到网桥上 err = d.moveAddrAndRoute(device, bri, filteredAddr, rs) if err != nil { return err } return nil } |
可以看到,如果使用MacVlan或者IPVlan模式,则不会去管理宿主机上的网桥(这也是MacVlan/IPVlan的价值之一,不需要网桥)。否则,会创建网桥并转移走父接口的IP地址、路由。
最后,下面的函数负责配置好网络:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
func setupNetwork(result020s []*t020.Result, vlanIds []uint16, args *skel.CmdArgs) error { if d.MacVlanMode() { // 处理MACVlan模式 if err := setupMacvlan(result020s[0], vlanIds[0], args); err != nil { return err } } else if d.IPVlanMode() { // 处理IPVlan模式 if err := setupIPVlan(result020s[0], vlanIds[0], args); err != nil { return err } } else { // 处理网桥模式 ifName := args.IfName if err := setupVlanDevice(result020s, vlanIds, args); err != nil { return err } args.IfName = ifName } // 发送一个免费ARP,让交换机知道浮动IP漂移到当前节点上了 if d.PureMode() { _ = utils.SendGratuitousARP(d.Device, result020s[0].IP4.IP.IP.String(), "", d.GratuitousArpRequest) } return nil } |
如果使用MACVlan,则配置过程如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
func setupMacvlan(result *t020.Result, vlanId uint16, args *skel.CmdArgs) error { if err := d.MaybeCreateVlanDevice(vlanId); err != nil { return err } // 通过MACVlan连接宿主机和容器 if err := utils.MacVlanConnectsHostWithContainer(result, args, d.DeviceIndex, d.MTU); err != nil { return err } // 在命名空间内进行宣告 _ = utils.SendGratuitousARP(args.IfName, result.IP4.IP.IP.String(), args.Netns, d.GratuitousArpRequest) return nil } func MacVlanConnectsHostWithContainer(result *t020.Result, args *skel.CmdArgs, parent int, mtu int) error { var err error // 创建MACVlan设备 macVlan := &netlink.Macvlan{ Mode: netlink.MACVLAN_MODE_BRIDGE, LinkAttrs: netlink.LinkAttrs{ Name: HostMacVlanName(args.ContainerID), MTU: mtu, ParentIndex: parent, // 父设备可能是物理接口或者VLAN子接口 }} // 添加接口 if err := netlink.LinkAdd(macVlan); err != nil { return err } // 出错时必须接口被删除 defer func() { if err != nil { netlink.LinkDel(macVlan) } }() // 配置沙盒,细节参考下文 if err = configSboxDevice(result, args, macVlan); err != nil { return err } return nil } |
如果使用IPVlan,则配置过程如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 |
func setupIPVlan(result *t020.Result, vlanId uint16, args *skel.CmdArgs) error { // 如有必要,创建VLAN设备 if err := d.MaybeCreateVlanDevice(vlanId); err != nil { return err } // 连接宿主机和容器网络 IPVLAN的父设备可能是 // VLAN子接口(位于宿主机命名空间) if err := utils.IPVlanConnectsHostWithContainer(result, args, d.DeviceIndex, d.GetIPVlanMode(), d.MTU); err != nil { return err } // l3模式下,所有连接在一起的容器之间的相互通信,都要通过宿主机上的IPVlan父接口进行代理 // 在我们的环境中(部署在OpenStack的虚拟机上),出现Pod无法ping二层网关的情况,需要 // arping -c 2 -U -I eth0 <pod-ip> // 才可以解决,等价于下面的代码 // Gratuitous ARP不能保证ARP缓存永久有效,底层交换机(OpenStack虚拟的)需要进行适当的配置, // 将Pod IP和VM的MAC地址关联 if d.IpVlanMode == "l3" || d.IpVlanMode == "l3s" { _ = utils.SendGratuitousARP(d.Device, result.IP4.IP.IP.String(), "", d.GratuitousArpRequest) return nil } // 在网络命名空间中进行宣告 _ = utils.SendGratuitousARP(args.IfName, result.IP4.IP.IP.String(), args.Netns, d.GratuitousArpRequest) return nil } // 创建VLAN设备的细节 func (d *VlanDriver) MaybeCreateVlanDevice(vlanId uint16) error { // 如果不启用VLAN支持,则不做任何事情 if vlanId == 0 { return nil } _, err := d.getOrCreateVlanDevice(vlanId) return err } func (d *VlanDriver) getOrCreateVlanDevice(vlanId uint16) (netlink.Link, error) { // 根据父接口和VLAN ID查找已存在的VLAN设备 link, err := d.getVlanIfExist(vlanId) if err != nil || link != nil { if link != nil { // 设置VLAN设备索引 d.DeviceIndex = link.Attrs().Index } return link, err } vlanIfName := fmt.Sprintf("%s%d", d.VlanNamePrefix, vlanId) // 获取VLAN设备,如果获取不到则通过后面的回调函数创建 vlan, err := getOrCreateDevice(vlanIfName, func(name string) error { // 创建设备 名字 vlan100 父接口索引 vlanIf := &netlink.Vlan{LinkAttrs: netlink.LinkAttrs{Name: vlanIfName, ParentIndex: d.vlanParentIndex}, // VLAN号 VlanId: (int)(vlanId)} // 添加设备 if err := netlink.LinkAdd(vlanIf); err != nil { return fmt.Errorf("Failed to add vlan device %s: %v", vlanIfName, err) } return nil }) if err != nil { return nil, err } // 设置为UP状态 if err := netlink.LinkSetUp(vlan); err != nil { return nil, fmt.Errorf("Failed to set up vlan device %s: %v", vlanIfName, err) } // 更新VLAN子接口索引号 d.DeviceIndex = vlan.Attrs().Index return vlan, nil } func (d *VlanDriver) getVlanIfExist(vlanId uint16) (netlink.Link, error) { links, err := netlink.LinkList() if err != nil { return nil, err } for _, link := range links { // 遍历网络接口列表,查找VLAN类型的、VLAN ID匹配的、父接口匹配的 // 隐含意味着对于每个VLAN,只需要一个子接口,所有连接到此VLAN的容器使用它 if link.Type() == "vlan" { if vlan, ok := link.(*netlink.Vlan); !ok { return nil, fmt.Errorf("vlan device type case error: %T", link) } else { if vlan.VlanId == int(vlanId) && vlan.ParentIndex == d.vlanParentIndex { return link, nil } } } } return nil, nil } // 通过IPVLAN两宿主机和容器连接起来的细节 // 配置网络沙盒 func configSboxDevice(result *t020.Result, args *skel.CmdArgs, sbox netlink.Link) error { // 配置MAC地址之前,应该将接口DOWN掉 if err := netlink.LinkSetDown(sbox); err != nil { return fmt.Errorf("could not set link down for container interface %q: %v", sbox.Attrs().Name, err) } if sbox.Type() != "ipvlan" { // IPVlan不需要设置MAC地址,因为必须和父接口一致 if err := netlink.LinkSetHardwareAddr(sbox, GenerateMACFromIP(result.IP4.IP.IP)); err != nil { return fmt.Errorf("could not set mac address for container interface %q: %v", sbox.Attrs().Name, err) } } // 获得容器网络命名空间 netns, err := ns.GetNS(args.Netns) if err != nil { return fmt.Errorf("failed to open netns %q: %v", args.Netns, err) } // 总是要关闭命名空间 defer netns.Close() // nolint: errcheck // 移动到网络命名空间中 if err = netlink.LinkSetNsFd(sbox, int(netns.Fd())); err != nil { return fmt.Errorf("failed to move sbox device %q to netns: %v", sbox.Attrs().Name, err) } // 在网络命名空间中配置 return netns.Do(func(_ ns.NetNS) error { // 改名,IPVLAN接口直接变为容器的eth0,就像VETH对的一端那样 if err := netlink.LinkSetName(sbox, args.IfName); err != nil { return fmt.Errorf("failed to rename sbox device %q to %q: %v", sbox.Attrs().Name, args.IfName, err) } // 如果容器、宿主机上有多个网络设备,则需要禁用rp_filter(反向路径过滤) // 因为从宿主机来的ARP请求(广播报文),可能使用不确定的源IP,如果启用反向路径过滤,封包可能被丢弃 if err := DisableRpFilter(args.IfName); err != nil { return fmt.Errorf("failed disable rp_filter to dev %s: %v", args.IfName, err) } // 禁用所有接口的rp_filter if err := DisableRpFilter("all"); err != nil { return fmt.Errorf("failed disable rp_filter to all: %v", err) } // 配置容器网络接口:将IP地址、路由添加到容器网络接口 return cniutil.ConfigureIface(args.IfName, result) }) } func ConfigureIface(ifName string, res *t020.Result) error { // 网络接口应该已经存在 link, err := netlink.LinkByName(ifName) if err != nil { return fmt.Errorf("failed to lookup %q: %v", ifName, err) } // 设置为UP状态 if err := netlink.LinkSetUp(link); err != nil { return fmt.Errorf("failed to set %q UP: %v", ifName, err) } // TODO(eyakubovich): IPv6 addr := &netlink.Addr{IPNet: &res.IP4.IP, Label: ""} // 添加IP地址 if err = netlink.AddrAdd(link, addr); err != nil { return fmt.Errorf("failed to add IP addr to %q: %v", ifName, err) } // 遍历并应用IPV4路由 for _, r := range res.IP4.Routes { gw := r.GW if gw == nil { gw = res.IP4.Gateway } if err = ip.AddRoute(&r.Dst, gw, link); err != nil { // we skip over duplicate routes as we assume the first one wins if !os.IsExist(err) { return fmt.Errorf("failed to add route '%v via %v dev %v': %v", r.Dst, gw, ifName, err) } } } return nil } |
DEL命令比较简单:
- 删除容器网络命名空间中所有VETH对
- 释放IP,就是调用IPAM插件,如果没有插件则什么都不做。Galaxy IPAM不会被调用,它总是自己负责IP地址的回收
该插件用于解决使用IPVlan L2模式下Pod访问不了宿主机IP、Service IP的问题。使用该插件,需要配置Galaxy:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
{ "NetworkConf":[ {"name":"tke-route-eni","type":"tke-route-eni","eni":"eth1","routeTable":1}, {"name":"galaxy-flannel","type":"galaxy-flannel", "delegate":{"type":"galaxy-veth"},"subnetFile":"/run/flannel/subnet.env"}, {"name":"galaxy-k8s-vlan","type":"galaxy-k8s-vlan", "device":"eth0", "switch":"ipvlan", "ipvlan_mode":"l2", "mtu": 1500}, {"name":"galaxy-k8s-sriov","type": "galaxy-k8s-sriov", "device": "eth0", "vf_num": 10}, // 新增 { "name":"galaxy-veth-host","type": "galaxy-veth-host", // 这里配置K8S的服务CIDR 指明宿主机网卡 "serviceCidr": "11.1.252.0/22", "hostInterface": "eth0", // 容器中此插件创建的网卡名称 是否进行SNAT "containerInterface": "veth0", "ipMasq": true } ], "DefaultNetworks": ["galaxy-flannel"] } |
Pod注解需要使用两个网络:
1 2 |
annotations: k8s.v1.cni.cncf.io/networks: "galaxy-k8s-vlan,galaxy-veth-host" |
代码解读:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 |
package main import ( "encoding/json" "fmt" "math" "math/rand" "net" "os" "runtime" "sort" "strconv" "time" "github.com/containernetworking/cni/pkg/skel" "github.com/containernetworking/cni/pkg/types" "github.com/containernetworking/cni/pkg/types/current" "github.com/containernetworking/cni/pkg/version" "github.com/containernetworking/plugins/pkg/ip" "github.com/containernetworking/plugins/pkg/ns" "github.com/containernetworking/plugins/pkg/utils" "github.com/containernetworking/plugins/pkg/utils/sysctl" "github.com/coreos/go-iptables/iptables" "github.com/j-keck/arping" "github.com/vishvananda/netlink" ) // constants for full jitter backoff in milliseconds, and for nodeport marks const ( maxSleep = 10000 // 10.00s baseSleep = 20 // 0.02 RPFilterTemplate = "net.ipv4.conf.%s.rp_filter" podRulePriority = 1024 nodePortRulePriority = 512 ) func init() { // this ensures that main runs only on main thread (thread group leader). // since namespace ops (unshare, setns) are done for a single thread, we // must ensure that the goroutine does not jump from OS thread to thread runtime.LockOSThread() } // PluginConf is whatever you expect your configuration json to be. This is whatever // is passed in on stdin. Your plugin may wish to expose its functionality via // runtime args, see CONVENTIONS.md in the CNI spec. type PluginConf struct { types.NetConf // This is the previous result, when called in the context of a chained // plugin. Because this plugin supports multiple versions, we'll have to // parse this in two passes. If your plugin is not chained, this can be // removed (though you may wish to error if a non-chainable plugin is // chained. // If you need to modify the result before returning it, you will need // to actually convert it to a concrete versioned struct. RawPrevResult *map[string]interface{} `json:"prevResult"` PrevResult *current.Result `json:"-"` IPMasq bool `json:"ipMasq"` HostInterface string `json:"hostInterface"` ServiceCidr string `json:"serviceCidr"` ContainerInterface string `json:"containerInterface"` MTU int `json:"mtu"` TableStart int `json:"routeTableStart"` NodePortMark int `json:"nodePortMark"` NodePorts string `json:"nodePorts"` } // parseConfig parses the supplied configuration (and prevResult) from stdin. func parseConfig(stdin []byte) (*PluginConf, error) { conf := PluginConf{} if err := json.Unmarshal(stdin, &conf); err != nil { return nil, fmt.Errorf("failed to parse network configuration: %v", err) } // Parse previous result. if conf.RawPrevResult != nil { resultBytes, err := json.Marshal(conf.RawPrevResult) if err != nil { return nil, fmt.Errorf("could not serialize prevResult: %v", err) } res, err := version.NewResult(conf.CNIVersion, resultBytes) if err != nil { return nil, fmt.Errorf("could not parse prevResult: %v", err) } conf.RawPrevResult = nil conf.PrevResult, err = current.NewResultFromResult(res) if err != nil { return nil, fmt.Errorf("could not convert result to current version: %v", err) } } // End previous result parsing if conf.HostInterface == "" { return nil, fmt.Errorf("hostInterface must be specified") } if conf.ContainerInterface == "" { return nil, fmt.Errorf("containerInterface must be specified") } if conf.NodePorts == "" { conf.NodePorts = "30000:32767" } if conf.NodePortMark == 0 { conf.NodePortMark = 0x2000 } // start using tables by default at 256 if conf.TableStart == 0 { conf.TableStart = 256 } return &conf, nil } func cmdAdd(args *skel.CmdArgs) error { conf, err := parseConfig(args.StdinData) if err != nil { return err } // 必须作为插件链调用,并且有前置插件 if conf.PrevResult == nil { return fmt.Errorf("must be called as chained plugin") } // 从前序插件的结果中得到容器IP列表 containerIPs := make([]net.IP, 0, len(conf.PrevResult.IPs)) if conf.CNIVersion != "0.3.0" && conf.CNIVersion != "0.3.1" { for _, ip := range conf.PrevResult.IPs { containerIPs = append(containerIPs, ip.Address.IP) } } else { for _, ip := range conf.PrevResult.IPs { if ip.Interface == nil { continue } intIdx := *ip.Interface if intIdx >= 0 && intIdx < len(conf.PrevResult.Interfaces) && conf.PrevResult.Interfaces[intIdx].Name != args.IfName { continue } containerIPs = append(containerIPs, ip.Address.IP) } } if len(containerIPs) == 0 { return fmt.Errorf("got no container IPs") } // 得到宿主机网络接口 iface, err := netlink.LinkByName(conf.HostInterface) if err != nil { return fmt.Errorf("failed to lookup %q: %v", conf.HostInterface, err) } // 得到宿主机网络接口的地址列表 hostAddrs, err := netlink.AddrList(iface, netlink.FAMILY_ALL) if err != nil || len(hostAddrs) == 0 { return fmt.Errorf("failed to get host IP addresses for %q: %v", iface, err) } netns, err := ns.GetNS(args.Netns) if err != nil { return fmt.Errorf("failed to open netns %q: %v", args.Netns, err) } defer netns.Close() containerIPV4 := false containerIPV6 := false for _, ipc := range containerIPs { if ipc.To4() != nil { containerIPV4 = true } else { containerIPV6 = true } } // 创建VETH对,并且在容器端进行配置: // 1. 如果启用了MASQ,则对出口是args.IfName的流量进行SNAT // 2. 为宿主机所有IP添加路由,走veth // 3. 为K8S服务IP添加旅游,则veth,网关设置为第一个宿主机IP hostInterface, _, err := setupContainerVeth(netns, conf.ServiceCidr, conf.ContainerInterface, conf.MTU, hostAddrs, conf.IPMasq, containerIPV4, containerIPV6, args.IfName, conf.PrevResult) if err != nil { return err } // 配置容器的宿主端: if err = setupHostVeth(hostInterface.Name, hostAddrs, conf.IPMasq, conf.TableStart, conf.PrevResult); err != nil { return err } if conf.IPMasq { // 在宿主端启用IP转发 err := enableForwarding(containerIPV4, containerIPV6) if err != nil { return err } chain := utils.FormatChainName(conf.Name, args.ContainerID) comment := utils.FormatComment(conf.Name, args.ContainerID) for _, ipc := range containerIPs { addrBits := 128 if ipc.To4() != nil { addrBits = 32 } // 对来自容器IP的流量进行SNAT if err = ip.SetupIPMasq(&net.IPNet{IP: ipc, Mask: net.CIDRMask(addrBits, addrBits)}, chain, comment); err != nil { return err } } } // 配置NodePort相关的iptables规则 if err = setupNodePortRule(conf.HostInterface, conf.NodePorts, conf.NodePortMark); err != nil { return err } // Pass through the result for the next plugin return types.PrintResult(conf.PrevResult, conf.CNIVersion) } func setupContainerVeth(netns ns.NetNS, serviceCidr string, ifName string, mtu int, hostAddrs []netlink.Addr, masq, containerIPV4, containerIPV6 bool, k8sIfName string, pr *current.Result) (*current.Interface, *current.Interface, error) { hostInterface := ¤t.Interface{} containerInterface := ¤t.Interface{} // 在容器网络命名空间(netns)中执行 err := netns.Do(func(hostNS ns.NetNS) error { // 创建VETH对,一端自动放入hostNS(这个变量netns.Do函数自动提供,为初始网络命名空间) hostVeth, contVeth0, err := ip.SetupVeth(ifName, mtu, hostNS) if err != nil { return err } hostInterface.Name = hostVeth.Name hostInterface.Mac = hostVeth.HardwareAddr.String() containerInterface.Name = contVeth0.Name // ip.SetupVeth函数不会获取VETH对的peer(第二个接口)的MAC地址,因此这里需要执行查询 containerNetlinkIface, _ := netlink.LinkByName(contVeth0.Name) containerInterface.Mac = containerNetlinkIface.Attrs().HardwareAddr.String() containerInterface.Sandbox = netns.Path() // 本次CNI调用产生的两个网络接口,纳入到Result pr.Interfaces = append(pr.Interfaces, hostInterface, containerInterface) // 后面只是用到index属性,用上面Link对象不行么? contVeth, err := net.InterfaceByName(ifName) if err != nil { return fmt.Errorf("failed to look up %q: %v", ifName, err) } if masq { // 支持IPv4/IPv6的IP转发 err := enableForwarding(containerIPV4, containerIPV6) if err != nil { return err } // 如果出口网卡是k8sIfName(容器主接口,通常是eth0,kubelet调用galaxy-sdn时提供) // ,则进行源地址转换 err = setupSNAT(k8sIfName, "kube-proxy SNAT") if err != nil { return fmt.Errorf("failed to enable SNAT on %q: %v", k8sIfName, err) } } // 为宿主机的每个网络接口的地址添加路由条目 for _, ipc := range hostAddrs { addrBits := 128 if ipc.IP.To4() != nil { addrBits = 32 } // 这些地址的出口网卡都设置为 err := netlink.RouteAdd(&netlink.Route{ LinkIndex: contVeth.Index, // 新添加的VETH Scope: netlink.SCOPE_LINK, Dst: &net.IPNet{ IP: ipc.IP, Mask: net.CIDRMask(addrBits, addrBits), }, }) if err != nil { return fmt.Errorf("failed to add host route dst %v: %v", ipc.IP, err) } } _, serviceNet, err := net.ParseCIDR(serviceCidr) if err != nil { return fmt.Errorf("failed to parse service cidr :%v", err) } // 将K8S服务网段的路由设置为:出口网卡新添加的VETH,网关为宿主机第一个IP err = netlink.RouteAdd(&netlink.Route{ LinkIndex: contVeth.Index, Scope: netlink.SCOPE_UNIVERSE, Dst: serviceNet, // 封包发给宿主机第一个IP/网络接口处理。VETH的宿主端没有连接到什么网络接口 // 这些网络接口可以作为下一跳,因为和VETH宿主端属于同一网络命名空间 Gw: hostAddrs[0].IP, }) if err != nil { return fmt.Errorf("failed to add service cidr route %v: %v", hostAddrs[0].IP, err) } // 为所有IPv4地址发送免费ARP for _, ipc := range pr.IPs { if ipc.Version == "4" { _ = arping.GratuitousArpOverIface(ipc.Address.IP, *contVeth) } } return nil }) if err != nil { return nil, nil, err } return hostInterface, containerInterface, nil } func enableForwarding(ipv4 bool, ipv6 bool) error { if ipv4 { err := ip.EnableIP4Forward() if err != nil { return fmt.Errorf("Could not enable IPv6 forwarding: %v", err) } } if ipv6 { err := ip.EnableIP6Forward() if err != nil { return fmt.Errorf("Could not enable IPv6 forwarding: %v", err) } } return nil } func setupSNAT(ifName string, comment string) error { ipt, err := iptables.NewWithProtocol(iptables.ProtocolIPv4) if err != nil { return fmt.Errorf("failed to locate iptables: %v", err) } rulespec := []string{"-o", ifName, "-j", "MASQUERADE"} //if ipt.HasRandomFully() { // rulespec = append(rulespec, "--random-fully") //} rulespec = append(rulespec, "-m", "comment", "--comment", comment) return ipt.AppendUnique("nat", "POSTROUTING", rulespec...) } func setupHostVeth(vethName string, hostAddrs []netlink.Addr, masq bool, tableStart int, result *current.Result) error { // no IPs to route if len(result.IPs) == 0 { return nil } // lookup by name as interface ids might have changed veth, err := net.InterfaceByName(vethName) if err != nil { return fmt.Errorf("failed to lookup %q: %v", vethName, err) } // 对于所有容器IP,出口设置为VETH for _, ipc := range result.IPs { addrBits := 128 if ipc.Address.IP.To4() != nil { addrBits = 32 } err := netlink.RouteAdd(&netlink.Route{ LinkIndex: veth.Index, Scope: netlink.SCOPE_LINK, Dst: &net.IPNet{ IP: ipc.Address.IP, Mask: net.CIDRMask(addrBits, addrBits), }, }) if err != nil { return fmt.Errorf("failed to add host route dst %v: %v", ipc.Address.IP, err) } } // 为来自Pod的,目的地址是VPC的配置策略路由 err = addPolicyRules(veth, result.IPs[0], result.Routes, tableStart) if err != nil { return fmt.Errorf("failed to add policy rules: %v", err) } // 为所有宿主机端的IP发送免费ARP for _, ipc := range hostAddrs { if ipc.IP.To4() != nil { _ = arping.GratuitousArpOverIface(ipc.IP, *veth) } } return nil } func addPolicyRules(veth *net.Interface, ipc *current.IPConfig, routes []*types.Route, tableStart int) error { table := -1 // 对路由,来自先前插件调用的结果,进行排序 sort.Slice(routes, func(i, j int) bool { return routes[i].Dst.String() < routes[j].Dst.String() }) // 尝试最多10次,向空表(table slot)写入路由 for i := 0; i < 10 && table == -1; i++ { var err error // 寻找空白路由表 table, err = findFreeTable(tableStart + rand.Intn(1000)) if err != nil { return err } // 将所有路由添加到路由表 for _, route := range routes { err := netlink.RouteAdd(&netlink.Route{ LinkIndex: veth.Index, // 出口设置为宿主机VETH Dst: &route.Dst, // 目标是先前CNI调用发现的路由,这些路由可能来自云提供商(VPC) Gw: ipc.Address.IP, // 将Result的第一个IP作为网关 Table: table, // 写在这个表中 }) if err != nil { table = -1 break } } if table == -1 { // failed to add routes so sleep and try again on a different table wait := time.Duration(rand.Intn(int(math.Min(maxSleep, baseSleep*math.Pow(2, float64(i)))))) * time.Millisecond fmt.Fprintf(os.Stderr, "route table collision, retrying in %v\n", wait) time.Sleep(wait) } } // ensure we have a route table selected if table == -1 { return fmt.Errorf("failed to add routes to a free table") } // 创建路由策略 rule := netlink.NewRule() // 如果流量来自VETH宿主端,也就是来自Pod rule.IifName = veth.Name rule.Table = table rule.Priority = podRulePriority err := netlink.RuleAdd(rule) if err != nil { return fmt.Errorf("failed to add policy rule %v: %v", rule, err) } return nil } func findFreeTable(start int) (int, error) { allocatedTableIDs := make(map[int]bool) // 遍历所有IPv4/IPv6的路由规则 for _, family := range []int{netlink.FAMILY_V4, netlink.FAMILY_V6} { rules, err := netlink.RuleList(family) if err != nil { return -1, err } for _, rule := range rules { // 收集所有已经占用的table slot allocatedTableIDs[rule.Table] = true } } // 寻找第一个空白的slot for i := start; i < math.MaxUint32; i++ { if !allocatedTableIDs[i] { return i, nil } } return -1, fmt.Errorf("failed to find free route table") } func setupNodePortRule(ifName string, nodePorts string, nodePortMark int) error { ipt, err := iptables.NewWithProtocol(iptables.ProtocolIPv4) if err != nil { return fmt.Errorf("failed to locate iptables: %v", err) } // 确保NodePort流量被正确的标记 if err := ipt.AppendUnique("mangle", "PREROUTING", "-i", ifName, "-p", "tcp", "--dport", nodePorts, "-j", "CONNMARK", "--set-mark", strconv.Itoa(nodePortMark), "-m", "comment", "--comment", "NodePort Mark"); err != nil { return err } if err := ipt.AppendUnique("mangle", "PREROUTING", "-i", ifName, "-p", "udp", "--dport", nodePorts, "-j", "CONNMARK", "--set-mark", strconv.Itoa(nodePortMark), "-m", "comment", "--comment", "NodePort Mark"); err != nil { return err } if err := ipt.AppendUnique("mangle", "PREROUTING", "-i", "veth+", "-j", "CONNMARK", "--restore-mark", "-m", "comment", "--comment", "NodePort Mark"); err != nil { return err } // 在宿主机网络接口上启用非严格的RP filter _, err = sysctl.Sysctl(fmt.Sprintf(RPFilterTemplate, ifName), "2") if err != nil { return fmt.Errorf("failed to set RP filter to loose for interface %q: %v", ifName, err) } // 对于标记为NodePort的流量,添加策略路由 rule := netlink.NewRule() rule.Mark = nodePortMark rule.Table = 254 // main table rule.Priority = nodePortRulePriority exists := false rules, err := netlink.RuleList(netlink.FAMILY_V4) if err != nil { return fmt.Errorf("Unable to retrive IP rules %v", err) } for _, r := range rules { if r.Table == rule.Table && r.Mark == rule.Mark && r.Priority == rule.Priority { exists = true break } } if !exists { err := netlink.RuleAdd(rule) if err != nil { return fmt.Errorf("failed to add policy rule %v: %v", rule, err) } } return nil } // cmdDel is called for DELETE requests func cmdDel(args *skel.CmdArgs) error { conf, err := parseConfig(args.StdinData) if err != nil { return err } if args.Netns == "" { return nil } // 网络命名空间存在,进行清理 var ipnets []netlink.Addr vethPeerIndex := -1 _ = ns.WithNetNSPath(args.Netns, func(_ ns.NetNS) error { var err error if conf.IPMasq { iface, err := netlink.LinkByName(args.IfName) if err != nil { if err.Error() == "Link not found" { return ip.ErrLinkNotFound } return fmt.Errorf("failed to lookup %q: %v", args.IfName, err) } // 从args.IfName(通常是eth0)获取IP地址列表 ipnets, err = netlink.AddrList(iface, netlink.FAMILY_ALL) if err != nil || len(ipnets) == 0 { return fmt.Errorf("failed to get IP addresses for %q: %v", args.IfName, err) } } vethIface, err := netlink.LinkByName(conf.ContainerInterface) if err != nil && err != ip.ErrLinkNotFound { return err } vethPeerIndex, _ = netlink.VethPeerIndex(&netlink.Veth{LinkAttrs: *vethIface.Attrs()}) return nil }) if conf.IPMasq { chain := utils.FormatChainName(conf.Name, args.ContainerID) comment := utils.FormatComment(conf.Name, args.ContainerID) for _, ipn := range ipnets { addrBits := 128 if ipn.IP.To4() != nil { addrBits = 32 } _ = ip.TeardownIPMasq(&net.IPNet{IP: ipn.IP, Mask: net.CIDRMask(addrBits, addrBits)}, chain, comment) } if vethPeerIndex != -1 { link, err := netlink.LinkByIndex(vethPeerIndex) if err != nil { return nil } rule := netlink.NewRule() rule.IifName = link.Attrs().Name // ignore errors as we might be called multiple times _ = netlink.RuleDel(rule) _ = netlink.LinkDel(link) } } return nil } func main() { rand.Seed(time.Now().UnixNano()) skel.PluginMain(cmdAdd, cmdDel, version.All) } |
这个守护进程由DaemonSet galaxy提供,在所有K8S节点上运行,入口点如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 |
package main import ( "math/rand" "time" "github.com/spf13/pflag" "k8s.io/component-base/cli/flag" "k8s.io/component-base/logs" glog "k8s.io/klog" "tkestack.io/galaxy/pkg/galaxy" "tkestack.io/galaxy/pkg/signal" "tkestack.io/galaxy/pkg/utils/ldflags" ) func main() { // 随机化 rand.Seed(time.Now().UTC().UnixNano()) // 创建Galaxy结构 galaxy := galaxy.NewGalaxy() galaxy.AddFlags(pflag.CommandLine) flag.InitFlags() logs.InitLogs() defer logs.FlushLogs() ldflags.PrintAndExitIfRequested() // 启动Galaxy if err := galaxy.Start(); err != nil { glog.Fatalf("Error start galaxy: %v", err) } // 等待信号,并停止Galaxy signal.BlockSignalHandler(func() { if err := galaxy.Stop(); err != nil { glog.Errorf("Error stop galaxy: %v", err) } }) } |
Galaxy结构的规格如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
type Galaxy struct { // 对应Galaxy主配置文件 JsonConf // 对应命令行选项 *options.ServerRunOptions quitChan chan struct{} dockerCli *docker.DockerInterface netConf map[string]map[string]interface{} // 负责处理端口映射 pmhandler *portmapping.PortMappingHandler client kubernetes.Interface pm *policy.PolicyManager } type JsonConf struct { NetworkConf []map[string]interface{} DefaultNetworks []string ENIIPNetwork string } |
初始化过程:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
func NewGalaxy() *Galaxy { g := &Galaxy{ ServerRunOptions: options.NewServerRunOptions(), quitChan: make(chan struct{}), netConf: map[string]map[string]interface{}{}, } return g } func (g *Galaxy) Init() error { if g.JsonConfigPath == "" { return fmt.Errorf("json config is required") } data, err := ioutil.ReadFile(g.JsonConfigPath) if err != nil { return fmt.Errorf("read json config: %v", err) } // 解析主配置文件 if err := json.Unmarshal(data, &g.JsonConf); err != nil { return fmt.Errorf("bad config %s: %v", string(data), err) } glog.Infof("Json Config: %s", string(data)) // 配置合法性的简单校验 if err := g.checkNetworkConf(); err != nil { return err } // 需要访问本机Docker dockerClient, err := docker.NewDockerInterface() if err != nil { return err } g.dockerCli = dockerClient g.pmhandler = portmapping.New("") return nil } |
启动过程:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
func (g *Galaxy) Start() error { // 初始化Galaxy配置 if err := g.Init(); err != nil { return err } // 创建K8S客户端 g.initk8sClient() // 启动Flannel垃圾回收器 gc.NewFlannelGC(g.dockerCli, g.quitChan, g.cleanIPtables).Run() // 启用或禁用bridge-nf-call-iptables,此参数可以控制网桥转发的封包被不被iptables过滤 kernel.BridgeNFCallIptables(g.quitChan, g.BridgeNFCallIptables) // 配置iptables转发 kernel.IPForward(g.quitChan, g.IPForward) // 启动时,遍历节点上已有的Pod,对于需要进行端口映射的,执行端口映射 if err := g.setupIPtables(); err != nil { return err } if g.NetworkPolicy { g.pm = policy.New(g.client, g.quitChan) go wait.Until(g.pm.Run, 3*time.Minute, g.quitChan) } if g.RouteENI { // 使用腾讯云ENI需要关闭反向路径过滤 kernel.DisableRPFilter(g.quitChan) eni.SetupENIs(g.quitChan) } // 在UDS上监听、启动HTTP服务器、注册路由 return g.StartServer() } |
上述代码结尾的g.StartServer()会调用下面的方法注册路由:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
func (g *Galaxy) installHandlers() { ws := new(restful.WebService) ws.Route(ws.GET("/cni").To(g.cni)) ws.Route(ws.POST("/cni").To(g.cni)) restful.Add(ws) } // 处理CNI插件转发来的CNI请求 func (g *Galaxy) cni(r *restful.Request, w *restful.Response) { data, err := ioutil.ReadAll(r.Request.Body) if err != nil { glog.Warningf("bad request %v", err) http.Error(w, fmt.Sprintf("err read body %v", err), http.StatusBadRequest) return } defer r.Request.Body.Close() // nolint: errcheck // 将请求转换为CNIRequest,然后转换为PodRequest req, err := galaxyapi.CniRequestToPodRequest(data) if err != nil { glog.Warningf("bad request %v", err) http.Error(w, fmt.Sprintf("%v", err), http.StatusBadRequest) return } req.Path = strings.TrimRight(fmt.Sprintf("%s:%s", req.Path, strings.Join(g.CNIPaths, ":")), ":") // 处理CNI请求 result, err := g.requestFunc(req) if err != nil { http.Error(w, fmt.Sprintf("%v", err), http.StatusInternalServerError) } else { // Empty response JSON means success with no body w.Header().Set("Content-Type", "application/json") if _, err := w.Write(result); err != nil { glog.Warningf("Error writing %s HTTP response: %v", req.Command, err) } } } |
当CNI插件galaxy-sdn调用Galaxy守护进程的/cni端点时,Galaxy会将请求从:
1 2 3 4 5 6 7 |
// Request sent to the Galaxy by the Galaxy SDN CNI plugin type CNIRequest struct { // CNI environment variables, like CNI_COMMAND and CNI_NETNS Env map[string]string `json:"env,omitempty"` // CNI configuration passed via stdin to the CNI plugin Config []byte `json:"config,omitempty"` } |
转换为PodRequest:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
type PodRequest struct { // 需要执行的CNI命令 Command string // 来自环境变量CNI_ARGS PodNamespace string // 来自环境变量CNI_ARGS PodName string // kubernetes pod ports Ports []k8s.Port // 存放操作结果的通道 Result chan *PodResult // 通过环境变量传来的CNI_IFNAME、CNI_COMMAND、CNI_ARGS... *skel.CmdArgs // Galaxy需要委托其它CNI插件完成工作,这是传递给那些插件的参数 // 插件类型 参数名 参数值 ExtendedCNIArgs map[string]map[string]json.RawMessage } // 请求处理结果 type PodResult struct { Response []byte Err error } |
然后调用requestFunc方法处理转换后的CNI请求:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 |
func (g *Galaxy) requestFunc(req *galaxyapi.PodRequest) (data []byte, err error) { start := time.Now() glog.Infof("%v, %s+", req, start.Format(time.StampMicro)) // 处理ADD命令 if req.Command == cniutil.COMMAND_ADD { defer func() { glog.Infof("%v, data %s, err %v, %s-", req, string(data), err, start.Format(time.StampMicro)) }() var pod *corev1.Pod // 查找Pod pod, err = g.getPod(req.PodName, req.PodNamespace) if err != nil { return } result, err1 := g.cmdAdd(req, pod) if err1 != nil { err = err1 return } else { // 转换结果格式 result020, err2 := convertResult(result) if err2 != nil { err = err2 } else { data, err = json.Marshal(result) if err != nil { return } // 如果处理成功,则执行端口映射,回顾一下启动的时候会对所有本节点现存的Pod进行端口映射 err = g.setupPortMapping(req, req.ContainerID, result020, pod) if err != nil { g.cleanupPortMapping(req) return } pod.Status.PodIP = result020.IP4.IP.IP.String() // 处理网络策略 if g.pm != nil { if err := g.pm.SyncPodChains(pod); err != nil { glog.Warning(err) } g.pm.SyncPodIPInIPSet(pod, true) } } } } else if req.Command == cniutil.COMMAND_DEL { defer glog.Infof("%v err %v, %s-", req, err, start.Format(time.StampMicro)) err = cniutil.CmdDel(req.CmdArgs, -1) if err == nil { err = g.cleanupPortMapping(req) } } else { err = fmt.Errorf("unknown command %s", req.Command) } return } |
requestFunc方法就是查询出Pod,将其作为参数的一部分,再调用 cmdAdd方法。需要注意,由于CNI版本很低,仅仅支持ADD/DEL两个命令。
ADD命令的处理逻辑如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 |
func (g *Galaxy) cmdAdd(req *galaxyapi.PodRequest, pod *corev1.Pod) (types.Result, error) { // 通过解析Pod的注解、spec特殊字段,来构造出Pod的网络需求 networkInfos, err := g.resolveNetworks(req, pod) if err != nil { return nil, err } return cniutil.CmdAdd(req.CmdArgs, networkInfos) } func CmdAdd(cmdArgs *skel.CmdArgs, networkInfos []*NetworkInfo) (types.Result, error) { // 每个NetworkInfo代表Pod需要加入的一个网络 if len(networkInfos) == 0 { return nil, fmt.Errorf("No network info returned") } // 将网络需求的JSON格式保存在/var/lib/cni/galaxy/$ContainerID if err := saveNetworkInfo(cmdArgs.ContainerID, networkInfos); err != nil { return nil, fmt.Errorf("Error save network info %v for %s: %v", networkInfos, cmdArgs.ContainerID, err) } var ( err error // 前一个网络的结果 result types.Result ) // 处理并满足每一个网络需求 for idx, networkInfo := range networkInfos { // 将来自k8s.v1.cni.galaxy.io/args注解的CNI参数添加到参数数组 cmdArgs.Args = strings.TrimRight(fmt.Sprintf("%s;%s", cmdArgs.Args, BuildCNIArgs(networkInfo.Args)), ";") if result != nil { networkInfo.Conf["prevResult"] = result } // 委托给具体的CNI插件的ADD命令 result, err = DelegateAdd(networkInfo.Conf, cmdArgs, networkInfo.IfName) if err != nil { // 如果失败,删除所有已经创建的CNI glog.Errorf("fail to add network %s: %v, begin to rollback and delete it", networkInfo.Args, err) // 从idx开始,倒过来依次为每种网络需求调用DEL命令 delErr := CmdDel(cmdArgs, idx) glog.Warningf("fail to delete cni in rollback %v", delErr) return nil, fmt.Errorf("fail to establish network %s:%v", networkInfo.Args, err) } } if err != nil { return nil, err } return result, nil } |
处理每一个网络需求时,会委托给相应的CNI插件:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
// 每个插件的NetConf是在前面resolveNetworks时,从confdir=/etc/cni/net.d/读取出来的 func DelegateAdd(netconf map[string]interface{}, args *skel.CmdArgs, ifName string) (types.Result, error) { netconfBytes, err := json.Marshal(netconf) if err != nil { return nil, fmt.Errorf("error serializing delegate netconf: %v", err) } typ, err := getNetworkType(netconf) if err != nil { return nil, err } pluginPath, err := invoke.FindInPath(typ, strings.Split(args.Path, ":")) if err != nil { return nil, err } // 通过命令行调用 glog.Infof("delegate add %s args %s conf %s", args.ContainerID, args.Args, string(netconfBytes)) return invoke.ExecPluginWithResult(pluginPath, netconfBytes, &invoke.Args{ Command: "ADD", ContainerID: args.ContainerID, NetNS: args.Netns, PluginArgsStr: args.Args, IfName: ifName, Path: args.Path, }) } |
这里做个小结:
- 工作负载的网络需求(需要加入到哪些CNI网络),可以通过注解、Spec配置
- Galaxy通过读取Pod注解、Spec,构造出网络列表
- Galaxy会遍历CNI网络列表,依次通过命令行调用对应的CNI插件的ADD命令
- 如果遍历过程中出错,逆序的调用已经ADD的CNI插件的DEL命令
第3、4其实就是新版本的CNI中NetworkList提供的能力。
和Galaxy守护进程一样,Galaxy IPAM也是一个HTTP服务器。只是前者仅仅供本机的CNI插件galaxy-sdn调用,因此使用UDS,而Galaxy IPAM是供Scheduler调用,因此使用TCP。
入口点如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
func main() { // initialize rand seed rand.Seed(time.Now().UTC().UnixNano()) s := server.NewServer() // add command line args s.AddFlags(pflag.CommandLine) flag.InitFlags() logs.InitLogs() defer logs.FlushLogs() // if checking version, print it and exit ldflags.PrintAndExitIfRequested() if err := s.Start(); err != nil { fmt.Fprintf(os.Stderr, "%v\n", err) // nolint: errcheck os.Exit(1) } } |
启动Galaxy IPAM服务器:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 |
func (s *Server) Start() error { if err := s.init(); err != nil { return fmt.Errorf("init server: %v", err) } // 启用InformerFactory监控K8S资源变化 s.StartInformers(s.stopChan) // 多实例部署的Leader选举 if s.LeaderElection.LeaderElect && s.leaderElectionConfig != nil { leaderelection.RunOrDie(context.Background(), *s.leaderElectionConfig) return nil } return s.Run() } // Galaxy IPAM初始化 func (s *Server) init() error { // 读取配置文件 if options.JsonConfigPath == "" { return fmt.Errorf("json config is required") } data, err := ioutil.ReadFile(options.JsonConfigPath) if err != nil { return fmt.Errorf("read json config: %v", err) } if err := json.Unmarshal(data, &s.JsonConf); err != nil { return fmt.Errorf("bad config %s: %v", string(data), err) } // 初始化K8S客户端、IPAMContext s.initk8sClient() // 此浮动IP插件,实现了PodWatcher接口,能够监听Pod的生命周期事件 s.plugin, err = schedulerplugin.NewFloatingIPPlugin(s.SchedulePluginConf, s.IPAMContext) if err != nil { return err } // 当有Pod事件后,调用浮动IP插件 s.PodInformer.Informer().AddEventHandler(eventhandler.NewPodEventHandler(s.plugin)) return nil } func (s *Server) Run() error { // 初始化浮动IP插件 if err := s.plugin.Init(); err != nil { return err } // 运行浮动IP插件 s.plugin.Run(s.stopChan) // 注册API路由 go s.startAPIServer() // 注册供Scheduler调度的路由 /v1/filter、/v1/priority、/v1/bind s.startServer() return nil } |
在上面的初始化流程中我们看到,在运行Galaxy IPAM服务器时,会初始化并运行s.plugin。类型为FloatingIPPlugin。
它是一个Pod事件监听器,实现接口:
1 2 3 4 5 |
type PodWatcher interface { AddPod(pod *corev1.Pod) error UpdatePod(oldPod, newPod *corev1.Pod) error DeletePod(pod *corev1.Pod) error } |
当Pod事件发生后,上述方法会被调用。
它的初始化逻辑如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 |
// 这个上下文包含各种客户端、Lister、Informer func NewFloatingIPPlugin(conf Conf, ctx *context.IPAMContext) (*FloatingIPPlugin, error) { conf.validate() glog.Infof("floating ip config: %v", conf) plugin := &FloatingIPPlugin{ nodeSubnet: make(map[string]*net.IPNet), IPAMContext: ctx, conf: &conf, unreleased: make(chan *releaseEvent, 50000), // 这是一个哈希,每个key都可以被请求加锁 dpLockPool: keymutex.NewHashed(500000), podLockPool: keymutex.NewHashed(500000), crdKey: NewCrdKey(ctx.ExtensionLister), crdCache: crd.NewCrdCache(ctx.DynamicClient, ctx.ExtensionLister, 0), } // 初始化IPAM plugin.ipam = floatingip.NewCrdIPAM(ctx.GalaxyClient, plugin.FIPInformer) if conf.CloudProviderGRPCAddr != "" { plugin.cloudProvider = cloudprovider.NewGRPCCloudProvider(conf.CloudProviderGRPCAddr) } return plugin, nil } func (p *FloatingIPPlugin) Init() error { if len(p.conf.FloatingIPs) > 0 { // 初始化IP池 if err := p.ipam.ConfigurePool(p.conf.FloatingIPs); err != nil { return err } } else { // 配置文件中没有浮动IP,从ConfigMap中拉取 glog.Infof("empty floatingips from config, fetching from configmap") if err := wait.PollInfinite(time.Second, func() (done bool, err error) { updated, err := p.updateConfigMap() if err != nil { glog.Warning(err) } return updated, nil }); err != nil { return fmt.Errorf("failed to get floatingip config from configmap: %v", err) } } glog.Infof("plugin init done") return nil } |
它的run方法的逻辑,主要包括三部分:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 |
func (p *FloatingIPPlugin) Run(stop chan struct{}) { if len(p.conf.FloatingIPs) == 0 { go wait.Until(func() { // 1. 定期刷新ConfigMap floatingip-config,获取最新的浮动IP配置,并调用ipam.ConfigurePool配置IP池 if _, err := p.updateConfigMap(); err != nil { glog.Warning(err) } }, time.Minute, stop) } go wait.Until(func() { // 2. 定期同步Pod状态,必要的情况下进行IP释放 if err := p.resyncPod(); err != nil { glog.Warningf("resync pod: %v", err) } p.syncPodIPsIntoDB() }, time.Duration(p.conf.ResyncInterval)*time.Minute, stop) for i := 0; i < 5; i++ { // 3. 从通道中读取IP释放事件,解除IP和Pod的绑定关系 go p.loop(stop) } } func (p *FloatingIPPlugin) updateConfigMap() (bool, error) { // 拉取ConfigMap floatingip-config cm, err := p.Client.CoreV1().ConfigMaps(p.conf.ConfigMapNamespace).Get(p.conf.ConfigMapName, v1.GetOptions{}) if err != nil { return false, fmt.Errorf("failed to get floatingip configmap %s_%s: %v", p.conf.ConfigMapName, p.conf.ConfigMapNamespace, err) } val, ok := cm.Data[p.conf.FloatingIPKey] if !ok { return false, fmt.Errorf("configmap %s_%s doesn't have a key floatingips", p.conf.ConfigMapName, p.conf.ConfigMapNamespace) } var updated bool // 调用IPAM,更新IP池 if updated, err = p.ensureIPAMConf(&p.lastIPConf, val); err != nil { return false, err } defer func() { if !updated { return } // 浮动IP配置更新,则节点允许的子网信息被清空 p.nodeSubnetLock.Lock() defer p.nodeSubnetLock.Unlock() p.nodeSubnet = map[string]*net.IPNet{} }() return true, nil } // 释放以下Pod的IP // 1. 所有者TAPP不存在的、已被删除的Pod // 2. 所有者StatefulSet/Deployment存在、已被删除的Pod,但是没有配置IP为不变的 // 3. 所有者Deployment不需要这么多IP的、已被删除的的Pod // 4. 所有者StatefulSet的副本数超过小于被删除Pod索引的 // 5. 未被删除,但是被驱逐的Pod func (p *FloatingIPPlugin) resyncPod() error { glog.V(4).Infof("resync pods+") defer glog.V(4).Infof("resync pods-") resyncMeta := &resyncMeta{} if err := p.fetchChecklist(resyncMeta); err != nil { return err } p.resyncAllocatedIPs(resyncMeta) return nil } // 拉取并处理待释放事件 func (p *FloatingIPPlugin) loop(stop chan struct{}) { for { select { case <-stop: return case event := <-p.unreleased: go func(event *releaseEvent) { // 解除Pod和IP地址的绑定关系 if err := p.unbind(event.pod); err != nil { event.retryTimes++ if event.retryTimes > 3 { // leave it to resync to protect chan from explosion glog.Errorf("abort unbind for pod %s, retried %d times: %v", util.PodName(event.pod), event.retryTimes, err) } else { glog.Warningf("unbind pod %s failed for %d times: %v", util.PodName(event.pod), event.retryTimes, err) // backoff time if required time.Sleep(100 * time.Millisecond * time.Duration(event.retryTimes)) p.unreleased <- event } } }(event) } } } |
当监听到Pod事件时,可能需要和IPAM同步Pod IP,或者产生一个待释放事件:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
// AddPod does nothing func (p *FloatingIPPlugin) AddPod(pod *corev1.Pod) error { return nil } // Pod更新时,和IPAM同步Pod IP func (p *FloatingIPPlugin) UpdatePod(oldPod, newPod *corev1.Pod) error { if !p.hasResourceName(&newPod.Spec) { return nil } // 先前的pod.Status.Phase不是终点状态(Failed/Succeeded),这一次是 // 意味着Pod运行完毕,需要释放IP。通常是Job类Pod if !finished(oldPod) && finished(newPod) { // Deployments will leave evicted pods // If it's a evicted one, release its ip glog.Infof("release ip from %s_%s, phase %s", newPod.Name, newPod.Namespace, string(newPod.Status.Phase)) p.unreleased <- &releaseEvent{pod: newPod} return nil } // 同步Pod IP if err := p.syncPodIP(newPod); err != nil { glog.Warningf("failed to sync pod ip: %v", err) } return nil } func (p *FloatingIPPlugin) DeletePod(pod *corev1.Pod) error { if !p.hasResourceName(&pod.Spec) { return nil } glog.Infof("handle pod delete event: %s_%s", pod.Name, pod.Namespace) // Pod删除后,产生一个待释放IP事件 p.unreleased <- &releaseEvent{pod: pod} return nil } |
和IPAM同步Pod IP的逻辑:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
func (p *FloatingIPPlugin) syncPodIP(pod *corev1.Pod) error { // 只有到达Running状态的,才同步 if pod.Status.Phase != corev1.PodRunning { return nil } // 同步干的事情是,如果Pod就有ipinfos注解,并且IP地址没有在池中分配,那么在池中将IP分配给Pod if pod.Annotations == nil { return nil } // 这个有点意思,lockpod会立即调用,并且返回一个unlock函数,此函数会在syncPodIP退出时执行。代码简洁 // 防止有其它线程再操作此Pod defer p.lockPod(pod.Name, pod.Namespace)() keyObj, err := util.FormatKey(pod) if err != nil { glog.V(5).Infof("sync pod %s/%s ip formatKey with error %v", pod.Namespace, pod.Name, err) return nil } cniArgs, err := constant.UnmarshalCniArgs(pod.Annotations[constant.ExtendedCNIArgsAnnotation]) if err != nil { return err } ipInfos := cniArgs.Common.IPInfos for i := range ipInfos { if ipInfos[i].IP == nil || ipInfos[i].IP.IP == nil { continue } // 遍历所有注解中的IP地址,调用IPAM的AllocateSpecificIP方法分配IP if err := p.syncIP(keyObj.KeyInDB, ipInfos[i].IP.IP, pod); err != nil { glog.Warningf("sync pod %s ip %s: %v", keyObj.KeyInDB, ipInfos[i].IP.IP.String(), err) } } return nil } |
小结一下FloatingIPPlugin的职责。它主要负责监控ConfigMap、Pod的变化。当ConfigMap变化后更新IPAM的配置,当Pod发生变化时,更新IPAM池中的IP分配信息。
此外FloatingIPPlugin还提供了调度的核心算法,包括Filter、Prioritize、Bind这几个方法,我们在后面在探讨。
FloatingIPPlugin.ipam字段,代表了IP地址分配管理的核心,它的接口:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
type IPAM interface { // 初始化IP池 ConfigurePool([]*FloatingIPPool) error // 释放映射中键匹配的IP,返回已分配、未分配IP地址的映射 ReleaseIPs(map[string]string) (map[string]string, map[string]string, error) // 为Pod分配指定的IP AllocateSpecificIP(string, net.IP, Attr) error // 在子网中分配IP AllocateInSubnet(string, *net.IPNet, Attr) (net.IP, error) // 在给定的多个IP范围中,都分配一个IP AllocateInSubnetsAndIPRange(string, *net.IPNet, [][]nets.IPRange, Attr) ([]net.IP, error) // 在指定的子网中,以指定的key分配IP AllocateInSubnetWithKey(oldK, newK, subnet string, attr Attr) error // 保留IP ReserveIP(oldK, newK string, attr Attr) (bool, error) // 释放IP Release(string, net.IP) error // 根据Key返回的一个匹配 First(string) (*FloatingIPInfo, error) // 将IP地址转换为浮动IP对象 ByIP(net.IP) (FloatingIP, error) // 根据前缀查询 ByPrefix(string) ([]*FloatingIPInfo, error) // 根据关键字查询 ByKeyword(string) ([]FloatingIP, error) // 返回节点的子网信息 NodeSubnet(net.IP) *net.IPNet } |
Galaxy IPAM对接K8S调度器,使用的是Scheduler Extender Webhook,也就是它提供若干HTTP接口供调度器使用。
这些接口实现为Galaxy IPAM Server的方法,在startServer()的时候注册:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 |
// 筛选匹配的节点 func (s *Server) filter(request *restful.Request, response *restful.Response) { // K8S标准API,ExtenderArgs包含了当前正被调度的Pod,以及可用的(已经被筛选过的)节点列表 args := new(schedulerapi.ExtenderArgs) if err := request.ReadEntity(&args); err != nil { glog.Error(err) _ = response.WriteError(http.StatusInternalServerError, err) return } glog.V(5).Infof("POST filter %v", *args) start := time.Now() glog.V(3).Infof("filtering %s_%s, start at %d+", args.Pod.Name, args.Pod.Namespace, start.UnixNano()) // 调用FloatingIPPlugin filteredNodes, failedNodesMap, err := s.plugin.Filter(&args.Pod, args.Nodes.Items) glog.V(3).Infof("filtering %s_%s, start at %d-", args.Pod.Name, args.Pod.Namespace, start.UnixNano()) args.Nodes.Items = filteredNodes errStr := "" if err != nil { errStr = err.Error() } _ = response.WriteEntity(schedulerapi.ExtenderFilterResult{ // 可以调度的节点 Nodes: args.Nodes, // 不可调度的节点,以及不可调度的原因 FailedNodes: failedNodesMap, // 错误信息 Error: errStr, }) } // 节点优先级判定(评分) func (s *Server) priority(request *restful.Request, response *restful.Response) { args := new(schedulerapi.ExtenderArgs) if err := request.ReadEntity(&args); err != nil { glog.Error(err) _ = response.WriteError(http.StatusInternalServerError, err) return } glog.V(5).Infof("POST priority %v", *args) // 调用FloatingIPPlugin hostPriorityList, err := s.plugin.Prioritize(&args.Pod, args.Nodes.Items) if err != nil { glog.Warningf("prioritize err: %v", err) } _ = response.WriteEntity(*hostPriorityList) } // 绑定Pod到节点 func (s *Server) bind(request *restful.Request, response *restful.Response) { args := new(schedulerapi.ExtenderBindingArgs) if err := request.ReadEntity(&args); err != nil { glog.Error(err) _ = response.WriteError(http.StatusInternalServerError, err) return } glog.V(5).Infof("POST bind %v", *args) start := time.Now() glog.V(3).Infof("binding %s_%s to %s, start at %d+", args.PodName, args.PodNamespace, args.Node, start.UnixNano()) // 调用FloatingIPPlugin err := s.plugin.Bind(args) glog.V(3).Infof("binding %s_%s to %s, start at %d-", args.PodName, args.PodNamespace, args.Node, start.UnixNano()) var result schedulerapi.ExtenderBindingResult if err != nil { glog.Warningf("bind err: %v", err) result.Error = err.Error() } _ = response.WriteEntity(result) } |
这些方法仅仅是负责HTTP相关处理,核心是上文我们提到的FloatingIPPlugin的几个方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 |
// 没有可用IP的节点被标记为failedNodes // 如果Pod不需要浮动IP,则failedNodes为空 func (p *FloatingIPPlugin) Filter(pod *corev1.Pod, nodes []corev1.Node) ( []corev1.Node, schedulerapi.FailedNodesMap, error) { start := time.Now() failedNodesMap := schedulerapi.FailedNodesMap{} // 没有指定自定义资源tke.cloud.tencent.com/eni-ip的request,认为不需要浮动IP if !p.hasResourceName(&pod.Spec) { return nodes, failedNodesMap, nil } filteredNodes := []corev1.Node{} // 开始过滤,期间锁定针对此Pod的操作 defer p.lockPod(pod.Name, pod.Namespace)() // 读取注解k8s.v1.cni.galaxy.io/args,获取Pod要加入的子网信息 subnetSet, err := p.getSubnet(pod) if err != nil { return filteredNodes, failedNodesMap, err } // 遍历所有节点 for i := range nodes { nodeName := nodes[i].Name // 得到节点所属的子网 subnet, err := p.getNodeSubnet(&nodes[i]) if err != nil { failedNodesMap[nodes[i].Name] = err.Error() continue } // 如果节点可以分配Pod要加入的子网,则保留节点,否则,丢弃节点 if subnetSet.Has(subnet.String()) { filteredNodes = append(filteredNodes, nodes[i]) } else { failedNodesMap[nodeName] = "FloatingIPPlugin:NoFIPLeft" } } if glog.V(5) { nodeNames := make([]string, len(filteredNodes)) for i := range filteredNodes { nodeNames[i] = filteredNodes[i].Name } glog.V(5).Infof("filtered nodes %v failed nodes %v for %s_%s", nodeNames, failedNodesMap, pod.Namespace, pod.Name) } metrics.ScheduleLatency.WithLabelValues("filter").Observe(time.Since(start).Seconds()) return filteredNodes, failedNodesMap, nil } // 打分目前是空实现 func (p *FloatingIPPlugin) Prioritize(pod *corev1.Pod, nodes []corev1.Node) (*schedulerapi.HostPriorityList, error) { list := &schedulerapi.HostPriorityList{} if !p.hasResourceName(&pod.Spec) { return list, nil } //TODO return list, nil } // 将新的浮动IP绑定给Pod,或者重用以有的IP func (p *FloatingIPPlugin) Bind(args *schedulerapi.ExtenderBindingArgs) error { start := time.Now() pod, err := p.PodLister.Pods(args.PodNamespace).Get(args.PodName) if err != nil { return fmt.Errorf("failed to find pod %s: %w", util.Join(args.PodName, args.PodNamespace), err) } if !p.hasResourceName(&pod.Spec) { return fmt.Errorf("pod which doesn't want floatingip have been sent to plugin") } defer p.lockPod(pod.Name, pod.Namespace)() // 为Pod生成键,键由要加入的池、Pod命名空间、Pod名字、所属StatefulSet/Deployment的名字构成 keyObj, err := util.FormatKey(pod) if err != nil { return err } // 分配IP给Pod cniArgs, err := p.allocateIP(keyObj.KeyInDB, args.Node, pod) if err != nil { return err } data, err := json.Marshal(cniArgs) if err != nil { return fmt.Errorf("marshal cni args %v: %v", *cniArgs, err) } // 添加k8s.v1.cni.galaxy.io/args注解 bindAnnotation := map[string]string{constant.ExtendedCNIArgsAnnotation: string(data)} var err1 error if err := wait.PollImmediate(time.Millisecond*500, 3*time.Second, func() (bool, error) { // 执行绑定操作,为Pod添加binding子资源 if err := p.Client.CoreV1().Pods(args.PodNamespace).Bind(&corev1.Binding{ ObjectMeta: v1.ObjectMeta{Namespace: args.PodNamespace, Name: args.PodName, UID: args.PodUID, Annotations: bindAnnotation}, Target: corev1.ObjectReference{ Kind: "Node", Name: args.Node, }, }); err != nil { err1 = err if apierrors.IsNotFound(err) { // Pod已经不存在了,终止轮询 return false, err } return false, nil } glog.Infof("bind pod %s to %s with %s", keyObj.KeyInDB, args.Node, string(data)) return true, nil }); err != nil { if apierrors.IsNotFound(err1) { // 绑定过程中发现Pod消失,说明有人将其删除了。已经分配的IP需要回收 glog.Infof("binding returns not found for pod %s, putting it into unreleased chan", keyObj.KeyInDB) // attach ip annotation p.unreleased <- &releaseEvent{pod: pod} } // If fails to update, depending on resync to update return fmt.Errorf("update pod %s: %w", keyObj.KeyInDB, err1) } metrics.ScheduleLatency.WithLabelValues("bind").Observe(time.Since(start).Seconds()) return nil } |
我们再来看一下在Bind阶段,分配IP的细节:
1 2 3 4 5 6 |
func (p *FloatingIPPlugin) allocateIP(key string, nodeName string, pod *corev1.Pod) (*constant.CniArgs, error) { cniArgs, err := getPodCniArgs(pod) if err != nil { return nil, err } ipranges := cniArgs.RequestIPRange |
getPodCniArgs读取k8s.v1.cni.galaxy.io/args注解为结构:
1 2 3 4 5 6 |
type CniArgs struct { // 用户可以指定从什么地址范围分配IP RequestIPRange [][]nets.IPRange `json:"request_ip_range,omitempty"` // 这些参数最终会传递给底层CNI插件,作为key1=val1;key2=val2格式的参数 Common CommonCniArgs `json:"common"` } |
allocateIP会使用key去查询已经分配给key的、每个iprange中的IP信息:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
ipInfos, err := p.ipam.ByKeyAndIPRanges(key, ipranges) if err != nil { return nil, fmt.Errorf("failed to query floating ip by key %s: %v", key, err) } if len(ipranges) == 0 && len(ipInfos) > 0 { // reuse only one if requesting only one ip ipInfos = ipInfos[:1] } // 那些尚未分配IP地址的ipranges var unallocatedIPRange [][]nets.IPRange // those does not have allocated ips reservedIPs := sets.NewString() for i := range ipInfos { if ipInfos[i] == nil { // 未分配 unallocatedIPRange = append(unallocatedIPRange, ipranges[i]) } else { // 已经分配给当前key,需要保留 reservedIPs.Insert(ipInfos[i].IP.String()) } } policy := parseReleasePolicy(&pod.ObjectMeta) attr := floatingip.Attr{Policy: policy, NodeName: nodeName, Uid: string(pod.UID)} |
检查从IPAM获取的ipinfos,如果PodUid和当前Pod.GetUID(),说明可能先前版本的Pod尚未删除,等待删除(bind函数返回错误):
1 2 3 4 5 6 7 |
for _, ipInfo := range ipInfos { // check if uid missmatch, if we delete a statfulset/tapp and creates a same name statfulset/tapp immediately, // galaxy-ipam may receive bind event for new pod early than deleting event for old pod if ipInfo != nil && ipInfo.PodUid != "" && ipInfo.PodUid != string(pod.GetUID()) { return nil, fmt.Errorf("waiting for delete event of %s before reuse this ip", key) } } |
如果有需要新分配的IP,调用IPAM进行分配:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
if len(unallocatedIPRange) > 0 || len(ipInfos) == 0 { // 节点所属的子网 subnet, err := p.queryNodeSubnet(nodeName) if err != nil { return nil, err } // 执行分配 if _, err := p.ipam.AllocateInSubnetsAndIPRange(key, subnet, unallocatedIPRange, attr); err != nil { return nil, err } // 更新IP信息 ipInfos, err = p.ipam.ByKeyAndIPRanges(key, ipranges) if err != nil { return nil, fmt.Errorf("failed to query floating ip by key %s: %v", key, err) } } |
检查已分配IP重用的情况,并更新IPAM属性:
1 2 3 4 5 6 7 8 9 |
for _, ipInfo := range ipInfos { //... if reservedIPs.Has(ipInfo.IP.String()) { glog.Infof("%s reused %s, updating attr to %v", key, ipInfo.IPInfo.IP.String(), attr) if err := p.ipam.UpdateAttr(key, ipInfo.IPInfo.IP.IP, attr); err != nil { return nil, fmt.Errorf("failed to update floating ip release policy: %v", err) } } } |
回填Pod注解:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
// 新分配的IP地址 var allocatedIPs []string // 所有IP地址 var ret []constant.IPInfo for _, ipInfo := range ipInfos { if !reservedIPs.Has(ipInfo.IP.String()) { allocatedIPs = append(allocatedIPs, ipInfo.IP.String()) } ret = append(ret, ipInfo.IPInfo) } glog.Infof("%s reused ips %v, allocated ips %v, attr %v", key, reservedIPs.List(), allocatedIPs, attr) // 回填 cniArgs.Common.IPInfos = ret return &cniArgs, nil |
当一个Pod创建时,如果它的Spec上具有自定义的资源请求tke.cloud.tencent.com/eni-ip,则调度器认为,需要为它分配浮动IP。
由于在floatingip-config中,可以配置多组浮动IP子网。Pod应该通过注解:
- k8s.v1.cni.galaxy.io/args 来声明自己需要加入什么子网、需要请求什么范围的IP地址
- tke.cloud.tencent.com/eni-ip-pool 来声明需要使用哪个IP池
Pod创建后,首先进入调度阶段。作为K8S调度器扩展的Galaxy IPAM会按需进行IP分配,并将结果写入到Pod的注解中。
当Kubelet在本地启动Pod时, 会调用CNI插件galaxy-sdn,galaxy-sdn则会调用Galaxy守护进程。后者则通过命令行调用galaxy-k8s-vlan等底层CNI插件,并且将Pod注解k8s.v1.cni.galaxy.io/args中的键值转换为key1=val1;key2=val2CNI参数传递。
galaxy-k8s-vlan插件会调用ipam.Allocate,获取CNI参数中的IP地址信息,并转换为v0.20格式的Result,然后调用setupNetwork将IP地址设置到容器的网络接口上。
Leave a Reply