Menu

  • Home
  • Work
    • Cloud
      • Virtualization
      • IaaS
      • PaaS
    • Java
    • Go
    • C
    • C++
    • JavaScript
    • PHP
    • Python
    • Architecture
    • Others
      • Assembly
      • Ruby
      • Perl
      • Lua
      • Rust
      • XML
      • Network
      • IoT
      • GIS
      • Algorithm
      • AI
      • Math
      • RE
      • Graphic
    • OS
      • Linux
      • Windows
      • Mac OS X
    • BigData
    • Database
      • MySQL
      • Oracle
    • Mobile
      • Android
      • IOS
    • Web
      • HTML
      • CSS
  • Life
    • Cooking
    • Travel
    • Gardening
  • Gallery
  • Video
  • Music
  • Essay
  • Home
  • Work
    • Cloud
      • Virtualization
      • IaaS
      • PaaS
    • Java
    • Go
    • C
    • C++
    • JavaScript
    • PHP
    • Python
    • Architecture
    • Others
      • Assembly
      • Ruby
      • Perl
      • Lua
      • Rust
      • XML
      • Network
      • IoT
      • GIS
      • Algorithm
      • AI
      • Math
      • RE
      • Graphic
    • OS
      • Linux
      • Windows
      • Mac OS X
    • BigData
    • Database
      • MySQL
      • Oracle
    • Mobile
      • Android
      • IOS
    • Web
      • HTML
      • CSS
  • Life
    • Cooking
    • Travel
    • Gardening
  • Gallery
  • Video
  • Music
  • Essay

Galaxy学习笔记

1
Aug
2020

Galaxy学习笔记

By Alex
/ in PaaS
/ tags CNI, K8S
0 Comments
简介

Galaxy是TKEStack的一个网络组件,支持为TKE集群提供Overlay/Underlay容器网络。Galaxy的一个特性是能够提供浮动IP(弹性IP) —— 即使Pod因为节点宕机而漂移到其它节点,其IP地址也能够保持不变。

网络类型

Galaxy提供四类网络,支持为每个工作负载单独配置网络模式。

Overlay网络

默认模式,基于Flannel的VXLAN或者Host Gateway(路由)方案。同节点容器通信不走网桥,报文直接利用主机路由转发,跨节点容器通信利用VXLAN协议封装或者直接路由,节点间路由记录在Etcd中。优点:简单可靠,性能不错,支持网络策略。

tke-installer默认安装的TKEStack会自动配置Galaxy为Overlay模式,CNI配置:

JSON
1
2
3
4
5
{
  "type": "galaxy-sdn",
  "capabilities": {"portMappings": true},
  "cniVersion": "0.2.0"
} 

在该模式下:

  1. Flannel在每个Kubelet节点上分配一个子网,并将其保存在etcd和本地路径 /run/flannel/subnet.env
  2. Kubelet调用Galaxy的CNI插件galaxy-sdn,该插件会通过UDS调用本机的Galaxy进程
  3. Galaxy进程则又调用Flannel CNI,解析/run/flannel/subnet.env中的子网信息
  4. Flannel CNI会调用:
    1. Bridge CNI或Veth CNI来为POD配置网络
    2. 调用host-lo

架构图:

galaxy-overlay

Underlay网络

该模式下,容器IP由宿主机网络提供,容器与宿主机可以直接路由,性能更好。支持基于Linux Bridge/MacVlan/IPVlan和SRIOV的容器-宿主机二层联通,可以根据业务场景和硬件环境,具体选择使用哪种网桥。

要使用Galaxy Underlay网络,需要启用Galaxy-ipam组件,该组件为Pod分配IP,浮动IP的能力也由它提供。

NAT

利用K8S的hostPort配置,将容器端口映射到宿主机端口。如果不指定hostPort,Galaxy进行随机映射。

Host

利用K8S的HostNetwork模式,直接使用物理网络。

架构

Galaxy有三类组件构成。

Galaxy

以DaemonSet方式运行在每个节点上,通过调用各种CNI插件来配置容器网络。

CNI插件

Galaxy将实际的创建CNI的工作委托给其它CNI插件,也就是说它扮演一个装饰器的角色。Galaxy支持任何标准CNI插件,它也内置了若干CNI插件。

Galaxy IPAM

是一个K8S Scheduler插件。kube-scheduler通过HTTP调用Galaxy-ipam,实现浮动IP的配置和管理。

因为仅仅在重新调度Pod的时候才会面临IP地址变化的问题,因此这个组件实现为Scheduler插件就很自然了。

支持的CNI列表

Galaxy支持任何标准的CNI插件,你可以将其用作一个CNI框架,就像multus-cni那样。

Galaxy还包含了一系列内置的CNI插件:

插件 说明
SDN CNI 使用Overlay网络时,此插件是CNI的入口。该插件会通过UDS调用Galaxy守护进程,转发Kubelet提供的所有CNI参数
Veth CNI 用于Overlay网络中,创建一个VETH对来连接容器和宿主机命名空间。该插件从ipam插件得到Pod的IP
underlay-veth CNI 用于Underlay网络,创建VETH对来连接容器和宿主机命名空间,该插件不会创建网桥,而是使用宿主机路由规则来进行封包转发
Vlan CNI

用于Underlay网络,创建一个VETH对,连接容器和宿主机上的bridge/macvlan/ipvlan设备

此插件支持为Pod配置VLAN,它能够从CNI参数,例如ipinfos=[{"ip":"192.168.0.68/26","vlan":2,"gateway":"192.168.0.65"}] ,或者ipam CNI插件的结果中获得IP地址

SRIOV CNI 用于Underlay网络,利用以太网服务器适配器(Ethernet Server Adapter)的SR-IOV,能够创建VF设备并将其放入容器的网络命名空间
TKE route ENI CNI 用于腾讯云
基础
构建Galaxy
二进制文件
Shell
1
2
3
4
5
6
7
8
go get -d tkestack.io/galaxy
cd $GOPATH/src/tkestack.io/galaxy
 
# 构建所有二进制文件
make
# 构建指定二进制文件
make BINS="galxy galxy-ipam"
make BINS="galxy-ipam"
镜像
Shell
1
2
3
4
5
6
7
8
# 构建所有镜像
make image
 
# 构建指定镜像
make image BINS="galxy-ipam"
 
# 为指定体系结构构建
make image.multiarch PLATFORMS="linux_arm64" 
使用Galaxy

清单文件可以到GitHub下载,默认内容如下:

YAML
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
---
apiVersion: rbac.authorization.k8s.io/v1
# kubernetes versions before 1.8.0 should use rbac.authorization.k8s.io/v1beta1
kind: ClusterRole
metadata:
  name: galaxy
rules:
- apiGroups: [""]
  resources:
  - pods
  - namespaces
  - nodes
  - pods/binding
  verbs: ["list", "watch", "get", "patch", "create", "update"]
- apiGroups: ["apps", "extensions"]
  resources:
  - statefulsets
  - deployments
  verbs: ["list", "watch"]
- apiGroups: [""]
  resources:
  - configmaps
  - endpoints
  - events
  verbs: ["get", "list", "watch", "update", "create", "patch"]
- apiGroups: ["galaxy.k8s.io"]
  resources:
  - pools
  - floatingips
  verbs: ["get", "list", "watch", "update", "create", "patch", "delete"]
- apiGroups: ["apiextensions.k8s.io"]
  resources:
  - customresourcedefinitions
  verbs:
  - "*"
- apiGroups: ["networking.k8s.io"]
  resources:
  - networkpolicies
  verbs: ["get", "list", "watch"]
---
apiVersion: v1
kind: ServiceAccount
metadata:
  name: galaxy
  namespace: kube-system
---
apiVersion: rbac.authorization.k8s.io/v1
# kubernetes versions before 1.8.0 should use rbac.authorization.k8s.io/v1beta1
kind: ClusterRoleBinding
metadata:
  name: galaxy
roleRef:
  apiGroup: rbac.authorization.k8s.io
  kind: ClusterRole
  name: galaxy
subjects:
  - kind: ServiceAccount
    name: galaxy
    namespace: kube-system
---
apiVersion: apps/v1
kind: DaemonSet
metadata:
  labels:
    app: galaxy
  name: galaxy
  namespace: kube-system
spec:
  selector:
    matchLabels:
      app: galaxy
  template:
    metadata:
      labels:
        app: galaxy
    spec:
      priorityClassName: system-node-critical
      serviceAccountName: galaxy
      hostNetwork: true
      hostPID: true
      containers:
      - image: tkestack/galaxy:v1.0.7
        command: ["/bin/sh"]
# 入口点脚本:
#   拷贝来自ConfigMap的Galaxy配置到宿主机
#   cp -p /etc/galaxy/cni/00-galaxy.conf /etc/cni/net.d/;
#   拷贝CNI插件二进制文件到宿主机
#   cp -p /opt/cni/galaxy/bin/galaxy-sdn /opt/cni/galaxy/bin/loopback /opt/cni/bin/;
#   启动Galaxy守护进程                        腾讯云中运行要这个参数
#   /usr/bin/galaxy --logtostderr=true --v=3 --route-eni
      # qcloud galaxy should run with --route-eni
        args: ["-c", "cp -p /etc/galaxy/cni/00-galaxy.conf /etc/cni/net.d/; cp -p /opt/cni/galaxy/bin/galaxy-sdn /opt/cni/galaxy/bin/loopback /opt/cni/bin/; /usr/bin/galaxy --logtostderr=true --v=3 --route-eni"]
      # private-cloud should run without --route-eni
      # args: ["-c", "cp -p /etc/galaxy/cni/00-galaxy.conf /etc/cni/net.d/; cp -p /opt/cni/galaxy/bin/galaxy-sdn /opt/cni/galaxy/bin/loopback /opt/cni/bin/; /usr/bin/galaxy --logtostderr=true --v=3"]
        imagePullPolicy: Always
        env:
          - name: MY_NODE_NAME
            valueFrom:
              fieldRef:
                fieldPath: spec.nodeName
          - name: DOCKER_HOST
            value: unix:///host/run/docker.sock
        name: galaxy
        resources:
          requests:
            cpu: 100m
            memory: 200Mi
        securityContext:
          privileged: true
        volumeMounts:
        - name: galaxy-run
          mountPath: /var/run/galaxy/
        - name: flannel-run
          mountPath: /run/flannel
        - name: galaxy-etc
          mountPath: /etc/galaxy
        - name: cni-config
          mountPath: /etc/cni/net.d/
        - name: cni-bin
          mountPath: /opt/cni/bin
        - name: cni-etc
          mountPath: /etc/galaxy/cni
        - name: cni-state
          mountPath: /var/lib/cni
        - name: docker-sock
          mountPath: /host/run/
        - name: tz-config
          mountPath: /etc/localtime
      terminationGracePeriodSeconds: 30
      tolerations:
      - operator: Exists
      volumes:
      - name: galaxy-run
        hostPath:
          path: /var/run/galaxy
      - name: flannel-run
        hostPath:
          path: /run/flannel
      - configMap:
          defaultMode: 420
          name: galaxy-etc
        name: galaxy-etc
      - name: cni-config
        hostPath:
          path: /etc/cni/net.d/
      - name: cni-bin
        hostPath:
          path: /opt/cni/bin
      - name: cni-state
        hostPath:
          path: /var/lib/cni
      - configMap:
          defaultMode: 420
          name: cni-etc
        name: cni-etc
      - name: docker-sock
        # in case of docker restart, /run/docker.sock may change, we have to mount the /run directory
        hostPath:
          path: /run/
      - name: tz-config
        hostPath:
          path: /etc/localtime
---
apiVersion: v1
kind: ConfigMap
metadata:
  name: galaxy-etc
  namespace: kube-system
data:
  # Galaxy配置文件
  # update network card name in "galaxy-k8s-vlan" and "galaxy-k8s-sriov" if necessary
  # update vf_num in "galaxy-k8s-sriov" according to demand
  # update ENIIPNetwork to tke-route-eni if running on qcloud
  galaxy.json: |
    {
      "NetworkConf":[
        {"name":"tke-route-eni","type":"tke-route-eni","eni":"eth1","routeTable":1},
        {"name":"galaxy-flannel","type":"galaxy-flannel", "delegate":{"type":"galaxy-veth"},"subnetFile":"/run/flannel/subnet.env"},
        {"name":"galaxy-k8s-vlan","type":"galaxy-k8s-vlan", "device":"eth1", "default_bridge_name": "br0"},
        {"name":"galaxy-k8s-sriov","type": "galaxy-k8s-sriov", "device": "eth1", "vf_num": 10}
      ],
      "DefaultNetworks": ["galaxy-flannel"],
      "ENIIPNetwork": "galaxy-k8s-vlan"
    }
---
apiVersion: v1
kind: ConfigMap
metadata:
  name: cni-etc
  namespace: kube-system
data:
  # CNI网络配置
  00-galaxy.conf: |
    {
      "name": "galaxy-sdn",
      "type": "galaxy-sdn",
      "capabilities": {"portMappings": true},
      "cniVersion": "0.2.0"
    }

根据实际运行环境、使用的网络模式,需要进行调整。

配置Kubelet

需要保证命令行标记: --network-plugin=cni --cni-bin-dir=/opt/cni/bin/,并重启Kubelet。

Overlay网络
安装Flannel

这种网络模式依赖于Flannel,参考Flannel学习笔记。

配置Galaxy
全局配置

你需要修改上述Galaxy清单中的galaxy-etc,来配置默认网络(DefaultNetworks):

JSON
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
{
  // 所有支持的网络模式的配置
  "NetworkConf":[
    {
      "name":"tke-route-eni", // 如果name为空,Galaxy假设它和type相同
      "type":"tke-route-eni",
      "eni":"eth1",
      "routeTable":1
    },
    {
      "name":"galaxy-flannel", // 这个插件就是flannel的CNI,只是被改了名字
      "type":"galaxy-flannel",
      "delegate":{
        "type":"galaxy-veth"
      },
      "subnetFile":"/run/flannel/subnet.env"
    },
    {
      "name":"galaxy-k8s-vlan",
      "type":"galaxy-k8s-vlan",
      "device":"eth1",
      "default_bridge_name":"br0"
    },
    {
      "name":"galaxy-k8s-sriov",
      "type":"galaxy-k8s-sriov",
      "device":"eth1",
      "vf_num":10
    },
    {
      "name":"galaxy-underlay-veth",
      "type":"galaxy-underlay-veth",
      "device":"eth1"
    }
  ],
  // Pod默认使用这个网络,注意可以加入多个网络
  "DefaultNetworks":[
    "galaxy-flannel" // 使用基于Flannel的Overlay网络
  ],
  // 如果Pod需要腾讯云弹性网卡(ENI)而且没有配置k8s.v1.cni.cncf.io/networks注解,则
  // 默认使用此网络。这个配置可以避免需要为所有需要Underlay网络的Pod添加注解
  "ENIIPNetwork":"galaxy-k8s-vlan"
}
Pod配置

可以为Pod指定注解: 

YAML
1
k8s.v1.cni.cncf.io/networks: galaxy-flannel,galaxy-k8s-sriov

来提示Galaxy应该为它配置哪些网络。 

和其它CNI插件共存

Galaxy可以和其它CNI插件同时存在。需要注意的一点是,--network-conf-dir目录下其它插件的配置文件,其文件名的字典排序不应该比00-galaxy.conf更小,否则Kubelet会在调用Galaxy CNI插件之前,调用其它CNI插件,这可能不符合预期。

Galaxy命令行标记
Shell
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
--alsologtostderr                   # 记录日志到文件,同时记录到标准错误
--bridge-nf-call-iptables           # 是否配置 bridge-nf-call-iptables,启用/禁用对网桥转发的封包被iptables规则过滤,默认true
--cni-paths stringSlice             # 除了从kubelet接收的,额外的CNI路径,默认/opt/cni/galaxy/bin
--flannel-allocated-ip-dir string   # Flannel CNI插件在何处存放分配的IP地址,默认/var/lib/cni/networks
--flannel-gc-interval duration      # 执行Flannel网络垃圾回收的间隔,默认10s
--gc-dirs string                    # 清理哪些目录,这些目录中的文件名包含容器ID
                                    # 默认 /var/lib/cni/flannel,/var/lib/cni/galaxy,/var/lib/cni/galaxy/port
--hostname-override string          # 覆盖kubelet hostname,如果指定该参数,则Galaxy使用它从API Server得到节点对象
--ip-forward                        # 是否启用IP转发
--json-config-path string           # Galaxy配置文件路径,默认/etc/galaxy/galaxy.json
--kubeconfig string                 # Kubelet配置文件
--log-backtrace-at traceLocation    # 如果日志在file:N打印,打印栈追踪。默认default :0
--log-dir string                    # 日志输出目录
--log-flush-frequency duration      # 日志刷出间隔,默认5s
--logtostderr                       # 输出到标准错误而非文件
--master string                     # API Server的地址和端口
--network-conf-dir string           # 额外的CNI网络配置文件。默认/etc/cni/net.d/
--network-policy                    # 启用网络策略支持
--route-eni                         # 是否启用腾讯云route-eni
--stderrthreshold severity          # 以上级别的日志打印到标准错误。默认2
-v, --v Level                       # 日志冗长级别 
Underlay网络
工作原理

Underlay网络必须配合Galaxy-ipam使用,Galaxy-ipam负责为Pod分配或释放IP地址:

  1. 你需要规划容器网络中使用的Underlay IP范围(注意和物理网路上其它节点的IP冲突),并且配置到ConfigMap floatingip-config中
  2. 调度Pod时,kube-scheduler会在filter/priority/bind方法中调用Galaxy-ipam
  3. Galaxy-ipam检查Pod是否配置了Reserved IP,如果是,则Galaxy-ipam仅将此IP所在的可用子网的节点标记为有效节点,否则所有节点都将被标记为有效节点。在Pod绑定IP期间,Galaxy-ipam分配一个IP并将其写入到Pod annotations中
  4. Galaxy从Pod的注解中获得IP,并将其作为参数传递给CNI,通过CNI配置Pod IP

Galaxy的浮动IP能力由Galaxy-ipam提供,后者是一个Kubernetes Scheudler Extender,K8S的调度器会调用Galaxy-ipam,影响其filtering/binding的过程。浮动IP必须配合Underlay网络使用。

整体工作流图如下:

galaxy-ipam

安装Galaxy-ipam

清单文件可以到GitHub下载,默认内容如下:

YAML
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
apiVersion: v1
kind: Service
metadata:
  name: galaxy-ipam
  namespace: kube-system
  labels:
    app: galaxy-ipam
spec:
  type: NodePort
  ports:
  - name: scheduler-port
    port: 9040
    targetPort: 9040
    nodePort: 32760
    protocol: TCP
  - name: api-port
    port: 9041
    targetPort: 9041
    nodePort: 32761
    protocol: TCP
  selector:
    app: galaxy-ipam
---
apiVersion: rbac.authorization.k8s.io/v1
# kubernetes versions before 1.8.0 should use rbac.authorization.k8s.io/v1beta1
kind: ClusterRole
metadata:
  name: galaxy-ipam
rules:
- apiGroups: [""]
  resources:
  - pods
  - namespaces
  - nodes
  - pods/binding
  verbs: ["list", "watch", "get", "patch", "create"]
- apiGroups: ["apps", "extensions"]
  resources:
  - statefulsets
  - deployments
  verbs: ["list", "watch"]
- apiGroups: [""]
  resources:
  - configmaps
  - endpoints
  - events
  verbs: ["get", "list", "watch", "update", "create", "patch"]
- apiGroups: ["galaxy.k8s.io"]
  resources:
  - pools
  - floatingips
  verbs: ["get", "list", "watch", "update", "create", "patch", "delete"]
- apiGroups: ["apiextensions.k8s.io"]
  resources:
  - customresourcedefinitions
  verbs:
  - "*"
- apiGroups: ["apps.tkestack.io"]
  resources:
  - tapps
  verbs: ["list", "watch"]
---
apiVersion: v1
kind: ServiceAccount
metadata:
  name: galaxy-ipam
  namespace: kube-system
---
apiVersion: rbac.authorization.k8s.io/v1
# kubernetes versions before 1.8.0 should use rbac.authorization.k8s.io/v1beta1
kind: ClusterRoleBinding
metadata:
  name: galaxy-ipam
roleRef:
  apiGroup: rbac.authorization.k8s.io
  kind: ClusterRole
  name: galaxy-ipam
subjects:
  - kind: ServiceAccount
    name: galaxy-ipam
    namespace: kube-system
---
apiVersion: apps/v1
kind: Deployment
metadata:
  labels:
    app: galaxy-ipam
  name: galaxy-ipam
  namespace: kube-system
spec:
  replicas: 1
  selector:
    matchLabels:
      app: galaxy-ipam
  template:
    metadata:
      labels:
        app: galaxy-ipam
    spec:
      priorityClassName: system-cluster-critical
      affinity:
        podAntiAffinity:
          requiredDuringSchedulingIgnoredDuringExecution:
          - labelSelector:
              matchExpressions:
              - key: app
                operator: In
                values:
                - galaxy-ipam
            topologyKey: "kubernetes.io/hostname"
      serviceAccountName: galaxy-ipam
      hostNetwork: true
      dnsPolicy: ClusterFirstWithHostNet
      containers:
      - image: tkestack/galaxy-ipam:v1.0.7
        args:
          - --logtostderr=true
          - --profiling
          - --v=3
          - --config=/etc/galaxy/galaxy-ipam.json
          # 这个服务暴露一个端口,供Scheduler调用
          - --port=9040
          - --api-port=9041
          - --leader-elect
        command:
          - /usr/bin/galaxy-ipam
        ports:
          - containerPort: 9040
          - containerPort: 9041
        imagePullPolicy: Always
        name: galaxy-ipam
        resources:
          requests:
            cpu: 100m
            memory: 200Mi
        volumeMounts:
        - name: kube-config
          mountPath: /etc/kubernetes/
        - name: galaxy-ipam-log
          mountPath: /data/galaxy-ipam/logs
        - name: galaxy-ipam-etc
          mountPath: /etc/galaxy
        - name: tz-config
          mountPath: /etc/localtime
      terminationGracePeriodSeconds: 30
      tolerations:
        - effect: NoSchedule
          key: node-role.kubernetes.io/master
          operator: Exists
      volumes:
      - name: kube-config
        hostPath:
          path: /etc/kubernetes/
      - name: galaxy-ipam-log
        emptyDir: {}
      - configMap:
          defaultMode: 420
          name: galaxy-ipam-etc
        name: galaxy-ipam-etc
      - name: tz-config
        hostPath:
          path: /etc/localtime
---
apiVersion: v1
kind: ConfigMap
metadata:
  name: galaxy-ipam-etc
  namespace: kube-system
data:
  # 配置文件
  galaxy-ipam.json: |
    {
      "schedule_plugin": {
        # 如果不是使用腾讯云的弹性网卡(ENI),去掉这一行
        "cloudProviderGrpcAddr": "127.0.0.2:80"
      }
    }
配置kube-scheduler

你需要为K8S调度器配置调度策略,此策略现在可以放在ConfigMap中:

/etc/kubernetes/scheduler-policy-config.json
JSON
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
cat <<EOF | kubectl create -f -
apiVersion: v1
kind: ConfigMap
metadata:
  name: scheduler-policy
  namespace: kube-system
data:
  policy.cfg: |
    {
      "kind": "Policy",
      "apiVersion": "v1",
      "extenders": [
        {
          // 通过HTTP调用扩展
          "urlPrefix": "http://127.0.0.1:9040/v1",
          "httpTimeout": 10000000000,
          "filterVerb": "filter",
          "BindVerb": "bind",
          "weight": 1,
          "enableHttps": false,
          "managedResources": [
            {
              "name": "tke.cloud.tencent.com/eni-ip",
              "ignoredByScheduler": false
            }
          ]
        }
      ]
    }
EOF

然后为调度器添加命令行标记: --policy-configmap=scheduler-policy即可。

配置浮动IP
floatingip-config

运行在裸金属环境下时,可以使用如下配置:

YAML
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
kind: ConfigMap
apiVersion: v1
metadata:
name: floatingip-config
namespace: kube-system
data:
floatingips: |
    [
        {
            # 节点的CIDR,这个网络范围内的节点可以运行具有浮动IP的Pod
            "nodeSubnets": ["10.0.0.0/16"],
            # 可以分配给Pod的IP地址范围
            "ips": ["10.0.70.2~10.0.70.241"],
            # Pod IP子网信息
            "subnet":"10.0.70.0/24",
            # Pod IP网关信息
            "gateway":"10.0.70.1",
            # Pod IP所属VLAN,如果Pod IP和Node IP不在同一VLAN,需要设置该字段,同时
            # 确保节点所连接的交换机时一个trunk port
            "vlan": 1024
        }
    ]

一个nodeSubnet可以对应多个Pod Subnet,例如:

JSON
1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 如果Pod运行在10.49.28.0/26,那么它可以具有10.0.80.2~10.0.80.4或者10.0.81.2~10.0.81.4的IP地址
// 如果Pod运行在10.49.29.0/24,则只能具有10.0.80.0/24的IP地址
[{
    "nodeSubnets": ["10.49.28.0/26", "10.49.29.0/24"],
    "ips": ["10.0.80.2~10.0.80.4"],
    "subnet": "10.0.80.0/24",
    "gateway": "10.0.80.1"
}, {
    "nodeSubnets": ["10.49.28.0/26"],
    "ips": ["10.0.81.2~10.0.81.4"],
    "subnet": "10.0.81.0/24",
    "gateway": "10.0.81.1",
    "vlan": 3
}]

只要可分配的IP地址范围不重叠,多个nodeSubnet可以共享同一个Pod Subnet:

JSON
1
2
3
4
5
6
7
8
9
10
11
12
13
[{
    "routableSubnet": "10.180.1.2/32",
    "ips": ["10.180.154.2~10.180.154.3"],
    "subnet": "10.180.154.0/24",
    "gateway": "10.180.154.1",
    "vlan": 3
}, {
    "routableSubnet": "10.180.1.3/32",
    "ips": ["10.180.154.7~10.180.154.8"],
    "subnet": "10.180.154.0/24",
    "gateway": "10.180.154.1",
    "vlan": 3
}]
保留IP

要保留一个IP不被分配,可以使用下面的FloatingIP资源: 

YAML
1
2
3
4
5
6
7
8
9
10
11
12
13
apiVersion: galaxy.k8s.io/v1alpha1
kind: FloatingIP
metadata:
  # 名字是需要保留的IP
  name: 10.0.0.1
  labels:
    # 从1.0.8下面这个标签不再需要
    ipType: internalIP
    # 这个标签必须保留
    reserved: this-is-not-for-pods
spec:
  key: pool__reserved-for-node_
  policy: 2
Deployment配置

目前浮动IP仅仅支持Deployment、StatefulSet产生的工作负载。

YAML
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
apiVersion: apps/v1
kind: Deployment
...
spec:
  ...
  template:
    metadata:
      annotations:
        # 如果默认网络不是galaxy-k8s-vlan则必须添加下面的注解
        k8s.v1.cni.cncf.io/networks: galaxy-k8s-vlan
        # IP释放策略:
        #    为空/不指定:Pod一旦停止,就释放IP
        #    immutable:仅仅在删除、缩容Deployment / StatefulSet的情况下才释放IP
        #    never:即使Deployment / StatefulSet被删除,也不会释放IP。后续的同名Deployment
        #           /StatefulSet会重用已分配的IP
        k8s.v1.cni.galaxy.io/release-policy: immutable
      creationTimestamp: null
      labels:
        k8s-app: nnn
        qcloud-app: nnn
    spec:
      containers:
      - image: nginx
        imagePullPolicy: Always
        name: nnn
        resources:
          limits:
            cpu: 500m
            memory: 1Gi
            # 扩展的资源限制,在某个容器中配置即可
            tke.cloud.tencent.com/eni-ip: "1"
          requests:
            cpu: 250m
            memory: 256Mi
            tke.cloud.tencent.com/eni-ip: "1"

生成的Pod中,通过注解 k8s.v1.cni.galaxy.io/args指明了它的IP地址等信息。CNI插件也是基于这些信息为容器网络命名空间设置IP地址、路由的:

YAML
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
apiVersion: v1
kind: Pod
metadata:
  name: nnn-7df5984746-58hjm
  annotations:
    k8s.v1.cni.cncf.io/networks: galaxy-k8s-vlan
    k8s.v1.cni.galaxy.io/args: '{"common":{"ipinfos":[{"ip":"192.168.64.202/24",
        "vlan":0,"gateway":"192.168.64.1","routable_subnet":"172.21.64.0/20"}]}}'
    k8s.v1.cni.galaxy.io/release-policy: immutable
...
spec:
...
status:
...
  hostIP: 172.21.64.15
  phase: Running
  podIP: 192.168.64.202
  podIPs:
  - ip: 192.168.64.202
Galaxy-ipam相关CRD
Pool

为Deployment设置注解 tke.cloud.tencent.com/eni-ip-pool,可以让多个Deployment共享一个IP池。

使用IP池的情况下,默认的IP释放策略为never,除非通过注解 k8s.v1.cni.galaxy.io/release-policy指定其它策略。

默认情况下,IP池的大小随着Deployment/StatefulSet的副本数量的增加而增大,你可以设置固定尺寸的IP池:

YAML
1
2
3
4
5
apiVersion: galaxy.k8s.io/v1alpha1
kind: Pool
metadata:
  name: example-pool
size: 4

通过HTTP接口创建IP池的时候,可以设置preAllocateIP=true来为池预先分配IP,通过kubectl创建CR时无法实现预分配。 

FloatingIP

该CR保存了浮动IP,及其绑定的工作负载信息:

YAML
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
apiVersion: galaxy.k8s.io/v1alpha1
kind: FloatingIP
metadata:
  creationTimestamp: "2020-03-04T08:28:15Z"
  generation: 1
  labels:
    ipType: internalIP
  # IP地址
  name: 192.168.64.202
  resourceVersion: "2744910"
  selfLink: /apis/galaxy.k8s.io/v1alpha1/floatingips/192.168.64.202
  uid: b5d55f27-4548-44c7-b8ad-570814b55026
spec:
  attribute: '{"NodeName":"172.21.64.15"}'
  # 绑定的工作负载
  key: dp_default_nnn_nnn-7df5984746-58hjm
  policy: 1
  subnet: 172.21.64.0/20
  updateTime: "2020-03-04T08:28:15Z" 
Galaxy-ipam HTTP接口

Galaxy-ipam提供了基于Swagger 2.0的API,为galaxy-ipam提供命令行选项 --swagger,则它能够在下面的URL展示此API:

http://${galaxy-ipam-ip}:9041/apidocs.json/v1

查询分配的IP
JSON
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// 查询分配给default命名空间中的StatefulSet sts的IP地址
// curl 'http://192.168.30.7:9041/v1/ip?appName=sts&appType=statefulsets&namespace=default'
{
"last": true,
"totalElements": 2,
"totalPages": 1,
"first": true,
"numberOfElements": 2,
"size": 10,
"number": 0,
"content": [
  {
   "ip": "10.0.0.112",
   "namespace": "default",
   "appName": "sts",
   "podName": "sts-0",
   "policy": 2,
   "appType": "statefulset",
   "updateTime": "2020-05-29T11:11:44.633383558Z",
   "status": "Deleted",
   "releasable": true
  },
  {
   "ip": "10.0.0.174",
   "namespace": "default",
   "appName": "sts",
   "podName": "sts-1",
   "policy": 2,
   "appType": "statefulset",
   "updateTime": "2020-05-29T11:11:45.132450117Z",
   "status": "Deleted",
   "releasable": true
  }
]
}
释放IP地址
JSON
1
2
3
4
5
6
7
8
9
10
11
// curl -X POST -H "Content-type: application/json" -d '
//   { "ips": [
//     { "ip":"10.0.0.112", "appName":"sts", "appType":"statefulset",  "podName":"sts-0","namespace":"default"},
//     {"ip":"10.0.0.174", "appName":"sts", "appType":"statefulset", "podName":"sts-1", "namespace":"default"}
//   ]}
// '
// http://192.168.30.7:9041/v1/ip
{
"code": 200,
"message": ""
}
常见问题
关于滚动更新

Deployment的默认更新策略是StrategyType=RollingUpdate,25% max unavailable, 25% max surge。这意味着这在滚动更新过程中,可能存在超过副本数限制25%的Pod。这可能导致死锁状态:

  1. 假设Deployment副本数为3,那么有一个副本不可用,就超过25%的限制。Deployment的控制器只能先创建新Pod,然后等它就绪后再删除一个旧Pod
  2. 如果浮动IP的释放策略被设置为immutable/never,这就意味着新的Pod需要从一个旧Pod那复用IP,然而旧Pod还尚未删除

这会导致新Pod卡斯在调度阶段。

端口映射

使用K8S的NodePort服务,可以将一组Pod通过宿主机端口暴露到集群外部。但是如果希望访问StatefulSet的每个特定Pod的端口,使用K8S NodePort服务无法实现。

使用K8S的HostPort,则会存在两个Pod调度到同一节点,进而产生端口冲突的问题:

YAML
1
2
3
4
5
6
    spec:
      containers:
      - image: ...
        ports:
          - containerPort: 9040
            hostPort: 9040

Galaxy提供了一个功能,可以进行随机的端口映射: 

YAML
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
apiVersion: apps/v1
kind: Deployment
metadata:
  labels:
    app: hello
  name: hello
spec:
  replicas: 1
  selector:
    matchLabels:
      app: hello
  template:
    metadata:
      labels:
        app: hello
      annotations:
        # 为Pod设置此注解即可
        tkestack.io/portmapping: ""
    spec:
      containers:
      - image: ...
        ports:
          - containerPort: 9040

这样Galaxy会利用iptables,随机的映射宿主机的端口到Pod的每个容器端口,并且将此随机端口回填到tkestack.io/portmapping注解中。

源码解读
CNI
galaxy-sdn

在TKEStack的缺省配置(基于Flannel的VXLAN模式的Overlay网络)下,每个节点只有一个CNI配置:

/etc/cni/net.d/00-galaxy.conf
JSON
1
2
3
4
5
{
  "type": "galaxy-sdn",
  "capabilities": {"portMappings": true},
  "cniVersion": "0.2.0"
}

我们先看看这个CNI插件做的什么事情,又是如何和Flannel交互的。Galaxy所有自带的CNI插件都位于github.com/tkestack/galaxy/cni目录下,其中galaxy-sdn插件入口点如下:

cni/sdn/sdn.go
Go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
package main
 
import (
    "bytes"
    "encoding/json"
    "fmt"
    "io/ioutil"
    "net"
    "net/http"
    "os"
    "strings"
 
    "github.com/containernetworking/cni/pkg/skel"
    // 使用0.20版本的CNI
    t020 "github.com/containernetworking/cni/pkg/types/020"
    "github.com/containernetworking/cni/pkg/version"
    galaxyapi "tkestack.io/galaxy/pkg/api/galaxy"
    "tkestack.io/galaxy/pkg/api/galaxy/private"
)
 
type cniPlugin struct {
    socketPath string
}
 
func NewCNIPlugin(socketPath string) *cniPlugin {
    return &cniPlugin{socketPath: socketPath}
}
 
// Create and fill a CNIRequest with this plugin's environment and stdin which
// contain the CNI variables and configuration
func newCNIRequest(args *skel.CmdArgs) *galaxyapi.CNIRequest {
    envMap := make(map[string]string)
    for _, item := range os.Environ() {
        idx := strings.Index(item, "=")
        if idx > 0 {
            envMap[strings.TrimSpace(item[:idx])] = item[idx+1:]
        }
    }
 
    return &galaxyapi.CNIRequest{
        Env:    envMap,
        Config: args.StdinData,
    }
}
 
// 调用Galaxy守护进程
func (p *cniPlugin) doCNI(url string, req *galaxyapi.CNIRequest) ([]byte, error) {
    data, err := json.Marshal(req)
    if err != nil {
        return nil, fmt.Errorf("failed to marshal CNI request %v: %v", req, err)
    }
    // 通过UDS连接
    client := &http.Client{
        Transport: &http.Transport{
            Dial: func(proto, addr string) (net.Conn, error) {
                return net.Dial("unix", p.socketPath)
            },
        },
    }
 
    resp, err := client.Post(url, "application/json", bytes.NewReader(data))
    if err != nil {
        return nil, fmt.Errorf("failed to send CNI request: %v", err)
    }
    defer resp.Body.Close() // nolint: errcheck
 
    body, err := ioutil.ReadAll(resp.Body)
    if err != nil {
        return nil, fmt.Errorf("failed to read CNI result: %v", err)
    }
 
    if resp.StatusCode != 200 {
        return nil, fmt.Errorf("galaxy returns: %s", string(body))
    }
 
    return body, nil
}
 
// 就是将CNI请求转发给Galaxy守护进程处理
func (p *cniPlugin) CmdAdd(args *skel.CmdArgs) (*t020.Result, error) {
    // 由于通过UDS通信,URL的hostname部分随便写了一个dummy,就是简单的调用/cni这个Endpoint
    body, err := p.doCNI("http://dummy/cni", newCNIRequest(args))
    if err != nil {
        return nil, err
    }
 
    result := &t020.Result{}
    if err := json.Unmarshal(body, result); err != nil {
        return nil, fmt.Errorf("failed to unmarshal response '%s': %v", string(body), err)
    }
 
    return result, nil
}
 
// Send the ADD command environment and config to the CNI server, printing
// the IPAM result to stdout when called as a CNI plugin
func (p *cniPlugin) skelCmdAdd(args *skel.CmdArgs) error {
    result, err := p.CmdAdd(args)
    if err != nil {
        return err
    }
    return result.Print()
}
 
// Send the DEL command environment and config to the CNI server
func (p *cniPlugin) CmdDel(args *skel.CmdArgs) error {
    _, err := p.doCNI("http://dummy/cni", newCNIRequest(args))
    return err
}
 
// 使用skel框架开发
func main() {
    // 传入UDS套接字的路径
    p := NewCNIPlugin(private.GalaxySocketPath)
    skel.PluginMain(p.skelCmdAdd, p.CmdDel, version.Legacy)
}

可以看到galaxy-sdn就是个空壳子,所有工作都是委托给Galaxy守护进程处理的。 

galaxy-k8s-vlan

整体流程:

Go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
func init() {
    // this ensures that main runs only on main thread (thread group leader).
    // since namespace ops (unshare, setns) are done for a single thread, we
    // must ensure that the goroutine does not jump from OS thread to thread
    runtime.LockOSThread()
    _, pANet, _ = net.ParseCIDR("10.0.0.0/8")
    _, pBNet, _ = net.ParseCIDR("172.16.0.0/12")
    _, pCNet, _ = net.ParseCIDR("192.168.0.0/16")
}
 
func main() {
    d = &vlan.VlanDriver{}
    skel.PluginMain(cmdAdd, cmdDel, version.Legacy)
}
 
func cmdAdd(args *skel.CmdArgs) error {
    // 加载CNI配置文件
    conf, err := d.LoadConf(args.StdinData)
    if err != nil {
        return err
    }
    // 首先尝试从args获得IPInfo,如果失败则调用第三方IPAM插件分配IP
    vlanIds, results, err := ipam.Allocate(conf.IPAM.Type, args)
    if err != nil {
        return err
    }
    // 判断是否创建默认网桥
    if d.DisableDefaultBridge == nil {
        defaultTrue := true
        d.DisableDefaultBridge = &defaultTrue
        for i := range vlanIds {
            if vlanIds[i] == 0 {
                *d.DisableDefaultBridge = false
            }
        }
    }
    // 初始化VLAN驱动,主要是在必要的情况下对宿主机上的网桥进行配置,或者进行针对MACVlan/IPVlan的配置
    if err := d.Init(); err != nil {
        return fmt.Errorf("failed to setup bridge %v", err)
    }
    result020s, err := resultConvert(results)
    if err != nil {
        return err
    }
    // 配置宿主机网络
    if err := setupNetwork(result020s, vlanIds, args); err != nil {
        return err
    }
    result020s[0].DNS = conf.DNS
    return result020s[0].Print()
}
 
func cmdDel(args *skel.CmdArgs) error {
    // 删除所有VETH对
    if err := utils.DeleteAllVeth(args.Netns); err != nil {
        return err
    }
    conf, err := d.LoadConf(args.StdinData)
    if err != nil {
        return err
    }
    // 是否IP地址
    return ipam.Release(conf.IPAM.Type, args)
}

VlanDriver是此CNI的核心。 

Go
1
2
3
4
5
6
7
8
9
type VlanDriver struct {
    //FIXME add a file lock cause we are running multiple processes?
    *NetConf
    // 不管是物理接口,还是VLAN子接口,都位于宿主机命名空间
    // 所有VLAN接口的物理接口(父接口)的设备索引,例如eth1
    vlanParentIndex int
    // 如果启用VLAN,则是当前所属VLAN的子接口的索引。否则是物理接口的索引
    DeviceIndex int
}

ADD/DEL命令都需要调用下面的方法加载CNI配置:

Go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
const (
    VlanPrefix        = "vlan"
    BridgePrefix      = "docker"
    DefaultBridge     = "docker"
    // IPVLAN有两种模式L2/L3
    DefaultIPVlanMode = "l3"
)
 
func (d *VlanDriver) LoadConf(bytes []byte) (*NetConf, error) {
    conf := &NetConf{}
    if err := json.Unmarshal(bytes, conf); err != nil {
        return nil, fmt.Errorf("failed to load netconf: %v", err)
    }
    if conf.DefaultBridgeName == "" {
        conf.DefaultBridgeName = DefaultBridge
    }
    if conf.BridgeNamePrefix == "" {
        conf.BridgeNamePrefix = BridgePrefix
    }
    if conf.VlanNamePrefix == "" {
        conf.VlanNamePrefix = VlanPrefix
    }
    if conf.IpVlanMode == "" {
        conf.IpVlanMode = DefaultIPVlanMode
    }
 
    d.NetConf = conf
    return conf, nil
}

CNI配置定义如下:

Go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
type NetConf struct {
    // CNI标准网络配置
    types.NetConf
    // 持有IDC IP地址的设备名,例如eth1或eth1.12(VLAN子接口)
    Device string `json:"device"`
    // 上述Device为空的时候,候选设备列表。取第一个存在的设备
    Devices []string `json:"devices"`
    // 交换机实现方式 macvlan, bridge(默认), pure(避免创建不必要的网桥)
    Switch string `json:"switch"`
    // IPVLAN模式,可选l2, l3(默认), l3s
    IpVlanMode string `json:"ipvlan_mode"`
 
    // 不去创建默认网桥
    DisableDefaultBridge *bool `json:"disable_default_bridge"`
    // 默认网桥的名字
    DefaultBridgeName string `json:"default_bridge_name"`
    // 网桥名字前缀
    BridgeNamePrefix string `json:"bridge_name_prefix"`
    // VLAN接口名字前缀
    VlanNamePrefix string `json:"vlan_name_prefix"`
    // 是否启用免费ARP
    GratuitousArpRequest bool `json:"gratuitous_arp_request"`
    // 设置MTU
    MTU int `json:"mtu"`
}

ADD命令,加载完CNI配置后,首先是调用IPAM进行IP地址分配:

Go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
func Allocate(ipamType string, args *skel.CmdArgs) ([]uint16, []types.Result, error) {
    var (
        vlanId uint16
        err    error
    )
    // 解析key1=val1;key2=val2格式的CNI参数。这些参数来自Pod的k8s.v1.cni.galaxy.io/args注解,
    // 如果使用浮动IP,则Galaxy IPAM会设置该注解,将IP地址填写在ipinfos字段
    kvMap, err := cniutil.ParseCNIArgs(args.Args)
    if err != nil {
        return nil, nil, err
    }
    var results []types.Result
    var vlanIDs []uint16
    // 读取ipinfos字段
    if ipInfoStr := kvMap[constant.IPInfosKey]; ipInfoStr != "" {
        // 解析IP信息
        var ipInfos []constant.IPInfo
        if err := json.Unmarshal([]byte(ipInfoStr), &ipInfos); err != nil {
            return nil, nil, fmt.Errorf("failed to unmarshal ipInfo from args %q: %v", args.Args, err)
        }
        if len(ipInfos) == 0 {
            return nil, nil, fmt.Errorf("empty ipInfos")
        }
        for j := range ipInfos {
            results = append(results, cniutil.IPInfoToResult(&ipInfos[j]))
            vlanIDs = append(vlanIDs, ipInfos[j].Vlan)
        }
        // 直接“分配”Pod注解里写的IP地址
        return vlanIDs, results, nil
    }
    // 如果Pod注解里面没有IP地址信息,则需要调用谋者IPAM插件,因此断言ipamType不为空:
    if ipamType == "" {
        return nil, nil, fmt.Errorf("neither ipInfo from cni args nor ipam type from netconf")
    }
    // 调用IPAM插件
    generalResult, err := ipam.ExecAdd(ipamType, args.StdinData)
    if err != nil {
        return nil, nil, err
    }
    result, err := t020.GetResult(generalResult)
    if err != nil {
        return nil, nil, err
    }
    if result.IP4 == nil {
        return nil, nil, fmt.Errorf("IPAM plugin returned missing IPv4 config")
    }
    return append(vlanIDs, vlanId), append(results, generalResult), err
}

如果使用Galaxy IPAM提供的浮动IP功能,则上述代码会自动获取已经分配的IP信息并返回。

分配/获取IP地址之后,就执行VlanDriver的初始化:

Go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
func (d *VlanDriver) Init() error {
    var (
        device netlink.Link
        err    error
    )
    // 这个驱动需要指定宿主机上的父接口
    if d.Device != "" {
        device, err = netlink.LinkByName(d.Device)
    } else {
        if len(d.Devices) == 0 {
            return fmt.Errorf("a device is needed to use vlan plugin")
        }
        for _, devName := range d.Devices {
            device, err = netlink.LinkByName(devName)
            if err == nil {
                break
            }
        }
    }
    if err != nil {
        return fmt.Errorf("Error getting device %s: %v", d.Device, err)
    }
    // 默认MTU从父接口上查找
    if d.MTU == 0 {
        d.MTU = device.Attrs().MTU
    }
    d.DeviceIndex = device.Attrs().Index
    d.vlanParentIndex = device.Attrs().Index
    // 如果指定的设备是VLAN子接口,则使用其父接口的索引
    if device.Type() == "vlan" {
        //A vlan device
        d.vlanParentIndex = device.Attrs().ParentIndex
        //glog.Infof("root device %s is a vlan device, parent index %d", d.Device, d.vlanParentIndex)
    }
    if d.IPVlanMode() {
        switch d.GetIPVlanMode() {
        case netlink.IPVLAN_MODE_L3S, netlink.IPVLAN_MODE_L3:
            // 允许绑定到非本地地址,什么作用
            return utils.EnableNonlocalBind()
        default:
            return nil
        }
    } else if d.MacVlanMode() {
        return nil
    } else if d.PureMode() {
        if err := d.initPureModeArgs(); err != nil {
            return err
        }
        return utils.EnableNonlocalBind()
    }
    if d.DisableDefaultBridge != nil && *d.DisableDefaultBridge {
        return nil
    }
    // 需要创建默认网桥
    // 获得父接口的IP地址
    v4Addr, err := netlink.AddrList(device, netlink.FAMILY_V4)
    if err != nil {
        return fmt.Errorf("Errror getting ipv4 address %v", err)
    }
    filteredAddr := network.FilterLoopbackAddr(v4Addr)
    if len(filteredAddr) == 0 {
        // 父接口没有IP地址,那么地址应当转给网桥了。检查确保网桥设备存在
        bri, err := netlink.LinkByName(d.DefaultBridgeName)
        if err != nil {
            return fmt.Errorf("Error getting bri device %s: %v", d.DefaultBridgeName, err)
        }
        if bri.Attrs().Index != device.Attrs().MasterIndex {
            return fmt.Errorf("No available address found on device %s", d.Device)
        }
    } else {
        // 否则,意味着网桥应该还不存在,初始化之
        if err := d.initVlanBridgeDevice(device, filteredAddr); err != nil {
            return err
        }
    }
    return nil
}
 
// 不使用MacVlan/IPVlan,则需要依赖网桥
func (d *VlanDriver) initVlanBridgeDevice(device netlink.Link, filteredAddr []netlink.Addr) error {
    // 创建网桥
    bri, err := getOrCreateBridge(d.DefaultBridgeName, device.Attrs().HardwareAddr)
    if err != nil {
        return err
    }
    // 启动网桥
    if err := netlink.LinkSetUp(bri); err != nil {
        return fmt.Errorf("failed to set up bridge device %s: %v", d.DefaultBridgeName, err)
    }
    // 获取父接口路由列表
    rs, err := netlink.RouteList(device, nl.FAMILY_V4)
    if err != nil {
        return fmt.Errorf("failed to list route of device %s", device.Attrs().Name)
    }
    defer func() {
        if err != nil {
            // 如果出现错误,则路由需要加回去
            for i := range rs {
                _ = netlink.RouteAdd(&rs[i])
            }
        }
    }()
    // 将父接口的IP地址、路由,转移到网桥上
    err = d.moveAddrAndRoute(device, bri, filteredAddr, rs)
    if err != nil {
        return err
    }
    return nil
}

可以看到,如果使用MacVlan或者IPVlan模式,则不会去管理宿主机上的网桥(这也是MacVlan/IPVlan的价值之一,不需要网桥)。否则,会创建网桥并转移走父接口的IP地址、路由。

最后,下面的函数负责配置好网络:  

Go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func setupNetwork(result020s []*t020.Result, vlanIds []uint16, args *skel.CmdArgs) error {
    if d.MacVlanMode() {
        // 处理MACVlan模式
        if err := setupMacvlan(result020s[0], vlanIds[0], args); err != nil {
            return err
        }
    } else if d.IPVlanMode() {
        // 处理IPVlan模式
        if err := setupIPVlan(result020s[0], vlanIds[0], args); err != nil {
            return err
        }
    } else {
        // 处理网桥模式
        ifName := args.IfName
        if err := setupVlanDevice(result020s, vlanIds, args); err != nil {
            return err
        }
        args.IfName = ifName
    }
    // 发送一个免费ARP,让交换机知道浮动IP漂移到当前节点上了
    if d.PureMode() {
        _ = utils.SendGratuitousARP(d.Device, result020s[0].IP4.IP.IP.String(), "", d.GratuitousArpRequest)
    }
    return nil
}

如果使用MACVlan,则配置过程如下:

Go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
func setupMacvlan(result *t020.Result, vlanId uint16, args *skel.CmdArgs) error {
    if err := d.MaybeCreateVlanDevice(vlanId); err != nil {
        return err
    }
    // 通过MACVlan连接宿主机和容器
    if err := utils.MacVlanConnectsHostWithContainer(result, args, d.DeviceIndex, d.MTU); err != nil {
        return err
    }
    // 在命名空间内进行宣告
    _ = utils.SendGratuitousARP(args.IfName, result.IP4.IP.IP.String(), args.Netns, d.GratuitousArpRequest)
    return nil
}
 
func MacVlanConnectsHostWithContainer(result *t020.Result, args *skel.CmdArgs, parent int, mtu int) error {
    var err error
    // 创建MACVlan设备
    macVlan := &netlink.Macvlan{
        Mode: netlink.MACVLAN_MODE_BRIDGE,
        LinkAttrs: netlink.LinkAttrs{
            Name:        HostMacVlanName(args.ContainerID),
            MTU:         mtu,
            ParentIndex: parent, // 父设备可能是物理接口或者VLAN子接口
        }}
    // 添加接口
    if err := netlink.LinkAdd(macVlan); err != nil {
        return err
    }
    // 出错时必须接口被删除
    defer func() {
        if err != nil {
            netlink.LinkDel(macVlan)
        }
    }()
    // 配置沙盒,细节参考下文
    if err = configSboxDevice(result, args, macVlan); err != nil {
        return err
    }
    return nil
}

如果使用IPVlan,则配置过程如下: 

Go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
func setupIPVlan(result *t020.Result, vlanId uint16, args *skel.CmdArgs) error {
    // 如有必要,创建VLAN设备
    if err := d.MaybeCreateVlanDevice(vlanId); err != nil {
        return err
    }
    // 连接宿主机和容器网络                                         IPVLAN的父设备可能是
    //                                                            VLAN子接口(位于宿主机命名空间)
    if err := utils.IPVlanConnectsHostWithContainer(result, args, d.DeviceIndex, d.GetIPVlanMode(), d.MTU); err != nil {
        return err
    }
 
    // l3模式下,所有连接在一起的容器之间的相互通信,都要通过宿主机上的IPVlan父接口进行代理
    // 在我们的环境中(部署在OpenStack的虚拟机上),出现Pod无法ping二层网关的情况,需要
    //     arping -c 2 -U -I eth0 <pod-ip>
    // 才可以解决,等价于下面的代码
    // Gratuitous ARP不能保证ARP缓存永久有效,底层交换机(OpenStack虚拟的)需要进行适当的配置,
    // 将Pod IP和VM的MAC地址关联
    if d.IpVlanMode == "l3" || d.IpVlanMode == "l3s" {
        _ = utils.SendGratuitousARP(d.Device, result.IP4.IP.IP.String(), "", d.GratuitousArpRequest)
        return nil
    }
    // 在网络命名空间中进行宣告
    _ = utils.SendGratuitousARP(args.IfName, result.IP4.IP.IP.String(), args.Netns, d.GratuitousArpRequest)
    return nil
}
 
 
// 创建VLAN设备的细节
func (d *VlanDriver) MaybeCreateVlanDevice(vlanId uint16) error {
    // 如果不启用VLAN支持,则不做任何事情
    if vlanId == 0 {
        return nil
    }
    _, err := d.getOrCreateVlanDevice(vlanId)
    return err
}
func (d *VlanDriver) getOrCreateVlanDevice(vlanId uint16) (netlink.Link, error) {
    // 根据父接口和VLAN ID查找已存在的VLAN设备
    link, err := d.getVlanIfExist(vlanId)
    if err != nil || link != nil {
        if link != nil {
            // 设置VLAN设备索引
            d.DeviceIndex = link.Attrs().Index
        }
        return link, err
    }
    vlanIfName := fmt.Sprintf("%s%d", d.VlanNamePrefix, vlanId)
    // 获取VLAN设备,如果获取不到则通过后面的回调函数创建
    vlan, err := getOrCreateDevice(vlanIfName, func(name string) error {
        // 创建设备                                                 名字 vlan100 父接口索引
        vlanIf := &netlink.Vlan{LinkAttrs: netlink.LinkAttrs{Name: vlanIfName, ParentIndex: d.vlanParentIndex},
            // VLAN号
            VlanId: (int)(vlanId)}
        // 添加设备
        if err := netlink.LinkAdd(vlanIf); err != nil {
            return fmt.Errorf("Failed to add vlan device %s: %v", vlanIfName, err)
        }
        return nil
    })
    if err != nil {
        return nil, err
    }
    // 设置为UP状态
    if err := netlink.LinkSetUp(vlan); err != nil {
        return nil, fmt.Errorf("Failed to set up vlan device %s: %v", vlanIfName, err)
    }
    // 更新VLAN子接口索引号
    d.DeviceIndex = vlan.Attrs().Index
    return vlan, nil
}
func (d *VlanDriver) getVlanIfExist(vlanId uint16) (netlink.Link, error) {
    links, err := netlink.LinkList()
    if err != nil {
        return nil, err
    }
    for _, link := range links {
        // 遍历网络接口列表,查找VLAN类型的、VLAN ID匹配的、父接口匹配的
        // 隐含意味着对于每个VLAN,只需要一个子接口,所有连接到此VLAN的容器使用它
        if link.Type() == "vlan" {
            if vlan, ok := link.(*netlink.Vlan); !ok {
                return nil, fmt.Errorf("vlan device type case error: %T", link)
            } else {
                if vlan.VlanId == int(vlanId) && vlan.ParentIndex == d.vlanParentIndex {
                    return link, nil
                }
            }
        }
    }
    return nil, nil
}
 
 
// 通过IPVLAN两宿主机和容器连接起来的细节
// 配置网络沙盒
func configSboxDevice(result *t020.Result, args *skel.CmdArgs, sbox netlink.Link) error {
    // 配置MAC地址之前,应该将接口DOWN掉
    if err := netlink.LinkSetDown(sbox); err != nil {
        return fmt.Errorf("could not set link down for container interface %q: %v", sbox.Attrs().Name, err)
    }
    if sbox.Type() != "ipvlan" {
        // IPVlan不需要设置MAC地址,因为必须和父接口一致
        if err := netlink.LinkSetHardwareAddr(sbox, GenerateMACFromIP(result.IP4.IP.IP)); err != nil {
            return fmt.Errorf("could not set mac address for container interface %q: %v", sbox.Attrs().Name, err)
        }
    }
    // 获得容器网络命名空间
    netns, err := ns.GetNS(args.Netns)
    if err != nil {
        return fmt.Errorf("failed to open netns %q: %v", args.Netns, err)
    }
    // 总是要关闭命名空间
    defer netns.Close() // nolint: errcheck
    // 移动到网络命名空间中
    if err = netlink.LinkSetNsFd(sbox, int(netns.Fd())); err != nil {
        return fmt.Errorf("failed to move sbox device %q to netns: %v", sbox.Attrs().Name, err)
    }
    // 在网络命名空间中配置
    return netns.Do(func(_ ns.NetNS) error {
        // 改名,IPVLAN接口直接变为容器的eth0,就像VETH对的一端那样
        if err := netlink.LinkSetName(sbox, args.IfName); err != nil {
            return fmt.Errorf("failed to rename sbox device %q to %q: %v", sbox.Attrs().Name, args.IfName, err)
        }
        // 如果容器、宿主机上有多个网络设备,则需要禁用rp_filter(反向路径过滤)
        // 因为从宿主机来的ARP请求(广播报文),可能使用不确定的源IP,如果启用反向路径过滤,封包可能被丢弃
        if err := DisableRpFilter(args.IfName); err != nil {
            return fmt.Errorf("failed disable rp_filter to dev %s: %v", args.IfName, err)
        }
        // 禁用所有接口的rp_filter
        if err := DisableRpFilter("all"); err != nil {
            return fmt.Errorf("failed disable rp_filter to all: %v", err)
        }
        // 配置容器网络接口:将IP地址、路由添加到容器网络接口
        return cniutil.ConfigureIface(args.IfName, result)
    })
}
func ConfigureIface(ifName string, res *t020.Result) error {
    // 网络接口应该已经存在
    link, err := netlink.LinkByName(ifName)
    if err != nil {
        return fmt.Errorf("failed to lookup %q: %v", ifName, err)
    }
    // 设置为UP状态
    if err := netlink.LinkSetUp(link); err != nil {
        return fmt.Errorf("failed to set %q UP: %v", ifName, err)
    }
 
    // TODO(eyakubovich): IPv6
    addr := &netlink.Addr{IPNet: &res.IP4.IP, Label: ""}
    // 添加IP地址
    if err = netlink.AddrAdd(link, addr); err != nil {
        return fmt.Errorf("failed to add IP addr to %q: %v", ifName, err)
    }
 
    // 遍历并应用IPV4路由
    for _, r := range res.IP4.Routes {
        gw := r.GW
        if gw == nil {
            gw = res.IP4.Gateway
        }
        if err = ip.AddRoute(&r.Dst, gw, link); err != nil {
            // we skip over duplicate routes as we assume the first one wins
            if !os.IsExist(err) {
                return fmt.Errorf("failed to add route '%v via %v dev %v': %v", r.Dst, gw, ifName, err)
            }
        }
    }
    return nil
}

DEL命令比较简单:

  1. 删除容器网络命名空间中所有VETH对
  2. 释放IP,就是调用IPAM插件,如果没有插件则什么都不做。Galaxy IPAM不会被调用,它总是自己负责IP地址的回收
galaxy-veth-host

该插件用于解决使用IPVlan L2模式下Pod访问不了宿主机IP、Service IP的问题。使用该插件,需要配置Galaxy:

JSON
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
{
  "NetworkConf":[
    {"name":"tke-route-eni","type":"tke-route-eni","eni":"eth1","routeTable":1},
    {"name":"galaxy-flannel","type":"galaxy-flannel", "delegate":{"type":"galaxy-veth"},"subnetFile":"/run/flannel/subnet.env"},
    {"name":"galaxy-k8s-vlan","type":"galaxy-k8s-vlan", "device":"eth0", "switch":"ipvlan", "ipvlan_mode":"l2", "mtu": 1500},
    {"name":"galaxy-k8s-sriov","type": "galaxy-k8s-sriov", "device": "eth0", "vf_num": 10},
    // 新增
    {
        "name":"galaxy-veth-host","type": "galaxy-veth-host",
        // 这里配置K8S的服务CIDR         指明宿主机网卡
        "serviceCidr": "11.1.252.0/22", "hostInterface": "eth0",
        // 容器中此插件创建的网卡名称     是否进行SNAT
        "containerInterface": "veth0", "ipMasq": true
    }
  ],
  "DefaultNetworks": ["galaxy-flannel"]
}

Pod注解需要使用两个网络:

YAML
1
2
annotations:
  k8s.v1.cni.cncf.io/networks: "galaxy-k8s-vlan,galaxy-veth-host"

代码解读: 

Go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
package main
 
import (
    "encoding/json"
    "fmt"
    "math"
    "math/rand"
    "net"
    "os"
    "runtime"
    "sort"
    "strconv"
    "time"
 
    "github.com/containernetworking/cni/pkg/skel"
    "github.com/containernetworking/cni/pkg/types"
    "github.com/containernetworking/cni/pkg/types/current"
    "github.com/containernetworking/cni/pkg/version"
    "github.com/containernetworking/plugins/pkg/ip"
    "github.com/containernetworking/plugins/pkg/ns"
    "github.com/containernetworking/plugins/pkg/utils"
    "github.com/containernetworking/plugins/pkg/utils/sysctl"
    "github.com/coreos/go-iptables/iptables"
    "github.com/j-keck/arping"
    "github.com/vishvananda/netlink"
)
 
// constants for full jitter backoff in milliseconds, and for nodeport marks
const (
    maxSleep             = 10000 // 10.00s
    baseSleep            = 20    //  0.02
    RPFilterTemplate     = "net.ipv4.conf.%s.rp_filter"
    podRulePriority      = 1024
    nodePortRulePriority = 512
)
 
func init() {
    // this ensures that main runs only on main thread (thread group leader).
    // since namespace ops (unshare, setns) are done for a single thread, we
    // must ensure that the goroutine does not jump from OS thread to thread
    runtime.LockOSThread()
}
 
// PluginConf is whatever you expect your configuration json to be. This is whatever
// is passed in on stdin. Your plugin may wish to expose its functionality via
// runtime args, see CONVENTIONS.md in the CNI spec.
type PluginConf struct {
    types.NetConf
 
    // This is the previous result, when called in the context of a chained
    // plugin. Because this plugin supports multiple versions, we'll have to
    // parse this in two passes. If your plugin is not chained, this can be
    // removed (though you may wish to error if a non-chainable plugin is
    // chained.
    // If you need to modify the result before returning it, you will need
    // to actually convert it to a concrete versioned struct.
    RawPrevResult *map[string]interface{} `json:"prevResult"`
    PrevResult    *current.Result         `json:"-"`
 
    IPMasq             bool   `json:"ipMasq"`
    HostInterface      string `json:"hostInterface"`
    ServiceCidr        string `json:"serviceCidr"`
    ContainerInterface string `json:"containerInterface"`
    MTU                int    `json:"mtu"`
    TableStart         int    `json:"routeTableStart"`
    NodePortMark       int    `json:"nodePortMark"`
    NodePorts          string `json:"nodePorts"`
}
 
// parseConfig parses the supplied configuration (and prevResult) from stdin.
func parseConfig(stdin []byte) (*PluginConf, error) {
    conf := PluginConf{}
 
    if err := json.Unmarshal(stdin, &conf); err != nil {
        return nil, fmt.Errorf("failed to parse network configuration: %v", err)
    }
 
    // Parse previous result.
    if conf.RawPrevResult != nil {
        resultBytes, err := json.Marshal(conf.RawPrevResult)
        if err != nil {
            return nil, fmt.Errorf("could not serialize prevResult: %v", err)
        }
        res, err := version.NewResult(conf.CNIVersion, resultBytes)
        if err != nil {
            return nil, fmt.Errorf("could not parse prevResult: %v", err)
        }
        conf.RawPrevResult = nil
        conf.PrevResult, err = current.NewResultFromResult(res)
        if err != nil {
            return nil, fmt.Errorf("could not convert result to current version: %v", err)
        }
    }
    // End previous result parsing
 
    if conf.HostInterface == "" {
        return nil, fmt.Errorf("hostInterface must be specified")
    }
 
    if conf.ContainerInterface == "" {
        return nil, fmt.Errorf("containerInterface must be specified")
    }
 
    if conf.NodePorts == "" {
        conf.NodePorts = "30000:32767"
    }
 
    if conf.NodePortMark == 0 {
        conf.NodePortMark = 0x2000
    }
 
    // start using tables by default at 256
    if conf.TableStart == 0 {
        conf.TableStart = 256
    }
 
    return &conf, nil
}
 
func cmdAdd(args *skel.CmdArgs) error {
    conf, err := parseConfig(args.StdinData)
    if err != nil {
        return err
    }
 
    // 必须作为插件链调用,并且有前置插件
    if conf.PrevResult == nil {
        return fmt.Errorf("must be called as chained plugin")
    }
 
    // 从前序插件的结果中得到容器IP列表
    containerIPs := make([]net.IP, 0, len(conf.PrevResult.IPs))
    if conf.CNIVersion != "0.3.0" && conf.CNIVersion != "0.3.1" {
        for _, ip := range conf.PrevResult.IPs {
            containerIPs = append(containerIPs, ip.Address.IP)
        }
    } else {
        for _, ip := range conf.PrevResult.IPs {
            if ip.Interface == nil {
                continue
            }
            intIdx := *ip.Interface
            if intIdx >= 0 && intIdx < len(conf.PrevResult.Interfaces) && conf.PrevResult.Interfaces[intIdx].Name != args.IfName {
                continue
            }
            containerIPs = append(containerIPs, ip.Address.IP)
        }
    }
    if len(containerIPs) == 0 {
        return fmt.Errorf("got no container IPs")
    }
    // 得到宿主机网络接口
    iface, err := netlink.LinkByName(conf.HostInterface)
    if err != nil {
        return fmt.Errorf("failed to lookup %q: %v", conf.HostInterface, err)
    }
    // 得到宿主机网络接口的地址列表
    hostAddrs, err := netlink.AddrList(iface, netlink.FAMILY_ALL)
    if err != nil || len(hostAddrs) == 0 {
        return fmt.Errorf("failed to get host IP addresses for %q: %v", iface, err)
    }
 
    netns, err := ns.GetNS(args.Netns)
    if err != nil {
        return fmt.Errorf("failed to open netns %q: %v", args.Netns, err)
    }
    defer netns.Close()
 
    containerIPV4 := false
    containerIPV6 := false
    for _, ipc := range containerIPs {
        if ipc.To4() != nil {
            containerIPV4 = true
        } else {
            containerIPV6 = true
        }
    }
    // 创建VETH对,并且在容器端进行配置:
    // 1. 如果启用了MASQ,则对出口是args.IfName的流量进行SNAT
    // 2. 为宿主机所有IP添加路由,走veth
    // 3. 为K8S服务IP添加旅游,则veth,网关设置为第一个宿主机IP
    hostInterface, _, err := setupContainerVeth(netns, conf.ServiceCidr, conf.ContainerInterface, conf.MTU,
        hostAddrs, conf.IPMasq, containerIPV4, containerIPV6, args.IfName, conf.PrevResult)
    if err != nil {
        return err
    }
    // 配置容器的宿主端:
    if err = setupHostVeth(hostInterface.Name, hostAddrs, conf.IPMasq, conf.TableStart, conf.PrevResult); err != nil {
        return err
    }
 
    if conf.IPMasq {
        // 在宿主端启用IP转发
        err := enableForwarding(containerIPV4, containerIPV6)
        if err != nil {
            return err
        }
 
        chain := utils.FormatChainName(conf.Name, args.ContainerID)
        comment := utils.FormatComment(conf.Name, args.ContainerID)
        for _, ipc := range containerIPs {
            addrBits := 128
            if ipc.To4() != nil {
                addrBits = 32
            }
            // 对来自容器IP的流量进行SNAT
            if err = ip.SetupIPMasq(&net.IPNet{IP: ipc, Mask: net.CIDRMask(addrBits, addrBits)}, chain, comment); err != nil {
                return err
            }
        }
    }
 
    // 配置NodePort相关的iptables规则
    if err = setupNodePortRule(conf.HostInterface, conf.NodePorts, conf.NodePortMark); err != nil {
        return err
    }
 
    // Pass through the result for the next plugin
    return types.PrintResult(conf.PrevResult, conf.CNIVersion)
}
 
func setupContainerVeth(netns ns.NetNS, serviceCidr string, ifName string, mtu int,
    hostAddrs []netlink.Addr, masq, containerIPV4, containerIPV6 bool, k8sIfName string,
    pr *current.Result) (*current.Interface, *current.Interface, error) {
    hostInterface := &current.Interface{}
    containerInterface := &current.Interface{}
    // 在容器网络命名空间(netns)中执行
    err := netns.Do(func(hostNS ns.NetNS) error {
        // 创建VETH对,一端自动放入hostNS(这个变量netns.Do函数自动提供,为初始网络命名空间)
        hostVeth, contVeth0, err := ip.SetupVeth(ifName, mtu, hostNS)
        if err != nil {
            return err
        }
        hostInterface.Name = hostVeth.Name
        hostInterface.Mac = hostVeth.HardwareAddr.String()
        containerInterface.Name = contVeth0.Name
        // ip.SetupVeth函数不会获取VETH对的peer(第二个接口)的MAC地址,因此这里需要执行查询
        containerNetlinkIface, _ := netlink.LinkByName(contVeth0.Name)
        containerInterface.Mac = containerNetlinkIface.Attrs().HardwareAddr.String()
        containerInterface.Sandbox = netns.Path()
 
        // 本次CNI调用产生的两个网络接口,纳入到Result
        pr.Interfaces = append(pr.Interfaces, hostInterface, containerInterface)
 
        // 后面只是用到index属性,用上面Link对象不行么?
        contVeth, err := net.InterfaceByName(ifName)
        if err != nil {
            return fmt.Errorf("failed to look up %q: %v", ifName, err)
        }
 
        if masq {
            // 支持IPv4/IPv6的IP转发
            err := enableForwarding(containerIPV4, containerIPV6)
            if err != nil {
                return err
            }
 
            // 如果出口网卡是k8sIfName(容器主接口,通常是eth0,kubelet调用galaxy-sdn时提供)
            // ,则进行源地址转换
            err = setupSNAT(k8sIfName, "kube-proxy SNAT")
            if err != nil {
                return fmt.Errorf("failed to enable SNAT on %q: %v", k8sIfName, err)
            }
        }
 
        // 为宿主机的每个网络接口的地址添加路由条目
        for _, ipc := range hostAddrs {
            addrBits := 128
            if ipc.IP.To4() != nil {
                addrBits = 32
            }
            // 这些地址的出口网卡都设置为
            err := netlink.RouteAdd(&netlink.Route{
                LinkIndex: contVeth.Index, // 新添加的VETH
                Scope:     netlink.SCOPE_LINK,
                Dst: &net.IPNet{
                    IP:   ipc.IP,
                    Mask: net.CIDRMask(addrBits, addrBits),
                },
            })
 
            if err != nil {
                return fmt.Errorf("failed to add host route dst %v: %v", ipc.IP, err)
            }
        }
 
        _, serviceNet, err := net.ParseCIDR(serviceCidr)
        if err != nil {
            return fmt.Errorf("failed to parse service cidr :%v", err)
        }
 
        // 将K8S服务网段的路由设置为:出口网卡新添加的VETH,网关为宿主机第一个IP
        err = netlink.RouteAdd(&netlink.Route{
            LinkIndex: contVeth.Index,
            Scope:     netlink.SCOPE_UNIVERSE,
            Dst:       serviceNet,
            // 封包发给宿主机第一个IP/网络接口处理。VETH的宿主端没有连接到什么网络接口
            // 这些网络接口可以作为下一跳,因为和VETH宿主端属于同一网络命名空间
            Gw: hostAddrs[0].IP,
        })
        if err != nil {
            return fmt.Errorf("failed to add service cidr route %v: %v", hostAddrs[0].IP, err)
        }
 
        // 为所有IPv4地址发送免费ARP
        for _, ipc := range pr.IPs {
            if ipc.Version == "4" {
                _ = arping.GratuitousArpOverIface(ipc.Address.IP, *contVeth)
            }
        }
 
        return nil
    })
    if err != nil {
        return nil, nil, err
    }
    return hostInterface, containerInterface, nil
}
 
func enableForwarding(ipv4 bool, ipv6 bool) error {
    if ipv4 {
        err := ip.EnableIP4Forward()
        if err != nil {
            return fmt.Errorf("Could not enable IPv6 forwarding: %v", err)
        }
    }
    if ipv6 {
        err := ip.EnableIP6Forward()
        if err != nil {
            return fmt.Errorf("Could not enable IPv6 forwarding: %v", err)
        }
    }
    return nil
}
 
func setupSNAT(ifName string, comment string) error {
    ipt, err := iptables.NewWithProtocol(iptables.ProtocolIPv4)
    if err != nil {
        return fmt.Errorf("failed to locate iptables: %v", err)
    }
    rulespec := []string{"-o", ifName, "-j", "MASQUERADE"}
    //if ipt.HasRandomFully() {
    //    rulespec = append(rulespec, "--random-fully")
    //}
    rulespec = append(rulespec, "-m", "comment", "--comment", comment)
    return ipt.AppendUnique("nat", "POSTROUTING", rulespec...)
}
 
func setupHostVeth(vethName string, hostAddrs []netlink.Addr, masq bool, tableStart int, result *current.Result) error {
    // no IPs to route
    if len(result.IPs) == 0 {
        return nil
    }
 
    // lookup by name as interface ids might have changed
    veth, err := net.InterfaceByName(vethName)
    if err != nil {
        return fmt.Errorf("failed to lookup %q: %v", vethName, err)
    }
 
    // 对于所有容器IP,出口设置为VETH
    for _, ipc := range result.IPs {
        addrBits := 128
        if ipc.Address.IP.To4() != nil {
            addrBits = 32
        }
 
        err := netlink.RouteAdd(&netlink.Route{
            LinkIndex: veth.Index,
            Scope:     netlink.SCOPE_LINK,
            Dst: &net.IPNet{
                IP:   ipc.Address.IP,
                Mask: net.CIDRMask(addrBits, addrBits),
            },
        })
 
        if err != nil {
            return fmt.Errorf("failed to add host route dst %v: %v", ipc.Address.IP, err)
        }
    }
 
    // 为来自Pod的,目的地址是VPC的配置策略路由
    err = addPolicyRules(veth, result.IPs[0], result.Routes, tableStart)
    if err != nil {
        return fmt.Errorf("failed to add policy rules: %v", err)
    }
 
    // 为所有宿主机端的IP发送免费ARP
    for _, ipc := range hostAddrs {
        if ipc.IP.To4() != nil {
            _ = arping.GratuitousArpOverIface(ipc.IP, *veth)
        }
    }
 
    return nil
}
 
func addPolicyRules(veth *net.Interface, ipc *current.IPConfig, routes []*types.Route, tableStart int) error {
    table := -1
 
    // 对路由,来自先前插件调用的结果,进行排序
    sort.Slice(routes, func(i, j int) bool {
        return routes[i].Dst.String() < routes[j].Dst.String()
    })
 
    // 尝试最多10次,向空表(table slot)写入路由
    for i := 0; i < 10 && table == -1; i++ {
        var err error
        // 寻找空白路由表
        table, err = findFreeTable(tableStart + rand.Intn(1000))
        if err != nil {
            return err
        }
 
        // 将所有路由添加到路由表
        for _, route := range routes {
            err := netlink.RouteAdd(&netlink.Route{
                LinkIndex: veth.Index,     // 出口设置为宿主机VETH
                Dst:       &route.Dst,     // 目标是先前CNI调用发现的路由,这些路由可能来自云提供商(VPC)
                Gw:        ipc.Address.IP, // 将Result的第一个IP作为网关
                Table:     table,          // 写在这个表中
            })
            if err != nil {
                table = -1
                break
            }
        }
 
        if table == -1 {
            // failed to add routes so sleep and try again on a different table
            wait := time.Duration(rand.Intn(int(math.Min(maxSleep,
                baseSleep*math.Pow(2, float64(i)))))) * time.Millisecond
            fmt.Fprintf(os.Stderr, "route table collision, retrying in %v\n", wait)
            time.Sleep(wait)
        }
    }
 
    // ensure we have a route table selected
    if table == -1 {
        return fmt.Errorf("failed to add routes to a free table")
    }
 
    // 创建路由策略
    rule := netlink.NewRule()
    // 如果流量来自VETH宿主端,也就是来自Pod
    rule.IifName = veth.Name
    rule.Table = table
    rule.Priority = podRulePriority
 
    err := netlink.RuleAdd(rule)
    if err != nil {
        return fmt.Errorf("failed to add policy rule %v: %v", rule, err)
    }
 
    return nil
}
 
func findFreeTable(start int) (int, error) {
    allocatedTableIDs := make(map[int]bool)
    // 遍历所有IPv4/IPv6的路由规则
    for _, family := range []int{netlink.FAMILY_V4, netlink.FAMILY_V6} {
        rules, err := netlink.RuleList(family)
        if err != nil {
            return -1, err
        }
        for _, rule := range rules {
            // 收集所有已经占用的table slot
            allocatedTableIDs[rule.Table] = true
        }
    }
    // 寻找第一个空白的slot
    for i := start; i < math.MaxUint32; i++ {
        if !allocatedTableIDs[i] {
            return i, nil
        }
    }
    return -1, fmt.Errorf("failed to find free route table")
}
 
func setupNodePortRule(ifName string, nodePorts string, nodePortMark int) error {
    ipt, err := iptables.NewWithProtocol(iptables.ProtocolIPv4)
    if err != nil {
        return fmt.Errorf("failed to locate iptables: %v", err)
    }
 
    // 确保NodePort流量被正确的标记
    if err := ipt.AppendUnique("mangle", "PREROUTING", "-i", ifName, "-p", "tcp",
        "--dport", nodePorts, "-j", "CONNMARK", "--set-mark", strconv.Itoa(nodePortMark),
        "-m", "comment", "--comment", "NodePort Mark"); err != nil {
        return err
    }
    if err := ipt.AppendUnique("mangle", "PREROUTING", "-i", ifName, "-p", "udp",
        "--dport", nodePorts, "-j", "CONNMARK", "--set-mark", strconv.Itoa(nodePortMark),
        "-m", "comment", "--comment", "NodePort Mark"); err != nil {
        return err
    }
    if err := ipt.AppendUnique("mangle", "PREROUTING", "-i", "veth+", "-j", "CONNMARK",
        "--restore-mark", "-m", "comment", "--comment", "NodePort Mark"); err != nil {
        return err
    }
 
    // 在宿主机网络接口上启用非严格的RP filter
    _, err = sysctl.Sysctl(fmt.Sprintf(RPFilterTemplate, ifName), "2")
    if err != nil {
        return fmt.Errorf("failed to set RP filter to loose for interface %q: %v", ifName, err)
    }
 
    // 对于标记为NodePort的流量,添加策略路由
    rule := netlink.NewRule()
    rule.Mark = nodePortMark
    rule.Table = 254 // main table
    rule.Priority = nodePortRulePriority
 
    exists := false
    rules, err := netlink.RuleList(netlink.FAMILY_V4)
    if err != nil {
        return fmt.Errorf("Unable to retrive IP rules %v", err)
    }
 
    for _, r := range rules {
        if r.Table == rule.Table && r.Mark == rule.Mark && r.Priority == rule.Priority {
            exists = true
            break
        }
    }
    if !exists {
        err := netlink.RuleAdd(rule)
        if err != nil {
            return fmt.Errorf("failed to add policy rule %v: %v", rule, err)
        }
    }
 
    return nil
}
 
// cmdDel is called for DELETE requests
func cmdDel(args *skel.CmdArgs) error {
    conf, err := parseConfig(args.StdinData)
    if err != nil {
        return err
    }
 
    if args.Netns == "" {
        return nil
    }
 
    // 网络命名空间存在,进行清理
    var ipnets []netlink.Addr
    vethPeerIndex := -1
    _ = ns.WithNetNSPath(args.Netns, func(_ ns.NetNS) error {
        var err error
 
        if conf.IPMasq {
            iface, err := netlink.LinkByName(args.IfName)
            if err != nil {
                if err.Error() == "Link not found" {
                    return ip.ErrLinkNotFound
                }
                return fmt.Errorf("failed to lookup %q: %v", args.IfName, err)
            }
            // 从args.IfName(通常是eth0)获取IP地址列表
            ipnets, err = netlink.AddrList(iface, netlink.FAMILY_ALL)
            if err != nil || len(ipnets) == 0 {
                return fmt.Errorf("failed to get IP addresses for %q: %v", args.IfName, err)
            }
        }
 
        vethIface, err := netlink.LinkByName(conf.ContainerInterface)
        if err != nil && err != ip.ErrLinkNotFound {
            return err
        }
        vethPeerIndex, _ = netlink.VethPeerIndex(&netlink.Veth{LinkAttrs: *vethIface.Attrs()})
        return nil
    })
 
    if conf.IPMasq {
        chain := utils.FormatChainName(conf.Name, args.ContainerID)
        comment := utils.FormatComment(conf.Name, args.ContainerID)
        for _, ipn := range ipnets {
            addrBits := 128
            if ipn.IP.To4() != nil {
                addrBits = 32
            }
 
            _ = ip.TeardownIPMasq(&net.IPNet{IP: ipn.IP, Mask: net.CIDRMask(addrBits, addrBits)}, chain, comment)
        }
 
        if vethPeerIndex != -1 {
            link, err := netlink.LinkByIndex(vethPeerIndex)
            if err != nil {
                return nil
            }
 
            rule := netlink.NewRule()
            rule.IifName = link.Attrs().Name
            // ignore errors as we might be called multiple times
            _ = netlink.RuleDel(rule)
            _ = netlink.LinkDel(link)
        }
    }
 
    return nil
}
 
func main() {
    rand.Seed(time.Now().UnixNano())
    skel.PluginMain(cmdAdd, cmdDel, version.All)
}&nbsp;
Galaxy守护进程
初始化

这个守护进程由DaemonSet galaxy提供,在所有K8S节点上运行,入口点如下:

cmd/galaxy/galaxy.go
Go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package main
 
import (
    "math/rand"
    "time"
 
    "github.com/spf13/pflag"
    "k8s.io/component-base/cli/flag"
    "k8s.io/component-base/logs"
    glog "k8s.io/klog"
    "tkestack.io/galaxy/pkg/galaxy"
    "tkestack.io/galaxy/pkg/signal"
    "tkestack.io/galaxy/pkg/utils/ldflags"
)
 
func main() {
    // 随机化
    rand.Seed(time.Now().UTC().UnixNano())
    // 创建Galaxy结构
    galaxy := galaxy.NewGalaxy()
    galaxy.AddFlags(pflag.CommandLine)
    flag.InitFlags()
    logs.InitLogs()
    defer logs.FlushLogs()
 
    ldflags.PrintAndExitIfRequested()
    // 启动Galaxy
    if err := galaxy.Start(); err != nil {
        glog.Fatalf("Error start galaxy: %v", err)
    }
    // 等待信号,并停止Galaxy
    signal.BlockSignalHandler(func() {
        if err := galaxy.Stop(); err != nil {
            glog.Errorf("Error stop galaxy: %v", err)
        }
    })
}

Galaxy结构的规格如下:

Go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
type Galaxy struct {
    // 对应Galaxy主配置文件
    JsonConf
    // 对应命令行选项
    *options.ServerRunOptions
    quitChan  chan struct{}
    dockerCli *docker.DockerInterface
    netConf   map[string]map[string]interface{}
    // 负责处理端口映射
    pmhandler *portmapping.PortMappingHandler
    client    kubernetes.Interface
    pm        *policy.PolicyManager
}
 
type JsonConf struct {
    NetworkConf     []map[string]interface{}
    DefaultNetworks []string
    ENIIPNetwork string
}

初始化过程:

Go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
func NewGalaxy() *Galaxy {
    g := &Galaxy{
        ServerRunOptions: options.NewServerRunOptions(),
        quitChan:         make(chan struct{}),
        netConf:          map[string]map[string]interface{}{},
    }
    return g
}
 
func (g *Galaxy) Init() error {
    if g.JsonConfigPath == "" {
        return fmt.Errorf("json config is required")
    }
    data, err := ioutil.ReadFile(g.JsonConfigPath)
    if err != nil {
        return fmt.Errorf("read json config: %v", err)
    }
    // 解析主配置文件
    if err := json.Unmarshal(data, &g.JsonConf); err != nil {
        return fmt.Errorf("bad config %s: %v", string(data), err)
    }
    glog.Infof("Json Config: %s", string(data))
    // 配置合法性的简单校验
    if err := g.checkNetworkConf(); err != nil {
        return err
    }
    // 需要访问本机Docker
    dockerClient, err := docker.NewDockerInterface()
    if err != nil {
        return err
    }
    g.dockerCli = dockerClient
    g.pmhandler = portmapping.New("")
    return nil
}

启动过程:

Go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
func (g *Galaxy) Start() error {
    // 初始化Galaxy配置
    if err := g.Init(); err != nil {
        return err
    }
    // 创建K8S客户端
    g.initk8sClient()
    // 启动Flannel垃圾回收器
    gc.NewFlannelGC(g.dockerCli, g.quitChan, g.cleanIPtables).Run()
    // 启用或禁用bridge-nf-call-iptables,此参数可以控制网桥转发的封包被不被iptables过滤
    kernel.BridgeNFCallIptables(g.quitChan, g.BridgeNFCallIptables)
    // 配置iptables转发
    kernel.IPForward(g.quitChan, g.IPForward)
    // 启动时,遍历节点上已有的Pod,对于需要进行端口映射的,执行端口映射
    if err := g.setupIPtables(); err != nil {
        return err
    }
    if g.NetworkPolicy {
        g.pm = policy.New(g.client, g.quitChan)
        go wait.Until(g.pm.Run, 3*time.Minute, g.quitChan)
    }
    if g.RouteENI {
        // 使用腾讯云ENI需要关闭反向路径过滤
        kernel.DisableRPFilter(g.quitChan)
        eni.SetupENIs(g.quitChan)
    }
    // 在UDS上监听、启动HTTP服务器、注册路由
    return g.StartServer()
}
CNI请求处理

上述代码结尾的g.StartServer()会调用下面的方法注册路由:

Go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
func (g *Galaxy) installHandlers() {
    ws := new(restful.WebService)
    ws.Route(ws.GET("/cni").To(g.cni))
    ws.Route(ws.POST("/cni").To(g.cni))
    restful.Add(ws)
}
 
// 处理CNI插件转发来的CNI请求
func (g *Galaxy) cni(r *restful.Request, w *restful.Response) {
    data, err := ioutil.ReadAll(r.Request.Body)
    if err != nil {
        glog.Warningf("bad request %v", err)
        http.Error(w, fmt.Sprintf("err read body %v", err), http.StatusBadRequest)
        return
    }
    defer r.Request.Body.Close() // nolint: errcheck
    // 将请求转换为CNIRequest,然后转换为PodRequest
    req, err := galaxyapi.CniRequestToPodRequest(data)
    if err != nil {
        glog.Warningf("bad request %v", err)
        http.Error(w, fmt.Sprintf("%v", err), http.StatusBadRequest)
        return
    }
    req.Path = strings.TrimRight(fmt.Sprintf("%s:%s", req.Path, strings.Join(g.CNIPaths, ":")), ":")
    // 处理CNI请求
    result, err := g.requestFunc(req)
    if err != nil {
        http.Error(w, fmt.Sprintf("%v", err), http.StatusInternalServerError)
    } else {
        // Empty response JSON means success with no body
        w.Header().Set("Content-Type", "application/json")
        if _, err := w.Write(result); err != nil {
            glog.Warningf("Error writing %s HTTP response: %v", req.Command, err)
        }
    }
}

当CNI插件galaxy-sdn调用Galaxy守护进程的/cni端点时,Galaxy会将请求从:

Go
1
2
3
4
5
6
7
// Request sent to the Galaxy by the Galaxy SDN CNI plugin
type CNIRequest struct {
    // CNI environment variables, like CNI_COMMAND and CNI_NETNS
    Env map[string]string `json:"env,omitempty"`
    // CNI configuration passed via stdin to the CNI plugin
    Config []byte `json:"config,omitempty"`
}

转换为PodRequest:

Go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
type PodRequest struct {
    // 需要执行的CNI命令
    Command string
    // 来自环境变量CNI_ARGS
    PodNamespace string
    // 来自环境变量CNI_ARGS
    PodName string
    // kubernetes pod ports
    Ports []k8s.Port
    // 存放操作结果的通道
    Result chan *PodResult
    // 通过环境变量传来的CNI_IFNAME、CNI_COMMAND、CNI_ARGS...
    *skel.CmdArgs
    // Galaxy需要委托其它CNI插件完成工作,这是传递给那些插件的参数
    //                 插件类型    参数名  参数值
    ExtendedCNIArgs map[string]map[string]json.RawMessage
}
 
// 请求处理结果
type PodResult struct {
    Response []byte
    Err error
}

然后调用requestFunc方法处理转换后的CNI请求:

Go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
func (g *Galaxy) requestFunc(req *galaxyapi.PodRequest) (data []byte, err error) {
    start := time.Now()
    glog.Infof("%v, %s+", req, start.Format(time.StampMicro))
    // 处理ADD命令
    if req.Command == cniutil.COMMAND_ADD {
        defer func() {
            glog.Infof("%v, data %s, err %v, %s-", req, string(data), err, start.Format(time.StampMicro))
        }()
        var pod *corev1.Pod
        // 查找Pod
        pod, err = g.getPod(req.PodName, req.PodNamespace)
        if err != nil {
            return
        }
        result, err1 := g.cmdAdd(req, pod)
        if err1 != nil {
            err = err1
            return
        } else {
            // 转换结果格式
            result020, err2 := convertResult(result)
            if err2 != nil {
                err = err2
            } else {
                data, err = json.Marshal(result)
                if err != nil {
                    return
                }
                // 如果处理成功,则执行端口映射,回顾一下启动的时候会对所有本节点现存的Pod进行端口映射
                err = g.setupPortMapping(req, req.ContainerID, result020, pod)
                if err != nil {
                    g.cleanupPortMapping(req)
                    return
                }
                pod.Status.PodIP = result020.IP4.IP.IP.String()
                // 处理网络策略
                if g.pm != nil {
                    if err := g.pm.SyncPodChains(pod); err != nil {
                        glog.Warning(err)
                    }
                    g.pm.SyncPodIPInIPSet(pod, true)
                }
            }
        }
    } else if req.Command == cniutil.COMMAND_DEL {
        defer glog.Infof("%v err %v, %s-", req, err, start.Format(time.StampMicro))
        err = cniutil.CmdDel(req.CmdArgs, -1)
        if err == nil {
            err = g.cleanupPortMapping(req)
        }
    } else {
        err = fmt.Errorf("unknown command %s", req.Command)
    }
    return
}

requestFunc方法就是查询出Pod,将其作为参数的一部分,再调用 cmdAdd方法。需要注意,由于CNI版本很低,仅仅支持ADD/DEL两个命令。

ADD命令的处理逻辑如下:

Go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37