Menu

  • Home
  • Work
    • Cloud
      • Virtualization
      • IaaS
      • PaaS
    • Java
    • Go
    • C
    • C++
    • JavaScript
    • PHP
    • Python
    • Architecture
    • Others
      • Assembly
      • Ruby
      • Perl
      • Lua
      • Rust
      • XML
      • Network
      • IoT
      • GIS
      • Algorithm
      • AI
      • Math
      • RE
      • Graphic
    • OS
      • Linux
      • Windows
      • Mac OS X
    • BigData
    • Database
      • MySQL
      • Oracle
    • Mobile
      • Android
      • IOS
    • Web
      • HTML
      • CSS
  • Life
    • Cooking
    • Travel
    • Gardening
  • Gallery
  • Video
  • Music
  • Essay
  • Home
  • Work
    • Cloud
      • Virtualization
      • IaaS
      • PaaS
    • Java
    • Go
    • C
    • C++
    • JavaScript
    • PHP
    • Python
    • Architecture
    • Others
      • Assembly
      • Ruby
      • Perl
      • Lua
      • Rust
      • XML
      • Network
      • IoT
      • GIS
      • Algorithm
      • AI
      • Math
      • RE
      • Graphic
    • OS
      • Linux
      • Windows
      • Mac OS X
    • BigData
    • Database
      • MySQL
      • Oracle
    • Mobile
      • Android
      • IOS
    • Web
      • HTML
      • CSS
  • Life
    • Cooking
    • Travel
    • Gardening
  • Gallery
  • Video
  • Music
  • Essay

Galaxy学习笔记

1
Aug
2020

Galaxy学习笔记

By Alex
/ in PaaS
/ tags CNI, K8S
0 Comments
简介

Galaxy是TKEStack的一个网络组件,支持为TKE集群提供Overlay/Underlay容器网络。Galaxy的一个特性是能够提供浮动IP(弹性IP) —— 即使Pod因为节点宕机而漂移到其它节点,其IP地址也能够保持不变。

网络类型

Galaxy提供四类网络,支持为每个工作负载单独配置网络模式。

Overlay网络

默认模式,基于Flannel的VXLAN或者Host Gateway(路由)方案。同节点容器通信不走网桥,报文直接利用主机路由转发,跨节点容器通信利用VXLAN协议封装或者直接路由,节点间路由记录在Etcd中。优点:简单可靠,性能不错,支持网络策略。

tke-installer默认安装的TKEStack会自动配置Galaxy为Overlay模式,CNI配置:

JSON
1
2
3
4
5
{
  "type": "galaxy-sdn",
  "capabilities": {"portMappings": true},
  "cniVersion": "0.2.0"
} 

在该模式下:

  1. Flannel在每个Kubelet节点上分配一个子网,并将其保存在etcd和本地路径 /run/flannel/subnet.env
  2. Kubelet调用Galaxy的CNI插件galaxy-sdn,该插件会通过UDS调用本机的Galaxy进程
  3. Galaxy进程则又调用Flannel CNI,解析/run/flannel/subnet.env中的子网信息
  4. Flannel CNI会调用:
    1. Bridge CNI或Veth CNI来为POD配置网络
    2. 调用host-lo

架构图:

galaxy-overlay

Underlay网络

该模式下,容器IP由宿主机网络提供,容器与宿主机可以直接路由,性能更好。支持基于Linux Bridge/MacVlan/IPVlan和SRIOV的容器-宿主机二层联通,可以根据业务场景和硬件环境,具体选择使用哪种网桥。

要使用Galaxy Underlay网络,需要启用Galaxy-ipam组件,该组件为Pod分配IP,浮动IP的能力也由它提供。

NAT

利用K8S的hostPort配置,将容器端口映射到宿主机端口。如果不指定hostPort,Galaxy进行随机映射。

Host

利用K8S的HostNetwork模式,直接使用物理网络。

架构

Galaxy有三类组件构成。

Galaxy

以DaemonSet方式运行在每个节点上,通过调用各种CNI插件来配置容器网络。

CNI插件

Galaxy将实际的创建CNI的工作委托给其它CNI插件,也就是说它扮演一个装饰器的角色。Galaxy支持任何标准CNI插件,它也内置了若干CNI插件。

Galaxy IPAM

是一个K8S Scheduler插件。kube-scheduler通过HTTP调用Galaxy-ipam,实现浮动IP的配置和管理。

因为仅仅在重新调度Pod的时候才会面临IP地址变化的问题,因此这个组件实现为Scheduler插件就很自然了。

支持的CNI列表

Galaxy支持任何标准的CNI插件,你可以将其用作一个CNI框架,就像multus-cni那样。

Galaxy还包含了一系列内置的CNI插件:

插件 说明
SDN CNI 使用Overlay网络时,此插件是CNI的入口。该插件会通过UDS调用Galaxy守护进程,转发Kubelet提供的所有CNI参数
Veth CNI 用于Overlay网络中,创建一个VETH对来连接容器和宿主机命名空间。该插件从ipam插件得到Pod的IP
underlay-veth CNI 用于Underlay网络,创建VETH对来连接容器和宿主机命名空间,该插件不会创建网桥,而是使用宿主机路由规则来进行封包转发
Vlan CNI

用于Underlay网络,创建一个VETH对,连接容器和宿主机上的bridge/macvlan/ipvlan设备

此插件支持为Pod配置VLAN,它能够从CNI参数,例如ipinfos=[{"ip":"192.168.0.68/26","vlan":2,"gateway":"192.168.0.65"}] ,或者ipam CNI插件的结果中获得IP地址

SRIOV CNI 用于Underlay网络,利用以太网服务器适配器(Ethernet Server Adapter)的SR-IOV,能够创建VF设备并将其放入容器的网络命名空间
TKE route ENI CNI 用于腾讯云
基础
构建Galaxy
二进制文件
Shell
1
2
3
4
5
6
7
8
go get -d tkestack.io/galaxy
cd $GOPATH/src/tkestack.io/galaxy
 
# 构建所有二进制文件
make
# 构建指定二进制文件
make BINS="galxy galxy-ipam"
make BINS="galxy-ipam"
镜像
Shell
1
2
3
4
5
6
7
8
# 构建所有镜像
make image
 
# 构建指定镜像
make image BINS="galxy-ipam"
 
# 为指定体系结构构建
make image.multiarch PLATFORMS="linux_arm64" 
使用Galaxy

清单文件可以到GitHub下载,默认内容如下:

YAML
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
---
apiVersion: rbac.authorization.k8s.io/v1
# kubernetes versions before 1.8.0 should use rbac.authorization.k8s.io/v1beta1
kind: ClusterRole
metadata:
  name: galaxy
rules:
- apiGroups: [""]
  resources:
  - pods
  - namespaces
  - nodes
  - pods/binding
  verbs: ["list", "watch", "get", "patch", "create", "update"]
- apiGroups: ["apps", "extensions"]
  resources:
  - statefulsets
  - deployments
  verbs: ["list", "watch"]
- apiGroups: [""]
  resources:
  - configmaps
  - endpoints
  - events
  verbs: ["get", "list", "watch", "update", "create", "patch"]
- apiGroups: ["galaxy.k8s.io"]
  resources:
  - pools
  - floatingips
  verbs: ["get", "list", "watch", "update", "create", "patch", "delete"]
- apiGroups: ["apiextensions.k8s.io"]
  resources:
  - customresourcedefinitions
  verbs:
  - "*"
- apiGroups: ["networking.k8s.io"]
  resources:
  - networkpolicies
  verbs: ["get", "list", "watch"]
---
apiVersion: v1
kind: ServiceAccount
metadata:
  name: galaxy
  namespace: kube-system
---
apiVersion: rbac.authorization.k8s.io/v1
# kubernetes versions before 1.8.0 should use rbac.authorization.k8s.io/v1beta1
kind: ClusterRoleBinding
metadata:
  name: galaxy
roleRef:
  apiGroup: rbac.authorization.k8s.io
  kind: ClusterRole
  name: galaxy
subjects:
  - kind: ServiceAccount
    name: galaxy
    namespace: kube-system
---
apiVersion: apps/v1
kind: DaemonSet
metadata:
  labels:
    app: galaxy
  name: galaxy
  namespace: kube-system
spec:
  selector:
    matchLabels:
      app: galaxy
  template:
    metadata:
      labels:
        app: galaxy
    spec:
      priorityClassName: system-node-critical
      serviceAccountName: galaxy
      hostNetwork: true
      hostPID: true
      containers:
      - image: tkestack/galaxy:v1.0.7
        command: ["/bin/sh"]
# 入口点脚本:
#   拷贝来自ConfigMap的Galaxy配置到宿主机
#   cp -p /etc/galaxy/cni/00-galaxy.conf /etc/cni/net.d/;
#   拷贝CNI插件二进制文件到宿主机
#   cp -p /opt/cni/galaxy/bin/galaxy-sdn /opt/cni/galaxy/bin/loopback /opt/cni/bin/;
#   启动Galaxy守护进程                        腾讯云中运行要这个参数
#   /usr/bin/galaxy --logtostderr=true --v=3 --route-eni
      # qcloud galaxy should run with --route-eni
        args: ["-c", "cp -p /etc/galaxy/cni/00-galaxy.conf /etc/cni/net.d/; cp -p /opt/cni/galaxy/bin/galaxy-sdn /opt/cni/galaxy/bin/loopback /opt/cni/bin/; /usr/bin/galaxy --logtostderr=true --v=3 --route-eni"]
      # private-cloud should run without --route-eni
      # args: ["-c", "cp -p /etc/galaxy/cni/00-galaxy.conf /etc/cni/net.d/; cp -p /opt/cni/galaxy/bin/galaxy-sdn /opt/cni/galaxy/bin/loopback /opt/cni/bin/; /usr/bin/galaxy --logtostderr=true --v=3"]
        imagePullPolicy: Always
        env:
          - name: MY_NODE_NAME
            valueFrom:
              fieldRef:
                fieldPath: spec.nodeName
          - name: DOCKER_HOST
            value: unix:///host/run/docker.sock
        name: galaxy
        resources:
          requests:
            cpu: 100m
            memory: 200Mi
        securityContext:
          privileged: true
        volumeMounts:
        - name: galaxy-run
          mountPath: /var/run/galaxy/
        - name: flannel-run
          mountPath: /run/flannel
        - name: galaxy-etc
          mountPath: /etc/galaxy
        - name: cni-config
          mountPath: /etc/cni/net.d/
        - name: cni-bin
          mountPath: /opt/cni/bin
        - name: cni-etc
          mountPath: /etc/galaxy/cni
        - name: cni-state
          mountPath: /var/lib/cni
        - name: docker-sock
          mountPath: /host/run/
        - name: tz-config
          mountPath: /etc/localtime
      terminationGracePeriodSeconds: 30
      tolerations:
      - operator: Exists
      volumes:
      - name: galaxy-run
        hostPath:
          path: /var/run/galaxy
      - name: flannel-run
        hostPath:
          path: /run/flannel
      - configMap:
          defaultMode: 420
          name: galaxy-etc
        name: galaxy-etc
      - name: cni-config
        hostPath:
          path: /etc/cni/net.d/
      - name: cni-bin
        hostPath:
          path: /opt/cni/bin
      - name: cni-state
        hostPath:
          path: /var/lib/cni
      - configMap:
          defaultMode: 420
          name: cni-etc
        name: cni-etc
      - name: docker-sock
        # in case of docker restart, /run/docker.sock may change, we have to mount the /run directory
        hostPath:
          path: /run/
      - name: tz-config
        hostPath:
          path: /etc/localtime
---
apiVersion: v1
kind: ConfigMap
metadata:
  name: galaxy-etc
  namespace: kube-system
data:
  # Galaxy配置文件
  # update network card name in "galaxy-k8s-vlan" and "galaxy-k8s-sriov" if necessary
  # update vf_num in "galaxy-k8s-sriov" according to demand
  # update ENIIPNetwork to tke-route-eni if running on qcloud
  galaxy.json: |
    {
      "NetworkConf":[
        {"name":"tke-route-eni","type":"tke-route-eni","eni":"eth1","routeTable":1},
        {"name":"galaxy-flannel","type":"galaxy-flannel", "delegate":{"type":"galaxy-veth"},"subnetFile":"/run/flannel/subnet.env"},
        {"name":"galaxy-k8s-vlan","type":"galaxy-k8s-vlan", "device":"eth1", "default_bridge_name": "br0"},
        {"name":"galaxy-k8s-sriov","type": "galaxy-k8s-sriov", "device": "eth1", "vf_num": 10}
      ],
      "DefaultNetworks": ["galaxy-flannel"],
      "ENIIPNetwork": "galaxy-k8s-vlan"
    }
---
apiVersion: v1
kind: ConfigMap
metadata:
  name: cni-etc
  namespace: kube-system
data:
  # CNI网络配置
  00-galaxy.conf: |
    {
      "name": "galaxy-sdn",
      "type": "galaxy-sdn",
      "capabilities": {"portMappings": true},
      "cniVersion": "0.2.0"
    }

根据实际运行环境、使用的网络模式,需要进行调整。

配置Kubelet

需要保证命令行标记: --network-plugin=cni --cni-bin-dir=/opt/cni/bin/,并重启Kubelet。

Overlay网络
安装Flannel

这种网络模式依赖于Flannel,参考Flannel学习笔记。

配置Galaxy
全局配置

你需要修改上述Galaxy清单中的galaxy-etc,来配置默认网络(DefaultNetworks):

JSON
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
{
  // 所有支持的网络模式的配置
  "NetworkConf":[
    {
      "name":"tke-route-eni", // 如果name为空,Galaxy假设它和type相同
      "type":"tke-route-eni",
      "eni":"eth1",
      "routeTable":1
    },
    {
      "name":"galaxy-flannel", // 这个插件就是flannel的CNI,只是被改了名字
      "type":"galaxy-flannel",
      "delegate":{
        "type":"galaxy-veth"
      },
      "subnetFile":"/run/flannel/subnet.env"
    },
    {
      "name":"galaxy-k8s-vlan",
      "type":"galaxy-k8s-vlan",
      "device":"eth1",
      "default_bridge_name":"br0"
    },
    {
      "name":"galaxy-k8s-sriov",
      "type":"galaxy-k8s-sriov",
      "device":"eth1",
      "vf_num":10
    },
    {
      "name":"galaxy-underlay-veth",
      "type":"galaxy-underlay-veth",
      "device":"eth1"
    }
  ],
  // Pod默认使用这个网络,注意可以加入多个网络
  "DefaultNetworks":[
    "galaxy-flannel" // 使用基于Flannel的Overlay网络
  ],
  // 如果Pod需要腾讯云弹性网卡(ENI)而且没有配置k8s.v1.cni.cncf.io/networks注解,则
  // 默认使用此网络。这个配置可以避免需要为所有需要Underlay网络的Pod添加注解
  "ENIIPNetwork":"galaxy-k8s-vlan"
}
Pod配置

可以为Pod指定注解: 

YAML
1
k8s.v1.cni.cncf.io/networks: galaxy-flannel,galaxy-k8s-sriov

来提示Galaxy应该为它配置哪些网络。 

和其它CNI插件共存

Galaxy可以和其它CNI插件同时存在。需要注意的一点是,--network-conf-dir目录下其它插件的配置文件,其文件名的字典排序不应该比00-galaxy.conf更小,否则Kubelet会在调用Galaxy CNI插件之前,调用其它CNI插件,这可能不符合预期。

Galaxy命令行标记
Shell
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
--alsologtostderr                   # 记录日志到文件,同时记录到标准错误
--bridge-nf-call-iptables           # 是否配置 bridge-nf-call-iptables,启用/禁用对网桥转发的封包被iptables规则过滤,默认true
--cni-paths stringSlice             # 除了从kubelet接收的,额外的CNI路径,默认/opt/cni/galaxy/bin
--flannel-allocated-ip-dir string   # Flannel CNI插件在何处存放分配的IP地址,默认/var/lib/cni/networks
--flannel-gc-interval duration      # 执行Flannel网络垃圾回收的间隔,默认10s
--gc-dirs string                    # 清理哪些目录,这些目录中的文件名包含容器ID
                                    # 默认 /var/lib/cni/flannel,/var/lib/cni/galaxy,/var/lib/cni/galaxy/port
--hostname-override string          # 覆盖kubelet hostname,如果指定该参数,则Galaxy使用它从API Server得到节点对象
--ip-forward                        # 是否启用IP转发
--json-config-path string           # Galaxy配置文件路径,默认/etc/galaxy/galaxy.json
--kubeconfig string                 # Kubelet配置文件
--log-backtrace-at traceLocation    # 如果日志在file:N打印,打印栈追踪。默认default :0
--log-dir string                    # 日志输出目录
--log-flush-frequency duration      # 日志刷出间隔,默认5s
--logtostderr                       # 输出到标准错误而非文件
--master string                     # API Server的地址和端口
--network-conf-dir string           # 额外的CNI网络配置文件。默认/etc/cni/net.d/
--network-policy                    # 启用网络策略支持
--route-eni                         # 是否启用腾讯云route-eni
--stderrthreshold severity          # 以上级别的日志打印到标准错误。默认2
-v, --v Level                       # 日志冗长级别 
Underlay网络
工作原理

Underlay网络必须配合Galaxy-ipam使用,Galaxy-ipam负责为Pod分配或释放IP地址:

  1. 你需要规划容器网络中使用的Underlay IP范围(注意和物理网路上其它节点的IP冲突),并且配置到ConfigMap floatingip-config中
  2. 调度Pod时,kube-scheduler会在filter/priority/bind方法中调用Galaxy-ipam
  3. Galaxy-ipam检查Pod是否配置了Reserved IP,如果是,则Galaxy-ipam仅将此IP所在的可用子网的节点标记为有效节点,否则所有节点都将被标记为有效节点。在Pod绑定IP期间,Galaxy-ipam分配一个IP并将其写入到Pod annotations中
  4. Galaxy从Pod的注解中获得IP,并将其作为参数传递给CNI,通过CNI配置Pod IP

Galaxy的浮动IP能力由Galaxy-ipam提供,后者是一个Kubernetes Scheudler Extender,K8S的调度器会调用Galaxy-ipam,影响其filtering/binding的过程。浮动IP必须配合Underlay网络使用。

整体工作流图如下:

galaxy-ipam

安装Galaxy-ipam

清单文件可以到GitHub下载,默认内容如下:

YAML
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
apiVersion: v1
kind: Service
metadata:
  name: galaxy-ipam
  namespace: kube-system
  labels:
    app: galaxy-ipam
spec:
  type: NodePort
  ports:
  - name: scheduler-port
    port: 9040
    targetPort: 9040
    nodePort: 32760
    protocol: TCP
  - name: api-port
    port: 9041
    targetPort: 9041
    nodePort: 32761
    protocol: TCP
  selector:
    app: galaxy-ipam
---
apiVersion: rbac.authorization.k8s.io/v1
# kubernetes versions before 1.8.0 should use rbac.authorization.k8s.io/v1beta1
kind: ClusterRole
metadata:
  name: galaxy-ipam
rules:
- apiGroups: [""]
  resources:
  - pods
  - namespaces
  - nodes
  - pods/binding
  verbs: ["list", "watch", "get", "patch", "create"]
- apiGroups: ["apps", "extensions"]
  resources:
  - statefulsets
  - deployments
  verbs: ["list", "watch"]
- apiGroups: [""]
  resources:
  - configmaps
  - endpoints
  - events
  verbs: ["get", "list", "watch", "update", "create", "patch"]
- apiGroups: ["galaxy.k8s.io"]
  resources:
  - pools
  - floatingips
  verbs: ["get", "list", "watch", "update", "create", "patch", "delete"]
- apiGroups: ["apiextensions.k8s.io"]
  resources:
  - customresourcedefinitions
  verbs:
  - "*"
- apiGroups: ["apps.tkestack.io"]
  resources:
  - tapps
  verbs: ["list", "watch"]
---
apiVersion: v1
kind: ServiceAccount
metadata:
  name: galaxy-ipam
  namespace: kube-system
---
apiVersion: rbac.authorization.k8s.io/v1
# kubernetes versions before 1.8.0 should use rbac.authorization.k8s.io/v1beta1
kind: ClusterRoleBinding
metadata:
  name: galaxy-ipam
roleRef:
  apiGroup: rbac.authorization.k8s.io
  kind: ClusterRole
  name: galaxy-ipam
subjects:
  - kind: ServiceAccount
    name: galaxy-ipam
    namespace: kube-system
---
apiVersion: apps/v1
kind: Deployment
metadata:
  labels:
    app: galaxy-ipam
  name: galaxy-ipam
  namespace: kube-system
spec:
  replicas: 1
  selector:
    matchLabels:
      app: galaxy-ipam
  template:
    metadata:
      labels:
        app: galaxy-ipam
    spec:
      priorityClassName: system-cluster-critical
      affinity:
        podAntiAffinity:
          requiredDuringSchedulingIgnoredDuringExecution:
          - labelSelector:
              matchExpressions:
              - key: app
                operator: In
                values:
                - galaxy-ipam
            topologyKey: "kubernetes.io/hostname"
      serviceAccountName: galaxy-ipam
      hostNetwork: true
      dnsPolicy: ClusterFirstWithHostNet
      containers:
      - image: tkestack/galaxy-ipam:v1.0.7
        args:
          - --logtostderr=true
          - --profiling
          - --v=3
          - --config=/etc/galaxy/galaxy-ipam.json
          # 这个服务暴露一个端口,供Scheduler调用
          - --port=9040
          - --api-port=9041
          - --leader-elect
        command:
          - /usr/bin/galaxy-ipam
        ports:
          - containerPort: 9040
          - containerPort: 9041
        imagePullPolicy: Always
        name: galaxy-ipam
        resources:
          requests:
            cpu: 100m
            memory: 200Mi
        volumeMounts:
        - name: kube-config
          mountPath: /etc/kubernetes/
        - name: galaxy-ipam-log
          mountPath: /data/galaxy-ipam/logs
        - name: galaxy-ipam-etc
          mountPath: /etc/galaxy
        - name: tz-config
          mountPath: /etc/localtime
      terminationGracePeriodSeconds: 30
      tolerations:
        - effect: NoSchedule
          key: node-role.kubernetes.io/master
          operator: Exists
      volumes:
      - name: kube-config
        hostPath:
          path: /etc/kubernetes/
      - name: galaxy-ipam-log
        emptyDir: {}
      - configMap:
          defaultMode: 420
          name: galaxy-ipam-etc
        name: galaxy-ipam-etc
      - name: tz-config
        hostPath:
          path: /etc/localtime
---
apiVersion: v1
kind: ConfigMap
metadata:
  name: galaxy-ipam-etc
  namespace: kube-system
data:
  # 配置文件
  galaxy-ipam.json: |
    {
      "schedule_plugin": {
        # 如果不是使用腾讯云的弹性网卡(ENI),去掉这一行
        "cloudProviderGrpcAddr": "127.0.0.2:80"
      }
    }
配置kube-scheduler

你需要为K8S调度器配置调度策略,此策略现在可以放在ConfigMap中:

/etc/kubernetes/scheduler-policy-config.json
JSON
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
cat <<EOF | kubectl create -f -
apiVersion: v1
kind: ConfigMap
metadata:
  name: scheduler-policy
  namespace: kube-system
data:
  policy.cfg: |
    {
      "kind": "Policy",
      "apiVersion": "v1",
      "extenders": [
        {
          // 通过HTTP调用扩展
          "urlPrefix": "http://127.0.0.1:9040/v1",
          "httpTimeout": 10000000000,
          "filterVerb": "filter",
          "BindVerb": "bind",
          "weight": 1,
          "enableHttps": false,
          "managedResources": [
            {
              "name": "tke.cloud.tencent.com/eni-ip",
              "ignoredByScheduler": false
            }
          ]
        }
      ]
    }
EOF

然后为调度器添加命令行标记: --policy-configmap=scheduler-policy即可。

配置浮动IP
floatingip-config

运行在裸金属环境下时,可以使用如下配置:

YAML
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
kind: ConfigMap
apiVersion: v1
metadata:
name: floatingip-config
namespace: kube-system
data:
floatingips: |
    [
        {
            # 节点的CIDR,这个网络范围内的节点可以运行具有浮动IP的Pod
            "nodeSubnets": ["10.0.0.0/16"],
            # 可以分配给Pod的IP地址范围
            "ips": ["10.0.70.2~10.0.70.241"],
            # Pod IP子网信息
            "subnet":"10.0.70.0/24",
            # Pod IP网关信息
            "gateway":"10.0.70.1",
            # Pod IP所属VLAN,如果Pod IP和Node IP不在同一VLAN,需要设置该字段,同时
            # 确保节点所连接的交换机时一个trunk port
            "vlan": 1024
        }
    ]

一个nodeSubnet可以对应多个Pod Subnet,例如:

JSON
1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 如果Pod运行在10.49.28.0/26,那么它可以具有10.0.80.2~10.0.80.4或者10.0.81.2~10.0.81.4的IP地址
// 如果Pod运行在10.49.29.0/24,则只能具有10.0.80.0/24的IP地址
[{
    "nodeSubnets": ["10.49.28.0/26", "10.49.29.0/24"],
    "ips": ["10.0.80.2~10.0.80.4"],
    "subnet": "10.0.80.0/24",
    "gateway": "10.0.80.1"
}, {
    "nodeSubnets": ["10.49.28.0/26"],
    "ips": ["10.0.81.2~10.0.81.4"],
    "subnet": "10.0.81.0/24",
    "gateway": "10.0.81.1",
    "vlan": 3
}]

只要可分配的IP地址范围不重叠,多个nodeSubnet可以共享同一个Pod Subnet:

JSON
1
2
3
4
5
6
7
8
9
10
11
12
13
[{
    "routableSubnet": "10.180.1.2/32",
    "ips": ["10.180.154.2~10.180.154.3"],
    "subnet": "10.180.154.0/24",
    "gateway": "10.180.154.1",
    "vlan": 3
}, {
    "routableSubnet": "10.180.1.3/32",
    "ips": ["10.180.154.7~10.180.154.8"],
    "subnet": "10.180.154.0/24",
    "gateway": "10.180.154.1",
    "vlan": 3
}]
保留IP

要保留一个IP不被分配,可以使用下面的FloatingIP资源: 

YAML
1
2
3
4
5
6
7
8
9
10
11
12
13
apiVersion: galaxy.k8s.io/v1alpha1
kind: FloatingIP
metadata:
  # 名字是需要保留的IP
  name: 10.0.0.1
  labels:
    # 从1.0.8下面这个标签不再需要
    ipType: internalIP
    # 这个标签必须保留
    reserved: this-is-not-for-pods
spec:
  key: pool__reserved-for-node_
  policy: 2
Deployment配置

目前浮动IP仅仅支持Deployment、StatefulSet产生的工作负载。

YAML
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
apiVersion: apps/v1
kind: Deployment
...
spec:
  ...
  template:
    metadata:
      annotations:
        # 如果默认网络不是galaxy-k8s-vlan则必须添加下面的注解
        k8s.v1.cni.cncf.io/networks: galaxy-k8s-vlan
        # IP释放策略:
        #    为空/不指定:Pod一旦停止,就释放IP
        #    immutable:仅仅在删除、缩容Deployment / StatefulSet的情况下才释放IP
        #    never:即使Deployment / StatefulSet被删除,也不会释放IP。后续的同名Deployment
        #           /StatefulSet会重用已分配的IP
        k8s.v1.cni.galaxy.io/release-policy: immutable
      creationTimestamp: null
      labels:
        k8s-app: nnn
        qcloud-app: nnn
    spec:
      containers:
      - image: nginx
        imagePullPolicy: Always
        name: nnn
        resources:
          limits:
            cpu: 500m
            memory: 1Gi
            # 扩展的资源限制,在某个容器中配置即可
            tke.cloud.tencent.com/eni-ip: "1"
          requests:
            cpu: 250m
            memory: 256Mi
            tke.cloud.tencent.com/eni-ip: "1"

生成的Pod中,通过注解 k8s.v1.cni.galaxy.io/args指明了它的IP地址等信息。CNI插件也是基于这些信息为容器网络命名空间设置IP地址、路由的:

YAML
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
apiVersion: v1
kind: Pod
metadata:
  name: nnn-7df5984746-58hjm
  annotations:
    k8s.v1.cni.cncf.io/networks: galaxy-k8s-vlan
    k8s.v1.cni.galaxy.io/args: '{"common":{"ipinfos":[{"ip":"192.168.64.202/24",
        "vlan":0,"gateway":"192.168.64.1","routable_subnet":"172.21.64.0/20"}]}}'
    k8s.v1.cni.galaxy.io/release-policy: immutable
...
spec:
...
status:
...
  hostIP: 172.21.64.15
  phase: Running
  podIP: 192.168.64.202
  podIPs:
  - ip: 192.168.64.202
Galaxy-ipam相关CRD
Pool

为Deployment设置注解 tke.cloud.tencent.com/eni-ip-pool,可以让多个Deployment共享一个IP池。

使用IP池的情况下,默认的IP释放策略为never,除非通过注解 k8s.v1.cni.galaxy.io/release-policy指定其它策略。

默认情况下,IP池的大小随着Deployment/StatefulSet的副本数量的增加而增大,你可以设置固定尺寸的IP池:

YAML
1
2
3
4
5
apiVersion: galaxy.k8s.io/v1alpha1
kind: Pool
metadata:
  name: example-pool
size: 4

通过HTTP接口创建IP池的时候,可以设置preAllocateIP=true来为池预先分配IP,通过kubectl创建CR时无法实现预分配。 

FloatingIP

该CR保存了浮动IP,及其绑定的工作负载信息:

YAML
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
apiVersion: galaxy.k8s.io/v1alpha1
kind: FloatingIP
metadata:
  creationTimestamp: "2020-03-04T08:28:15Z"
  generation: 1
  labels:
    ipType: internalIP
  # IP地址
  name: 192.168.64.202
  resourceVersion: "2744910"
  selfLink: /apis/galaxy.k8s.io/v1alpha1/floatingips/192.168.64.202
  uid: b5d55f27-4548-44c7-b8ad-570814b55026
spec:
  attribute: '{"NodeName":"172.21.64.15"}'
  # 绑定的工作负载
  key: dp_default_nnn_nnn-7df5984746-58hjm
  policy: 1
  subnet: 172.21.64.0/20
  updateTime: "2020-03-04T08:28:15Z" 
Galaxy-ipam HTTP接口

Galaxy-ipam提供了基于Swagger 2.0的API,为galaxy-ipam提供命令行选项 --swagger,则它能够在下面的URL展示此API:

http://${galaxy-ipam-ip}:9041/apidocs.json/v1

查询分配的IP
JSON
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// 查询分配给default命名空间中的StatefulSet sts的IP地址
// curl 'http://192.168.30.7:9041/v1/ip?appName=sts&appType=statefulsets&namespace=default'
{
"last": true,
"totalElements": 2,
"totalPages": 1,
"first": true,
"numberOfElements": 2,
"size": 10,
"number": 0,
"content": [
  {
   "ip": "10.0.0.112",
   "namespace": "default",
   "appName": "sts",
   "podName": "sts-0",
   "policy": 2,
   "appType": "statefulset",
   "updateTime": "2020-05-29T11:11:44.633383558Z",
   "status": "Deleted",
   "releasable": true
  },
  {
   "ip": "10.0.0.174",
   "namespace": "default",
   "appName": "sts",
   "podName": "sts-1",
   "policy": 2,
   "appType": "statefulset",
   "updateTime": "2020-05-29T11:11:45.132450117Z",
   "status": "Deleted",
   "releasable": true
  }
]
}
释放IP地址
JSON
1
2
3
4
5
6
7
8
9
10
11
// curl -X POST -H "Content-type: application/json" -d '
//   { "ips": [
//     { "ip":"10.0.0.112", "appName":"sts", "appType":"statefulset",  "podName":"sts-0","namespace":"default"},
//     {"ip":"10.0.0.174", "appName":"sts", "appType":"statefulset", "podName":"sts-1", "namespace":"default"}
//   ]}
// '
// http://192.168.30.7:9041/v1/ip
{
"code": 200,
"message": ""
}
常见问题
关于滚动更新

Deployment的默认更新策略是StrategyType=RollingUpdate,25% max unavailable, 25% max surge。这意味着这在滚动更新过程中,可能存在超过副本数限制25%的Pod。这可能导致死锁状态:

  1. 假设Deployment副本数为3,那么有一个副本不可用,就超过25%的限制。Deployment的控制器只能先创建新Pod,然后等它就绪后再删除一个旧Pod
  2. 如果浮动IP的释放策略被设置为immutable/never,这就意味着新的Pod需要从一个旧Pod那复用IP,然而旧Pod还尚未删除

这会导致新Pod卡斯在调度阶段。

端口映射

使用K8S的NodePort服务,可以将一组Pod通过宿主机端口暴露到集群外部。但是如果希望访问StatefulSet的每个特定Pod的端口,使用K8S NodePort服务无法实现。

使用K8S的HostPort,则会存在两个Pod调度到同一节点,进而产生端口冲突的问题:

YAML
1
2
3
4
5
6
    spec:
      containers:
      - image: ...
        ports:
          - containerPort: 9040
            hostPort: 9040

Galaxy提供了一个功能,可以进行随机的端口映射: 

YAML
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
apiVersion: apps/v1
kind: Deployment
metadata:
  labels:
    app: hello
  name: hello
spec:
  replicas: 1
  selector:
    matchLabels:
      app: hello
  template:
    metadata:
      labels:
        app: hello
      annotations:
        # 为Pod设置此注解即可
        tkestack.io/portmapping: ""
    spec:
      containers:
      - image: ...
        ports:
          - containerPort: 9040

这样Galaxy会利用iptables,随机的映射宿主机的端口到Pod的每个容器端口,并且将此随机端口回填到tkestack.io/portmapping注解中。

源码解读
CNI
galaxy-sdn

在TKEStack的缺省配置(基于Flannel的VXLAN模式的Overlay网络)下,每个节点只有一个CNI配置:

/etc/cni/net.d/00-galaxy.conf
JSON
1
2
3
4
5
{
  "type": "galaxy-sdn",
  "capabilities": {"portMappings": true},
  "cniVersion": "0.2.0"
}

我们先看看这个CNI插件做的什么事情,又是如何和Flannel交互的。Galaxy所有自带的CNI插件都位于github.com/tkestack/galaxy/cni目录下,其中galaxy-sdn插件入口点如下:

cni/sdn/sdn.go
Go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
package main
 
import (
    "bytes"
    "encoding/json"
    "fmt"
    "io/ioutil"
    "net"
    "net/http"
    "os"
    "strings"
 
    "github.com/containernetworking/cni/pkg/skel"
    // 使用0.20版本的CNI
    t020 "github.com/containernetworking/cni/pkg/types/020"
    "github.com/containernetworking/cni/pkg/version"
    galaxyapi "tkestack.io/galaxy/pkg/api/galaxy"
    "tkestack.io/galaxy/pkg/api/galaxy/private"
)
 
type cniPlugin struct {
    socketPath string
}
 
func NewCNIPlugin(socketPath string) *cniPlugin {
    return &cniPlugin{socketPath: socketPath}
}
 
// Create and fill a CNIRequest with this plugin's environment and stdin which
// contain the CNI variables and configuration
func newCNIRequest(args *skel.CmdArgs) *galaxyapi.CNIRequest {
    envMap := make(map[string]string)
    for _, item := range os.Environ() {
        idx := strings.Index(item, "=")
        if idx > 0 {
            envMap[strings.TrimSpace(item[:idx])] = item[idx+1:]
        }
    }
 
    return &galaxyapi.CNIRequest{
        Env:    envMap,
        Config: args.StdinData,
    }
}
 
// 调用Galaxy守护进程
func (p *cniPlugin) doCNI(url string, req *galaxyapi.CNIRequest) ([]byte, error) {
    data, err := json.Marshal(req)
    if err != nil {
        return nil, fmt.Errorf("failed to marshal CNI request %v: %v", req, err)
    }
    // 通过UDS连接
    client := &http.Client{
        Transport: &http.Transport{
            Dial: func(proto, addr string) (net.Conn, error) {
                return net.Dial("unix", p.socketPath)
            },
        },
    }
 
    resp, err := client.Post(url, "application/json", bytes.NewReader(data))
    if err != nil {
        return nil, fmt.Errorf("failed to send CNI request: %v", err)
    }
    defer resp.Body.Close() // nolint: errcheck
 
    body, err := ioutil.ReadAll(resp.Body)
    if err != nil {
        return nil, fmt.Errorf("failed to read CNI result: %v", err)
    }
 
    if resp.StatusCode != 200 {
        return nil, fmt.Errorf("galaxy returns: %s", string(body))
    }
 
    return body, nil
}
 
// 就是将CNI请求转发给Galaxy守护进程处理
func (p *cniPlugin) CmdAdd(args *skel.CmdArgs) (*t020.Result, error) {
    // 由于通过UDS通信,URL的hostname部分随便写了一个dummy,就是简单的调用/cni这个Endpoint
    body, err := p.doCNI("http://dummy/cni", newCNIRequest(args))
    if err != nil {
        return nil, err
    }
 
    result := &t020.Result{}
    if err := json.Unmarshal(body, result); err != nil {
        return nil, fmt.Errorf("failed to unmarshal response '%s': %v", string(body), err)
    }
 
    return result, nil
}
 
// Send the ADD command environment and config to the CNI server, printing
// the IPAM result to stdout when called as a CNI plugin
func (p *cniPlugin) skelCmdAdd(args *skel.CmdArgs) error {
    result, err := p.CmdAdd(args)
    if err != nil {
        return err
    }
    return result.Print()
}
 
// Send the DEL command environment and config to the CNI server
func (p *cniPlugin) CmdDel(args *skel.CmdArgs) error {
    _, err := p.doCNI("http://dummy/cni", newCNIRequest(args))
    return err
}
 
// 使用skel框架开发
func main() {
    // 传入UDS套接字的路径
    p := NewCNIPlugin(private.GalaxySocketPath)
    skel.PluginMain(p.skelCmdAdd, p.CmdDel, version.Legacy)
}

可以看到galaxy-sdn就是个空壳子,所有工作都是委托给Galaxy守护进程处理的。 

galaxy-k8s-vlan

整体流程:

Go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
func init() {
    // this ensures that main runs only on main thread (thread group leader).
    // since namespace ops (unshare, setns) are done for a single thread, we
    // must ensure that the goroutine does not jump from OS thread to thread
    runtime.LockOSThread()
    _, pANet, _ = net.ParseCIDR("10.0.0.0/8")
    _, pBNet, _ = net.ParseCIDR("172.16.0.0/12")
    _, pCNet, _ = net.ParseCIDR("192.168.0.0/16")
}
 
func main() {
    d = &vlan.VlanDriver{}
    skel.PluginMain(cmdAdd, cmdDel, version.Legacy)
}
 
func cmdAdd(args *skel.CmdArgs) error {
    // 加载CNI配置文件
    conf, err := d.LoadConf(args.StdinData)
    if err != nil {
        return err
    }
    // 首先尝试从args获得IPInfo,如果失败则调用第三方IPAM插件分配IP
    vlanIds, results, err := ipam.Allocate(conf.IPAM.Type, args)
    if err != nil {
        return err
    }
    // 判断是否创建默认网桥
    if d.DisableDefaultBridge == nil {
        defaultTrue := true
        d.DisableDefaultBridge = &defaultTrue
        for i := range vlanIds {
            if vlanIds[i] == 0 {
                *d.DisableDefaultBridge = false
            }
        }
    }
    // 初始化VLAN驱动,主要是在必要的情况下对宿主机上的网桥进行配置,或者进行针对MACVlan/IPVlan的配置
    if err := d.Init(); err != nil {
        return fmt.Errorf("failed to setup bridge %v", err)
    }
    result020s, err := resultConvert(results)
    if err != nil {
        return err
    }
    // 配置宿主机网络
    if err := setupNetwork(result020s, vlanIds, args); err != nil {
        return err
    }
    result020s[0].DNS = conf.DNS
    return result020s[0].Print()
}
 
func cmdDel(args *skel.CmdArgs) error {
    // 删除所有VETH对
    if err := utils.DeleteAllVeth(args.Netns); err != nil {
        return err
    }
    conf, err := d.LoadConf(args.StdinData)
    if err != nil {
        return err
    }
    // 是否IP地址
    return ipam.Release(conf.IPAM.Type, args)
}

VlanDriver是此CNI的核心。 

Go
1
2
3
4
5
6
7
8
9
type VlanDriver struct {
    //FIXME add a file lock cause we are running multiple processes?
    *NetConf
    // 不管是物理接口,还是VLAN子接口,都位于宿主机命名空间
    // 所有VLAN接口的物理接口(父接口)的设备索引,例如eth1
    vlanParentIndex int
    // 如果启用VLAN,则是当前所属VLAN的子接口的索引。否则是物理接口的索引
    DeviceIndex int
}

ADD/DEL命令都需要调用下面的方法加载CNI配置:

Go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
const (
    VlanPrefix        = "vlan"
    BridgePrefix      = "docker"
    DefaultBridge     = "docker"
    // IPVLAN有两种模式L2/L3
    DefaultIPVlanMode = "l3"
)
 
func (d *VlanDriver) LoadConf(bytes []byte) (*NetConf, error) {
    conf := &NetConf{}
    if err := json.Unmarshal(bytes, conf); err != nil {
        return nil, fmt.Errorf("failed to load netconf: %v", err)
    }
    if conf.DefaultBridgeName == "" {
        conf.DefaultBridgeName = DefaultBridge
    }
    if conf.BridgeNamePrefix == "" {
        conf.BridgeNamePrefix = BridgePrefix
    }
    if conf.VlanNamePrefix == "" {
        conf.VlanNamePrefix = VlanPrefix
    }
    if conf.IpVlanMode == "" {
        conf.IpVlanMode = DefaultIPVlanMode
    }
 
    d.NetConf = conf
    return conf, nil
}

CNI配置定义如下:

Go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
type NetConf struct {
    // CNI标准网络配置
    types.NetConf
    // 持有IDC IP地址的设备名,例如eth1或eth1.12(VLAN子接口)
    Device string `json:"device"`
    // 上述Device为空的时候,候选设备列表。取第一个存在的设备
    Devices []string `json:"devices"`
    // 交换机实现方式 macvlan, bridge(默认), pure(避免创建不必要的网桥)
    Switch string `json:"switch"`
    // IPVLAN模式,可选l2, l3(默认), l3s
    IpVlanMode string `json:"ipvlan_mode"`
 
    // 不去创建默认网桥
    DisableDefaultBridge *bool `json:"disable_default_bridge"`
    // 默认网桥的名字
    DefaultBridgeName string `json:"default_bridge_name"`
    // 网桥名字前缀
    BridgeNamePrefix string `json:"bridge_name_prefix"`
    // VLAN接口名字前缀
    VlanNamePrefix string `json:"vlan_name_prefix"`
    // 是否启用免费ARP
    GratuitousArpRequest bool `json:"gratuitous_arp_request"`
    // 设置MTU
    MTU int `json:"mtu"`
}

ADD命令,加载完CNI配置后,首先是调用IPAM进行IP地址分配:

Go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
func Allocate(ipamType string, args *skel.CmdArgs) ([]uint16, []types.Result, error) {
    var (
        vlanId uint16
        err    error
    )
    // 解析key1=val1;key2=val2格式的CNI参数。这些参数来自Pod的k8s.v1.cni.galaxy.io/args注解,
    // 如果使用浮动IP,则Galaxy IPAM会设置该注解,将IP地址填写在ipinfos字段
    kvMap, err := cniutil.ParseCNIArgs(args.Args)
    if err != nil {
        return nil, nil, err
    }
    var results []types.Result
    var vlanIDs []uint16
    // 读取ipinfos字段
    if ipInfoStr := kvMap[constant.IPInfosKey]; ipInfoStr != "" {
        // 解析IP信息
        var ipInfos []constant.IPInfo
        if err := json.Unmarshal([]byte(ipInfoStr), &ipInfos); err != nil {
            return nil, nil, fmt.Errorf("failed to unmarshal ipInfo from args %q: %v", args.Args, err)
        }
        if len(ipInfos) == 0 {
            return nil, nil, fmt.Errorf("empty ipInfos")
        }
        for j := range ipInfos {
            results = append(results, cniutil.IPInfoToResult(&ipInfos[j]))
            vlanIDs = append(vlanIDs, ipInfos[j].Vlan)
        }
        // 直接“分配”Pod注解里写的IP地址
        return vlanIDs, results, nil
    }
    // 如果Pod注解里面没有IP地址信息,则需要调用谋者IPAM插件,因此断言ipamType不为空:
    if ipamType == "" {
        return nil, nil, fmt.Errorf("neither ipInfo from cni args nor ipam type from netconf")
    }
    // 调用IPAM插件
    generalResult, err := ipam.ExecAdd(ipamType, args.StdinData)
    if err != nil {
        return nil, nil, err
    }
    result, err := t020.GetResult(generalResult)
    if err != nil {
        return nil, nil, err
    }
    if result.IP4 == nil {
        return nil, nil, fmt.Errorf("IPAM plugin returned missing IPv4 config")
    }
    return append(vlanIDs, vlanId), append(results, generalResult), err
}

如果使用Galaxy IPAM提供的浮动IP功能,则上述代码会自动获取已经分配的IP信息并返回。

分配/获取IP地址之后,就执行VlanDriver的初始化:

Go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
func (d *VlanDriver) Init() error {
    var (
        device netlink.Link
        err    error
    )
    // 这个驱动需要指定宿主机上的父接口
    if d.Device != "" {
        device, err = netlink.LinkByName(d.Device)
    } else {
        if len(d.Devices) == 0 {
            return fmt.Errorf("a device is needed to use vlan plugin")
        }
        for _, devName := range d.Devices {
            device, err = netlink.LinkByName(devName)
            if err == nil {
                break
            }
        }
    }
    if err != nil {
        return fmt.Errorf("Error getting device %s: %v", d.Device, err)
    }
    // 默认MTU从父接口上查找
    if d.MTU == 0 {
        d.MTU = device.Attrs().MTU
    }
    d.DeviceIndex = device.Attrs().Index
    d.vlanParentIndex = device.Attrs().Index
    // 如果指定的设备是VLAN子接口,则使用其父接口的索引
    if device.Type() == "vlan" {
        //A vlan device
        d.vlanParentIndex = device.Attrs().ParentIndex
        //glog.Infof("root device %s is a vlan device, parent index %d", d.Device, d.vlanParentIndex)
    }
    if d.IPVlanMode() {
        switch d.GetIPVlanMode() {
        case netlink.IPVLAN_MODE_L3S, netlink.IPVLAN_MODE_L3:
            // 允许绑定到非本地地址,什么作用
            return utils.EnableNonlocalBind()
        default:
            return nil
        }
    } else if d.MacVlanMode() {
        return nil
    } else if d.PureMode() {
        if err := d.initPureModeArgs(); err != nil {
            return err
        }
        return utils.EnableNonlocalBind()
    }
    if d.DisableDefaultBridge != nil && *d.DisableDefaultBridge {
        return nil
    }
    // 需要创建默认网桥
    // 获得父接口的IP地址
    v4Addr, err := netlink.AddrList(device, netlink.FAMILY_V4)
    if err != nil {
        return fmt.Errorf("Errror getting ipv4 address %v", err)
    }
    filteredAddr := network.FilterLoopbackAddr(v4Addr)
    if len(filteredAddr) == 0 {
        // 父接口没有IP地址,那么地址应当转给网桥了。检查确保网桥设备存在
        bri, err := netlink.LinkByName(d.DefaultBridgeName)
        if err != nil {
            return fmt.Errorf("Error getting bri device %s: %v", d.DefaultBridgeName, err)
        }
        if bri.Attrs().Index != device.Attrs().MasterIndex {
            return fmt.Errorf("No available address found on device %s", d.Device)
        }
    } else {
        // 否则,意味着网桥应该还不存在,初始化之
        if err := d.initVlanBridgeDevice(device, filteredAddr); err != nil {
            return err
        }
    }
    return nil
}
 
// 不使用MacVlan/IPVlan,则需要依赖网桥
func (d *VlanDriver) initVlanBridgeDevice(device netlink.Link, filteredAddr []netlink.Addr) error {
    // 创建网桥
    bri, err := getOrCreateBridge(d.DefaultBridgeName, device.Attrs().HardwareAddr)
    if err != nil {
        return err
    }
    // 启动网桥
    if err := netlink.LinkSetUp(bri); err != nil {
        return fmt.Errorf("failed to set up bridge device %s: %v", d.DefaultBridgeName, err)
    }
    // 获取父接口路由列表
    rs, err := netlink.RouteList(device, nl.FAMILY_V4)
    if err != nil {
        return fmt.Errorf("failed to list route of device %s", device.Attrs().Name)
    }
    defer func() {
        if err != nil {
            // 如果出现错误,则路由需要加回去
            for i := range rs {
                _ = netlink.RouteAdd(&rs[i])
            }
        }
    }()
    // 将父接口的IP地址、路由,转移到网桥上
    err = d.moveAddrAndRoute(device, bri, filteredAddr, rs)
    if err != nil {
        return err
    }
    return nil
}

可以看到,如果使用MacVlan或者IPVlan模式,则不会去管理宿主机上的网桥(这也是MacVlan/IPVlan的价值之一,不需要网桥)。否则,会创建网桥并转移走父接口的IP地址、路由。

最后,下面的函数负责配置好网络:  

Go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func setupNetwork(result020s []*t020.Result, vlanIds []uint16, args *skel.CmdArgs) error {
    if d.MacVlanMode() {
        // 处理MACVlan模式
        if err := setupMacvlan(result020s[0], vlanIds[0], args); err != nil {
            return err
        }
    } else if d.IPVlanMode() {
        // 处理IPVlan模式
        if err := setupIPVlan(result020s[0], vlanIds[0], args); err != nil {
            return err
        }
    } else {
        // 处理网桥模式
        ifName := args.IfName
        if err := setupVlanDevice(result020s, vlanIds, args); err != nil {
            return err
        }
        args.IfName = ifName
    }
    // 发送一个免费ARP,让交换机知道浮动IP漂移到当前节点上了
    if d.PureMode() {
        _ = utils.SendGratuitousARP(d.Device, result020s[0].IP4.IP.IP.String(), "", d.GratuitousArpRequest)
    }
    return nil
}

如果使用MACVlan,则配置过程如下:

Go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
func setupMacvlan(result *t020.Result, vlanId uint16, args *skel.CmdArgs) error {
    if err := d.MaybeCreateVlanDevice(vlanId); err != nil {
        return err
    }
    // 通过MACVlan连接宿主机和容器
    if err := utils.MacVlanConnectsHostWithContainer(result, args, d.DeviceIndex, d.MTU); err != nil {
        return err
    }
    // 在命名空间内进行宣告
    _ = utils.SendGratuitousARP(args.IfName, result.IP4.IP.IP.String(), args.Netns, d.GratuitousArpRequest)
    return nil
}
 
func MacVlanConnectsHostWithContainer(result *t020.Result, args *skel.CmdArgs, parent int, mtu int) error {
    var err error
    // 创建MACVlan设备
    macVlan := &netlink.Macvlan{
        Mode: netlink.MACVLAN_MODE_BRIDGE,
        LinkAttrs: netlink.LinkAttrs{
            Name:        HostMacVlanName(args.ContainerID),
            MTU:         mtu,
            ParentIndex: parent, // 父设备可能是物理接口或者VLAN子接口
        }}
    // 添加接口
    if err := netlink.LinkAdd(macVlan); err != nil {
        return err
    }
    // 出错时必须接口被删除
    defer func() {
        if err != nil {
            netlink.LinkDel(macVlan)
        }
    }()
    // 配置沙盒,细节参考下文
    if err = configSboxDevice(result, args, macVlan); err != nil {
        return err
    }
    return nil
}

如果使用IPVlan,则配置过程如下: 

Go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
func setupIPVlan(result *t020.Result, vlanId uint16, args *skel.CmdArgs) error {
    // 如有必要,创建VLAN设备
    if err := d.MaybeCreateVlanDevice(vlanId); err != nil {
        return err
    }
    // 连接宿主机和容器网络                                         IPVLAN的父设备可能是
    //                                                            VLAN子接口(位于宿主机命名空间)
    if err := utils.IPVlanConnectsHostWithContainer(result, args, d.DeviceIndex, d.GetIPVlanMode(), d.MTU); err != nil {
        return err
    }
 
    // l3模式下,所有连接在一起的容器之间的相互通信,都要通过宿主机上的IPVlan父接口进行代理
    // 在我们的环境中(部署在OpenStack的虚拟机上),出现Pod无法ping二层网关的情况,需要
    //     arping -c 2 -U -I eth0 <pod-ip>
    // 才可以解决,等价于下面的代码
    // Gratuitous ARP不能保证ARP缓存永久有效,底层交换机(OpenStack虚拟的)需要进行适当的配置,
    // 将Pod IP和VM的MAC地址关联
    if d.IpVlanMode == "l3" || d.IpVlanMode == "l3s" {
        _ = utils.SendGratuitousARP(d.Device, result.IP4.IP.IP.String(), "", d.GratuitousArpRequest)
        return nil
    }
    // 在网络命名空间中进行宣告
    _ = utils.SendGratuitousARP(args.IfName, result.IP4.IP.IP.String(), args.Netns, d.GratuitousArpRequest)
    return nil
}
 
 
// 创建VLAN设备的细节
func (d *VlanDriver) MaybeCreateVlanDevice(vlanId uint16) error {
    // 如果不启用VLAN支持,则不做任何事情
    if vlanId == 0 {
        return nil
    }
    _, err := d.getOrCreateVlanDevice(vlanId)
    return err
}
func (d *VlanDriver) getOrCreateVlanDevice(vlanId uint16) (netlink.Link, error) {
    // 根据父接口和VLAN ID查找已存在的VLAN设备
    link, err := d.getVlanIfExist(vlanId)
    if err != nil || link != nil {
        if link != nil {
            // 设置VLAN设备索引
            d.DeviceIndex = link.Attrs().Index
        }
        return link, err
    }
    vlanIfName := fmt.Sprintf("%s%d", d.VlanNamePrefix, vlanId)
    // 获取VLAN设备,如果获取不到则通过后面的回调函数创建
    vlan, err := getOrCreateDevice(vlanIfName, func(name string) error {
        // 创建设备                                                 名字 vlan100 父接口索引
        vlanIf := &netlink.Vlan{LinkAttrs: netlink.LinkAttrs{Name: vlanIfName, ParentIndex: d.vlanParentIndex},
            // VLAN号
            VlanId: (int)(vlanId)}
        // 添加设备
        if err := netlink.LinkAdd(vlanIf); err != nil {
            return fmt.Errorf("Failed to add vlan device %s: %v", vlanIfName, err)
        }
        return nil
    })
    if err != nil {
        return nil, err
    }
    // 设置为UP状态
    if err := netlink.LinkSetUp(vlan); err != nil {
        return nil, fmt.Errorf("Failed to set up vlan device %s: %v", vlanIfName, err)
    }
    // 更新VLAN子接口索引号
    d.DeviceIndex = vlan.Attrs().Index
    return vlan, nil
}
func (d *VlanDriver) getVlanIfExist(vlanId uint16) (netlink.Link, error) {
    links, err := netlink.LinkList()
    if err != nil {
        return nil, err
    }
    for _, link := range links {
        // 遍历网络接口列表,查找VLAN类型的、VLAN ID匹配的、父接口匹配的
        // 隐含意味着对于每个VLAN,只需要一个子接口,所有连接到此VLAN的容器使用它
        if link.Type() == "vlan" {
            if vlan, ok := link.(*netlink.Vlan); !ok {
                return nil, fmt.Errorf("vlan device type case error: %T", link)
            } else {
                if vlan.VlanId == int(vlanId) && vlan.ParentIndex == d.vlanParentIndex {
                    return link, nil
                }
            }
        }
    }
    return nil, nil
}
 
 
// 通过IPVLAN两宿主机和容器连接起来的细节
// 配置网络沙盒
func configSboxDevice(result *t020.Result, args *skel.CmdArgs, sbox netlink.Link) error {
    // 配置MAC地址之前,应该将接口DOWN掉
    if err := netlink.LinkSetDown(sbox); err != nil {
        return fmt.Errorf("could not set link down for container interface %q: %v", sbox.Attrs().Name, err)
    }
    if sbox.Type() != "ipvlan" {
        // IPVlan不需要设置MAC地址,因为必须和父接口一致
        if err := netlink.LinkSetHardwareAddr(sbox, GenerateMACFromIP(result.IP4.IP.IP)); err != nil {
            return fmt.Errorf("could not set mac address for container interface %q: %v", sbox.Attrs().Name, err)
        }
    }
    // 获得容器网络命名空间
    netns, err := ns.GetNS(args.Netns)
    if err != nil {
        return fmt.Errorf("failed to open netns %q: %v", args.Netns, err)
    }
    // 总是要关闭命名空间
    defer netns.Close() // nolint: errcheck
    // 移动到网络命名空间中
    if err = netlink.LinkSetNsFd(sbox, int(netns.Fd())); err != nil {
        return fmt.Errorf("failed to move sbox device %q to netns: %v", sbox.Attrs().Name, err)
    }
    // 在网络命名空间中配置
    return netns.Do(func(_ ns.NetNS) error {
        // 改名,IPVLAN接口直接变为容器的eth0,就像VETH对的一端那样
        if err := netlink.LinkSetName(sbox, args.IfName); err != nil {
            return fmt.Errorf("failed to rename sbox device %q to %q: %v", sbox.Attrs().Name, args.IfName, err)
        }
        // 如果容器、宿主机上有多个网络设备,则需要禁用rp_filter(反向路径过滤)
        // 因为从宿主机来的ARP请求(广播报文),可能使用不确定的源IP,如果启用反向路径过滤,封包可能被丢弃
        if err := DisableRpFilter(args.IfName); err != nil {
            return fmt.Errorf("failed disable rp_filter to dev %s: %v", args.IfName, err)
        }
        // 禁用所有接口的rp_filter
        if err := DisableRpFilter("all"); err != nil {
            return fmt.Errorf("failed disable rp_filter to all: %v", err)
        }
        // 配置容器网络接口:将IP地址、路由添加到容器网络接口
        return cniutil.ConfigureIface(args.IfName, result)
    })
}
func ConfigureIface(ifName string, res *t020.Result) error {
    // 网络接口应该已经存在
    link, err := netlink.LinkByName(ifName)
    if err != nil {
        return fmt.Errorf("failed to lookup %q: %v", ifName, err)
    }
    // 设置为UP状态
    if err := netlink.LinkSetUp(link); err != nil {
        return fmt.Errorf("failed to set %q UP: %v", ifName, err)
    }
 
    // TODO(eyakubovich): IPv6
    addr := &netlink.Addr{IPNet: &res.IP4.IP, Label: ""}
    // 添加IP地址
    if err = netlink.AddrAdd(link, addr); err != nil {
        return fmt.Errorf("failed to add IP addr to %q: %v", ifName, err)
    }
 
    // 遍历并应用IPV4路由
    for _, r := range res.IP4.Routes {
        gw := r.GW
        if gw == nil {
            gw = res.IP4.Gateway
        }
        if err = ip.AddRoute(&r.Dst, gw, link); err != nil {
            // we skip over duplicate routes as we assume the first one wins
            if !os.IsExist(err) {
                return fmt.Errorf("failed to add route '%v via %v dev %v': %v", r.Dst, gw, ifName, err)
            }
        }
    }
    return nil
}

DEL命令比较简单:

  1. 删除容器网络命名空间中所有VETH对
  2. 释放IP,就是调用IPAM插件,如果没有插件则什么都不做。Galaxy IPAM不会被调用,它总是自己负责IP地址的回收
galaxy-veth-host

该插件用于解决使用IPVlan L2模式下Pod访问不了宿主机IP、Service IP的问题。使用该插件,需要配置Galaxy:

JSON
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
{
  "NetworkConf":[
    {"name":"tke-route-eni","type":"tke-route-eni","eni":"eth1","routeTable":1},
    {"name":"galaxy-flannel","type":"galaxy-flannel", "delegate":{"type":"galaxy-veth"},"subnetFile":"/run/flannel/subnet.env"},
    {"name":"galaxy-k8s-vlan","type":"galaxy-k8s-vlan", "device":"eth0", "switch":"ipvlan", "ipvlan_mode":"l2", "mtu": 1500},
    {"name":"galaxy-k8s-sriov","type": "galaxy-k8s-sriov", "device": "eth0", "vf_num": 10},
    // 新增
    {
        "name":"galaxy-veth-host","type": "galaxy-veth-host",
        // 这里配置K8S的服务CIDR         指明宿主机网卡
        "serviceCidr": "11.1.252.0/22", "hostInterface": "eth0",
        // 容器中此插件创建的网卡名称     是否进行SNAT
        "containerInterface": "veth0", "ipMasq": true
    }
  ],
  "DefaultNetworks": ["galaxy-flannel"]
}

Pod注解需要使用两个网络:

YAML
1
2
annotations:
  k8s.v1.cni.cncf.io/networks: "galaxy-k8s-vlan,galaxy-veth-host"

代码解读: 

Go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
package main
 
import (
    "encoding/json"
    "fmt"
    "math"
    "math/rand"
    "net"
    "os"
    "runtime"
    "sort"
    "strconv"
    "time"
 
    "github.com/containernetworking/cni/pkg/skel"
    "github.com/containernetworking/cni/pkg/types"
    "github.com/containernetworking/cni/pkg/types/current"
    "github.com/containernetworking/cni/pkg/version"
    "github.com/containernetworking/plugins/pkg/ip"
    "github.com/containernetworking/plugins/pkg/ns"
    "github.com/containernetworking/plugins/pkg/utils"
    "github.com/containernetworking/plugins/pkg/utils/sysctl"
    "github.com/coreos/go-iptables/iptables"
    "github.com/j-keck/arping"
    "github.com/vishvananda/netlink"
)
 
// constants for full jitter backoff in milliseconds, and for nodeport marks
const (
    maxSleep             = 10000 // 10.00s
    baseSleep            = 20    //  0.02
    RPFilterTemplate     = "net.ipv4.conf.%s.rp_filter"
    podRulePriority      = 1024
    nodePortRulePriority = 512
)
 
func init() {
    // this ensures that main runs only on main thread (thread group leader).
    // since namespace ops (unshare, setns) are done for a single thread, we
    // must ensure that the goroutine does not jump from OS thread to thread
    runtime.LockOSThread()
}
 
// PluginConf is whatever you expect your configuration json to be. This is whatever
// is passed in on stdin. Your plugin may wish to expose its functionality via
// runtime args, see CONVENTIONS.md in the CNI spec.
type PluginConf struct {
    types.NetConf
 
    // This is the previous result, when called in the context of a chained
    // plugin. Because this plugin supports multiple versions, we'll have to
    // parse this in two passes. If your plugin is not chained, this can be
    // removed (though you may wish to error if a non-chainable plugin is
    // chained.
    // If you need to modify the result before returning it, you will need
    // to actually convert it to a concrete versioned struct.
    RawPrevResult *map[string]interface{} `json:"prevResult"`
    PrevResult    *current.Result         `json:"-"`
 
    IPMasq             bool   `json:"ipMasq"`
    HostInterface      string `json:"hostInterface"`
    ServiceCidr        string `json:"serviceCidr"`
    ContainerInterface string `json:"containerInterface"`
    MTU                int    `json:"mtu"`
    TableStart         int    `json:"routeTableStart"`
    NodePortMark       int    `json:"nodePortMark"`
    NodePorts          string `json:"nodePorts"`
}
 
// parseConfig parses the supplied configuration (and prevResult) from stdin.
func parseConfig(stdin []byte) (*PluginConf, error) {
    conf := PluginConf{}
 
    if err := json.Unmarshal(stdin, &conf); err != nil {
        return nil, fmt.Errorf("failed to parse network configuration: %v", err)
    }
 
    // Parse previous result.
    if conf.RawPrevResult != nil {
        resultBytes, err := json.Marshal(conf.RawPrevResult)
        if err != nil {
            return nil, fmt.Errorf("could not serialize prevResult: %v", err)
        }
        res, err := version.NewResult(conf.CNIVersion, resultBytes)
        if err != nil {
            return nil, fmt.Errorf("could not parse prevResult: %v", err)
        }
        conf.RawPrevResult = nil
        conf.PrevResult, err = current.NewResultFromResult(res)
        if err != nil {
            return nil, fmt.Errorf("could not convert result to current version: %v", err)
        }
    }
    // End previous result parsing
 
    if conf.HostInterface == "" {
        return nil, fmt.Errorf("hostInterface must be specified")
    }
 
    if conf.ContainerInterface == "" {
        return nil, fmt.Errorf("containerInterface must be specified")
    }
 
    if conf.NodePorts == "" {
        conf.NodePorts = "30000:32767"
    }
 
    if conf.NodePortMark == 0 {
        conf.NodePortMark = 0x2000
    }
 
    // start using tables by default at 256
    if conf.TableStart == 0 {
        conf.TableStart = 256
    }
 
    return &conf, nil
}
 
func cmdAdd(args *skel.CmdArgs) error {
    conf, err := parseConfig(args.StdinData)
    if err != nil {
        return err
    }
 
    // 必须作为插件链调用,并且有前置插件
    if conf.PrevResult == nil {
        return fmt.Errorf("must be called as chained plugin")
    }
 
    // 从前序插件的结果中得到容器IP列表
    containerIPs := make([]net.IP, 0, len(conf.PrevResult.IPs))
    if conf.CNIVersion != "0.3.0" && conf.CNIVersion != "0.3.1" {
        for _, ip := range conf.PrevResult.IPs {
            containerIPs = append(containerIPs, ip.Address.IP)
        }
    } else {
        for _, ip := range conf.PrevResult.IPs {
            if ip.Interface == nil {
                continue
            }
            intIdx := *ip.Interface
            if intIdx >= 0 && intIdx < len(conf.PrevResult.Interfaces) && conf.PrevResult.Interfaces[intIdx].Name != args.IfName {
                continue
            }
            containerIPs = append(containerIPs, ip.Address.IP)
        }
    }
    if len(containerIPs) == 0 {
        return fmt.Errorf("got no container IPs")
    }
    // 得到宿主机网络接口
    iface, err := netlink.LinkByName(conf.HostInterface)
    if err != nil {
        return fmt.Errorf("failed to lookup %q: %v", conf.HostInterface, err)
    }
    // 得到宿主机网络接口的地址列表
    hostAddrs, err := netlink.AddrList(iface, netlink.FAMILY_ALL)
    if err != nil || len(hostAddrs) == 0 {
        return fmt.Errorf("failed to get host IP addresses for %q: %v", iface, err)
    }
 
    netns, err := ns.GetNS(args.Netns)
    if err != nil {
        return fmt.Errorf("failed to open netns %q: %v", args.Netns, err)
    }
    defer netns.Close()
 
    containerIPV4 := false
    containerIPV6 := false
    for _, ipc := range containerIPs {
        if ipc.To4() != nil {
            containerIPV4 = true
        } else {
            containerIPV6 = true
        }
    }
    // 创建VETH对,并且在容器端进行配置:
    // 1. 如果启用了MASQ,则对出口是args.IfName的流量进行SNAT
    // 2. 为宿主机所有IP添加路由,走veth
    // 3. 为K8S服务IP添加旅游,则veth,网关设置为第一个宿主机IP
    hostInterface, _, err := setupContainerVeth(netns, conf.ServiceCidr, conf.ContainerInterface, conf.MTU,
        hostAddrs, conf.IPMasq, containerIPV4, containerIPV6, args.IfName, conf.PrevResult)
    if err != nil {
        return err
    }
    // 配置容器的宿主端:
    if err = setupHostVeth(hostInterface.Name, hostAddrs, conf.IPMasq, conf.TableStart, conf.PrevResult); err != nil {
        return err
    }
 
    if conf.IPMasq {
        // 在宿主端启用IP转发
        err := enableForwarding(containerIPV4, containerIPV6)
        if err != nil {
            return err
        }
 
        chain := utils.FormatChainName(conf.Name, args.ContainerID)
        comment := utils.FormatComment(conf.Name, args.ContainerID)
        for _, ipc := range containerIPs {
            addrBits := 128
            if ipc.To4() != nil {
                addrBits = 32
            }
            // 对来自容器IP的流量进行SNAT
            if err = ip.SetupIPMasq(&net.IPNet{IP: ipc, Mask: net.CIDRMask(addrBits, addrBits)}, chain, comment); err != nil {
                return err
            }
        }
    }
 
    // 配置NodePort相关的iptables规则
    if err = setupNodePortRule(conf.HostInterface, conf.NodePorts, conf.NodePortMark); err != nil {
        return err
    }
 
    // Pass through the result for the next plugin
    return types.PrintResult(conf.PrevResult, conf.CNIVersion)
}
 
func setupContainerVeth(netns ns.NetNS, serviceCidr string, ifName string, mtu int,
    hostAddrs []netlink.Addr, masq, containerIPV4, containerIPV6 bool, k8sIfName string,
    pr *current.Result) (*current.Interface, *current.Interface, error) {
    hostInterface := &current.Interface{}
    containerInterface := &current.Interface{}
    // 在容器网络命名空间(netns)中执行
    err := netns.Do(func(hostNS ns.NetNS) error {
        // 创建VETH对,一端自动放入hostNS(这个变量netns.Do函数自动提供,为初始网络命名空间)
        hostVeth, contVeth0, err := ip.SetupVeth(ifName, mtu, hostNS)
        if err != nil {
            return err
        }
        hostInterface.Name = hostVeth.Name
        hostInterface.Mac = hostVeth.HardwareAddr.String()
        containerInterface.Name = contVeth0.Name
        // ip.SetupVeth函数不会获取VETH对的peer(第二个接口)的MAC地址,因此这里需要执行查询
        containerNetlinkIface, _ := netlink.LinkByName(contVeth0.Name)
        containerInterface.Mac = containerNetlinkIface.Attrs().HardwareAddr.String()
        containerInterface.Sandbox = netns.Path()
 
        // 本次CNI调用产生的两个网络接口,纳入到Result
        pr.Interfaces = append(pr.Interfaces, hostInterface, containerInterface)
 
        // 后面只是用到index属性,用上面Link对象不行么?
        contVeth, err := net.InterfaceByName(ifName)
        if err != nil {
            return fmt.Errorf("failed to look up %q: %v", ifName, err)
        }
 
        if masq {
            // 支持IPv4/IPv6的IP转发
            err := enableForwarding(containerIPV4, containerIPV6)
            if err != nil {
                return err
            }
 
            // 如果出口网卡是k8sIfName(容器主接口,通常是eth0,kubelet调用galaxy-sdn时提供)
            // ,则进行源地址转换
            err = setupSNAT(k8sIfName, "kube-proxy SNAT")
            if err != nil {
                return fmt.Errorf("failed to enable SNAT on %q: %v", k8sIfName, err)
            }
        }
 
        // 为宿主机的每个网络接口的地址添加路由条目
        for _, ipc := range hostAddrs {
            addrBits := 128
            if ipc.IP.To4() != nil {
                addrBits = 32
            }
            // 这些地址的出口网卡都设置为
            err := netlink.RouteAdd(&netlink.Route{
                LinkIndex: contVeth.Index, // 新添加的VETH
                Scope:     netlink.SCOPE_LINK,
                Dst: &net.IPNet{
                    IP:   ipc.IP,
                    Mask: net.CIDRMask(addrBits, addrBits),
                },
            })
 
            if err != nil {
                return fmt.Errorf("failed to add host route dst %v: %v", ipc.IP, err)
            }
        }
 
        _, serviceNet, err := net.ParseCIDR(serviceCidr)
        if err != nil {
            return fmt.Errorf("failed to parse service cidr :%v", err)
        }
 
        // 将K8S服务网段的路由设置为:出口网卡新添加的VETH,网关为宿主机第一个IP
        err = netlink.RouteAdd(&netlink.Route{
            LinkIndex: contVeth.Index,
            Scope:     netlink.SCOPE_UNIVERSE,
            Dst:       serviceNet,
            // 封包发给宿主机第一个IP/网络接口处理。VETH的宿主端没有连接到什么网络接口
            // 这些网络接口可以作为下一跳,因为和VETH宿主端属于同一网络命名空间
            Gw: hostAddrs[0].IP,
        })
        if err != nil {
            return fmt.Errorf("failed to add service cidr route %v: %v", hostAddrs[0].IP, err)
        }
 
        // 为所有IPv4地址发送免费ARP
        for _, ipc := range pr.IPs {
            if ipc.Version == "4" {
                _ = arping.GratuitousArpOverIface(ipc.Address.IP, *contVeth)
            }
        }
 
        return nil
    })
    if err != nil {
        return nil, nil, err
    }
    return hostInterface, containerInterface, nil
}
 
func enableForwarding(ipv4 bool, ipv6 bool) error {
    if ipv4 {
        err := ip.EnableIP4Forward()
        if err != nil {
            return fmt.Errorf("Could not enable IPv6 forwarding: %v", err)
        }
    }
    if ipv6 {
        err := ip.EnableIP6Forward()
        if err != nil {
            return fmt.Errorf("Could not enable IPv6 forwarding: %v", err)
        }
    }
    return nil
}
 
func setupSNAT(ifName string, comment string) error {
    ipt, err := iptables.NewWithProtocol(iptables.ProtocolIPv4)
    if err != nil {
        return fmt.Errorf("failed to locate iptables: %v", err)
    }
    rulespec := []string{"-o", ifName, "-j", "MASQUERADE"}
    //if ipt.HasRandomFully() {
    //    rulespec = append(rulespec, "--random-fully")
    //}
    rulespec = append(rulespec, "-m", "comment", "--comment", comment)
    return ipt.AppendUnique("nat", "POSTROUTING", rulespec...)
}
 
func setupHostVeth(vethName string, hostAddrs []netlink.Addr, masq bool, tableStart int, result *current.Result) error {
    // no IPs to route
    if len(result.IPs) == 0 {
        return nil
    }
 
    // lookup by name as interface ids might have changed
    veth, err := net.InterfaceByName(vethName)
    if err != nil {
        return fmt.Errorf("failed to lookup %q: %v", vethName, err)
    }
 
    // 对于所有容器IP,出口设置为VETH
    for _, ipc := range result.IPs {
        addrBits := 128
        if ipc.Address.IP.To4() != nil {
            addrBits = 32
        }
 
        err := netlink.RouteAdd(&netlink.Route{
            LinkIndex: veth.Index,
            Scope:     netlink.SCOPE_LINK,
            Dst: &net.IPNet{
                IP:   ipc.Address.IP,
                Mask: net.CIDRMask(addrBits, addrBits),
            },
        })
 
        if err != nil {
            return fmt.Errorf("failed to add host route dst %v: %v", ipc.Address.IP, err)
        }
    }
 
    // 为来自Pod的,目的地址是VPC的配置策略路由
    err = addPolicyRules(veth, result.IPs[0], result.Routes, tableStart)
    if err != nil {
        return fmt.Errorf("failed to add policy rules: %v", err)
    }
 
    // 为所有宿主机端的IP发送免费ARP
    for _, ipc := range hostAddrs {
        if ipc.IP.To4() != nil {
            _ = arping.GratuitousArpOverIface(ipc.IP, *veth)
        }
    }
 
    return nil
}
 
func addPolicyRules(veth *net.Interface, ipc *current.IPConfig, routes []*types.Route, tableStart int) error {
    table := -1
 
    // 对路由,来自先前插件调用的结果,进行排序
    sort.Slice(routes, func(i, j int) bool {
        return routes[i].Dst.String() < routes[j].Dst.String()
    })
 
    // 尝试最多10次,向空表(table slot)写入路由
    for i := 0; i < 10 && table == -1; i++ {
        var err error
        // 寻找空白路由表
        table, err = findFreeTable(tableStart + rand.Intn(1000))
        if err != nil {
            return err
        }
 
        // 将所有路由添加到路由表
        for _, route := range routes {
            err := netlink.RouteAdd(&netlink.Route{
                LinkIndex: veth.Index,     // 出口设置为宿主机VETH
                Dst:       &route.Dst,     // 目标是先前CNI调用发现的路由,这些路由可能来自云提供商(VPC)
                Gw:        ipc.Address.IP, // 将Result的第一个IP作为网关
                Table:     table,          // 写在这个表中
            })
            if err != nil {
                table = -1
                break
            }
        }
 
        if table == -1 {
            // failed to add routes so sleep and try again on a different table
            wait := time.Duration(rand.Intn(int(math.Min(maxSleep,
                baseSleep*math.Pow(2, float64(i)))))) * time.Millisecond
            fmt.Fprintf(os.Stderr, "route table collision, retrying in %v\n", wait)
            time.Sleep(wait)
        }
    }
 
    // ensure we have a route table selected
    if table == -1 {
        return fmt.Errorf("failed to add routes to a free table")
    }
 
    // 创建路由策略
    rule := netlink.NewRule()
    // 如果流量来自VETH宿主端,也就是来自Pod
    rule.IifName = veth.Name
    rule.Table = table
    rule.Priority = podRulePriority
 
    err := netlink.RuleAdd(rule)
    if err != nil {
        return fmt.Errorf("failed to add policy rule %v: %v", rule, err)
    }
 
    return nil
}
 
func findFreeTable(start int) (int, error) {
    allocatedTableIDs := make(map[int]bool)
    // 遍历所有IPv4/IPv6的路由规则
    for _, family := range []int{netlink.FAMILY_V4, netlink.FAMILY_V6} {
        rules, err := netlink.RuleList(family)
        if err != nil {
            return -1, err
        }
        for _, rule := range rules {
            // 收集所有已经占用的table slot
            allocatedTableIDs[rule.Table] = true
        }
    }
    // 寻找第一个空白的slot
    for i := start; i < math.MaxUint32; i++ {
        if !allocatedTableIDs[i] {
            return i, nil
        }
    }
    return -1, fmt.Errorf("failed to find free route table")
}
 
func setupNodePortRule(ifName string, nodePorts string, nodePortMark int) error {
    ipt, err := iptables.NewWithProtocol(iptables.ProtocolIPv4)
    if err != nil {
        return fmt.Errorf("failed to locate iptables: %v", err)
    }
 
    // 确保NodePort流量被正确的标记
    if err := ipt.AppendUnique("mangle", "PREROUTING", "-i", ifName, "-p", "tcp",
        "--dport", nodePorts, "-j", "CONNMARK", "--set-mark", strconv.Itoa(nodePortMark),
        "-m", "comment", "--comment", "NodePort Mark"); err != nil {
        return err
    }
    if err := ipt.AppendUnique("mangle", "PREROUTING", "-i", ifName, "-p", "udp",
        "--dport", nodePorts, "-j", "CONNMARK", "--set-mark", strconv.Itoa(nodePortMark),
        "-m", "comment", "--comment", "NodePort Mark"); err != nil {
        return err
    }
    if err := ipt.AppendUnique("mangle", "PREROUTING", "-i", "veth+", "-j", "CONNMARK",
        "--restore-mark", "-m", "comment", "--comment", "NodePort Mark"); err != nil {
        return err
    }
 
    // 在宿主机网络接口上启用非严格的RP filter
    _, err = sysctl.Sysctl(fmt.Sprintf(RPFilterTemplate, ifName), "2")
    if err != nil {
        return fmt.Errorf("failed to set RP filter to loose for interface %q: %v", ifName, err)
    }
 
    // 对于标记为NodePort的流量,添加策略路由
    rule := netlink.NewRule()
    rule.Mark = nodePortMark
    rule.Table = 254 // main table
    rule.Priority = nodePortRulePriority
 
    exists := false
    rules, err := netlink.RuleList(netlink.FAMILY_V4)
    if err != nil {
        return fmt.Errorf("Unable to retrive IP rules %v", err)
    }
 
    for _, r := range rules {
        if r.Table == rule.Table && r.Mark == rule.Mark && r.Priority == rule.Priority {
            exists = true
            break
        }
    }
    if !exists {
        err := netlink.RuleAdd(rule)
        if err != nil {
            return fmt.Errorf("failed to add policy rule %v: %v", rule, err)
        }
    }
 
    return nil
}
 
// cmdDel is called for DELETE requests
func cmdDel(args *skel.CmdArgs) error {
    conf, err := parseConfig(args.StdinData)
    if err != nil {
        return err
    }
 
    if args.Netns == "" {
        return nil
    }
 
    // 网络命名空间存在,进行清理
    var ipnets []netlink.Addr
    vethPeerIndex := -1
    _ = ns.WithNetNSPath(args.Netns, func(_ ns.NetNS) error {
        var err error
 
        if conf.IPMasq {
            iface, err := netlink.LinkByName(args.IfName)
            if err != nil {
                if err.Error() == "Link not found" {
                    return ip.ErrLinkNotFound
                }
                return fmt.Errorf("failed to lookup %q: %v", args.IfName, err)
            }
            // 从args.IfName(通常是eth0)获取IP地址列表
            ipnets, err = netlink.AddrList(iface, netlink.FAMILY_ALL)
            if err != nil || len(ipnets) == 0 {
                return fmt.Errorf("failed to get IP addresses for %q: %v", args.IfName, err)
            }
        }
 
        vethIface, err := netlink.LinkByName(conf.ContainerInterface)
        if err != nil && err != ip.ErrLinkNotFound {
            return err
        }
        vethPeerIndex, _ = netlink.VethPeerIndex(&netlink.Veth{LinkAttrs: *vethIface.Attrs()})
        return nil
    })
 
    if conf.IPMasq {
        chain := utils.FormatChainName(conf.Name, args.ContainerID)
        comment := utils.FormatComment(conf.Name, args.ContainerID)
        for _, ipn := range ipnets {
            addrBits := 128
            if ipn.IP.To4() != nil {
                addrBits = 32
            }
 
            _ = ip.TeardownIPMasq(&net.IPNet{IP: ipn.IP, Mask: net.CIDRMask(addrBits, addrBits)}, chain, comment)
        }
 
        if vethPeerIndex != -1 {
            link, err := netlink.LinkByIndex(vethPeerIndex)
            if err != nil {
                return nil
            }
 
            rule := netlink.NewRule()
            rule.IifName = link.Attrs().Name
            // ignore errors as we might be called multiple times
            _ = netlink.RuleDel(rule)
            _ = netlink.LinkDel(link)
        }
    }
 
    return nil
}
 
func main() {
    rand.Seed(time.Now().UnixNano())
    skel.PluginMain(cmdAdd, cmdDel, version.All)
}&nbsp;
Galaxy守护进程
初始化

这个守护进程由DaemonSet galaxy提供,在所有K8S节点上运行,入口点如下:

cmd/galaxy/galaxy.go
Go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package main
 
import (
    "math/rand"
    "time"
 
    "github.com/spf13/pflag"
    "k8s.io/component-base/cli/flag"
    "k8s.io/component-base/logs"
    glog "k8s.io/klog"
    "tkestack.io/galaxy/pkg/galaxy"
    "tkestack.io/galaxy/pkg/signal"
    "tkestack.io/galaxy/pkg/utils/ldflags"
)
 
func main() {
    // 随机化
    rand.Seed(time.Now().UTC().UnixNano())
    // 创建Galaxy结构
    galaxy := galaxy.NewGalaxy()
    galaxy.AddFlags(pflag.CommandLine)
    flag.InitFlags()
    logs.InitLogs()
    defer logs.FlushLogs()
 
    ldflags.PrintAndExitIfRequested()
    // 启动Galaxy
    if err := galaxy.Start(); err != nil {
        glog.Fatalf("Error start galaxy: %v", err)
    }
    // 等待信号,并停止Galaxy
    signal.BlockSignalHandler(func() {
        if err := galaxy.Stop(); err != nil {
            glog.Errorf("Error stop galaxy: %v", err)
        }
    })
}

Galaxy结构的规格如下:

Go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
type Galaxy struct {
    // 对应Galaxy主配置文件
    JsonConf
    // 对应命令行选项
    *options.ServerRunOptions
    quitChan  chan struct{}
    dockerCli *docker.DockerInterface
    netConf   map[string]map[string]interface{}
    // 负责处理端口映射
    pmhandler *portmapping.PortMappingHandler
    client    kubernetes.Interface
    pm        *policy.PolicyManager
}
 
type JsonConf struct {
    NetworkConf     []map[string]interface{}
    DefaultNetworks []string
    ENIIPNetwork string
}

初始化过程:

Go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
func NewGalaxy() *Galaxy {
    g := &Galaxy{
        ServerRunOptions: options.NewServerRunOptions(),
        quitChan:         make(chan struct{}),
        netConf:          map[string]map[string]interface{}{},
    }
    return g
}
 
func (g *Galaxy) Init() error {
    if g.JsonConfigPath == "" {
        return fmt.Errorf("json config is required")
    }
    data, err := ioutil.ReadFile(g.JsonConfigPath)
    if err != nil {
        return fmt.Errorf("read json config: %v", err)
    }
    // 解析主配置文件
    if err := json.Unmarshal(data, &g.JsonConf); err != nil {
        return fmt.Errorf("bad config %s: %v", string(data), err)
    }
    glog.Infof("Json Config: %s", string(data))
    // 配置合法性的简单校验
    if err := g.checkNetworkConf(); err != nil {
        return err
    }
    // 需要访问本机Docker
    dockerClient, err := docker.NewDockerInterface()
    if err != nil {
        return err
    }
    g.dockerCli = dockerClient
    g.pmhandler = portmapping.New("")
    return nil
}

启动过程:

Go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
func (g *Galaxy) Start() error {
    // 初始化Galaxy配置
    if err := g.Init(); err != nil {
        return err
    }
    // 创建K8S客户端
    g.initk8sClient()
    // 启动Flannel垃圾回收器
    gc.NewFlannelGC(g.dockerCli, g.quitChan, g.cleanIPtables).Run()
    // 启用或禁用bridge-nf-call-iptables,此参数可以控制网桥转发的封包被不被iptables过滤
    kernel.BridgeNFCallIptables(g.quitChan, g.BridgeNFCallIptables)
    // 配置iptables转发
    kernel.IPForward(g.quitChan, g.IPForward)
    // 启动时,遍历节点上已有的Pod,对于需要进行端口映射的,执行端口映射
    if err := g.setupIPtables(); err != nil {
        return err
    }
    if g.NetworkPolicy {
        g.pm = policy.New(g.client, g.quitChan)
        go wait.Until(g.pm.Run, 3*time.Minute, g.quitChan)
    }
    if g.RouteENI {
        // 使用腾讯云ENI需要关闭反向路径过滤
        kernel.DisableRPFilter(g.quitChan)
        eni.SetupENIs(g.quitChan)
    }
    // 在UDS上监听、启动HTTP服务器、注册路由
    return g.StartServer()
}
CNI请求处理

上述代码结尾的g.StartServer()会调用下面的方法注册路由:

Go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
func (g *Galaxy) installHandlers() {
    ws := new(restful.WebService)
    ws.Route(ws.GET("/cni").To(g.cni))
    ws.Route(ws.POST("/cni").To(g.cni))
    restful.Add(ws)
}
 
// 处理CNI插件转发来的CNI请求
func (g *Galaxy) cni(r *restful.Request, w *restful.Response) {
    data, err := ioutil.ReadAll(r.Request.Body)
    if err != nil {
        glog.Warningf("bad request %v", err)
        http.Error(w, fmt.Sprintf("err read body %v", err), http.StatusBadRequest)
        return
    }
    defer r.Request.Body.Close() // nolint: errcheck
    // 将请求转换为CNIRequest,然后转换为PodRequest
    req, err := galaxyapi.CniRequestToPodRequest(data)
    if err != nil {
        glog.Warningf("bad request %v", err)
        http.Error(w, fmt.Sprintf("%v", err), http.StatusBadRequest)
        return
    }
    req.Path = strings.TrimRight(fmt.Sprintf("%s:%s", req.Path, strings.Join(g.CNIPaths, ":")), ":")
    // 处理CNI请求
    result, err := g.requestFunc(req)
    if err != nil {
        http.Error(w, fmt.Sprintf("%v", err), http.StatusInternalServerError)
    } else {
        // Empty response JSON means success with no body
        w.Header().Set("Content-Type", "application/json")
        if _, err := w.Write(result); err != nil {
            glog.Warningf("Error writing %s HTTP response: %v", req.Command, err)
        }
    }
}

当CNI插件galaxy-sdn调用Galaxy守护进程的/cni端点时,Galaxy会将请求从:

Go
1
2
3
4
5
6
7
// Request sent to the Galaxy by the Galaxy SDN CNI plugin
type CNIRequest struct {
    // CNI environment variables, like CNI_COMMAND and CNI_NETNS
    Env map[string]string `json:"env,omitempty"`
    // CNI configuration passed via stdin to the CNI plugin
    Config []byte `json:"config,omitempty"`
}

转换为PodRequest:

Go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
type PodRequest struct {
    // 需要执行的CNI命令
    Command string
    // 来自环境变量CNI_ARGS
    PodNamespace string
    // 来自环境变量CNI_ARGS
    PodName string
    // kubernetes pod ports
    Ports []k8s.Port
    // 存放操作结果的通道
    Result chan *PodResult
    // 通过环境变量传来的CNI_IFNAME、CNI_COMMAND、CNI_ARGS...
    *skel.CmdArgs
    // Galaxy需要委托其它CNI插件完成工作,这是传递给那些插件的参数
    //                 插件类型    参数名  参数值
    ExtendedCNIArgs map[string]map[string]json.RawMessage
}
 
// 请求处理结果
type PodResult struct {
    Response []byte
    Err error
}

然后调用requestFunc方法处理转换后的CNI请求:

Go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
func (g *Galaxy) requestFunc(req *galaxyapi.PodRequest) (data []byte, err error) {
    start := time.Now()
    glog.Infof("%v, %s+", req, start.Format(time.StampMicro))
    // 处理ADD命令
    if req.Command == cniutil.COMMAND_ADD {
        defer func() {
            glog.Infof("%v, data %s, err %v, %s-", req, string(data), err, start.Format(time.StampMicro))
        }()
        var pod *corev1.Pod
        // 查找Pod
        pod, err = g.getPod(req.PodName, req.PodNamespace)
        if err != nil {
            return
        }
        result, err1 := g.cmdAdd(req, pod)
        if err1 != nil {
            err = err1
            return
        } else {
            // 转换结果格式
            result020, err2 := convertResult(result)
            if err2 != nil {
                err = err2
            } else {
                data, err = json.Marshal(result)
                if err != nil {
                    return
                }
                // 如果处理成功,则执行端口映射,回顾一下启动的时候会对所有本节点现存的Pod进行端口映射
                err = g.setupPortMapping(req, req.ContainerID, result020, pod)
                if err != nil {
                    g.cleanupPortMapping(req)
                    return
                }
                pod.Status.PodIP = result020.IP4.IP.IP.String()
                // 处理网络策略
                if g.pm != nil {
                    if err := g.pm.SyncPodChains(pod); err != nil {
                        glog.Warning(err)
                    }
                    g.pm.SyncPodIPInIPSet(pod, true)
                }
            }
        }
    } else if req.Command == cniutil.COMMAND_DEL {
        defer glog.Infof("%v err %v, %s-", req, err, start.Format(time.StampMicro))
        err = cniutil.CmdDel(req.CmdArgs, -1)
        if err == nil {
            err = g.cleanupPortMapping(req)
        }
    } else {
        err = fmt.Errorf("unknown command %s", req.Command)
    }
    return
}

requestFunc方法就是查询出Pod,将其作为参数的一部分,再调用 cmdAdd方法。需要注意,由于CNI版本很低,仅仅支持ADD/DEL两个命令。

ADD命令的处理逻辑如下:

Go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
func (g *Galaxy) cmdAdd(req *galaxyapi.PodRequest, pod *corev1.Pod) (types.Result, error) {
    // 通过解析Pod的注解、spec特殊字段,来构造出Pod的网络需求
    networkInfos, err := g.resolveNetworks(req, pod)
    if err != nil {
        return nil, err
    }
    return cniutil.CmdAdd(req.CmdArgs, networkInfos)
}
 
func CmdAdd(cmdArgs *skel.CmdArgs, networkInfos []*NetworkInfo) (types.Result, error) {
    // 每个NetworkInfo代表Pod需要加入的一个网络
    if len(networkInfos) == 0 {
        return nil, fmt.Errorf("No network info returned")
    }
    // 将网络需求的JSON格式保存在/var/lib/cni/galaxy/$ContainerID
    if err := saveNetworkInfo(cmdArgs.ContainerID, networkInfos); err != nil {
        return nil, fmt.Errorf("Error save network info %v for %s: %v", networkInfos, cmdArgs.ContainerID, err)
    }
    var (
        err    error
        // 前一个网络的结果
        result types.Result
    )
    // 处理并满足每一个网络需求
    for idx, networkInfo := range networkInfos {
        // 将来自k8s.v1.cni.galaxy.io/args注解的CNI参数添加到参数数组
        cmdArgs.Args = strings.TrimRight(fmt.Sprintf("%s;%s", cmdArgs.Args, BuildCNIArgs(networkInfo.Args)), ";")
        if result != nil {
            networkInfo.Conf["prevResult"] = result
        }
        // 委托给具体的CNI插件的ADD命令
        result, err = DelegateAdd(networkInfo.Conf, cmdArgs, networkInfo.IfName)
        if err != nil {
            // 如果失败,删除所有已经创建的CNI
            glog.Errorf("fail to add network %s: %v, begin to rollback and delete it", networkInfo.Args, err)
            // 从idx开始,倒过来依次为每种网络需求调用DEL命令
            delErr := CmdDel(cmdArgs, idx)
            glog.Warningf("fail to delete cni in rollback %v", delErr)
            return nil, fmt.Errorf("fail to establish network %s:%v", networkInfo.Args, err)
        }
    }
    if err != nil {
        return nil, err
    }
    return result, nil
}

处理每一个网络需求时,会委托给相应的CNI插件:

Go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// 每个插件的NetConf是在前面resolveNetworks时,从confdir=/etc/cni/net.d/读取出来的
func DelegateAdd(netconf map[string]interface{}, args *skel.CmdArgs, ifName string) (types.Result, error) {
    netconfBytes, err := json.Marshal(netconf)
    if err != nil {
        return nil, fmt.Errorf("error serializing delegate netconf: %v", err)
    }
    typ, err := getNetworkType(netconf)
    if err != nil {
        return nil, err
    }
    pluginPath, err := invoke.FindInPath(typ, strings.Split(args.Path, ":"))
    if err != nil {
        return nil, err
    }
    // 通过命令行调用
    glog.Infof("delegate add %s args %s conf %s", args.ContainerID, args.Args, string(netconfBytes))
    return invoke.ExecPluginWithResult(pluginPath, netconfBytes, &invoke.Args{
        Command:       "ADD",
        ContainerID:   args.ContainerID,
        NetNS:         args.Netns,
        PluginArgsStr: args.Args,
        IfName:        ifName,
        Path:          args.Path,
    })
}
小结

这里做个小结:

  1. 工作负载的网络需求(需要加入到哪些CNI网络),可以通过注解、Spec配置
  2. Galaxy通过读取Pod注解、Spec,构造出网络列表
  3. Galaxy会遍历CNI网络列表,依次通过命令行调用对应的CNI插件的ADD命令
  4. 如果遍历过程中出错,逆序的调用已经ADD的CNI插件的DEL命令

第3、4其实就是新版本的CNI中NetworkList提供的能力。

Galaxy IPAM

和Galaxy守护进程一样,Galaxy IPAM也是一个HTTP服务器。只是前者仅仅供本机的CNI插件galaxy-sdn调用,因此使用UDS,而Galaxy IPAM是供Scheduler调用,因此使用TCP。

初始化

入口点如下:

cmd/galaxy-ipam/galaxy-ipam.go
Go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func main() {
    // initialize rand seed
    rand.Seed(time.Now().UTC().UnixNano())
 
    s := server.NewServer()
    // add command line args
    s.AddFlags(pflag.CommandLine)
 
    flag.InitFlags()
    logs.InitLogs()
    defer logs.FlushLogs()
 
    // if checking version, print it and exit
    ldflags.PrintAndExitIfRequested()
 
    if err := s.Start(); err != nil {
        fmt.Fprintf(os.Stderr, "%v\n", err) // nolint: errcheck
        os.Exit(1)
    }
}

启动Galaxy IPAM服务器: 

Go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
func (s *Server) Start() error {
    if err := s.init(); err != nil {
        return fmt.Errorf("init server: %v", err)
    }
    // 启用InformerFactory监控K8S资源变化
    s.StartInformers(s.stopChan)
    // 多实例部署的Leader选举
    if s.LeaderElection.LeaderElect && s.leaderElectionConfig != nil {
        leaderelection.RunOrDie(context.Background(), *s.leaderElectionConfig)
        return nil
    }
    return s.Run()
}
 
// Galaxy IPAM初始化
func (s *Server) init() error {
    // 读取配置文件
    if options.JsonConfigPath == "" {
        return fmt.Errorf("json config is required")
    }
    data, err := ioutil.ReadFile(options.JsonConfigPath)
    if err != nil {
        return fmt.Errorf("read json config: %v", err)
    }
    if err := json.Unmarshal(data, &s.JsonConf); err != nil {
        return fmt.Errorf("bad config %s: %v", string(data), err)
    }
    // 初始化K8S客户端、IPAMContext
    s.initk8sClient()
    // 此浮动IP插件,实现了PodWatcher接口,能够监听Pod的生命周期事件
    s.plugin, err = schedulerplugin.NewFloatingIPPlugin(s.SchedulePluginConf, s.IPAMContext)
    if err != nil {
        return err
    }
    // 当有Pod事件后,调用浮动IP插件
    s.PodInformer.Informer().AddEventHandler(eventhandler.NewPodEventHandler(s.plugin))
    return nil
}
 
func (s *Server) Run() error {
    // 初始化浮动IP插件
    if err := s.plugin.Init(); err != nil {
        return err
    }
    // 运行浮动IP插件
    s.plugin.Run(s.stopChan)
    // 注册API路由
    go s.startAPIServer()
    // 注册供Scheduler调度的路由 /v1/filter、/v1/priority、/v1/bind
    s.startServer()
    return nil
}
FloatingIPPlugin

在上面的初始化流程中我们看到,在运行Galaxy IPAM服务器时,会初始化并运行s.plugin。类型为FloatingIPPlugin。

它是一个Pod事件监听器,实现接口:

Go
1
2
3
4
5
type PodWatcher interface {
    AddPod(pod *corev1.Pod) error
    UpdatePod(oldPod, newPod *corev1.Pod) error
    DeletePod(pod *corev1.Pod) error
}

当Pod事件发生后,上述方法会被调用。

它的初始化逻辑如下:

Go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
//                                  这个上下文包含各种客户端、Lister、Informer
func NewFloatingIPPlugin(conf Conf, ctx *context.IPAMContext) (*FloatingIPPlugin, error) {
    conf.validate()
    glog.Infof("floating ip config: %v", conf)
    plugin := &FloatingIPPlugin{
        nodeSubnet:  make(map[string]*net.IPNet),
        IPAMContext: ctx,
        conf:        &conf,
        unreleased:  make(chan *releaseEvent, 50000),
        // 这是一个哈希,每个key都可以被请求加锁
        dpLockPool:  keymutex.NewHashed(500000),
        podLockPool: keymutex.NewHashed(500000),
        crdKey:      NewCrdKey(ctx.ExtensionLister),
        crdCache:    crd.NewCrdCache(ctx.DynamicClient, ctx.ExtensionLister, 0),
    }
    // 初始化IPAM
    plugin.ipam = floatingip.NewCrdIPAM(ctx.GalaxyClient, plugin.FIPInformer)
    if conf.CloudProviderGRPCAddr != "" {
        plugin.cloudProvider = cloudprovider.NewGRPCCloudProvider(conf.CloudProviderGRPCAddr)
    }
    return plugin, nil
}
 
func (p *FloatingIPPlugin) Init() error {
    if len(p.conf.FloatingIPs) > 0 {
        // 初始化IP池
        if err := p.ipam.ConfigurePool(p.conf.FloatingIPs); err != nil {
            return err
        }
    } else {
        // 配置文件中没有浮动IP,从ConfigMap中拉取
        glog.Infof("empty floatingips from config, fetching from configmap")
        if err := wait.PollInfinite(time.Second, func() (done bool, err error) {
            updated, err := p.updateConfigMap()
            if err != nil {
                glog.Warning(err)
            }
            return updated, nil
        }); err != nil {
            return fmt.Errorf("failed to get floatingip config from configmap: %v", err)
        }
    }
    glog.Infof("plugin init done")
    return nil
}

它的run方法的逻辑,主要包括三部分:

Go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
func (p *FloatingIPPlugin) Run(stop chan struct{}) {
    if len(p.conf.FloatingIPs) == 0 {
        go wait.Until(func() {
// 1. 定期刷新ConfigMap floatingip-config,获取最新的浮动IP配置,并调用ipam.ConfigurePool配置IP池
            if _, err := p.updateConfigMap(); err != nil {
                glog.Warning(err)
            }
        }, time.Minute, stop)
    }
    go wait.Until(func() {
// 2. 定期同步Pod状态,必要的情况下进行IP释放
        if err := p.resyncPod(); err != nil {
            glog.Warningf("resync pod: %v", err)
        }
        p.syncPodIPsIntoDB()
    }, time.Duration(p.conf.ResyncInterval)*time.Minute, stop)
    for i := 0; i < 5; i++ {
// 3. 从通道中读取IP释放事件,解除IP和Pod的绑定关系
        go p.loop(stop)
    }
}
 
func (p *FloatingIPPlugin) updateConfigMap() (bool, error) {
    // 拉取ConfigMap floatingip-config
    cm, err := p.Client.CoreV1().ConfigMaps(p.conf.ConfigMapNamespace).Get(p.conf.ConfigMapName, v1.GetOptions{})
    if err != nil {
        return false, fmt.Errorf("failed to get floatingip configmap %s_%s: %v", p.conf.ConfigMapName,
            p.conf.ConfigMapNamespace, err)
    }
    val, ok := cm.Data[p.conf.FloatingIPKey]
    if !ok {
        return false, fmt.Errorf("configmap %s_%s doesn't have a key floatingips", p.conf.ConfigMapName,
            p.conf.ConfigMapNamespace)
    }
    var updated bool
    // 调用IPAM,更新IP池
    if updated, err = p.ensureIPAMConf(&p.lastIPConf, val); err != nil {
        return false, err
    }
    defer func() {
        if !updated {
            return
        }
        // 浮动IP配置更新,则节点允许的子网信息被清空
        p.nodeSubnetLock.Lock()
        defer p.nodeSubnetLock.Unlock()
        p.nodeSubnet = map[string]*net.IPNet{}
    }()
    return true, nil
}
 
// 释放以下Pod的IP
// 1. 所有者TAPP不存在的、已被删除的Pod
// 2. 所有者StatefulSet/Deployment存在、已被删除的Pod,但是没有配置IP为不变的
// 3. 所有者Deployment不需要这么多IP的、已被删除的的Pod
// 4. 所有者StatefulSet的副本数超过小于被删除Pod索引的
// 5. 未被删除,但是被驱逐的Pod
func (p *FloatingIPPlugin) resyncPod() error {
    glog.V(4).Infof("resync pods+")
    defer glog.V(4).Infof("resync pods-")
    resyncMeta := &resyncMeta{}
    if err := p.fetchChecklist(resyncMeta); err != nil {
        return err
    }
    p.resyncAllocatedIPs(resyncMeta)
    return nil
}
 
// 拉取并处理待释放事件
func (p *FloatingIPPlugin) loop(stop chan struct{}) {
    for {
        select {
        case <-stop:
            return
        case event := <-p.unreleased:
            go func(event *releaseEvent) {
                // 解除Pod和IP地址的绑定关系
                if err := p.unbind(event.pod); err != nil {
                    event.retryTimes++
                    if event.retryTimes > 3 {
                        // leave it to resync to protect chan from explosion
                        glog.Errorf("abort unbind for pod %s, retried %d times: %v", util.PodName(event.pod),
                            event.retryTimes, err)
                    } else {
                        glog.Warningf("unbind pod %s failed for %d times: %v", util.PodName(event.pod),
                            event.retryTimes, err)
                        // backoff time if required
                        time.Sleep(100 * time.Millisecond * time.Duration(event.retryTimes))
                        p.unreleased <- event
                    }
                }
            }(event)
        }
    }
}

当监听到Pod事件时,可能需要和IPAM同步Pod IP,或者产生一个待释放事件:

Go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// AddPod does nothing
func (p *FloatingIPPlugin) AddPod(pod *corev1.Pod) error {
    return nil
}
 
// Pod更新时,和IPAM同步Pod IP
func (p *FloatingIPPlugin) UpdatePod(oldPod, newPod *corev1.Pod) error {
    if !p.hasResourceName(&newPod.Spec) {
        return nil
    }
    // 先前的pod.Status.Phase不是终点状态(Failed/Succeeded),这一次是
    // 意味着Pod运行完毕,需要释放IP。通常是Job类Pod
    if !finished(oldPod) && finished(newPod) {
        // Deployments will leave evicted pods
        // If it's a evicted one, release its ip
        glog.Infof("release ip from %s_%s, phase %s", newPod.Name, newPod.Namespace, string(newPod.Status.Phase))
        p.unreleased <- &releaseEvent{pod: newPod}
        return nil
    }
    // 同步Pod IP
    if err := p.syncPodIP(newPod); err != nil {
        glog.Warningf("failed to sync pod ip: %v", err)
    }
    return nil
}
 
func (p *FloatingIPPlugin) DeletePod(pod *corev1.Pod) error {
    if !p.hasResourceName(&pod.Spec) {
        return nil
    }
    glog.Infof("handle pod delete event: %s_%s", pod.Name, pod.Namespace)
    // Pod删除后,产生一个待释放IP事件
    p.unreleased <- &releaseEvent{pod: pod}
    return nil
}

和IPAM同步Pod IP的逻辑:

Go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
func (p *FloatingIPPlugin) syncPodIP(pod *corev1.Pod) error {
    // 只有到达Running状态的,才同步
    if pod.Status.Phase != corev1.PodRunning {
        return nil
    }
    // 同步干的事情是,如果Pod就有ipinfos注解,并且IP地址没有在池中分配,那么在池中将IP分配给Pod
    if pod.Annotations == nil {
        return nil
    }
    // 这个有点意思,lockpod会立即调用,并且返回一个unlock函数,此函数会在syncPodIP退出时执行。代码简洁
    // 防止有其它线程再操作此Pod
    defer p.lockPod(pod.Name, pod.Namespace)()
    keyObj, err := util.FormatKey(pod)
    if err != nil {
        glog.V(5).Infof("sync pod %s/%s ip formatKey with error %v", pod.Namespace, pod.Name, err)
        return nil
    }
    cniArgs, err := constant.UnmarshalCniArgs(pod.Annotations[constant.ExtendedCNIArgsAnnotation])
    if err != nil {
        return err
    }
    ipInfos := cniArgs.Common.IPInfos
    for i := range ipInfos {
        if ipInfos[i].IP == nil || ipInfos[i].IP.IP == nil {
            continue
        }
        // 遍历所有注解中的IP地址,调用IPAM的AllocateSpecificIP方法分配IP
        if err := p.syncIP(keyObj.KeyInDB, ipInfos[i].IP.IP, pod); err != nil {
            glog.Warningf("sync pod %s ip %s: %v", keyObj.KeyInDB, ipInfos[i].IP.IP.String(), err)
        }
    }
    return nil
}

小结一下FloatingIPPlugin的职责。它主要负责监控ConfigMap、Pod的变化。当ConfigMap变化后更新IPAM的配置,当Pod发生变化时,更新IPAM池中的IP分配信息。

此外FloatingIPPlugin还提供了调度的核心算法,包括Filter、Prioritize、Bind这几个方法,我们在后面在探讨。

IPAM

FloatingIPPlugin.ipam字段,代表了IP地址分配管理的核心,它的接口:

Go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
type IPAM interface {
    // 初始化IP池
    ConfigurePool([]*FloatingIPPool) error
    // 释放映射中键匹配的IP,返回已分配、未分配IP地址的映射
    ReleaseIPs(map[string]string) (map[string]string, map[string]string, error)
    // 为Pod分配指定的IP
    AllocateSpecificIP(string, net.IP, Attr) error
    // 在子网中分配IP
    AllocateInSubnet(string, *net.IPNet, Attr) (net.IP, error)
    // 在给定的多个IP范围中,都分配一个IP
    AllocateInSubnetsAndIPRange(string, *net.IPNet, [][]nets.IPRange, Attr) ([]net.IP, error)
    // 在指定的子网中,以指定的key分配IP
    AllocateInSubnetWithKey(oldK, newK, subnet string, attr Attr) error
    // 保留IP
    ReserveIP(oldK, newK string, attr Attr) (bool, error)
    // 释放IP
    Release(string, net.IP) error
    // 根据Key返回的一个匹配
    First(string) (*FloatingIPInfo, error)
    // 将IP地址转换为浮动IP对象
    ByIP(net.IP) (FloatingIP, error)
    // 根据前缀查询
    ByPrefix(string) ([]*FloatingIPInfo, error)
    // 根据关键字查询
    ByKeyword(string) ([]FloatingIP, error)
    // 返回节点的子网信息
    NodeSubnet(net.IP) *net.IPNet
} 
调度器扩展

Galaxy IPAM对接K8S调度器,使用的是Scheduler Extender Webhook,也就是它提供若干HTTP接口供调度器使用。

这些接口实现为Galaxy IPAM Server的方法,在startServer()的时候注册:

Go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
// 筛选匹配的节点
func (s *Server) filter(request *restful.Request, response *restful.Response) {
    // K8S标准API,ExtenderArgs包含了当前正被调度的Pod,以及可用的(已经被筛选过的)节点列表
    args := new(schedulerapi.ExtenderArgs)
    if err := request.ReadEntity(&args); err != nil {
        glog.Error(err)
        _ = response.WriteError(http.StatusInternalServerError, err)
        return
    }
    glog.V(5).Infof("POST filter %v", *args)
    start := time.Now()
    glog.V(3).Infof("filtering %s_%s, start at %d+", args.Pod.Name, args.Pod.Namespace, start.UnixNano())
    // 调用FloatingIPPlugin
    filteredNodes, failedNodesMap, err := s.plugin.Filter(&args.Pod, args.Nodes.Items)
    glog.V(3).Infof("filtering %s_%s, start at %d-", args.Pod.Name, args.Pod.Namespace, start.UnixNano())
    args.Nodes.Items = filteredNodes
    errStr := ""
    if err != nil {
        errStr = err.Error()
    }
    _ = response.WriteEntity(schedulerapi.ExtenderFilterResult{
        // 可以调度的节点
        Nodes:       args.Nodes,
        // 不可调度的节点,以及不可调度的原因
        FailedNodes: failedNodesMap,
        // 错误信息
        Error:       errStr,
    })
}
 
// 节点优先级判定(评分)
func (s *Server) priority(request *restful.Request, response *restful.Response) {
    args := new(schedulerapi.ExtenderArgs)
    if err := request.ReadEntity(&args); err != nil {
        glog.Error(err)
        _ = response.WriteError(http.StatusInternalServerError, err)
        return
    }
    glog.V(5).Infof("POST priority %v", *args)
    // 调用FloatingIPPlugin
    hostPriorityList, err := s.plugin.Prioritize(&args.Pod, args.Nodes.Items)
    if err != nil {
        glog.Warningf("prioritize err: %v", err)
    }
    _ = response.WriteEntity(*hostPriorityList)
}
 
 
// 绑定Pod到节点
func (s *Server) bind(request *restful.Request, response *restful.Response) {
    args := new(schedulerapi.ExtenderBindingArgs)
    if err := request.ReadEntity(&args); err != nil {
        glog.Error(err)
        _ = response.WriteError(http.StatusInternalServerError, err)
        return
    }
    glog.V(5).Infof("POST bind %v", *args)
    start := time.Now()
    glog.V(3).Infof("binding %s_%s to %s, start at %d+", args.PodName, args.PodNamespace, args.Node, start.UnixNano())
    // 调用FloatingIPPlugin
    err := s.plugin.Bind(args)
    glog.V(3).Infof("binding %s_%s to %s, start at %d-", args.PodName, args.PodNamespace, args.Node, start.UnixNano())
    var result schedulerapi.ExtenderBindingResult
    if err != nil {
        glog.Warningf("bind err: %v", err)
        result.Error = err.Error()
    }
    _ = response.WriteEntity(result)
}

这些方法仅仅是负责HTTP相关处理,核心是上文我们提到的FloatingIPPlugin的几个方法:

Go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
// 没有可用IP的节点被标记为failedNodes
// 如果Pod不需要浮动IP,则failedNodes为空
func (p *FloatingIPPlugin) Filter(pod *corev1.Pod, nodes []corev1.Node) (
    []corev1.Node, schedulerapi.FailedNodesMap, error) {
    start := time.Now()
    failedNodesMap := schedulerapi.FailedNodesMap{}
    // 没有指定自定义资源tke.cloud.tencent.com/eni-ip的request,认为不需要浮动IP
    if !p.hasResourceName(&pod.Spec) {
        return nodes, failedNodesMap, nil
    }
    filteredNodes := []corev1.Node{}
    // 开始过滤,期间锁定针对此Pod的操作
    defer p.lockPod(pod.Name, pod.Namespace)()
    // 读取注解k8s.v1.cni.galaxy.io/args,获取Pod要加入的子网信息
    subnetSet, err := p.getSubnet(pod)
    if err != nil {
        return filteredNodes, failedNodesMap, err
    }
    // 遍历所有节点
    for i := range nodes {
        nodeName := nodes[i].Name
        // 得到节点所属的子网
        subnet, err := p.getNodeSubnet(&nodes[i])
        if err != nil {
            failedNodesMap[nodes[i].Name] = err.Error()
            continue
        }
        // 如果节点可以分配Pod要加入的子网,则保留节点,否则,丢弃节点
        if subnetSet.Has(subnet.String()) {
            filteredNodes = append(filteredNodes, nodes[i])
        } else {
            failedNodesMap[nodeName] = "FloatingIPPlugin:NoFIPLeft"
        }
    }
    if glog.V(5) {
        nodeNames := make([]string, len(filteredNodes))
        for i := range filteredNodes {
            nodeNames[i] = filteredNodes[i].Name
        }
        glog.V(5).Infof("filtered nodes %v failed nodes %v for %s_%s", nodeNames, failedNodesMap,
            pod.Namespace, pod.Name)
    }
    metrics.ScheduleLatency.WithLabelValues("filter").Observe(time.Since(start).Seconds())
    return filteredNodes, failedNodesMap, nil
}
 
// 打分目前是空实现
func (p *FloatingIPPlugin) Prioritize(pod *corev1.Pod, nodes []corev1.Node) (*schedulerapi.HostPriorityList, error) {
    list := &schedulerapi.HostPriorityList{}
    if !p.hasResourceName(&pod.Spec) {
        return list, nil
    }
    //TODO
    return list, nil
}
 
// 将新的浮动IP绑定给Pod,或者重用以有的IP
func (p *FloatingIPPlugin) Bind(args *schedulerapi.ExtenderBindingArgs) error {
    start := time.Now()
    pod, err := p.PodLister.Pods(args.PodNamespace).Get(args.PodName)
    if err != nil {
        return fmt.Errorf("failed to find pod %s: %w", util.Join(args.PodName, args.PodNamespace), err)
    }
    if !p.hasResourceName(&pod.Spec) {
        return fmt.Errorf("pod which doesn't want floatingip have been sent to plugin")
    }
    defer p.lockPod(pod.Name, pod.Namespace)()
    // 为Pod生成键,键由要加入的池、Pod命名空间、Pod名字、所属StatefulSet/Deployment的名字构成
    keyObj, err := util.FormatKey(pod)
    if err != nil {
        return err
    }
    // 分配IP给Pod
    cniArgs, err := p.allocateIP(keyObj.KeyInDB, args.Node, pod)
    if err != nil {
        return err
    }
    data, err := json.Marshal(cniArgs)
    if err != nil {
        return fmt.Errorf("marshal cni args %v: %v", *cniArgs, err)
    }
    // 添加k8s.v1.cni.galaxy.io/args注解
    bindAnnotation := map[string]string{constant.ExtendedCNIArgsAnnotation: string(data)}
    var err1 error
    if err := wait.PollImmediate(time.Millisecond*500, 3*time.Second, func() (bool, error) {
        // 执行绑定操作,为Pod添加binding子资源
        if err := p.Client.CoreV1().Pods(args.PodNamespace).Bind(&corev1.Binding{
            ObjectMeta: v1.ObjectMeta{Namespace: args.PodNamespace, Name: args.PodName, UID: args.PodUID,
                Annotations: bindAnnotation},
            Target: corev1.ObjectReference{
                Kind: "Node",
                Name: args.Node,
            },
        }); err != nil {
            err1 = err
            if apierrors.IsNotFound(err) {
                // Pod已经不存在了,终止轮询
                return false, err
            }
            return false, nil
        }
        glog.Infof("bind pod %s to %s with %s", keyObj.KeyInDB, args.Node, string(data))
        return true, nil
    }); err != nil {
        if apierrors.IsNotFound(err1) {
            // 绑定过程中发现Pod消失,说明有人将其删除了。已经分配的IP需要回收
            glog.Infof("binding returns not found for pod %s, putting it into unreleased chan", keyObj.KeyInDB)
            // attach ip annotation
            p.unreleased <- &releaseEvent{pod: pod}
        }
        // If fails to update, depending on resync to update
        return fmt.Errorf("update pod %s: %w", keyObj.KeyInDB, err1)
    }
    metrics.ScheduleLatency.WithLabelValues("bind").Observe(time.Since(start).Seconds())
    return nil
}

我们再来看一下在Bind阶段,分配IP的细节:

Go
1
2
3
4
5
6
func (p *FloatingIPPlugin) allocateIP(key string, nodeName string, pod *corev1.Pod) (*constant.CniArgs, error) {
    cniArgs, err := getPodCniArgs(pod)
    if err != nil {
        return nil, err
    }
    ipranges := cniArgs.RequestIPRange

getPodCniArgs读取k8s.v1.cni.galaxy.io/args注解为结构: 

Go
1
2
3
4
5
6
type CniArgs struct {
    // 用户可以指定从什么地址范围分配IP
    RequestIPRange [][]nets.IPRange `json:"request_ip_range,omitempty"`
    // 这些参数最终会传递给底层CNI插件,作为key1=val1;key2=val2格式的参数
    Common CommonCniArgs `json:"common"`
}

allocateIP会使用key去查询已经分配给key的、每个iprange中的IP信息:

Go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
    ipInfos, err := p.ipam.ByKeyAndIPRanges(key, ipranges)
    if err != nil {
        return nil, fmt.Errorf("failed to query floating ip by key %s: %v", key, err)
    }
    if len(ipranges) == 0 && len(ipInfos) > 0 {
        // reuse only one if requesting only one ip
        ipInfos = ipInfos[:1]
    }
    // 那些尚未分配IP地址的ipranges
    var unallocatedIPRange [][]nets.IPRange // those does not have allocated ips
    reservedIPs := sets.NewString()
    for i := range ipInfos {
        if ipInfos[i] == nil {
            // 未分配
            unallocatedIPRange = append(unallocatedIPRange, ipranges[i])
        } else {
            // 已经分配给当前key,需要保留
            reservedIPs.Insert(ipInfos[i].IP.String())
        }
    }
    policy := parseReleasePolicy(&pod.ObjectMeta)
    attr := floatingip.Attr{Policy: policy, NodeName: nodeName, Uid: string(pod.UID)}

检查从IPAM获取的ipinfos,如果PodUid和当前Pod.GetUID(),说明可能先前版本的Pod尚未删除,等待删除(bind函数返回错误):

Go
1
2
3
4
5
6
7
    for _, ipInfo := range ipInfos {
        // check if uid missmatch, if we delete a statfulset/tapp and creates a same name statfulset/tapp immediately,
        // galaxy-ipam may receive bind event for new pod early than deleting event for old pod
        if ipInfo != nil && ipInfo.PodUid != "" && ipInfo.PodUid != string(pod.GetUID()) {
            return nil, fmt.Errorf("waiting for delete event of %s before reuse this ip", key)
        }
    }

如果有需要新分配的IP,调用IPAM进行分配:

Go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
    if len(unallocatedIPRange) > 0 || len(ipInfos) == 0 {
        // 节点所属的子网
        subnet, err := p.queryNodeSubnet(nodeName)
        if err != nil {
            return nil, err
        }
        // 执行分配
        if _, err := p.ipam.AllocateInSubnetsAndIPRange(key, subnet, unallocatedIPRange, attr); err != nil {
            return nil, err
        }
        // 更新IP信息
        ipInfos, err = p.ipam.ByKeyAndIPRanges(key, ipranges)
        if err != nil {
            return nil, fmt.Errorf("failed to query floating ip by key %s: %v", key, err)
        }
    }

检查已分配IP重用的情况,并更新IPAM属性: 

Go
1
2
3
4
5
6
7
8
9
    for _, ipInfo := range ipInfos {
        //...
        if reservedIPs.Has(ipInfo.IP.String()) {
            glog.Infof("%s reused %s, updating attr to %v", key, ipInfo.IPInfo.IP.String(), attr)
            if err := p.ipam.UpdateAttr(key, ipInfo.IPInfo.IP.IP, attr); err != nil {
                return nil, fmt.Errorf("failed to update floating ip release policy: %v", err)
            }
        }
    }

回填Pod注解:

Go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
    // 新分配的IP地址
    var allocatedIPs []string
    // 所有IP地址
    var ret []constant.IPInfo
    for _, ipInfo := range ipInfos {
        if !reservedIPs.Has(ipInfo.IP.String()) {
            allocatedIPs = append(allocatedIPs, ipInfo.IP.String())
        }
        ret = append(ret, ipInfo.IPInfo)
    }
    glog.Infof("%s reused ips %v, allocated ips %v, attr %v", key, reservedIPs.List(), allocatedIPs, attr)
    // 回填
    cniArgs.Common.IPInfos = ret
    return &cniArgs, nil
小结

当一个Pod创建时,如果它的Spec上具有自定义的资源请求tke.cloud.tencent.com/eni-ip,则调度器认为,需要为它分配浮动IP。

由于在floatingip-config中,可以配置多组浮动IP子网。Pod应该通过注解:

  1. k8s.v1.cni.galaxy.io/args 来声明自己需要加入什么子网、需要请求什么范围的IP地址
  2. tke.cloud.tencent.com/eni-ip-pool 来声明需要使用哪个IP池

Pod创建后,首先进入调度阶段。作为K8S调度器扩展的Galaxy IPAM会按需进行IP分配,并将结果写入到Pod的注解中。

当Kubelet在本地启动Pod时, 会调用CNI插件galaxy-sdn,galaxy-sdn则会调用Galaxy守护进程。后者则通过命令行调用galaxy-k8s-vlan等底层CNI插件,并且将Pod注解k8s.v1.cni.galaxy.io/args中的键值转换为key1=val1;key2=val2CNI参数传递。

galaxy-k8s-vlan插件会调用ipam.Allocate,获取CNI参数中的IP地址信息,并转换为v0.20格式的Result,然后调用setupNetwork将IP地址设置到容器的网络接口上。

← Istio中的透明代理问题
内核缺陷触发的NodePort服务63秒延迟问题 →

Leave a Reply Cancel reply

Your email address will not be published. Required fields are marked *

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code class="" title="" data-url=""> <del datetime=""> <em> <i> <q cite=""> <strike> <strong> <pre class="" title="" data-url=""> <span class="" title="" data-url="">

Related Posts

  • 基于Calico的CNI
  • Cilium学习笔记
  • CNI学习笔记
  • Flannel学习笔记
  • Envoy学习笔记

Recent Posts

  • Investigating and Solving the Issue of Failed Certificate Request with ZeroSSL and Cert-Manager
  • A Comprehensive Study of Kotlin for Java Developers
  • 背诵营笔记
  • 利用LangChain和语言模型交互
  • 享学营笔记
ABOUT ME

汪震 | Alex Wong

江苏淮安人,现居北京。目前供职于腾讯云,专注容器方向。

GitHub:gmemcc

Git:git.gmem.cc

Email:gmemjunk@gmem.cc@me.com

ABOUT GMEM

绿色记忆是我的个人网站,域名gmem.cc中G是Green的简写,MEM是Memory的简写,CC则是我的小天使彩彩名字的简写。

我在这里记录自己的工作与生活,同时和大家分享一些编程方面的知识。

GMEM HISTORY
v2.00:微风
v1.03:单车旅行
v1.02:夏日版
v1.01:未完成
v0.10:彩虹天堂
v0.01:阳光海岸
MIRROR INFO
Meta
  • Log in
  • Entries RSS
  • Comments RSS
  • WordPress.org
Recent Posts
  • Investigating and Solving the Issue of Failed Certificate Request with ZeroSSL and Cert-Manager
    In this blog post, I will walk ...
  • A Comprehensive Study of Kotlin for Java Developers
    Introduction Purpose of the Study Understanding the Mo ...
  • 背诵营笔记
    Day 1 Find Your Greatness 原文 Greatness. It’s just ...
  • 利用LangChain和语言模型交互
    LangChain是什么 从名字上可以看出来,LangChain可以用来构建自然语言处理能力的链条。它是一个库 ...
  • 享学营笔记
    Unit 1 At home Lesson 1 In the ...
  • K8S集群跨云迁移
    要将K8S集群从一个云服务商迁移到另外一个,需要解决以下问题: 各种K8S资源的迁移 工作负载所挂载的数 ...
  • Terraform快速参考
    简介 Terraform用于实现基础设施即代码(infrastructure as code)—— 通过代码( ...
  • 草缸2021
    经过四个多月的努力,我的小小荷兰景到达极致了状态。

  • 编写Kubernetes风格的APIServer
    背景 前段时间接到一个需求做一个工具,工具将在K8S中运行。需求很适合用控制器模式实现,很自然的就基于kube ...
  • 记录一次KeyDB缓慢的定位过程
    环境说明 运行环境 这个问题出现在一套搭建在虚拟机上的Kubernetes 1.18集群上。集群有三个节点: ...
  • eBPF学习笔记
    简介 BPF,即Berkeley Packet Filter,是一个古老的网络封包过滤机制。它允许从用户空间注 ...
  • IPVS模式下ClusterIP泄露宿主机端口的问题
    问题 在一个启用了IPVS模式kube-proxy的K8S集群中,运行着一个Docker Registry服务 ...
  • 念爷爷
      今天是爷爷的头七,十二月七日、阴历十月廿三中午,老人家与世长辞。   九月初,回家看望刚动完手术的爸爸,发

  • 6 杨梅坑

  • liuhuashan
    深圳人才公园的网红景点 —— 流花山

  • 1 2020年10月拈花湾

  • 内核缺陷触发的NodePort服务63秒延迟问题
    现象 我们有一个新创建的TKE 1.3.0集群,使用基于Galaxy + Flannel(VXLAN模式)的容 ...
  • Galaxy学习笔记
    简介 Galaxy是TKEStack的一个网络组件,支持为TKE集群提供Overlay/Underlay容器网 ...
TOPLINKS
  • Zitahli's blue 91 people like this
  • 梦中的婚礼 64 people like this
  • 汪静好 61 people like this
  • 那年我一岁 36 people like this
  • 为了爱 28 people like this
  • 小绿彩 26 people like this
  • 彩虹姐姐的笑脸 24 people like this
  • 杨梅坑 6 people like this
  • 亚龙湾之旅 1 people like this
  • 汪昌博 people like this
  • 2013年11月香山 10 people like this
  • 2013年7月秦皇岛 6 people like this
  • 2013年6月蓟县盘山 5 people like this
  • 2013年2月梅花山 2 people like this
  • 2013年淮阴自贡迎春灯会 3 people like this
  • 2012年镇江金山游 1 people like this
  • 2012年徽杭古道 9 people like this
  • 2011年清明节后扬州行 1 people like this
  • 2008年十一云龙公园 5 people like this
  • 2008年之秋忆 7 people like this
  • 老照片 13 people like this
  • 火一样的六月 16 people like this
  • 发黄的相片 3 people like this
  • Cesium学习笔记 90 people like this
  • IntelliJ IDEA知识集锦 59 people like this
  • Bazel学习笔记 38 people like this
  • 基于Kurento搭建WebRTC服务器 38 people like this
  • PhoneGap学习笔记 32 people like this
  • NaCl学习笔记 32 people like this
  • 使用Oracle Java Mission Control监控JVM运行状态 29 people like this
  • Ceph学习笔记 27 people like this
  • 基于Calico的CNI 27 people like this
Tag Cloud
ActiveMQ AspectJ CDT Ceph Chrome CNI Command Cordova Coroutine CXF Cygwin DNS Docker eBPF Eclipse ExtJS F7 FAQ Groovy Hibernate HTTP IntelliJ IO编程 IPVS JacksonJSON JMS JSON JVM K8S kernel LB libvirt Linux知识 Linux编程 LOG Maven MinGW Mock Monitoring Multimedia MVC MySQL netfs Netty Nginx NIO Node.js NoSQL Oracle PDT PHP Redis RPC Scheduler ServiceMesh SNMP Spring SSL svn Tomcat TSDB Ubuntu WebGL WebRTC WebService WebSocket wxWidgets XDebug XML XPath XRM ZooKeeper 亚龙湾 单元测试 学习笔记 实时处理 并发编程 彩姐 性能剖析 性能调优 文本处理 新特性 架构模式 系统编程 网络编程 视频监控 设计模式 远程调试 配置文件 齐塔莉
Recent Comments
  • qg on Istio中的透明代理问题
  • heao on 基于本地gRPC的Go插件系统
  • 黄豆豆 on Ginkgo学习笔记
  • cloud on OpenStack学习笔记
  • 5dragoncon on Cilium学习笔记
  • Archeb on 重温iptables
  • C/C++编程:WebSocketpp(Linux + Clion + boostAsio) – 源码巴士 on 基于C/C++的WebSocket库
  • jerbin on eBPF学习笔记
  • point on Istio中的透明代理问题
  • G on Istio中的透明代理问题
  • 绿色记忆:Go语言单元测试和仿冒 on Ginkgo学习笔记
  • point on Istio中的透明代理问题
  • 【Maven】maven插件开发实战 – IT汇 on Maven插件开发
  • chenlx on eBPF学习笔记
  • Alex on eBPF学习笔记
  • CFC4N on eBPF学习笔记
  • 李运田 on 念爷爷
  • yongman on 记录一次KeyDB缓慢的定位过程
  • Alex on Istio中的透明代理问题
  • will on Istio中的透明代理问题
  • will on Istio中的透明代理问题
  • haolipeng on 基于本地gRPC的Go插件系统
  • 吴杰 on 基于C/C++的WebSocket库
©2005-2025 Gmem.cc | Powered by WordPress | 京ICP备18007345号-2