Galaxy学习笔记
Galaxy是TKEStack的一个网络组件,支持为TKE集群提供Overlay/Underlay容器网络。Galaxy的一个特性是能够提供浮动IP(弹性IP) —— 即使Pod因为节点宕机而漂移到其它节点,其IP地址也能够保持不变。
Galaxy提供四类网络,支持为每个工作负载单独配置网络模式。
默认模式,基于Flannel的VXLAN或者Host Gateway(路由)方案。同节点容器通信不走网桥,报文直接利用主机路由转发,跨节点容器通信利用VXLAN协议封装或者直接路由,节点间路由记录在Etcd中。优点:简单可靠,性能不错,支持网络策略。
tke-installer默认安装的TKEStack会自动配置Galaxy为Overlay模式,CNI配置:
1 2 3 4 5 |
{ "type": "galaxy-sdn", "capabilities": {"portMappings": true}, "cniVersion": "0.2.0" } |
在该模式下:
- Flannel在每个Kubelet节点上分配一个子网,并将其保存在etcd和本地路径 /run/flannel/subnet.env
- Kubelet调用Galaxy的CNI插件galaxy-sdn,该插件会通过UDS调用本机的Galaxy进程
- Galaxy进程则又调用Flannel CNI,解析/run/flannel/subnet.env中的子网信息
- Flannel CNI会调用:
- Bridge CNI或Veth CNI来为POD配置网络
- 调用host-lo
架构图:
该模式下,容器IP由宿主机网络提供,容器与宿主机可以直接路由,性能更好。支持基于Linux Bridge/MacVlan/IPVlan和SRIOV的容器-宿主机二层联通,可以根据业务场景和硬件环境,具体选择使用哪种网桥。
要使用Galaxy Underlay网络,需要启用Galaxy-ipam组件,该组件为Pod分配IP,浮动IP的能力也由它提供。
利用K8S的hostPort配置,将容器端口映射到宿主机端口。如果不指定hostPort,Galaxy进行随机映射。
利用K8S的HostNetwork模式,直接使用物理网络。
Galaxy有三类组件构成。
以DaemonSet方式运行在每个节点上,通过调用各种CNI插件来配置容器网络。
Galaxy将实际的创建CNI的工作委托给其它CNI插件,也就是说它扮演一个装饰器的角色。Galaxy支持任何标准CNI插件,它也内置了若干CNI插件。
是一个K8S Scheduler插件。kube-scheduler通过HTTP调用Galaxy-ipam,实现浮动IP的配置和管理。
因为仅仅在重新调度Pod的时候才会面临IP地址变化的问题,因此这个组件实现为Scheduler插件就很自然了。
Galaxy支持任何标准的CNI插件,你可以将其用作一个CNI框架,就像multus-cni那样。
Galaxy还包含了一系列内置的CNI插件:
插件 | 说明 |
SDN CNI | 使用Overlay网络时,此插件是CNI的入口。该插件会通过UDS调用Galaxy守护进程,转发Kubelet提供的所有CNI参数 |
Veth CNI | 用于Overlay网络中,创建一个VETH对来连接容器和宿主机命名空间。该插件从ipam插件得到Pod的IP |
underlay-veth CNI | 用于Underlay网络,创建VETH对来连接容器和宿主机命名空间,该插件不会创建网桥,而是使用宿主机路由规则来进行封包转发 |
Vlan CNI |
用于Underlay网络,创建一个VETH对,连接容器和宿主机上的bridge/macvlan/ipvlan设备 此插件支持为Pod配置VLAN,它能够从CNI参数,例如ipinfos=[{"ip":"192.168.0.68/26","vlan":2,"gateway":"192.168.0.65"}] ,或者ipam CNI插件的结果中获得IP地址 |
SRIOV CNI | 用于Underlay网络,利用以太网服务器适配器(Ethernet Server Adapter)的SR-IOV,能够创建VF设备并将其放入容器的网络命名空间 |
TKE route ENI CNI | 用于腾讯云 |
1 2 3 4 5 6 7 8 |
go get -d tkestack.io/galaxy cd $GOPATH/src/tkestack.io/galaxy # 构建所有二进制文件 make # 构建指定二进制文件 make BINS="galxy galxy-ipam" make BINS="galxy-ipam" |
1 2 3 4 5 6 7 8 |
# 构建所有镜像 make image # 构建指定镜像 make image BINS="galxy-ipam" # 为指定体系结构构建 make image.multiarch PLATFORMS="linux_arm64" |
清单文件可以到GitHub下载,默认内容如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 |
--- apiVersion: rbac.authorization.k8s.io/v1 # kubernetes versions before 1.8.0 should use rbac.authorization.k8s.io/v1beta1 kind: ClusterRole metadata: name: galaxy rules: - apiGroups: [""] resources: - pods - namespaces - nodes - pods/binding verbs: ["list", "watch", "get", "patch", "create", "update"] - apiGroups: ["apps", "extensions"] resources: - statefulsets - deployments verbs: ["list", "watch"] - apiGroups: [""] resources: - configmaps - endpoints - events verbs: ["get", "list", "watch", "update", "create", "patch"] - apiGroups: ["galaxy.k8s.io"] resources: - pools - floatingips verbs: ["get", "list", "watch", "update", "create", "patch", "delete"] - apiGroups: ["apiextensions.k8s.io"] resources: - customresourcedefinitions verbs: - "*" - apiGroups: ["networking.k8s.io"] resources: - networkpolicies verbs: ["get", "list", "watch"] --- apiVersion: v1 kind: ServiceAccount metadata: name: galaxy namespace: kube-system --- apiVersion: rbac.authorization.k8s.io/v1 # kubernetes versions before 1.8.0 should use rbac.authorization.k8s.io/v1beta1 kind: ClusterRoleBinding metadata: name: galaxy roleRef: apiGroup: rbac.authorization.k8s.io kind: ClusterRole name: galaxy subjects: - kind: ServiceAccount name: galaxy namespace: kube-system --- apiVersion: apps/v1 kind: DaemonSet metadata: labels: app: galaxy name: galaxy namespace: kube-system spec: selector: matchLabels: app: galaxy template: metadata: labels: app: galaxy spec: priorityClassName: system-node-critical serviceAccountName: galaxy hostNetwork: true hostPID: true containers: - image: tkestack/galaxy:v1.0.7 command: ["/bin/sh"] # 入口点脚本: # 拷贝来自ConfigMap的Galaxy配置到宿主机 # cp -p /etc/galaxy/cni/00-galaxy.conf /etc/cni/net.d/; # 拷贝CNI插件二进制文件到宿主机 # cp -p /opt/cni/galaxy/bin/galaxy-sdn /opt/cni/galaxy/bin/loopback /opt/cni/bin/; # 启动Galaxy守护进程 腾讯云中运行要这个参数 # /usr/bin/galaxy --logtostderr=true --v=3 --route-eni # qcloud galaxy should run with --route-eni args: ["-c", "cp -p /etc/galaxy/cni/00-galaxy.conf /etc/cni/net.d/; cp -p /opt/cni/galaxy/bin/galaxy-sdn /opt/cni/galaxy/bin/loopback /opt/cni/bin/; /usr/bin/galaxy --logtostderr=true --v=3 --route-eni"] # private-cloud should run without --route-eni # args: ["-c", "cp -p /etc/galaxy/cni/00-galaxy.conf /etc/cni/net.d/; cp -p /opt/cni/galaxy/bin/galaxy-sdn /opt/cni/galaxy/bin/loopback /opt/cni/bin/; /usr/bin/galaxy --logtostderr=true --v=3"] imagePullPolicy: Always env: - name: MY_NODE_NAME valueFrom: fieldRef: fieldPath: spec.nodeName - name: DOCKER_HOST value: unix:///host/run/docker.sock name: galaxy resources: requests: cpu: 100m memory: 200Mi securityContext: privileged: true volumeMounts: - name: galaxy-run mountPath: /var/run/galaxy/ - name: flannel-run mountPath: /run/flannel - name: galaxy-etc mountPath: /etc/galaxy - name: cni-config mountPath: /etc/cni/net.d/ - name: cni-bin mountPath: /opt/cni/bin - name: cni-etc mountPath: /etc/galaxy/cni - name: cni-state mountPath: /var/lib/cni - name: docker-sock mountPath: /host/run/ - name: tz-config mountPath: /etc/localtime terminationGracePeriodSeconds: 30 tolerations: - operator: Exists volumes: - name: galaxy-run hostPath: path: /var/run/galaxy - name: flannel-run hostPath: path: /run/flannel - configMap: defaultMode: 420 name: galaxy-etc name: galaxy-etc - name: cni-config hostPath: path: /etc/cni/net.d/ - name: cni-bin hostPath: path: /opt/cni/bin - name: cni-state hostPath: path: /var/lib/cni - configMap: defaultMode: 420 name: cni-etc name: cni-etc - name: docker-sock # in case of docker restart, /run/docker.sock may change, we have to mount the /run directory hostPath: path: /run/ - name: tz-config hostPath: path: /etc/localtime --- apiVersion: v1 kind: ConfigMap metadata: name: galaxy-etc namespace: kube-system data: # Galaxy配置文件 # update network card name in "galaxy-k8s-vlan" and "galaxy-k8s-sriov" if necessary # update vf_num in "galaxy-k8s-sriov" according to demand # update ENIIPNetwork to tke-route-eni if running on qcloud galaxy.json: | { "NetworkConf":[ {"name":"tke-route-eni","type":"tke-route-eni","eni":"eth1","routeTable":1}, {"name":"galaxy-flannel","type":"galaxy-flannel", "delegate":{"type":"galaxy-veth"},"subnetFile":"/run/flannel/subnet.env"}, {"name":"galaxy-k8s-vlan","type":"galaxy-k8s-vlan", "device":"eth1", "default_bridge_name": "br0"}, {"name":"galaxy-k8s-sriov","type": "galaxy-k8s-sriov", "device": "eth1", "vf_num": 10} ], "DefaultNetworks": ["galaxy-flannel"], "ENIIPNetwork": "galaxy-k8s-vlan" } --- apiVersion: v1 kind: ConfigMap metadata: name: cni-etc namespace: kube-system data: # CNI网络配置 00-galaxy.conf: | { "name": "galaxy-sdn", "type": "galaxy-sdn", "capabilities": {"portMappings": true}, "cniVersion": "0.2.0" } |
根据实际运行环境、使用的网络模式,需要进行调整。
需要保证命令行标记: --network-plugin=cni --cni-bin-dir=/opt/cni/bin/,并重启Kubelet。
这种网络模式依赖于Flannel,参考Flannel学习笔记。
你需要修改上述Galaxy清单中的galaxy-etc,来配置默认网络(DefaultNetworks):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 |
{ // 所有支持的网络模式的配置 "NetworkConf":[ { "name":"tke-route-eni", // 如果name为空,Galaxy假设它和type相同 "type":"tke-route-eni", "eni":"eth1", "routeTable":1 }, { "name":"galaxy-flannel", // 这个插件就是flannel的CNI,只是被改了名字 "type":"galaxy-flannel", "delegate":{ "type":"galaxy-veth" }, "subnetFile":"/run/flannel/subnet.env" }, { "name":"galaxy-k8s-vlan", "type":"galaxy-k8s-vlan", "device":"eth1", "default_bridge_name":"br0" }, { "name":"galaxy-k8s-sriov", "type":"galaxy-k8s-sriov", "device":"eth1", "vf_num":10 }, { "name":"galaxy-underlay-veth", "type":"galaxy-underlay-veth", "device":"eth1" } ], // Pod默认使用这个网络,注意可以加入多个网络 "DefaultNetworks":[ "galaxy-flannel" // 使用基于Flannel的Overlay网络 ], // 如果Pod需要腾讯云弹性网卡(ENI)而且没有配置k8s.v1.cni.cncf.io/networks注解,则 // 默认使用此网络。这个配置可以避免需要为所有需要Underlay网络的Pod添加注解 "ENIIPNetwork":"galaxy-k8s-vlan" } |
可以为Pod指定注解:
1 |
k8s.v1.cni.cncf.io/networks: galaxy-flannel,galaxy-k8s-sriov |
来提示Galaxy应该为它配置哪些网络。
Galaxy可以和其它CNI插件同时存在。需要注意的一点是,--network-conf-dir目录下其它插件的配置文件,其文件名的字典排序不应该比00-galaxy.conf更小,否则Kubelet会在调用Galaxy CNI插件之前,调用其它CNI插件,这可能不符合预期。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
--alsologtostderr # 记录日志到文件,同时记录到标准错误 --bridge-nf-call-iptables # 是否配置 bridge-nf-call-iptables,启用/禁用对网桥转发的封包被iptables规则过滤,默认true --cni-paths stringSlice # 除了从kubelet接收的,额外的CNI路径,默认/opt/cni/galaxy/bin --flannel-allocated-ip-dir string # Flannel CNI插件在何处存放分配的IP地址,默认/var/lib/cni/networks --flannel-gc-interval duration # 执行Flannel网络垃圾回收的间隔,默认10s --gc-dirs string # 清理哪些目录,这些目录中的文件名包含容器ID # 默认 /var/lib/cni/flannel,/var/lib/cni/galaxy,/var/lib/cni/galaxy/port --hostname-override string # 覆盖kubelet hostname,如果指定该参数,则Galaxy使用它从API Server得到节点对象 --ip-forward # 是否启用IP转发 --json-config-path string # Galaxy配置文件路径,默认/etc/galaxy/galaxy.json --kubeconfig string # Kubelet配置文件 --log-backtrace-at traceLocation # 如果日志在file:N打印,打印栈追踪。默认default :0 --log-dir string # 日志输出目录 --log-flush-frequency duration # 日志刷出间隔,默认5s --logtostderr # 输出到标准错误而非文件 --master string # API Server的地址和端口 --network-conf-dir string # 额外的CNI网络配置文件。默认/etc/cni/net.d/ --network-policy # 启用网络策略支持 --route-eni # 是否启用腾讯云route-eni --stderrthreshold severity # 以上级别的日志打印到标准错误。默认2 -v, --v Level # 日志冗长级别 |
Underlay网络必须配合Galaxy-ipam使用,Galaxy-ipam负责为Pod分配或释放IP地址:
- 你需要规划容器网络中使用的Underlay IP范围(注意和物理网路上其它节点的IP冲突),并且配置到ConfigMap floatingip-config中
- 调度Pod时,kube-scheduler会在filter/priority/bind方法中调用Galaxy-ipam
- Galaxy-ipam检查Pod是否配置了Reserved IP,如果是,则Galaxy-ipam仅将此IP所在的可用子网的节点标记为有效节点,否则所有节点都将被标记为有效节点。在Pod绑定IP期间,Galaxy-ipam分配一个IP并将其写入到Pod annotations中
- Galaxy从Pod的注解中获得IP,并将其作为参数传递给CNI,通过CNI配置Pod IP
Galaxy的浮动IP能力由Galaxy-ipam提供,后者是一个Kubernetes Scheudler Extender,K8S的调度器会调用Galaxy-ipam,影响其filtering/binding的过程。浮动IP必须配合Underlay网络使用。
整体工作流图如下:
清单文件可以到GitHub下载,默认内容如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 |
apiVersion: v1 kind: Service metadata: name: galaxy-ipam namespace: kube-system labels: app: galaxy-ipam spec: type: NodePort ports: - name: scheduler-port port: 9040 targetPort: 9040 nodePort: 32760 protocol: TCP - name: api-port port: 9041 targetPort: 9041 nodePort: 32761 protocol: TCP selector: app: galaxy-ipam --- apiVersion: rbac.authorization.k8s.io/v1 # kubernetes versions before 1.8.0 should use rbac.authorization.k8s.io/v1beta1 kind: ClusterRole metadata: name: galaxy-ipam rules: - apiGroups: [""] resources: - pods - namespaces - nodes - pods/binding verbs: ["list", "watch", "get", "patch", "create"] - apiGroups: ["apps", "extensions"] resources: - statefulsets - deployments verbs: ["list", "watch"] - apiGroups: [""] resources: - configmaps - endpoints - events verbs: ["get", "list", "watch", "update", "create", "patch"] - apiGroups: ["galaxy.k8s.io"] resources: - pools - floatingips verbs: ["get", "list", "watch", "update", "create", "patch", "delete"] - apiGroups: ["apiextensions.k8s.io"] resources: - customresourcedefinitions verbs: - "*" - apiGroups: ["apps.tkestack.io"] resources: - tapps verbs: ["list", "watch"] --- apiVersion: v1 kind: ServiceAccount metadata: name: galaxy-ipam namespace: kube-system --- apiVersion: rbac.authorization.k8s.io/v1 # kubernetes versions before 1.8.0 should use rbac.authorization.k8s.io/v1beta1 kind: ClusterRoleBinding metadata: name: galaxy-ipam roleRef: apiGroup: rbac.authorization.k8s.io kind: ClusterRole name: galaxy-ipam subjects: - kind: ServiceAccount name: galaxy-ipam namespace: kube-system --- apiVersion: apps/v1 kind: Deployment metadata: labels: app: galaxy-ipam name: galaxy-ipam namespace: kube-system spec: replicas: 1 selector: matchLabels: app: galaxy-ipam template: metadata: labels: app: galaxy-ipam spec: priorityClassName: system-cluster-critical affinity: podAntiAffinity: requiredDuringSchedulingIgnoredDuringExecution: - labelSelector: matchExpressions: - key: app operator: In values: - galaxy-ipam topologyKey: "kubernetes.io/hostname" serviceAccountName: galaxy-ipam hostNetwork: true dnsPolicy: ClusterFirstWithHostNet containers: - image: tkestack/galaxy-ipam:v1.0.7 args: - --logtostderr=true - --profiling - --v=3 - --config=/etc/galaxy/galaxy-ipam.json # 这个服务暴露一个端口,供Scheduler调用 - --port=9040 - --api-port=9041 - --leader-elect command: - /usr/bin/galaxy-ipam ports: - containerPort: 9040 - containerPort: 9041 imagePullPolicy: Always name: galaxy-ipam resources: requests: cpu: 100m memory: 200Mi volumeMounts: - name: kube-config mountPath: /etc/kubernetes/ - name: galaxy-ipam-log mountPath: /data/galaxy-ipam/logs - name: galaxy-ipam-etc mountPath: /etc/galaxy - name: tz-config mountPath: /etc/localtime terminationGracePeriodSeconds: 30 tolerations: - effect: NoSchedule key: node-role.kubernetes.io/master operator: Exists volumes: - name: kube-config hostPath: path: /etc/kubernetes/ - name: galaxy-ipam-log emptyDir: {} - configMap: defaultMode: 420 name: galaxy-ipam-etc name: galaxy-ipam-etc - name: tz-config hostPath: path: /etc/localtime --- apiVersion: v1 kind: ConfigMap metadata: name: galaxy-ipam-etc namespace: kube-system data: # 配置文件 galaxy-ipam.json: | { "schedule_plugin": { # 如果不是使用腾讯云的弹性网卡(ENI),去掉这一行 "cloudProviderGrpcAddr": "127.0.0.2:80" } } |
你需要为K8S调度器配置调度策略,此策略现在可以放在ConfigMap中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
cat <<EOF | kubectl create -f - apiVersion: v1 kind: ConfigMap metadata: name: scheduler-policy namespace: kube-system data: policy.cfg: | { "kind": "Policy", "apiVersion": "v1", "extenders": [ { // 通过HTTP调用扩展 "urlPrefix": "http://127.0.0.1:9040/v1", "httpTimeout": 10000000000, "filterVerb": "filter", "BindVerb": "bind", "weight": 1, "enableHttps": false, "managedResources": [ { "name": "tke.cloud.tencent.com/eni-ip", "ignoredByScheduler": false } ] } ] } EOF |
然后为调度器添加命令行标记: --policy-configmap=scheduler-policy即可。
运行在裸金属环境下时,可以使用如下配置:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
kind: ConfigMap apiVersion: v1 metadata: name: floatingip-config namespace: kube-system data: floatingips: | [ { # 节点的CIDR,这个网络范围内的节点可以运行具有浮动IP的Pod "nodeSubnets": ["10.0.0.0/16"], # 可以分配给Pod的IP地址范围 "ips": ["10.0.70.2~10.0.70.241"], # Pod IP子网信息 "subnet":"10.0.70.0/24", # Pod IP网关信息 "gateway":"10.0.70.1", # Pod IP所属VLAN,如果Pod IP和Node IP不在同一VLAN,需要设置该字段,同时 # 确保节点所连接的交换机时一个trunk port "vlan": 1024 } ] |
一个nodeSubnet可以对应多个Pod Subnet,例如:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
// 如果Pod运行在10.49.28.0/26,那么它可以具有10.0.80.2~10.0.80.4或者10.0.81.2~10.0.81.4的IP地址 // 如果Pod运行在10.49.29.0/24,则只能具有10.0.80.0/24的IP地址 [{ "nodeSubnets": ["10.49.28.0/26", "10.49.29.0/24"], "ips": ["10.0.80.2~10.0.80.4"], "subnet": "10.0.80.0/24", "gateway": "10.0.80.1" }, { "nodeSubnets": ["10.49.28.0/26"], "ips": ["10.0.81.2~10.0.81.4"], "subnet": "10.0.81.0/24", "gateway": "10.0.81.1", "vlan": 3 }] |
只要可分配的IP地址范围不重叠,多个nodeSubnet可以共享同一个Pod Subnet:
1 2 3 4 5 6 7 8 9 10 11 12 13 |
[{ "routableSubnet": "10.180.1.2/32", "ips": ["10.180.154.2~10.180.154.3"], "subnet": "10.180.154.0/24", "gateway": "10.180.154.1", "vlan": 3 }, { "routableSubnet": "10.180.1.3/32", "ips": ["10.180.154.7~10.180.154.8"], "subnet": "10.180.154.0/24", "gateway": "10.180.154.1", "vlan": 3 }] |
要保留一个IP不被分配,可以使用下面的FloatingIP资源:
1 2 3 4 5 6 7 8 9 10 11 12 13 |
apiVersion: galaxy.k8s.io/v1alpha1 kind: FloatingIP metadata: # 名字是需要保留的IP name: 10.0.0.1 labels: # 从1.0.8下面这个标签不再需要 ipType: internalIP # 这个标签必须保留 reserved: this-is-not-for-pods spec: key: pool__reserved-for-node_ policy: 2 |
目前浮动IP仅仅支持Deployment、StatefulSet产生的工作负载。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
apiVersion: apps/v1 kind: Deployment ... spec: ... template: metadata: annotations: # 如果默认网络不是galaxy-k8s-vlan则必须添加下面的注解 k8s.v1.cni.cncf.io/networks: galaxy-k8s-vlan # IP释放策略: # 为空/不指定:Pod一旦停止,就释放IP # immutable:仅仅在删除、缩容Deployment / StatefulSet的情况下才释放IP # never:即使Deployment / StatefulSet被删除,也不会释放IP。后续的同名Deployment # /StatefulSet会重用已分配的IP k8s.v1.cni.galaxy.io/release-policy: immutable creationTimestamp: null labels: k8s-app: nnn qcloud-app: nnn spec: containers: - image: nginx imagePullPolicy: Always name: nnn resources: limits: cpu: 500m memory: 1Gi # 扩展的资源限制,在某个容器中配置即可 tke.cloud.tencent.com/eni-ip: "1" requests: cpu: 250m memory: 256Mi tke.cloud.tencent.com/eni-ip: "1" |
生成的Pod中,通过注解 k8s.v1.cni.galaxy.io/args指明了它的IP地址等信息。CNI插件也是基于这些信息为容器网络命名空间设置IP地址、路由的:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
apiVersion: v1 kind: Pod metadata: name: nnn-7df5984746-58hjm annotations: k8s.v1.cni.cncf.io/networks: galaxy-k8s-vlan k8s.v1.cni.galaxy.io/args: '{"common":{"ipinfos":[{"ip":"192.168.64.202/24", "vlan":0,"gateway":"192.168.64.1","routable_subnet":"172.21.64.0/20"}]}}' k8s.v1.cni.galaxy.io/release-policy: immutable ... spec: ... status: ... hostIP: 172.21.64.15 phase: Running podIP: 192.168.64.202 podIPs: - ip: 192.168.64.202 |
为Deployment设置注解 tke.cloud.tencent.com/eni-ip-pool,可以让多个Deployment共享一个IP池。
使用IP池的情况下,默认的IP释放策略为never,除非通过注解 k8s.v1.cni.galaxy.io/release-policy指定其它策略。
默认情况下,IP池的大小随着Deployment/StatefulSet的副本数量的增加而增大,你可以设置固定尺寸的IP池:
1 2 3 4 5 |
apiVersion: galaxy.k8s.io/v1alpha1 kind: Pool metadata: name: example-pool size: 4 |
通过HTTP接口创建IP池的时候,可以设置preAllocateIP=true来为池预先分配IP,通过kubectl创建CR时无法实现预分配。
该CR保存了浮动IP,及其绑定的工作负载信息:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
apiVersion: galaxy.k8s.io/v1alpha1 kind: FloatingIP metadata: creationTimestamp: "2020-03-04T08:28:15Z" generation: 1 labels: ipType: internalIP # IP地址 name: 192.168.64.202 resourceVersion: "2744910" selfLink: /apis/galaxy.k8s.io/v1alpha1/floatingips/192.168.64.202 uid: b5d55f27-4548-44c7-b8ad-570814b55026 spec: attribute: '{"NodeName":"172.21.64.15"}' # 绑定的工作负载 key: dp_default_nnn_nnn-7df5984746-58hjm policy: 1 subnet: 172.21.64.0/20 updateTime: "2020-03-04T08:28:15Z" |
Galaxy-ipam提供了基于Swagger 2.0的API,为galaxy-ipam提供命令行选项 --swagger,则它能够在下面的URL展示此API:
http://${galaxy-ipam-ip}:9041/apidocs.json/v1
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
// 查询分配给default命名空间中的StatefulSet sts的IP地址 // curl 'http://192.168.30.7:9041/v1/ip?appName=sts&appType=statefulsets&namespace=default' { "last": true, "totalElements": 2, "totalPages": 1, "first": true, "numberOfElements": 2, "size": 10, "number": 0, "content": [ { "ip": "10.0.0.112", "namespace": "default", "appName": "sts", "podName": "sts-0", "policy": 2, "appType": "statefulset", "updateTime": "2020-05-29T11:11:44.633383558Z", "status": "Deleted", "releasable": true }, { "ip": "10.0.0.174", "namespace": "default", "appName": "sts", "podName": "sts-1", "policy": 2, "appType": "statefulset", "updateTime": "2020-05-29T11:11:45.132450117Z", "status": "Deleted", "releasable": true } ] } |
1 2 3 4 5 6 7 8 9 10 11 |
// curl -X POST -H "Content-type: application/json" -d ' // { "ips": [ // { "ip":"10.0.0.112", "appName":"sts", "appType":"statefulset", "podName":"sts-0","namespace":"default"}, // {"ip":"10.0.0.174", "appName":"sts", "appType":"statefulset", "podName":"sts-1", "namespace":"default"} // ]} // ' // http://192.168.30.7:9041/v1/ip { "code": 200, "message": "" } |
Deployment的默认更新策略是StrategyType=RollingUpdate,25% max unavailable, 25% max surge。这意味着这在滚动更新过程中,可能存在超过副本数限制25%的Pod。这可能导致死锁状态:
- 假设Deployment副本数为3,那么有一个副本不可用,就超过25%的限制。Deployment的控制器只能先创建新Pod,然后等它就绪后再删除一个旧Pod
- 如果浮动IP的释放策略被设置为immutable/never,这就意味着新的Pod需要从一个旧Pod那复用IP,然而旧Pod还尚未删除
这会导致新Pod卡斯在调度阶段。
使用K8S的NodePort服务,可以将一组Pod通过宿主机端口暴露到集群外部。但是如果希望访问StatefulSet的每个特定Pod的端口,使用K8S NodePort服务无法实现。
使用K8S的HostPort,则会存在两个Pod调度到同一节点,进而产生端口冲突的问题:
1 2 3 4 5 6 |
spec: containers: - image: ... ports: - containerPort: 9040 hostPort: 9040 |
Galaxy提供了一个功能,可以进行随机的端口映射:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
apiVersion: apps/v1 kind: Deployment metadata: labels: app: hello name: hello spec: replicas: 1 selector: matchLabels: app: hello template: metadata: labels: app: hello annotations: # 为Pod设置此注解即可 tkestack.io/portmapping: "" spec: containers: - image: ... ports: - containerPort: 9040 |
这样Galaxy会利用iptables,随机的映射宿主机的端口到Pod的每个容器端口,并且将此随机端口回填到tkestack.io/portmapping注解中。
在TKEStack的缺省配置(基于Flannel的VXLAN模式的Overlay网络)下,每个节点只有一个CNI配置:
1 2 3 4 5 |
{ "type": "galaxy-sdn", "capabilities": {"portMappings": true}, "cniVersion": "0.2.0" } |
我们先看看这个CNI插件做的什么事情,又是如何和Flannel交互的。Galaxy所有自带的CNI插件都位于github.com/tkestack/galaxy/cni目录下,其中galaxy-sdn插件入口点如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 |
package main import ( "bytes" "encoding/json" "fmt" "io/ioutil" "net" "net/http" "os" "strings" "github.com/containernetworking/cni/pkg/skel" // 使用0.20版本的CNI t020 "github.com/containernetworking/cni/pkg/types/020" "github.com/containernetworking/cni/pkg/version" galaxyapi "tkestack.io/galaxy/pkg/api/galaxy" "tkestack.io/galaxy/pkg/api/galaxy/private" ) type cniPlugin struct { socketPath string } func NewCNIPlugin(socketPath string) *cniPlugin { return &cniPlugin{socketPath: socketPath} } // Create and fill a CNIRequest with this plugin's environment and stdin which // contain the CNI variables and configuration func newCNIRequest(args *skel.CmdArgs) *galaxyapi.CNIRequest { envMap := make(map[string]string) for _, item := range os.Environ() { idx := strings.Index(item, "=") if idx > 0 { envMap[strings.TrimSpace(item[:idx])] = item[idx+1:] } } return &galaxyapi.CNIRequest{ Env: envMap, Config: args.StdinData, } } // 调用Galaxy守护进程 func (p *cniPlugin) doCNI(url string, req *galaxyapi.CNIRequest) ([]byte, error) { data, err := json.Marshal(req) if err != nil { return nil, fmt.Errorf("failed to marshal CNI request %v: %v", req, err) } // 通过UDS连接 client := &http.Client{ Transport: &http.Transport{ Dial: func(proto, addr string) (net.Conn, error) { return net.Dial("unix", p.socketPath) }, }, } resp, err := client.Post(url, "application/json", bytes.NewReader(data)) if err != nil { return nil, fmt.Errorf("failed to send CNI request: %v", err) } defer resp.Body.Close() // nolint: errcheck body, err := ioutil.ReadAll(resp.Body) if err != nil { return nil, fmt.Errorf("failed to read CNI result: %v", err) } if resp.StatusCode != 200 { return nil, fmt.Errorf("galaxy returns: %s", string(body)) } return body, nil } // 就是将CNI请求转发给Galaxy守护进程处理 func (p *cniPlugin) CmdAdd(args *skel.CmdArgs) (*t020.Result, error) { // 由于通过UDS通信,URL的hostname部分随便写了一个dummy,就是简单的调用/cni这个Endpoint body, err := p.doCNI("http://dummy/cni", newCNIRequest(args)) if err != nil { return nil, err } result := &t020.Result{} if err := json.Unmarshal(body, result); err != nil { return nil, fmt.Errorf("failed to unmarshal response '%s': %v", string(body), err) } return result, nil } // Send the ADD command environment and config to the CNI server, printing // the IPAM result to stdout when called as a CNI plugin func (p *cniPlugin) skelCmdAdd(args *skel.CmdArgs) error { result, err := p.CmdAdd(args) if err != nil { return err } return result.Print() } // Send the DEL command environment and config to the CNI server func (p *cniPlugin) CmdDel(args *skel.CmdArgs) error { _, err := p.doCNI("http://dummy/cni", newCNIRequest(args)) return err } // 使用skel框架开发 func main() { // 传入UDS套接字的路径 p := NewCNIPlugin(private.GalaxySocketPath) skel.PluginMain(p.skelCmdAdd, p.CmdDel, version.Legacy) } |
可以看到galaxy-sdn就是个空壳子,所有工作都是委托给Galaxy守护进程处理的。
整体流程:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 |
func init() { // this ensures that main runs only on main thread (thread group leader). // since namespace ops (unshare, setns) are done for a single thread, we // must ensure that the goroutine does not jump from OS thread to thread runtime.LockOSThread() _, pANet, _ = net.ParseCIDR("10.0.0.0/8") _, pBNet, _ = net.ParseCIDR("172.16.0.0/12") _, pCNet, _ = net.ParseCIDR("192.168.0.0/16") } func main() { d = &vlan.VlanDriver{} skel.PluginMain(cmdAdd, cmdDel, version.Legacy) } func cmdAdd(args *skel.CmdArgs) error { // 加载CNI配置文件 conf, err := d.LoadConf(args.StdinData) if err != nil { return err } // 首先尝试从args获得IPInfo,如果失败则调用第三方IPAM插件分配IP vlanIds, results, err := ipam.Allocate(conf.IPAM.Type, args) if err != nil { return err } // 判断是否创建默认网桥 if d.DisableDefaultBridge == nil { defaultTrue := true d.DisableDefaultBridge = &defaultTrue for i := range vlanIds { if vlanIds[i] == 0 { *d.DisableDefaultBridge = false } } } // 初始化VLAN驱动,主要是在必要的情况下对宿主机上的网桥进行配置,或者进行针对MACVlan/IPVlan的配置 if err := d.Init(); err != nil { return fmt.Errorf("failed to setup bridge %v", err) } result020s, err := resultConvert(results) if err != nil { return err } // 配置宿主机网络 if err := setupNetwork(result020s, vlanIds, args); err != nil { return err } result020s[0].DNS = conf.DNS return result020s[0].Print() } func cmdDel(args *skel.CmdArgs) error { // 删除所有VETH对 if err := utils.DeleteAllVeth(args.Netns); err != nil { return err } conf, err := d.LoadConf(args.StdinData) if err != nil { return err } // 是否IP地址 return ipam.Release(conf.IPAM.Type, args) } |
VlanDriver是此CNI的核心。
1 2 3 4 5 6 7 8 9 |
type VlanDriver struct { //FIXME add a file lock cause we are running multiple processes? *NetConf // 不管是物理接口,还是VLAN子接口,都位于宿主机命名空间 // 所有VLAN接口的物理接口(父接口)的设备索引,例如eth1 vlanParentIndex int // 如果启用VLAN,则是当前所属VLAN的子接口的索引。否则是物理接口的索引 DeviceIndex int } |
ADD/DEL命令都需要调用下面的方法加载CNI配置:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
const ( VlanPrefix = "vlan" BridgePrefix = "docker" DefaultBridge = "docker" // IPVLAN有两种模式L2/L3 DefaultIPVlanMode = "l3" ) func (d *VlanDriver) LoadConf(bytes []byte) (*NetConf, error) { conf := &NetConf{} if err := json.Unmarshal(bytes, conf); err != nil { return nil, fmt.Errorf("failed to load netconf: %v", err) } if conf.DefaultBridgeName == "" { conf.DefaultBridgeName = DefaultBridge } if conf.BridgeNamePrefix == "" { conf.BridgeNamePrefix = BridgePrefix } if conf.VlanNamePrefix == "" { conf.VlanNamePrefix = VlanPrefix } if conf.IpVlanMode == "" { conf.IpVlanMode = DefaultIPVlanMode } d.NetConf = conf return conf, nil } |
CNI配置定义如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
type NetConf struct { // CNI标准网络配置 types.NetConf // 持有IDC IP地址的设备名,例如eth1或eth1.12(VLAN子接口) Device string `json:"device"` // 上述Device为空的时候,候选设备列表。取第一个存在的设备 Devices []string `json:"devices"` // 交换机实现方式 macvlan, bridge(默认), pure(避免创建不必要的网桥) Switch string `json:"switch"` // IPVLAN模式,可选l2, l3(默认), l3s IpVlanMode string `json:"ipvlan_mode"` // 不去创建默认网桥 DisableDefaultBridge *bool `json:"disable_default_bridge"` // 默认网桥的名字 DefaultBridgeName string `json:"default_bridge_name"` // 网桥名字前缀 BridgeNamePrefix string `json:"bridge_name_prefix"` // VLAN接口名字前缀 VlanNamePrefix string `json:"vlan_name_prefix"` // 是否启用免费ARP GratuitousArpRequest bool `json:"gratuitous_arp_request"` // 设置MTU MTU int `json:"mtu"` } |
ADD命令,加载完CNI配置后,首先是调用IPAM进行IP地址分配:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 |
func Allocate(ipamType string, args *skel.CmdArgs) ([]uint16, []types.Result, error) { var ( vlanId uint16 err error ) // 解析key1=val1;key2=val2格式的CNI参数。这些参数来自Pod的k8s.v1.cni.galaxy.io/args注解, // 如果使用浮动IP,则Galaxy IPAM会设置该注解,将IP地址填写在ipinfos字段 kvMap, err := cniutil.ParseCNIArgs(args.Args) if err != nil { return nil, nil, err } var results []types.Result var vlanIDs []uint16 // 读取ipinfos字段 if ipInfoStr := kvMap[constant.IPInfosKey]; ipInfoStr != "" { // 解析IP信息 var ipInfos []constant.IPInfo if err := json.Unmarshal([]byte(ipInfoStr), &ipInfos); err != nil { return nil, nil, fmt.Errorf("failed to unmarshal ipInfo from args %q: %v", args.Args, err) } if len(ipInfos) == 0 { return nil, nil, fmt.Errorf("empty ipInfos") } for j := range ipInfos { results = append(results, cniutil.IPInfoToResult(&ipInfos[j])) vlanIDs = append(vlanIDs, ipInfos[j].Vlan) } // 直接“分配”Pod注解里写的IP地址 return vlanIDs, results, nil } // 如果Pod注解里面没有IP地址信息,则需要调用谋者IPAM插件,因此断言ipamType不为空: if ipamType == "" { return nil, nil, fmt.Errorf("neither ipInfo from cni args nor ipam type from netconf") } // 调用IPAM插件 generalResult, err := ipam.ExecAdd(ipamType, args.StdinData) if err != nil { return nil, nil, err } result, err := t020.GetResult(generalResult) if err != nil { return nil, nil, err } if result.IP4 == nil { return nil, nil, fmt.Errorf("IPAM plugin returned missing IPv4 config") } return append(vlanIDs, vlanId), append(results, generalResult), err } |
如果使用Galaxy IPAM提供的浮动IP功能,则上述代码会自动获取已经分配的IP信息并返回。
分配/获取IP地址之后,就执行VlanDriver的初始化:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 |
func (d *VlanDriver) Init() error { var ( device netlink.Link err error ) // 这个驱动需要指定宿主机上的父接口 if d.Device != "" { device, err = netlink.LinkByName(d.Device) } else { if len(d.Devices) == 0 { return fmt.Errorf("a device is needed to use vlan plugin") } for _, devName := range d.Devices { device, err = netlink.LinkByName(devName) if err == nil { break } } } if err != nil { return fmt.Errorf("Error getting device %s: %v", d.Device, err) } // 默认MTU从父接口上查找 if d.MTU == 0 { d.MTU = device.Attrs().MTU } d.DeviceIndex = device.Attrs().Index d.vlanParentIndex = device.Attrs().Index // 如果指定的设备是VLAN子接口,则使用其父接口的索引 if device.Type() == "vlan" { //A vlan device d.vlanParentIndex = device.Attrs().ParentIndex //glog.Infof("root device %s is a vlan device, parent index %d", d.Device, d.vlanParentIndex) } if d.IPVlanMode() { switch d.GetIPVlanMode() { case netlink.IPVLAN_MODE_L3S, netlink.IPVLAN_MODE_L3: // 允许绑定到非本地地址,什么作用 return utils.EnableNonlocalBind() default: return nil } } else if d.MacVlanMode() { return nil } else if d.PureMode() { if err := d.initPureModeArgs(); err != nil { return err } return utils.EnableNonlocalBind() } if d.DisableDefaultBridge != nil && *d.DisableDefaultBridge { return nil } // 需要创建默认网桥 // 获得父接口的IP地址 v4Addr, err := netlink.AddrList(device, netlink.FAMILY_V4) if err != nil { return fmt.Errorf("Errror getting ipv4 address %v", err) } filteredAddr := network.FilterLoopbackAddr(v4Addr) if len(filteredAddr) == 0 { // 父接口没有IP地址,那么地址应当转给网桥了。检查确保网桥设备存在 bri, err := netlink.LinkByName(d.DefaultBridgeName) if err != nil { return fmt.Errorf("Error getting bri device %s: %v", d.DefaultBridgeName, err) } if bri.Attrs().Index != device.Attrs().MasterIndex { return fmt.Errorf("No available address found on device %s", d.Device) } } else { // 否则,意味着网桥应该还不存在,初始化之 if err := d.initVlanBridgeDevice(device, filteredAddr); err != nil { return err } } return nil } // 不使用MacVlan/IPVlan,则需要依赖网桥 func (d *VlanDriver) initVlanBridgeDevice(device netlink.Link, filteredAddr []netlink.Addr) error { // 创建网桥 bri, err := getOrCreateBridge(d.DefaultBridgeName, device.Attrs().HardwareAddr) if err != nil { return err } // 启动网桥 if err := netlink.LinkSetUp(bri); err != nil { return fmt.Errorf("failed to set up bridge device %s: %v", d.DefaultBridgeName, err) } // 获取父接口路由列表 rs, err := netlink.RouteList(device, nl.FAMILY_V4) if err != nil { return fmt.Errorf("failed to list route of device %s", device.Attrs().Name) } defer func() { if err != nil { // 如果出现错误,则路由需要加回去 for i := range rs { _ = netlink.RouteAdd(&rs[i]) } } }() // 将父接口的IP地址、路由,转移到网桥上 err = d.moveAddrAndRoute(device, bri, filteredAddr, rs) if err != nil { return err } return nil } |
可以看到,如果使用MacVlan或者IPVlan模式,则不会去管理宿主机上的网桥(这也是MacVlan/IPVlan的价值之一,不需要网桥)。否则,会创建网桥并转移走父接口的IP地址、路由。
最后,下面的函数负责配置好网络:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
func setupNetwork(result020s []*t020.Result, vlanIds []uint16, args *skel.CmdArgs) error { if d.MacVlanMode() { // 处理MACVlan模式 if err := setupMacvlan(result020s[0], vlanIds[0], args); err != nil { return err } } else if d.IPVlanMode() { // 处理IPVlan模式 if err := setupIPVlan(result020s[0], vlanIds[0], args); err != nil { return err } } else { // 处理网桥模式 ifName := args.IfName if err := setupVlanDevice(result020s, vlanIds, args); err != nil { return err } args.IfName = ifName } // 发送一个免费ARP,让交换机知道浮动IP漂移到当前节点上了 if d.PureMode() { _ = utils.SendGratuitousARP(d.Device, result020s[0].IP4.IP.IP.String(), "", d.GratuitousArpRequest) } return nil } |
如果使用MACVlan,则配置过程如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
func setupMacvlan(result *t020.Result, vlanId uint16, args *skel.CmdArgs) error { if err := d.MaybeCreateVlanDevice(vlanId); err != nil { return err } // 通过MACVlan连接宿主机和容器 if err := utils.MacVlanConnectsHostWithContainer(result, args, d.DeviceIndex, d.MTU); err != nil { return err } // 在命名空间内进行宣告 _ = utils.SendGratuitousARP(args.IfName, result.IP4.IP.IP.String(), args.Netns, d.GratuitousArpRequest) return nil } func MacVlanConnectsHostWithContainer(result *t020.Result, args *skel.CmdArgs, parent int, mtu int) error { var err error // 创建MACVlan设备 macVlan := &netlink.Macvlan{ Mode: netlink.MACVLAN_MODE_BRIDGE, LinkAttrs: netlink.LinkAttrs{ Name: HostMacVlanName(args.ContainerID), MTU: mtu, ParentIndex: parent, // 父设备可能是物理接口或者VLAN子接口 }} // 添加接口 if err := netlink.LinkAdd(macVlan); err != nil { return err } // 出错时必须接口被删除 defer func() { if err != nil { netlink.LinkDel(macVlan) } }() // 配置沙盒,细节参考下文 if err = configSboxDevice(result, args, macVlan); err != nil { return err } return nil } |
如果使用IPVlan,则配置过程如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 |
func setupIPVlan(result *t020.Result, vlanId uint16, args *skel.CmdArgs) error { // 如有必要,创建VLAN设备 if err := d.MaybeCreateVlanDevice(vlanId); err != nil { return err } // 连接宿主机和容器网络 IPVLAN的父设备可能是 // VLAN子接口(位于宿主机命名空间) if err := utils.IPVlanConnectsHostWithContainer(result, args, d.DeviceIndex, d.GetIPVlanMode(), d.MTU); err != nil { return err } // l3模式下,所有连接在一起的容器之间的相互通信,都要通过宿主机上的IPVlan父接口进行代理 // 在我们的环境中(部署在OpenStack的虚拟机上),出现Pod无法ping二层网关的情况,需要 // arping -c 2 -U -I eth0 <pod-ip> // 才可以解决,等价于下面的代码 // Gratuitous ARP不能保证ARP缓存永久有效,底层交换机(OpenStack虚拟的)需要进行适当的配置, // 将Pod IP和VM的MAC地址关联 if d.IpVlanMode == "l3" || d.IpVlanMode == "l3s" { _ = utils.SendGratuitousARP(d.Device, result.IP4.IP.IP.String(), "", d.GratuitousArpRequest) return nil } // 在网络命名空间中进行宣告 _ = utils.SendGratuitousARP(args.IfName, result.IP4.IP.IP.String(), args.Netns, d.GratuitousArpRequest) return nil } // 创建VLAN设备的细节 func (d *VlanDriver) MaybeCreateVlanDevice(vlanId uint16) error { // 如果不启用VLAN支持,则不做任何事情 if vlanId == 0 { return nil } _, err := d.getOrCreateVlanDevice(vlanId) return err } func (d *VlanDriver) getOrCreateVlanDevice(vlanId uint16) (netlink.Link, error) { // 根据父接口和VLAN ID查找已存在的VLAN设备 link, err := d.getVlanIfExist(vlanId) if err != nil || link != nil { if link != nil { // 设置VLAN设备索引 d.DeviceIndex = link.Attrs().Index } return link, err } vlanIfName := fmt.Sprintf("%s%d", d.VlanNamePrefix, vlanId) // 获取VLAN设备,如果获取不到则通过后面的回调函数创建 vlan, err := getOrCreateDevice(vlanIfName, func(name string) error { // 创建设备 名字 vlan100 父接口索引 vlanIf := &netlink.Vlan{LinkAttrs: netlink.LinkAttrs{Name: vlanIfName, ParentIndex: d.vlanParentIndex}, // VLAN号 VlanId: (int)(vlanId)} // 添加设备 if err := netlink.LinkAdd(vlanIf); err != nil { return fmt.Errorf("Failed to add vlan device %s: %v", vlanIfName, err) } return nil }) if err != nil { return nil, err } // 设置为UP状态 if err := netlink.LinkSetUp(vlan); err != nil { return nil, fmt.Errorf("Failed to set up vlan device %s: %v", vlanIfName, err) } // 更新VLAN子接口索引号 d.DeviceIndex = vlan.Attrs().Index return vlan, nil } func (d *VlanDriver) getVlanIfExist(vlanId uint16) (netlink.Link, error) { links, err := netlink.LinkList() if err != nil { return nil, err } for _, link := range links { // 遍历网络接口列表,查找VLAN类型的、VLAN ID匹配的、父接口匹配的 // 隐含意味着对于每个VLAN,只需要一个子接口,所有连接到此VLAN的容器使用它 if link.Type() == "vlan" { if vlan, ok := link.(*netlink.Vlan); !ok { return nil, fmt.Errorf("vlan device type case error: %T", link) } else { if vlan.VlanId == int(vlanId) && vlan.ParentIndex == d.vlanParentIndex { return link, nil } } } } return nil, nil } // 通过IPVLAN两宿主机和容器连接起来的细节 // 配置网络沙盒 func configSboxDevice(result *t020.Result, args *skel.CmdArgs, sbox netlink.Link) error { // 配置MAC地址之前,应该将接口DOWN掉 if err := netlink.LinkSetDown(sbox); err != nil { return fmt.Errorf("could not set link down for container interface %q: %v", sbox.Attrs().Name, err) } if sbox.Type() != "ipvlan" { // IPVlan不需要设置MAC地址,因为必须和父接口一致 if err := netlink.LinkSetHardwareAddr(sbox, GenerateMACFromIP(result.IP4.IP.IP)); err != nil { return fmt.Errorf("could not set mac address for container interface %q: %v", sbox.Attrs().Name, err) } } // 获得容器网络命名空间 netns, err := ns.GetNS(args.Netns) if err != nil { return fmt.Errorf("failed to open netns %q: %v", args.Netns, err) } // 总是要关闭命名空间 defer netns.Close() // nolint: errcheck // 移动到网络命名空间中 if err = netlink.LinkSetNsFd(sbox, int(netns.Fd())); err != nil { return fmt.Errorf("failed to move sbox device %q to netns: %v", sbox.Attrs().Name, err) } // 在网络命名空间中配置 return netns.Do(func(_ ns.NetNS) error { // 改名,IPVLAN接口直接变为容器的eth0,就像VETH对的一端那样 if err := netlink.LinkSetName(sbox, args.IfName); err != nil { return fmt.Errorf("failed to rename sbox device %q to %q: %v", sbox.Attrs().Name, args.IfName, err) } // 如果容器、宿主机上有多个网络设备,则需要禁用rp_filter(反向路径过滤) // 因为从宿主机来的ARP请求(广播报文),可能使用不确定的源IP,如果启用反向路径过滤,封包可能被丢弃 if err := DisableRpFilter(args.IfName); err != nil { return fmt.Errorf("failed disable rp_filter to dev %s: %v", args.IfName, err) } // 禁用所有接口的rp_filter if err := DisableRpFilter("all"); err != nil { return fmt.Errorf("failed disable rp_filter to all: %v", err) } // 配置容器网络接口:将IP地址、路由添加到容器网络接口 return cniutil.ConfigureIface(args.IfName, result) }) } func ConfigureIface(ifName string, res *t020.Result) error { // 网络接口应该已经存在 link, err := netlink.LinkByName(ifName) if err != nil { return fmt.Errorf("failed to lookup %q: %v", ifName, err) } // 设置为UP状态 if err := netlink.LinkSetUp(link); err != nil { return fmt.Errorf("failed to set %q UP: %v", ifName, err) } // TODO(eyakubovich): IPv6 addr := &netlink.Addr{IPNet: &res.IP4.IP, Label: ""} // 添加IP地址 if err = netlink.AddrAdd(link, addr); err != nil { return fmt.Errorf("failed to add IP addr to %q: %v", ifName, err) } // 遍历并应用IPV4路由 for _, r := range res.IP4.Routes { gw := r.GW if gw == nil { gw = res.IP4.Gateway } if err = ip.AddRoute(&r.Dst, gw, link); err != nil { // we skip over duplicate routes as we assume the first one wins if !os.IsExist(err) { return fmt.Errorf("failed to add route '%v via %v dev %v': %v", r.Dst, gw, ifName, err) } } } return nil } |
DEL命令比较简单:
- 删除容器网络命名空间中所有VETH对
- 释放IP,就是调用IPAM插件,如果没有插件则什么都不做。Galaxy IPAM不会被调用,它总是自己负责IP地址的回收
该插件用于解决使用IPVlan L2模式下Pod访问不了宿主机IP、Service IP的问题。使用该插件,需要配置Galaxy:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
{ "NetworkConf":[ {"name":"tke-route-eni","type":"tke-route-eni","eni":"eth1","routeTable":1}, {"name":"galaxy-flannel","type":"galaxy-flannel", "delegate":{"type":"galaxy-veth"},"subnetFile":"/run/flannel/subnet.env"}, {"name":"galaxy-k8s-vlan","type":"galaxy-k8s-vlan", "device":"eth0", "switch":"ipvlan", "ipvlan_mode":"l2", "mtu": 1500}, {"name":"galaxy-k8s-sriov","type": "galaxy-k8s-sriov", "device": "eth0", "vf_num": 10}, // 新增 { "name":"galaxy-veth-host","type": "galaxy-veth-host", // 这里配置K8S的服务CIDR 指明宿主机网卡 "serviceCidr": "11.1.252.0/22", "hostInterface": "eth0", // 容器中此插件创建的网卡名称 是否进行SNAT "containerInterface": "veth0", "ipMasq": true } ], "DefaultNetworks": ["galaxy-flannel"] } |
Pod注解需要使用两个网络:
1 2 |
annotations: k8s.v1.cni.cncf.io/networks: "galaxy-k8s-vlan,galaxy-veth-host" |
代码解读:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 |
package main import ( "encoding/json" "fmt" "math" "math/rand" "net" "os" "runtime" "sort" "strconv" "time" "github.com/containernetworking/cni/pkg/skel" "github.com/containernetworking/cni/pkg/types" "github.com/containernetworking/cni/pkg/types/current" "github.com/containernetworking/cni/pkg/version" "github.com/containernetworking/plugins/pkg/ip" "github.com/containernetworking/plugins/pkg/ns" "github.com/containernetworking/plugins/pkg/utils" "github.com/containernetworking/plugins/pkg/utils/sysctl" "github.com/coreos/go-iptables/iptables" "github.com/j-keck/arping" "github.com/vishvananda/netlink" ) // constants for full jitter backoff in milliseconds, and for nodeport marks const ( maxSleep = 10000 // 10.00s baseSleep = 20 // 0.02 RPFilterTemplate = "net.ipv4.conf.%s.rp_filter" podRulePriority = 1024 nodePortRulePriority = 512 ) func init() { // this ensures that main runs only on main thread (thread group leader). // since namespace ops (unshare, setns) are done for a single thread, we // must ensure that the goroutine does not jump from OS thread to thread runtime.LockOSThread() } // PluginConf is whatever you expect your configuration json to be. This is whatever // is passed in on stdin. Your plugin may wish to expose its functionality via // runtime args, see CONVENTIONS.md in the CNI spec. type PluginConf struct { types.NetConf // This is the previous result, when called in the context of a chained // plugin. Because this plugin supports multiple versions, we'll have to // parse this in two passes. If your plugin is not chained, this can be // removed (though you may wish to error if a non-chainable plugin is // chained. // If you need to modify the result before returning it, you will need // to actually convert it to a concrete versioned struct. RawPrevResult *map[string]interface{} `json:"prevResult"` PrevResult *current.Result `json:"-"` IPMasq bool `json:"ipMasq"` HostInterface string `json:"hostInterface"` ServiceCidr string `json:"serviceCidr"` ContainerInterface string `json:"containerInterface"` MTU int `json:"mtu"` TableStart int `json:"routeTableStart"` NodePortMark int `json:"nodePortMark"` NodePorts string `json:"nodePorts"` } // parseConfig parses the supplied configuration (and prevResult) from stdin. func parseConfig(stdin []byte) (*PluginConf, error) { conf := PluginConf{} if err := json.Unmarshal(stdin, &conf); err != nil { return nil, fmt.Errorf("failed to parse network configuration: %v", err) } // Parse previous result. if conf.RawPrevResult != nil { resultBytes, err := json.Marshal(conf.RawPrevResult) if err != nil { return nil, fmt.Errorf("could not serialize prevResult: %v", err) } res, err := version.NewResult(conf.CNIVersion, resultBytes) if err != nil { return nil, fmt.Errorf("could not parse prevResult: %v", err) } conf.RawPrevResult = nil conf.PrevResult, err = current.NewResultFromResult(res) if err != nil { return nil, fmt.Errorf("could not convert result to current version: %v", err) } } // End previous result parsing if conf.HostInterface == "" { return nil, fmt.Errorf("hostInterface must be specified") } if conf.ContainerInterface == "" { return nil, fmt.Errorf("containerInterface must be specified") } if conf.NodePorts == "" { conf.NodePorts = "30000:32767" } if conf.NodePortMark == 0 { conf.NodePortMark = 0x2000 } // start using tables by default at 256 if conf.TableStart == 0 { conf.TableStart = 256 } return &conf, nil } func cmdAdd(args *skel.CmdArgs) error { conf, err := parseConfig(args.StdinData) if err != nil { return err } // 必须作为插件链调用,并且有前置插件 if conf.PrevResult == nil { return fmt.Errorf("must be called as chained plugin") } // 从前序插件的结果中得到容器IP列表 containerIPs := make([]net.IP, 0, len(conf.PrevResult.IPs)) if conf.CNIVersion != "0.3.0" && conf.CNIVersion != "0.3.1" { for _, ip := range conf.PrevResult.IPs { containerIPs = append(containerIPs, ip.Address.IP) } } else { for _, ip := range conf.PrevResult.IPs { if ip.Interface == nil { continue } intIdx := *ip.Interface if intIdx >= 0 && intIdx < len(conf.PrevResult.Interfaces) && conf.PrevResult.Interfaces[intIdx].Name != args.IfName { continue } containerIPs = append(containerIPs, ip.Address.IP) } } if len(containerIPs) == 0 { return fmt.Errorf("got no container IPs") } // 得到宿主机网络接口 iface, err := netlink.LinkByName(conf.HostInterface) if err != nil { return fmt.Errorf("failed to lookup %q: %v", conf.HostInterface, err) } // 得到宿主机网络接口的地址列表 hostAddrs, err := netlink.AddrList(iface, netlink.FAMILY_ALL) if err != nil || len(hostAddrs) == 0 { return fmt.Errorf("failed to get host IP addresses for %q: %v", iface, err) } netns, err := ns.GetNS(args.Netns) if err != nil { return fmt.Errorf("failed to open netns %q: %v", args.Netns, err) } defer netns.Close() containerIPV4 := false containerIPV6 := false for _, ipc := range containerIPs { if ipc.To4() != nil { containerIPV4 = true } else { containerIPV6 = true } } // 创建VETH对,并且在容器端进行配置: // 1. 如果启用了MASQ,则对出口是args.IfName的流量进行SNAT // 2. 为宿主机所有IP添加路由,走veth // 3. 为K8S服务IP添加旅游,则veth,网关设置为第一个宿主机IP hostInterface, _, err := setupContainerVeth(netns, conf.ServiceCidr, conf.ContainerInterface, conf.MTU, hostAddrs, conf.IPMasq, containerIPV4, containerIPV6, args.IfName, conf.PrevResult) if err != nil { return err } // 配置容器的宿主端: if err = setupHostVeth(hostInterface.Name, hostAddrs, conf.IPMasq, conf.TableStart, conf.PrevResult); err != nil { return err } if conf.IPMasq { // 在宿主端启用IP转发 err := enableForwarding(containerIPV4, containerIPV6) if err != nil { return err } chain := utils.FormatChainName(conf.Name, args.ContainerID) comment := utils.FormatComment(conf.Name, args.ContainerID) for _, ipc := range containerIPs { addrBits := 128 if ipc.To4() != nil { addrBits = 32 } // 对来自容器IP的流量进行SNAT if err = ip.SetupIPMasq(&net.IPNet{IP: ipc, Mask: net.CIDRMask(addrBits, addrBits)}, chain, comment); err != nil { return err } } } // 配置NodePort相关的iptables规则 if err = setupNodePortRule(conf.HostInterface, conf.NodePorts, conf.NodePortMark); err != nil { return err } // Pass through the result for the next plugin return types.PrintResult(conf.PrevResult, conf.CNIVersion) } func setupContainerVeth(netns ns.NetNS, serviceCidr string, ifName string, mtu int, hostAddrs []netlink.Addr, masq, containerIPV4, containerIPV6 bool, k8sIfName string, pr *current.Result) (*current.Interface, *current.Interface, error) { hostInterface := ¤t.Interface{} containerInterface := ¤t.Interface{} // 在容器网络命名空间(netns)中执行 err := netns.Do(func(hostNS ns.NetNS) error { // 创建VETH对,一端自动放入hostNS(这个变量netns.Do函数自动提供,为初始网络命名空间) hostVeth, contVeth0, err := ip.SetupVeth(ifName, mtu, hostNS) if err != nil { return err } hostInterface.Name = hostVeth.Name hostInterface.Mac = hostVeth.HardwareAddr.String() containerInterface.Name = contVeth0.Name // ip.SetupVeth函数不会获取VETH对的peer(第二个接口)的MAC地址,因此这里需要执行查询 containerNetlinkIface, _ := netlink.LinkByName(contVeth0.Name) containerInterface.Mac = containerNetlinkIface.Attrs().HardwareAddr.String() containerInterface.Sandbox = netns.Path() // 本次CNI调用产生的两个网络接口,纳入到Result pr.Interfaces = append(pr.Interfaces, hostInterface, containerInterface) // 后面只是用到index属性,用上面Link对象不行么? contVeth, err := net.InterfaceByName(ifName) if err != nil { return fmt.Errorf("failed to look up %q: %v", ifName, err) } if masq { // 支持IPv4/IPv6的IP转发 err := enableForwarding(containerIPV4, containerIPV6) if err != nil { return err } // 如果出口网卡是k8sIfName(容器主接口,通常是eth0,kubelet调用galaxy-sdn时提供) // ,则进行源地址转换 err = setupSNAT(k8sIfName, "kube-proxy SNAT") if err != nil { return fmt.Errorf("failed to enable SNAT on %q: %v", k8sIfName, err) } } // 为宿主机的每个网络接口的地址添加路由条目 for _, ipc := range hostAddrs { addrBits := 128 if ipc.IP.To4() != nil { addrBits = 32 } // 这些地址的出口网卡都设置为 err := netlink.RouteAdd(&netlink.Route{ LinkIndex: contVeth.Index, // 新添加的VETH Scope: netlink.SCOPE_LINK, Dst: &net.IPNet{ IP: ipc.IP, Mask: net.CIDRMask(addrBits, addrBits), }, }) if err != nil { return fmt.Errorf("failed to add host route dst %v: %v", ipc.IP, err) } } _, serviceNet, err := net.ParseCIDR(serviceCidr) if err != nil { return fmt.Errorf("failed to parse service cidr :%v", err) } // 将K8S服务网段的路由设置为:出口网卡新添加的VETH,网关为宿主机第一个IP err = netlink.RouteAdd(&netlink.Route{ LinkIndex: contVeth.Index, Scope: netlink.SCOPE_UNIVERSE, Dst: serviceNet, // 封包发给宿主机第一个IP/网络接口处理。VETH的宿主端没有连接到什么网络接口 // 这些网络接口可以作为下一跳,因为和VETH宿主端属于同一网络命名空间 Gw: hostAddrs[0].IP, }) if err != nil { return fmt.Errorf("failed to add service cidr route %v: %v", hostAddrs[0].IP, err) } // 为所有IPv4地址发送免费ARP for _, ipc := range pr.IPs { if ipc.Version == "4" { _ = arping.GratuitousArpOverIface(ipc.Address.IP, *contVeth) } } return nil }) if err != nil { return nil, nil, err } return hostInterface, containerInterface, nil } func enableForwarding(ipv4 bool, ipv6 bool) error { if ipv4 { err := ip.EnableIP4Forward() if err != nil { return fmt.Errorf("Could not enable IPv6 forwarding: %v", err) } } if ipv6 { err := ip.EnableIP6Forward() if err != nil { return fmt.Errorf("Could not enable IPv6 forwarding: %v", err) } } return nil } func setupSNAT(ifName string, comment string) error { ipt, err := iptables.NewWithProtocol(iptables.ProtocolIPv4) if err != nil { return fmt.Errorf("failed to locate iptables: %v", err) } rulespec := []string{"-o", ifName, "-j", "MASQUERADE"} //if ipt.HasRandomFully() { // rulespec = append(rulespec, "--random-fully") //} rulespec = append(rulespec, "-m", "comment", "--comment", comment) return ipt.AppendUnique("nat", "POSTROUTING", rulespec...) } func setupHostVeth(vethName string, hostAddrs []netlink.Addr, masq bool, tableStart int, result *current.Result) error { // no IPs to route if len(result.IPs) == 0 { return nil } // lookup by name as interface ids might have changed veth, err := net.InterfaceByName(vethName) if err != nil { return fmt.Errorf("failed to lookup %q: %v", vethName, err) } // 对于所有容器IP,出口设置为VETH for _, ipc := range result.IPs { addrBits := 128 if ipc.Address.IP.To4() != nil { addrBits = 32 } err := netlink.RouteAdd(&netlink.Route{ LinkIndex: veth.Index, Scope: netlink.SCOPE_LINK, Dst: &net.IPNet{ IP: ipc.Address.IP, Mask: net.CIDRMask(addrBits, addrBits), }, }) if err != nil { return fmt.Errorf("failed to add host route dst %v: %v", ipc.Address.IP, err) } } // 为来自Pod的,目的地址是VPC的配置策略路由 err = addPolicyRules(veth, result.IPs[0], result.Routes, tableStart) if err != nil { return fmt.Errorf("failed to add policy rules: %v", err) } // 为所有宿主机端的IP发送免费ARP for _, ipc := range hostAddrs { if ipc.IP.To4() != nil { _ = arping.GratuitousArpOverIface(ipc.IP, *veth) } } return nil } func addPolicyRules(veth *net.Interface, ipc *current.IPConfig, routes []*types.Route, tableStart int) error { table := -1 // 对路由,来自先前插件调用的结果,进行排序 sort.Slice(routes, func(i, j int) bool { return routes[i].Dst.String() < routes[j].Dst.String() }) // 尝试最多10次,向空表(table slot)写入路由 for i := 0; i < 10 && table == -1; i++ { var err error // 寻找空白路由表 table, err = findFreeTable(tableStart + rand.Intn(1000)) if err != nil { return err } // 将所有路由添加到路由表 for _, route := range routes { err := netlink.RouteAdd(&netlink.Route{ LinkIndex: veth.Index, // 出口设置为宿主机VETH Dst: &route.Dst, // 目标是先前CNI调用发现的路由,这些路由可能来自云提供商(VPC) Gw: ipc.Address.IP, // 将Result的第一个IP作为网关 Table: table, // 写在这个表中 }) if err != nil { table = -1 break } } if table == -1 { // failed to add routes so sleep and try again on a different table wait := time.Duration(rand.Intn(int(math.Min(maxSleep, baseSleep*math.Pow(2, float64(i)))))) * time.Millisecond fmt.Fprintf(os.Stderr, "route table collision, retrying in %v\n", wait) time.Sleep(wait) } } // ensure we have a route table selected if table == -1 { return fmt.Errorf("failed to add routes to a free table") } // 创建路由策略 rule := netlink.NewRule() // 如果流量来自VETH宿主端,也就是来自Pod rule.IifName = veth.Name rule.Table = table rule.Priority = podRulePriority err := netlink.RuleAdd(rule) if err != nil { return fmt.Errorf("failed to add policy rule %v: %v", rule, err) } return nil } func findFreeTable(start int) (int, error) { allocatedTableIDs := make(map[int]bool) // 遍历所有IPv4/IPv6的路由规则 for _, family := range []int{netlink.FAMILY_V4, netlink.FAMILY_V6} { rules, err := netlink.RuleList(family) if err != nil { return -1, err } for _, rule := range rules { // 收集所有已经占用的table slot allocatedTableIDs[rule.Table] = true } } // 寻找第一个空白的slot for i := start; i < math.MaxUint32; i++ { if !allocatedTableIDs[i] { return i, nil } } return -1, fmt.Errorf("failed to find free route table") } func setupNodePortRule(ifName string, nodePorts string, nodePortMark int) error { ipt, err := iptables.NewWithProtocol(iptables.ProtocolIPv4) if err != nil { return fmt.Errorf("failed to locate iptables: %v", err) } // 确保NodePort流量被正确的标记 if err := ipt.AppendUnique("mangle", "PREROUTING", "-i", ifName, "-p", "tcp", "--dport", nodePorts, "-j", "CONNMARK", "--set-mark", strconv.Itoa(nodePortMark), "-m", "comment", "--comment", "NodePort Mark"); err != nil { return err } if err := ipt.AppendUnique("mangle", "PREROUTING", "-i", ifName, "-p", "udp", "--dport", nodePorts, "-j", "CONNMARK", "--set-mark", strconv.Itoa(nodePortMark), "-m", "comment", "--comment", "NodePort Mark"); err != nil { return err } if err := ipt.AppendUnique("mangle", "PREROUTING", "-i", "veth+", "-j", "CONNMARK", "--restore-mark", "-m", "comment", "--comment", "NodePort Mark"); err != nil { return err } // 在宿主机网络接口上启用非严格的RP filter _, err = sysctl.Sysctl(fmt.Sprintf(RPFilterTemplate, ifName), "2") if err != nil { return fmt.Errorf("failed to set RP filter to loose for interface %q: %v", ifName, err) } // 对于标记为NodePort的流量,添加策略路由 rule := netlink.NewRule() rule.Mark = nodePortMark rule.Table = 254 // main table rule.Priority = nodePortRulePriority exists := false rules, err := netlink.RuleList(netlink.FAMILY_V4) if err != nil { return fmt.Errorf("Unable to retrive IP rules %v", err) } for _, r := range rules { if r.Table == rule.Table && r.Mark == rule.Mark && r.Priority == rule.Priority { exists = true break } } if !exists { err := netlink.RuleAdd(rule) if err != nil { return fmt.Errorf("failed to add policy rule %v: %v", rule, err) } } return nil } // cmdDel is called for DELETE requests func cmdDel(args *skel.CmdArgs) error { conf, err := parseConfig(args.StdinData) if err != nil { return err } if args.Netns == "" { return nil } // 网络命名空间存在,进行清理 var ipnets []netlink.Addr vethPeerIndex := -1 _ = ns.WithNetNSPath(args.Netns, func(_ ns.NetNS) error { var err error if conf.IPMasq { iface, err := netlink.LinkByName(args.IfName) if err != nil { if err.Error() == "Link not found" { return ip.ErrLinkNotFound } return fmt.Errorf("failed to lookup %q: %v", args.IfName, err) } // 从args.IfName(通常是eth0)获取IP地址列表 ipnets, err = netlink.AddrList(iface, netlink.FAMILY_ALL) if err != nil || len(ipnets) == 0 { return fmt.Errorf("failed to get IP addresses for %q: %v", args.IfName, err) } } vethIface, err := netlink.LinkByName(conf.ContainerInterface) if err != nil && err != ip.ErrLinkNotFound { return err } vethPeerIndex, _ = netlink.VethPeerIndex(&netlink.Veth{LinkAttrs: *vethIface.Attrs()}) return nil }) if conf.IPMasq { chain := utils.FormatChainName(conf.Name, args.ContainerID) comment := utils.FormatComment(conf.Name, args.ContainerID) for _, ipn := range ipnets { addrBits := 128 if ipn.IP.To4() != nil { addrBits = 32 } _ = ip.TeardownIPMasq(&net.IPNet{IP: ipn.IP, Mask: net.CIDRMask(addrBits, addrBits)}, chain, comment) } if vethPeerIndex != -1 { link, err := netlink.LinkByIndex(vethPeerIndex) if err != nil { return nil } rule := netlink.NewRule() rule.IifName = link.Attrs().Name // ignore errors as we might be called multiple times _ = netlink.RuleDel(rule) _ = netlink.LinkDel(link) } } return nil } func main() { rand.Seed(time.Now().UnixNano()) skel.PluginMain(cmdAdd, cmdDel, version.All) } |
这个守护进程由DaemonSet galaxy提供,在所有K8S节点上运行,入口点如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 |
package main import ( "math/rand" "time" "github.com/spf13/pflag" "k8s.io/component-base/cli/flag" "k8s.io/component-base/logs" glog "k8s.io/klog" "tkestack.io/galaxy/pkg/galaxy" "tkestack.io/galaxy/pkg/signal" "tkestack.io/galaxy/pkg/utils/ldflags" ) func main() { // 随机化 rand.Seed(time.Now().UTC().UnixNano()) // 创建Galaxy结构 galaxy := galaxy.NewGalaxy() galaxy.AddFlags(pflag.CommandLine) flag.InitFlags() logs.InitLogs() defer logs.FlushLogs() ldflags.PrintAndExitIfRequested() // 启动Galaxy if err := galaxy.Start(); err != nil { glog.Fatalf("Error start galaxy: %v", err) } // 等待信号,并停止Galaxy signal.BlockSignalHandler(func() { if err := galaxy.Stop(); err != nil { glog.Errorf("Error stop galaxy: %v", err) } }) } |
Galaxy结构的规格如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
type Galaxy struct { // 对应Galaxy主配置文件 JsonConf // 对应命令行选项 *options.ServerRunOptions quitChan chan struct{} dockerCli *docker.DockerInterface netConf map[string]map[string]interface{} // 负责处理端口映射 pmhandler *portmapping.PortMappingHandler client kubernetes.Interface pm *policy.PolicyManager } type JsonConf struct { NetworkConf []map[string]interface{} DefaultNetworks []string ENIIPNetwork string } |
初始化过程:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
func NewGalaxy() *Galaxy { g := &Galaxy{ ServerRunOptions: options.NewServerRunOptions(), quitChan: make(chan struct{}), netConf: map[string]map[string]interface{}{}, } return g } func (g *Galaxy) Init() error { if g.JsonConfigPath == "" { return fmt.Errorf("json config is required") } data, err := ioutil.ReadFile(g.JsonConfigPath) if err != nil { return fmt.Errorf("read json config: %v", err) } // 解析主配置文件 if err := json.Unmarshal(data, &g.JsonConf); err != nil { return fmt.Errorf("bad config %s: %v", string(data), err) } glog.Infof("Json Config: %s", string(data)) // 配置合法性的简单校验 if err := g.checkNetworkConf(); err != nil { return err } // 需要访问本机Docker dockerClient, err := docker.NewDockerInterface() if err != nil { return err } g.dockerCli = dockerClient g.pmhandler = portmapping.New("") return nil } |
启动过程:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
func (g *Galaxy) Start() error { // 初始化Galaxy配置 if err := g.Init(); err != nil { return err } // 创建K8S客户端 g.initk8sClient() // 启动Flannel垃圾回收器 gc.NewFlannelGC(g.dockerCli, g.quitChan, g.cleanIPtables).Run() // 启用或禁用bridge-nf-call-iptables,此参数可以控制网桥转发的封包被不被iptables过滤 kernel.BridgeNFCallIptables(g.quitChan, g.BridgeNFCallIptables) // 配置iptables转发 kernel.IPForward(g.quitChan, g.IPForward) // 启动时,遍历节点上已有的Pod,对于需要进行端口映射的,执行端口映射 if err := g.setupIPtables(); err != nil { return err } if g.NetworkPolicy { g.pm = policy.New(g.client, g.quitChan) go wait.Until(g.pm.Run, 3*time.Minute, g.quitChan) } if g.RouteENI { // 使用腾讯云ENI需要关闭反向路径过滤 kernel.DisableRPFilter(g.quitChan) eni.SetupENIs(g.quitChan) } // 在UDS上监听、启动HTTP服务器、注册路由 return g.StartServer() } |
上述代码结尾的g.StartServer()会调用下面的方法注册路由:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
func (g *Galaxy) installHandlers() { ws := new(restful.WebService) ws.Route(ws.GET("/cni").To(g.cni)) ws.Route(ws.POST("/cni").To(g.cni)) restful.Add(ws) } // 处理CNI插件转发来的CNI请求 func (g *Galaxy) cni(r *restful.Request, w *restful.Response) { data, err := ioutil.ReadAll(r.Request.Body) if err != nil { glog.Warningf("bad request %v", err) http.Error(w, fmt.Sprintf("err read body %v", err), http.StatusBadRequest) return } defer r.Request.Body.Close() // nolint: errcheck // 将请求转换为CNIRequest,然后转换为PodRequest req, err := galaxyapi.CniRequestToPodRequest(data) if err != nil { glog.Warningf("bad request %v", err) http.Error(w, fmt.Sprintf("%v", err), http.StatusBadRequest) return } req.Path = strings.TrimRight(fmt.Sprintf("%s:%s", req.Path, strings.Join(g.CNIPaths, ":")), ":") // 处理CNI请求 result, err := g.requestFunc(req) if err != nil { http.Error(w, fmt.Sprintf("%v", err), http.StatusInternalServerError) } else { // Empty response JSON means success with no body w.Header().Set("Content-Type", "application/json") if _, err := w.Write(result); err != nil { glog.Warningf("Error writing %s HTTP response: %v", req.Command, err) } } } |
当CNI插件galaxy-sdn调用Galaxy守护进程的/cni端点时,Galaxy会将请求从:
1 2 3 4 5 6 7 |
// Request sent to the Galaxy by the Galaxy SDN CNI plugin type CNIRequest struct { // CNI environment variables, like CNI_COMMAND and CNI_NETNS Env map[string]string `json:"env,omitempty"` // CNI configuration passed via stdin to the CNI plugin Config []byte `json:"config,omitempty"` } |
转换为PodRequest:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
type PodRequest struct { // 需要执行的CNI命令 Command string // 来自环境变量CNI_ARGS PodNamespace string // 来自环境变量CNI_ARGS PodName string // kubernetes pod ports Ports []k8s.Port // 存放操作结果的通道 Result chan *PodResult // 通过环境变量传来的CNI_IFNAME、CNI_COMMAND、CNI_ARGS... *skel.CmdArgs // Galaxy需要委托其它CNI插件完成工作,这是传递给那些插件的参数 // 插件类型 参数名 参数值 ExtendedCNIArgs map[string]map[string]json.RawMessage } // 请求处理结果 type PodResult struct { Response []byte Err error } |
然后调用requestFunc方法处理转换后的CNI请求:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 |
func (g *Galaxy) requestFunc(req *galaxyapi.PodRequest) (data []byte, err error) { start := time.Now() glog.Infof("%v, %s+", req, start.Format(time.StampMicro)) // 处理ADD命令 if req.Command == cniutil.COMMAND_ADD { defer func() { glog.Infof("%v, data %s, err %v, %s-", req, string(data), err, start.Format(time.StampMicro)) }() var pod *corev1.Pod // 查找Pod pod, err = g.getPod(req.PodName, req.PodNamespace) if err != nil { return } result, err1 := g.cmdAdd(req, pod) if err1 != nil { err = err1 return } else { // 转换结果格式 result020, err2 := convertResult(result) if err2 != nil { err = err2 } else { data, err = json.Marshal(result) if err != nil { return } // 如果处理成功,则执行端口映射,回顾一下启动的时候会对所有本节点现存的Pod进行端口映射 err = g.setupPortMapping(req, req.ContainerID, result020, pod) if err != nil { g.cleanupPortMapping(req) return } pod.Status.PodIP = result020.IP4.IP.IP.String() // 处理网络策略 if g.pm != nil { if err := g.pm.SyncPodChains(pod); err != nil { glog.Warning(err) } g.pm.SyncPodIPInIPSet(pod, true) } } } } else if req.Command == cniutil.COMMAND_DEL { defer glog.Infof("%v err %v, %s-", req, err, start.Format(time.StampMicro)) err = cniutil.CmdDel(req.CmdArgs, -1) if err == nil { err = g.cleanupPortMapping(req) } } else { err = fmt.Errorf("unknown command %s", req.Command) } return } |
requestFunc方法就是查询出Pod,将其作为参数的一部分,再调用 cmdAdd方法。需要注意,由于CNI版本很低,仅仅支持ADD/DEL两个命令。
ADD命令的处理逻辑如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 |