CRI学习笔记
在最开始,没有出现容器运行时规范时,K8S要创建容器,直接调用Docker守护进程,后者直接(不需要containerd)调用libcontainer,然后就把容器运行起来了。
为了防止Docker公司一家独大,控制容器运行时标准,出现了OCI标准,即Open Container Initiative。Docker将libcontainer封装为runc,作为OCI的参考实现。
Docker推出Swarm之后,将容器相关操作,都移动到containerd项目中,Docker自身则负责容器编排。containerd最初只是对接runc这样的OCI实现的守护进程。
OCI标准,主要关注两方面的内容:
- 容器镜像的格式,可以参考从镜像中抽取文件关于镜像格式的一章
- 容器需要支持哪些指令,例如创建、删除、启动、停止容器
OCI是一套想对宽泛的标准,实现起来比较自由。runc这个参考实现,基于namespaces + cgroups进行隔离,但是,基于虚拟化技术进行隔离,也是可以的。
兼容OCI标准的项目包括:runC、Kata(及其前身runV、Clear Containers)、gVisor
基于虚拟化技术的OCI实现,用VM来运行容器,前身是runV + Clear Container。
Google的项目,它不去创建VM,而是启动名为Sentry的用户态进程,负责处理容器的系统调用。拦截容器系统调用到Sentry的过程,由KVM或ptrace实现。
gVisor所包含的OCI运行时叫做runsc。
这个项目仍然在发展过程中,但是对于Serverless场景有它的优势:
- 安全性要求:Serverless场景需要强隔离,因为天然是多租户的(按用量计费,不可能为每个租户预先分配独立的大量资源)
- 非常轻量:Serverless的运行时沙箱需要频繁的创建、销毁,而且沙箱粒度可能非常细(例如FaaS)。这要求沙箱启动要非常块、占用资源要非常少。gVisor对比轻量级VM在这一点上更有优势
CRI即Container Runtime Interface,它包含一套规范、一套ProtoBuf API、一些库(用于容器运行时和Kubelet的集成)。CRI目前仍然处于Alpha状态。
CRI出现的动机是,不同容器运行时需要支持K8S。最开始,K8S只支持Docker,后来对rkt的支持被加入。对不同容器运行时的支持,是通过实现Kubelet的某个高级别接口,以in-tree方式写在K8S代码库中的。这样下去,势必会导致K8S(sig-node)的代码库难以维护。因此,从1.5版本开始K8S推出CRI,容器运行时需要接入K8S,自己提供out-of-tree的CRI实现即可。
CRI推出之时,K8S的地位没有现在这么高,Docker还希望通过Swarm和K8S竞争。因此,K8S开发了dockershim,作为Docker的CRI垫片(适配器),dockershim运行在kubelet内部,实现CRI接口,实际创建容器的操作,则是调用Docker实现。尽管CRI-O、containerd-plugin都可以实现Docker到K8S的对接,但是dockershim是最成熟的,经过生产环境检验的
CRI标准包含的接口有三部分:
- 操控容器的接口,例如创建、删除、启动、停止容器
- 操控镜像的接口,例如拉取、删除镜像
- 针对容器沙箱环境(Pod Sandbox)的操控接口
兼容CRI标准的项目包括:Docker(通过dockershim)、containerd(通过CRI-containerd)、CRI-O
Kubernetes + Docker是最流行的组合,在这种组合中,容器创建流程如下:
当Kubelet需要创建容器时,需要:
- 基于CRI接口调用进程内的dockershim
- dockershim将CRI请求转换为Docker守护程序能理解的请求
- Docker守护进程调用另外一个守护进程containerd,后者负责容器相关操作
- containerd仍然不会直接创建容器,而是会创建containerd-shim,让containerd-shim操控容器。containerd-shim负责持有容器的stdin等文件描述符,保持它们的打开状态。这样,即使containerd宕机或重启,容器仍然可以继续运行。注意containerd和containerd-shim不是父子进程关系
- containerd-shim创建容器时,调用的是runC,容器本质上就是有独立命名空间+cgroups配置的进程
- runC创建容器后,自身会退出,containerd-shim变为容器的父进程
要在K8S中使用Kata这样的基于虚拟化的OCI运行时,可以使用CRI-O,容器创建流程如下:
当Kubelet需要创建Pod时:
- Kubelet基于CRI接口,调用CRI-O,后者调用Kata-runtime
- Kata-runtime通过QEMU创建精简版的VM,Pod的所有容器在此VM中创建
- 在创建业务容器之前,Kubelet会调用CRI的RunPodSandbox接口,创建一个infra容器,它会执行/pause,永久挂起。这个容器的价值是,维持(至少网络+IPC)命名空间的存在。infra容器创建的命名空间称为容器沙箱环境
- 业务容器会加入到上述沙箱中
这是一个项目,它实现了CRI规范,并把CRI调用转换为对应的OCI调用。有了CRI-O,你就可以在K8S中使用任何OCI兼容的容器运行时。
CRI是主要目标是提供一个命令式的、容器级别的接口,允许Kubelet直接控制器容器的生命周期。
Pod由一组容器组成,它们在一个隔离的、具有资源约束的环境中。在K8S中Pod是最小调度单元。当Pod被调度到节点上后,Kubelet会为它创建运行环境,并且在此运行环境中增删改Pod,以满足Pod Spec。这里的运行环境,称为PodSandBox。
到底PodSandBox的实体是什么,取决于容器运行时:
- 对于依赖于Hypervisor的运行时,则PodSandBox天然是一个VM
- 对于其它运行时,可能是一系列Linux命名空间 + Cgroups规则
不论如何,PodSandBox必须支持:隔离性、资源需求和限制。
PodSandbox的生命周期是和容器解耦的。沙盒在任何容器创建之前即创建,在任何容器终止之后才退出:
1 2 3 |
create sandbox Foo --> create container C --> start container C stop container C --> remove container C --> delete sandbox Foo |
容器运行时不应该擅自影响Pod/容器的生命周期,例如启动一个容器。所有指令应该来自Kubelet,垃圾回收、重启容器等都是Kubelet的职责。
Kubelet还负责在删除沙盒之前,优雅的停止所有容器。
任何沙盒/容器的生命周期操作,例如create/start/stop/delete,要么返回错误,要么阻塞直到成功。一个成功的操作,应该包含沙盒/容器的状态变迁,例如Create调用成功后,容器的状态应该变为created。
K8S仅仅支持修改Pod Spec中很少的一部分,例如镜像地址。这些变更要求Kubelet重新创建容器。
在容器启动、关闭期间,K8S支持执行钩子。Kubelet会通过CRI的Exec函数来调用容器运行时,执行钩子。
为了让生命周期钩子正常工作,Exec需要访问容器的文件系统(Mount命名空间)。这意味着钩子就是执行容器里面的某个命令。
1 2 3 4 5 |
pre-start post-start pre-stop post-stop | | | | exec exec exec exec | | | | create --------> start ----------------> stop --------> remove |
CRI的接口以gRPC方式描述,定义在staging/src/k8s.io/cri-api/pkg/apis/services.go。
该接口定义了远程容器运行时需要支持的公共操作,包括沙盒、容器、运行时自身的操作:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 |
service RuntimeService { // 返回运行时名称、版本,以及API版本 rpc Version(VersionRequest) returns (VersionResponse) {} // 创建并启动Pod沙盒,如果调用成功,则运行时必须保证沙盒已经就绪 // the sandbox is in the ready state on success. rpc RunPodSandbox(RunPodSandboxRequest) returns (RunPodSandboxResponse) {} // 停止沙盒中所有运行中的进程,回收分配给沙盒的网络资源(例如IP地址) // 沙盒中正在运行的容器必须强制停止 // 必须实现幂等操作:如果相关资源已经回收,也不能返回错误 // 在调用RemovePodSandbox之前,Kubelet至少会调用该接口一次. // Kubelet会尝试尽早回收资源,只要沙盒不再需要,它就会调用StopPodSandbox rpc StopPodSandbox(StopPodSandboxRequest) returns (StopPodSandboxResponse) {} // 移除沙盒,如果其中还有容器运行,必须强制停止并移除 // 必须实现幂等操作:即使沙盒已经被删除,也不能返回错误 rpc RemovePodSandbox(RemovePodSandboxRequest) returns (RemovePodSandboxResponse) {} // 返回沙盒的状态,如果沙盒不存在,则返回错误 rpc PodSandboxStatus(PodSandboxStatusRequest) returns (PodSandboxStatusResponse) {} // 列出运行时现有的沙盒 rpc ListPodSandbox(ListPodSandboxRequest) returns (ListPodSandboxResponse) {} // 在指定的沙盒中创建容器 rpc CreateContainer(CreateContainerRequest) returns (CreateContainerResponse) {} // 启动沙盒中的容器 rpc StartContainer(StartContainerRequest) returns (StartContainerResponse) {} // 以一个优雅等待期停止容器 // 必须实现幂等操作:即使容器已经停止了,也不能返回错误 rpc StopContainer(StopContainerRequest) returns (StopContainerResponse) {} // 移除一个容器,如果容器正在运行,必须停止并移除 // 必须实现幂等操作:即使容器已经移除了,也不能返回错误 rpc RemoveContainer(RemoveContainerRequest) returns (RemoveContainerResponse) {} // 基于过滤器查询容器列表 rpc ListContainers(ListContainersRequest) returns (ListContainersResponse) {} // 返回容器的状态,如果容器不存在,返回错误 rpc ContainerStatus(ContainerStatusRequest) returns (ContainerStatusResponse) {} // 更新容器的ContainerConfig rpc UpdateContainerResources(UpdateContainerResourcesRequest) returns (UpdateContainerResourcesResponse) {} // 请求容器运行时,重新打开容器的stdout/stderr日志文件 // 该调用经常发生在容器日志轮换之后 // 如果容器不在运行,则运行时可以有两个选择: // 1. 创建新日志文件并返回nil // 2. 返回一个错误,但是绝不能创建一个新的容器日志文件 rpc ReopenContainerLog(ReopenContainerLogRequest) returns (ReopenContainerLogResponse) {} // 同步的在一个容器里面执行命令 rpc ExecSync(ExecSyncRequest) returns (ExecSyncResponse) {} // 准备一个流式端点,用于在容器中执行命令 rpc Exec(ExecRequest) returns (ExecResponse) {} // 准备一个流式端点,Attach到运行中的容器 rpc Attach(AttachRequest) returns (AttachResponse) {} //准备一个流式端点,从沙盒中转发端口 rpc PortForward(PortForwardRequest) returns (PortForwardResponse) {} // 返回容器的统计信息 // 如果容器不存在,返回错误 rpc ContainerStats(ContainerStatsRequest) returns (ContainerStatsResponse) {} // 列出所有容器的统计信息 rpc ListContainerStats(ListContainerStatsRequest) returns (ListContainerStatsResponse) {} // 更新运行时配置 rpc UpdateRuntimeConfig(UpdateRuntimeConfigRequest) returns (UpdateRuntimeConfigResponse) {} // 返回运行时的状态 rpc Status(StatusRequest) returns (StatusResponse) {} } |
定义用于管理镜像的API:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
service ImageService { // 列出现有的镜像 rpc ListImages(ListImagesRequest) returns (ListImagesResponse) {} // 返回镜像的状态,如果镜像不存在,返回响应,设置 // ImageStatusResponse.Image为nil rpc ImageStatus(ImageStatusRequest) returns (ImageStatusResponse) {} // 使用指定的身份验证配置,来拉取镜像 rpc PullImage(PullImageRequest) returns (PullImageResponse) {} // 移除一个镜像 // 必须实现幂等操作:即使镜像已经移除了,也不能返回错误 rpc RemoveImage(RemoveImageRequest) returns (RemoveImageResponse) {} // 返回用于存放镜像的文件系统的信息 rpc ImageFsInfo(ImageFsInfoRequest) returns (ImageFsInfoResponse) {} } |
创建沙盒时,提供的请求对象如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 |
message RunPodSandboxRequest { // 沙盒配置 PodSandboxConfig config = 1; // 命名的运行时配置(RuntimeClass),此配置用于此沙盒 // 如果指定的handler不存在,则运行时应该拒绝此请求 // 指定为空字符串,则使用默认handler // 参考: https://git.k8s.io/enhancements/keps/sig-node/runtime-class.md string runtime_handler = 2; } message PodSandboxConfig { // 沙盒的元数据,用于唯一性的识别沙盒 // 运行时依赖于此元数据来确保正确的操作,运行时也应该利用这些元数据提升UX,例如创建 // 可理解的容器名 PodSandboxMetadata metadata = 1; // 沙盒的主机名,如果使用宿主机网络命名空间,可以为空 string hostname = 2; // 宿主机上,存放容器日志的路径 // 默认情况下,容器的STDOUT/STDERR就是容器日志文件,被写到此目录 // // 此目录下还可能包含各容器的结构化日志,例如基于\n分隔的JSON日志、systemd-journald日志 // gRPC跟踪文件,等等。示例: // PodSandboxConfig.LogDirectory = `/var/log/pods/<podUID>/` 沙盒日志根目录 // ContainerConfig.LogPath = `containerName/Instance#.log` 容器日志子目录 // string log_directory = 3; // 沙盒的DNS配置 DNSConfig dns_config = 4; // 沙盒的端口映射 repeated PortMapping port_mappings = 5; // 用于定位和选择资源的标签 map<string, string> labels = 6; // Kubelet可能设置此字段,来存储、取回任何元数据。包括Pod上设置的任何Annotation // // 运行时不应该修改任何Annotation,这里传递的Annotation,必须在PodSandboxStatus中原样返回 // // 为了维持Kubelet和运行时之间的接口时良好定义的,Annoatation不应该影响Runtime的行为 // // 运行时作者也可以用Annotation来试验新的、对K8S API opaque的特性 map<string, string> annotations = 7; // 针对Linux宿主机的特殊配置 LinuxPodSandboxConfig linux = 8; } // 包含生成沙盒名字所需要的所有必要信息 // 鼓励运行时使用这里的信息来创建用户可见的标识(例如容器名),以提升用户体验 message PodSandboxMetadata { // Pod的名字 string name = 1; // Pod的UID string uid = 2; // Pod的命名空间 string namespace = 3; // 尝试创建沙盒的次数,默认0 uint32 attempt = 4; } // 沙盒的DNS配置 message DNSConfig { // 集群DNS服务器列表 repeated string servers = 1; // 集群的搜索域 repeated string searches = 2; // DNS选项 repeated string options = 3; } // 沙盒到宿主机的端口映射 message PortMapping { // 协议 Protocol protocol = 1; // 容器端口 int32 container_port = 2; // 宿主机端口 int32 host_port = 3; // 宿主机IP string host_ip = 4; } // Linux特有的沙盒配置 message LinuxPodSandboxConfig { // 沙盒的父cgroup // 使用 cgroupfs 风格, 运行时可能将其转换为systemd语义 string cgroup_parent = 1; // 沙盒安全属性 LinuxSandboxSecurityContext security_context = 2; // 沙盒的sysctls配置 map<string, string> sysctls = 3; } // 应用到沙盒的安全设置,注意: // 1) 不会应用到Pod中的容iq // 2) 可能对没有运行进程的沙盒不适用 message LinuxSandboxSecurityContext { // 沙盒命名空间配置 // 仅当沙盒基于命名空间做隔离时有意义 NamespaceOption namespace_options = 1; // SELinux上下文 // SELinux,Security-Enhanced Linux,为内核提供访问控制策略 // 可以最大限度地减小系统中服务进程可访问的资源 // 传统的Linux权限机制管理的主体是用户,用户是否对某个资源具有读、写、执行的权利 // SELinux则控制每一类进程是否拥有对某一类资源的访问权限。它限制的主体是进程,这样 // 如果程序有缺陷,它产生的破坏性也可以被限制,而不在于它是否root身份运行 SELinuxOption selinux_options = 2; // 运行沙盒的用户的UID Int64Value run_as_user = 3; // 运行沙盒的GID Int64Value run_as_group = 8; // 沙盒的rootfs是否只读 bool readonly_rootfs = 4; // 父GID之外的辅助组 repeated int64 supplemental_groups = 5; // 沙盒是否启动一个特权容器 bool privileged = 6; // 沙盒Seccomp profile,可选值 // * runtime/default:运行时默认值 // * unconfined:自由的,不支持seccomp沙盒化 // * localhost/<full-path-to-profile>: 安装在节点上的profile // <full-path-to-profile> 完整路径 // Default: "", 等价于unconfined // seccomp即short for secure computing mode,是一种限制系统调用的机制 // 严格模式下,仅允许exit(),sigreturn(),read()和write()这几个系统调用 // 过滤模式下,可以选择允许的系统调用 string seccomp_profile_path = 7; } message NamespaceOption { // 容器或沙盒的网络命名空间类型,目前仅仅支持 POD, NODE NamespaceMode network = 1; // PID命名空间类型:POD, CONTAINER, NODE, TARGET NamespaceMode pid = 2; // IPC命名空间类型:POD, NODE NamespaceMode ipc = 3; // 上述命名空间,类型为TARGET的,用该字段指定目标 // 该字段必须是当前沙盒中创建的,命名空间类型是CONTAINER的容器的ID // 不支持为不同命名空间设置不同的TARGET string target_id = 4; } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 |
message CreateContainerRequest { // 在其中创建容器的沙盒的ID string pod_sandbox_id = 1; // 容器配置 ContainerConfig config = 2; // 沙盒配置,就是创建沙盒时提供的那个对象,在这里仅仅为了引用方便 // 沙盒配置在Pod生命周期内不改变 PodSandboxConfig sandbox_config = 3; } message ContainerConfig { // 容器元数据,唯一性的标识容器。运行时应当基于此元数据,确保正确的行为 // 运行时可以基于此元数据提升UX,例如设置一个可读的容器名 ContainerMetadata metadata = 1 ; // 使用的镜像 ImageSpec image = 2; // 执行的命令,例如Docker的入口点 repeated string command = 3; // 命令的参数 repeated string args = 4; // 工作目录 string working_dir = 5; // 环境变量 repeated KeyValue envs = 6; // 挂载情况 repeated Mount mounts = 7; // 设备列表 repeated Device devices = 8; // 容器标签 map<string, string> labels = 9; // 容器注解 map<string, string> annotations = 10; // 相对于PodSandboxConfig.LogDirectory的日志路径(存放STDOUT、STDERR) string log_path = 11; // 交互式容器字段,交互式容器具有特殊应用场景(例如debugging) bool stdin = 12; bool stdin_once = 13; bool tty = 14; // Linux容器特殊配置 LinuxContainerConfig linux = 15; // Windows容器特殊配置 WindowsContainerConfig windows = 16; } // 说明宿主机的卷如何挂载到容器中 message Mount { // 容器内挂载路径 string container_path = 1; // 宿主机挂载路径。如果路径不存在,运行时应该报错 // 如果路径是符号连接,运行时应该跟踪直到找到真实路径 string host_path = 2; // 只读挂载 bool readonly = 3; // 如果true,则挂载需要SELinux relabeling. bool selinux_relabel = 4; // (子树)挂载传播模式 MountPropagation propagation = 5; } // 说明宿主机设备如何挂载到容器中 message Device { // 设备在容器中的路径 string container_path = 1; // 设备在宿主机中的路径 string host_path = 2; // 设备的Cgroups 特权 // * r - 允许容器读 // * w - 允许容器写 // * m - 允许容器创建不存在的设备文件 string permissions = 3; } message LinuxContainerConfig { // 请求资源信息 LinuxContainerResources resources = 1; // 安全上下文信息 LinuxContainerSecurityContext security_context = 2; } message LinuxContainerResources { // CPU CFS (Completely Fair Scheduler) period int64 cpu_period = 1; // CPU CFS (Completely Fair Scheduler) quota int64 cpu_quota = 2; // CPU shares (relative weight vs. other containers) int64 cpu_shares = 3; // Memory limit in bytes int64 memory_limit_in_bytes = 4; // OOMScoreAdj adjusts the oom-killer score int64 oom_score_adj = 5; // CpusetCpus constrains the allowed set of logical CPUs string cpuset_cpus = 6; // CpusetMems constrains the allowed set of memory nodes string cpuset_mems = 7; // List of HugepageLimits to limit the HugeTLB usage of container per page size repeated HugepageLimit hugepage_limits = 8; } message LinuxContainerSecurityContext { // 需要添加或删除的特权 Capability capabilities = 1; // 在特权模式下运行容器,特权模式和下列字段不兼容: // 1. capabilities // 2. selinux_options // 4. seccomp // 5. apparmor // 运行在特权模式下,则上述字段无效。并且隐含意味着: // 1. 具有所有特权 // 2. 敏感的路径,例如sysfs中内核模块的路径,没有被遮罩 // 3. 所有sysfs、procfs挂载为可读写 // 4. 没有启用Apparmor confinement // 5. 没有启用Seccomp restrictions // 6. device cgroup does 不限制访问任何设备 // 7. 宿主机上所有 /dev 下的设备对容器可见 // 8. 没有启用SELinux restrictions bool privileged = 2; // 基于命名空间进行隔离时,容器的命名空间配置 NamespaceOption namespace_options = 3; // SELinux上下文 SELinuxOption selinux_options = 4; // 运行容器的UID Int64Value run_as_user = 5; // 运行容器的GID Int64Value run_as_group = 12; // 运行容器的用户的名字,名字必须定义在容器镜像的/etc/passwd中 string run_as_username = 6; // 是否设置容器rootfs为只读 bool readonly_rootfs = 7; // 补充的GID repeated int64 supplemental_groups = 8; // AppArmor profile string apparmor_profile = 9; // Seccomp profile string seccomp_profile_path = 10; // 是否在容器上设置no_new_privs bool no_new_privs = 11; // 容器运行时应当遮罩掉的宿主机路径 repeated string masked_paths = 13; // 容器运行时应当设置为只读的宿主机路径 repeated string readonly_paths = 14; } |
位于pkg/kubelet/dockershim是一个CRI实现,它作为Docker适配到CRI的垫片(适配器)。
在Kubelet启动时,它会运行KubeletServer:
1 2 3 4 5 6 7 8 9 10 11 |
func Run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies, featureGate featuregate.FeatureGate, stopCh <-chan struct{}) error { klog.Infof("Version: %+v", version.Get()) if err := initForOS(s.KubeletFlags.WindowsService); err != nil { return fmt.Errorf("failed OS init: %v", err) } // 运行服务 if err := run(s, kubeDeps, featureGate, stopCh); err != nil { return fmt.Errorf("failed to run Kubelet: %v", err) } return nil } |
作为KubeletServer启动过程的一部分,会进行容器运行时的预(在运行Kubelet之前)初始化:
1 2 3 4 5 6 7 8 9 10 11 |
func run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies, featureGate featuregate.FeatureGate, stopCh <-chan struct{}) (err error) { // ... // 初始化容器运行时服务 err = kubelet.PreInitRuntimeService(&s.KubeletConfiguration, kubeDeps, &s.ContainerRuntimeOptions, s.ContainerRuntime, s.RuntimeCgroups, s.RemoteRuntimeEndpoint, s.RemoteImageEndpoint, s.NonMasqueradeCIDR) } |
初始化逻辑如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
func PreInitRuntimeService(kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *Dependencies, crOptions *config.ContainerRuntimeOptions, containerRuntime string, runtimeCgroups string, remoteRuntimeEndpoint string, remoteImageEndpoint string, nonMasqueradeCIDR string) error { // ... switch containerRuntime { // 如果节点使用Docker运行时,则创建dockersim case kubetypes.DockerContainerRuntime: // docker runDockershim( kubeCfg, kubeDeps, crOptions, runtimeCgroups, remoteRuntimeEndpoint, remoteImageEndpoint, nonMasqueradeCIDR, ) case kubetypes.RemoteContainerRuntime: // remote // 否则,不做任何操作,CRI运行时是一个进程外的gRPC服务 break default: return fmt.Errorf("unsupported CRI runtime: %q", containerRuntime) } // ... } |
runDockershim负责创建dockershim,运行在kubelet进程内的CRI服务器端:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 |
func runDockershim(kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *Dependencies, crOptions *config.ContainerRuntimeOptions, runtimeCgroups string, remoteRuntimeEndpoint string, remoteImageEndpoint string, nonMasqueradeCIDR string) error { // NetworkPluginSettings是Kubelet运行时参数的一部分,为CNI提供静态配置 pluginSettings := dockershim.NetworkPluginSettings{ // Kubelet如何配置网络,以处理hairpin包(在网桥上,需要从接收封包的端口,将封包再发回去) // hairpin-veth:在容器运行时创建的容器的veth上设置hairpin标记 // promiscuous-bridge:将容器网桥设置为混杂的,这会强制让其接受hairpin包 // none:不设置,如果kubelet以hairpin模式启动,kube-proxy运行在iptables模式,hairpin包被丢弃 HairpinMode: kubeletconfiginternal.HairpinMode(kubeCfg.HairpinMode), // 不应该包含在任何MASQUERADE规则中各的CIDR NonMasqueradeCIDR: nonMasqueradeCIDR, // 插件名 PluginName: crOptions.NetworkPluginName, // 插件位置 PluginConfDir: crOptions.CNIConfDir, PluginBinDirString: crOptions.CNIBinDir, PluginCacheDir: crOptions.CNICacheDir, MTU: int(crOptions.NetworkPluginMTU), } // 创建Docker的CRI shim,启动为gRPC服务器 streamingConfig := getStreamingConfig(kubeCfg, kubeDeps, crOptions) // Docker客户端配置 dockerClientConfig := &dockershim.ClientConfig{ DockerEndpoint: kubeDeps.DockerOptions.DockerEndpoint, RuntimeRequestTimeout: kubeDeps.DockerOptions.RuntimeRequestTimeout, ImagePullProgressDeadline: kubeDeps.DockerOptions.ImagePullProgressDeadline, } // 创建dockerService,shim的核心逻辑所在,实现了CRI的gRPC接口 ds, err := dockershim.NewDockerService(dockerClientConfig, crOptions.PodSandboxImage, streamingConfig, &pluginSettings, runtimeCgroups, kubeCfg.CgroupDriver, crOptions.DockershimRootDirectory, !crOptions.RedirectContainerStreaming) if err != nil { return err } if crOptions.RedirectContainerStreaming { kubeDeps.criHandler = ds } // Kubelet和dockershim之间通信的UDS,gRPC服务在其上监听 klog.V(2).Infof("Starting the GRPC server for the docker CRI shim.") dockerServer := dockerremote.NewDockerServer(remoteRuntimeEndpoint, ds) if err := dockerServer.Start(); err != nil { return err } return nil } |
其中dockershim.NewDockerService调用创建的是dockerService,dockerService是dockershim的核心,它实现了CRI的所有接口。
通过下面的工厂函数创建dockerService:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 |
func NewDockerService(config *ClientConfig, podSandboxImage string, streamingConfig *streaming.Config, pluginSettings *NetworkPluginSettings, cgroupsName string, kubeCgroupDriver string, dockershimRootDir string, startLocalStreamingServer bool) (DockerService, error) { // Docker客户端接口 libdocker.Interface client := NewDockerClientFromConfig(config) c := libdocker.NewInstrumentedInterface(client) // 支持Docker的Checkpoint/Restore操作 checkpointManager, err := checkpointmanager.NewCheckpointManager(filepath.Join(dockershimRootDir, sandboxCheckpointDir)) if err != nil { return nil, err } ds := &dockerService{ client: c, // 用于转发对底层OS的操作 os: kubecontainer.RealOS{}, // 容器沙盒镜像(pause镜像) podSandboxImage: podSandboxImage, // 提供流和执行命令 streamingRuntime: &streamingRuntime{ client: client, execHandler: &NativeExecHandler{}, }, // 容器管理器,能够启动容器 containerManager: cm.NewContainerManager(cgroupsName, client), // 检查点管理器 checkpointManager: checkpointManager, // 是否需要在localhost上开启stream server startLocalStreamingServer: startLocalStreamingServer, networkReady: make(map[string]bool), // 用于容器清理 containerCleanupInfos: make(map[string]*containerCleanupInfo), } // 检查Docker版本兼容性 if err = ds.checkVersionCompatibility(); err != nil { return nil, err } // 创建流服务器 if streamingConfig != nil { var err error ds.streamingServer, err = streaming.NewServer(*streamingConfig, ds.streamingRuntime) if err != nil { return nil, err } } // 确定使用的hairpin模式 if err := effectiveHairpinMode(pluginSettings); err != nil { // This is a non-recoverable error. Returning it up the callstack will just // lead to retries of the same failure, so just fail hard. return nil, err } klog.Infof("Hairpin mode set to %q", pluginSettings.HairpinMode) // 目前仅仅支持CNI插件 pluginSettings.PluginBinDirs = cni.SplitDirs(pluginSettings.PluginBinDirString) // 基于CNI配置文件和二进制文件来检测、获取网络插件 cniPlugins := cni.ProbeNetworkPlugins(pluginSettings.PluginConfDir, pluginSettings.PluginCacheDir, pluginSettings.PluginBinDirs) cniPlugins = append(cniPlugins, kubenet.NewPlugin(pluginSettings.PluginBinDirs, pluginSettings.PluginCacheDir)) // 用于获取端口映射信息,容器所述网络命名空间信息 netHost := &dockerNetworkHost{ &namespaceGetter{ds}, &portMappingGetter{ds}, } // 根据名字来找到并初始化CNI插件 plug, err := network.InitNetworkPlugin(cniPlugins, pluginSettings.PluginName, netHost, pluginSettings.HairpinMode, pluginSettings.NonMasqueradeCIDR, pluginSettings.MTU) if err != nil { return nil, fmt.Errorf("didn't find compatible CNI plugin with given settings %+v: %v", pluginSettings, err) } // 网络插件管理器 ds.network = network.NewPluginManager(plug) klog.Infof("Docker cri networking managed by %v", plug.Name()) // 判断cgroups驱动 cgroupDriver := defaultCgroupDriver dockerInfo, err := ds.client.Info() klog.Infof("Docker Info: %+v", dockerInfo) if err != nil { klog.Errorf("Failed to execute Info() call to the Docker client: %v", err) klog.Warningf("Falling back to use the default driver: %q", cgroupDriver) } else if len(dockerInfo.CgroupDriver) == 0 { klog.Warningf("No cgroup driver is set in Docker") klog.Warningf("Falling back to use the default driver: %q", cgroupDriver) } else { cgroupDriver = dockerInfo.CgroupDriver } if len(kubeCgroupDriver) != 0 && kubeCgroupDriver != cgroupDriver { return nil, fmt.Errorf("misconfiguration: kubelet cgroup driver: %q is different from docker cgroup driver: %q", kubeCgroupDriver, cgroupDriver) } klog.Infof("Setting cgroupDriver to %s", cgroupDriver) ds.cgroupDriver = cgroupDriver ds.versionCache = cache.NewObjectCache( func() (interface{}, error) { return ds.getDockerVersion() }, versionCacheTTL, ) // Register prometheus metrics. metrics.Register() return ds, nil } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 |
type dockerService struct { // Docker客户端 client libdocker.Interface // 系统级服务 os kubecontainer.OSInterface // 使用沙盒镜像 podSandboxImage string // 流服务相关,用于在节点上执行命令 streamingRuntime *streamingRuntime streamingServer streaming.Server // 负责CNI相关工作,为每个Pod设置一个锁,保证针对单个Pod的CNI操作串行化执行 network *network.PluginManager // 保存每个沙盒网络是否ready networkReady map[string]bool networkReadyLock sync.Mutex // 容器管理器,在Linux下负责管理cgroups,并确保容器在cgroups的管理之下 containerManager cm.ContainerManager // 时用的cgroup驱动 cgroupDriver string checkpointManager checkpointmanager.CheckpointManager // 缓存运行时的版本信息,为了保证跨Docker版本的兼容性,dockershim可能需要为 // 某些操作执行版本检查。该缓存避免每次都去调用docker守护程序 versionCache *cache.ObjectCache // 是否需要在本机启动stream服务 startLocalStreamingServer bool // 维持 container id到containerCleanupInfo的映射 // containerCleanupInfo中包含容器移除后需要清理的对象的信息 containerCleanupInfos map[string]*containerCleanupInfo } |
结构dockerService实现了DockerService接口,此接口包含了CRI gRPC服务的所有方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
type DockerService interface { CRIService // 处理streaming调用 http.Handler // 支持遗留特性 legacy.DockerLegacyService } type CRIService interface { // 下面两个是CRI的gRPC接口 runtimeapi.RuntimeServiceServer runtimeapi.ImageServiceServer // 启动服务 Start() error } |
在前面的代码中我们已经看到,Kubelet会创建dockerSerivce并调用它的Start方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
// dockerService的启动过程 func (ds *dockerService) Start() error { // 清理上次运行期间残留的东西 ds.initCleanup() // 启用本地streaming服务 if ds.startLocalStreamingServer { go func() { if err := ds.streamingServer.Start(true); err != nil { klog.Fatalf("Streaming server stopped unexpectedly: %v", err) } }() } // 启动容器管理器 return ds.containerManager.Start() } // 容器管理器的启动过程 func (m *containerManager) Start() error { if len(m.cgroupsName) != 0 { manager, err := createCgroupManager(m.cgroupsName) if err != nil { return err } m.cgroupsManager = manager } // 下面的循环负责: // 1、确保进程运行在cgroups中 // 2、确保进程的OOM score被应用 go wait.Until(m.doWork, 5*time.Minute, wait.NeverStop) return nil } |
从本节开始,我们将分析dockershim对CRI接口的实现。第一个分析的是创建沙盒的接口RunPodSandbox。
总体来说,沙盒创建包含一下几个步骤:
- 拉取沙盒镜像
- 创建沙盒容器
- 为沙盒创建检查点
- 启动沙盒容器
- 调用CNI插件,为沙盒准备容器网络
完整的代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 |
// 对于Docker来说,沙盒的实现是一个容器,它持有(维持)Pod的网络命名空间 // Docker尚未使用LogDirectory func (ds *dockerService) RunPodSandbox(ctx context.Context, r *runtimeapi.RunPodSandboxRequest) (*runtimeapi.RunPodSandboxResponse, error) { // 请求的沙盒配置 config := r.GetConfig() // 第一步:拉取沙盒的镜像 image := defaultSandboxImage podSandboxImage := ds.podSandboxImage if len(podSandboxImage) != 0 { image = podSandboxImage } // 如果使用私有仓库的沙盒镜像,用户必须在节点上合理配置凭证信息 // 如果镜像不存在,则拉取 if err := ensureSandboxImageExists(ds.client, image); err != nil { return nil, err } // 第二步:创建沙盒容器 // 使用何种运行时,注意Docker也可以支持多种容器运行时 if r.GetRuntimeHandler() != "" && r.GetRuntimeHandler() != runtimeName { return nil, fmt.Errorf("RuntimeHandler %q not supported", r.GetRuntimeHandler()) } // 将 runtimeapi.PodSandboxConfig(CRI的配置信息) 转换为 dockertypes.ContainerCreateConfig(Docker的配置信息) createConfig, err := ds.makeSandboxDockerConfig(config, image) if err != nil { return nil, fmt.Errorf("failed to make sandbox docker config for pod %q: %v", config.Metadata.Name, err) } // 调用Docker Client API,创建容器 createResp, err := ds.client.CreateContainer(*createConfig) if err != nil { // 针对1.11-的workaround createResp, err = recoverFromCreationConflictIfNeeded(ds.client, *createConfig, err) } if err != nil || createResp == nil { return nil, fmt.Errorf("failed to create a sandbox for pod %q: %v", config.Metadata.Name, err) } resp := &runtimeapi.RunPodSandboxResponse{PodSandboxId: createResp.ID} // 设置内存中的沙盒网络就绪状态 ds.setNetworkReady(createResp.ID, false) defer func(e *error) { // 外层函数的调用结果,决定是否设置网络为ready if *e == nil { ds.setNetworkReady(createResp.ID, true) } }(&err) // 使用(当前函数调用的)最后一刻的err // 第三步:为沙盒创建Checkpoint if err = ds.checkpointManager.CreateCheckpoint(createResp.ID, constructPodSandboxCheckpoint(config)); err != nil { return nil, err } // 第四步:启动沙盒容器 // 假设在沙盒启动失败的情况下,Kubelet的垃圾回收器会在之后自动删除沙盒 err = ds.client.StartContainer(createResp.ID) if err != nil { return nil, fmt.Errorf("failed to start sandbox container for pod %q: %v", config.Metadata.Name, err) } // 覆盖Docker生成的resolv.conf文件 // NOTE: 不管在什么场景下,集群DNS设置都不会再次传递给Docker API // 这里修改的resolv.conf会被沙盒中所有容器共享 if dnsConfig := config.GetDnsConfig(); dnsConfig != nil { containerInfo, err := ds.client.InspectContainer(createResp.ID) if err != nil { return nil, fmt.Errorf("failed to inspect sandbox container for pod %q: %v", config.Metadata.Name, err) } if err := rewriteResolvFile(containerInfo.ResolvConfPath, dnsConfig.Servers, dnsConfig.Searches, dnsConfig.Options); err != nil { return nil, fmt.Errorf("rewrite resolv.conf failed for pod %q: %v", config.Metadata.Name, err) } } // 如果使用宿主机网络,则不调用CNI插件 if config.GetLinux().GetSecurityContext().GetNamespaceOptions().GetNetwork() == runtimeapi.NamespaceMode_NODE { return resp, nil } // 第五步:为沙盒准备好容器网络 // 任何Pod的网络,都是利用启动阶段发现的CNI插件创建的 // 这个CNI插件负责:分配IP地址给Pod、在沙盒中创建路由、创建网络接口... // 虽然从理论上来说,CNI仅仅应该仓库沙盒网络,但是它也可能在宿主机上 // 插入iptables规则、打开端口。以满足Pod Spec中CNI标准展示不能识别的部分 cID := kubecontainer.BuildContainerID(runtimeName, createResp.ID) networkOptions := make(map[string]string) if dnsConfig := config.GetDnsConfig(); dnsConfig != nil { // Build DNS options. dnsOption, err := json.Marshal(dnsConfig) if err != nil { return nil, fmt.Errorf("failed to marshal dns config for pod %q: %v", config.Metadata.Name, err) } networkOptions["dns"] = string(dnsOption) } // 调用CNI插件管理器,为Pod准备网络 err = ds.network.SetUpPod(config.GetMetadata().Namespace, config.GetMetadata().Name, cID, config.Annotations, networkOptions) if err != nil { errList := []error{fmt.Errorf("failed to set up sandbox container %q network for pod %q: %v", createResp.ID, config.Metadata.Name, err)} // 出错的情况下,确保沙盒网络资源会被清理干净 err = ds.network.TearDownPod(config.GetMetadata().Namespace, config.GetMetadata().Name, cID) if err != nil { errList = append(errList, fmt.Errorf("failed to clean up sandbox container %q network for pod %q: %v", createResp.ID, config.Metadata.Name, err)) } // 停止沙盒容器 err = ds.client.StopContainer(createResp.ID, defaultSandboxGracePeriod) if err != nil { errList = append(errList, fmt.Errorf("failed to stop sandbox container %q for pod %q: %v", createResp.ID, config.Metadata.Name, err)) } return resp, utilerrors.NewAggregate(errList) } return resp, nil } |
在沙盒创建过程中,作为最后一步,会调用PluginManager.SetUpPod()来准备容器网络:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
// Pod命名空间 Pod名 沙盒的容器ID func (pm *PluginManager) SetUpPod(podNamespace, podName string, id kubecontainer.ContainerID, annotations, options map[string]string) error { defer recordOperation("set_up_pod", time.Now()) fullPodName := kubecontainer.BuildPodFullName(podName, podNamespace) // 串行化针对某个Pod的CNI操作 pm.podLock(fullPodName).Lock() defer pm.podUnlock(fullPodName) // 调用网络插件 if err := pm.plugin.SetUpPod(podNamespace, podName, id, annotations, options); err != nil { return fmt.Errorf("networkPlugin %s failed to set up pod %q network: %v", pm.plugin.Name(), fullPodName, err) } return nil } // 注意两级PodLock的处理 func (pm *PluginManager) podLock(fullPodName string) *sync.Mutex { // 首先获取全局锁 pm.podsLock.Lock() defer pm.podsLock.Unlock() // 确保单个Pod的锁对象存在,不存在则创建之 lock, ok := pm.pods[fullPodName] if !ok { lock = &podLock{} pm.pods[fullPodName] = lock } // 加Pod锁时,增加此计数。podUnlock时减小此计数 lock.refcount++ return &lock.mu } |
可以看到,初始化网络的过程,是委托给NetworkPlugin处理的,NetworkPlugin接口也是dockershim私有的:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
type NetworkPlugin interface { // 初始化该插件,只会调用一次 Init(host Host, hairpinMode kubeletconfig.HairpinMode, nonMasqueradeCIDR string, mtu int) error // 在各类事件发生时调用,例如NET_PLUGIN_EVENT_POD_CIDR_CHANGE Event(name string, details map[string]interface{}) // 返回插件名称 Name() string // 返回NET_PLUGIN_CAPABILITY_*的集合 Capabilities() utilsets.Int // 在Pod的infra容器创建后,其它容器创建之前调用,为Pod 准备网络 SetUpPod(namespace string, name string, podSandboxID kubecontainer.ContainerID, annotations, options map[string]string) error // 在删除infra容器之前调用 TearDownPod(namespace string, name string, podSandboxID kubecontainer.ContainerID) error // 用于获取容器IP地址 GetPodNetworkStatus(namespace string, name string, podSandboxID kubecontainer.ContainerID) (*PodNetworkStatus, error) // 如果该插件处于错误状态,返回错误 Status() error } |
对于CNI来说,NetworkPlugin的实现为cniNetworkPlugin,它在dockerService创建过程中初始化:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
cniPlugins := cni.ProbeNetworkPlugins(pluginSettings.PluginConfDir, pluginSettings.PluginCacheDir, pluginSettings.PluginBinDirs) func ProbeNetworkPlugins(confDir, cacheDir string, binDirs []string) []network.NetworkPlugin { // ... plugin := &cniNetworkPlugin{ defaultNetwork: nil, // 主网络接口 loNetwork: getLoNetwork(binDirs), // lo网络接口 execer: utilexec.New(), confDir: confDir, binDirs: binDirs, cacheDir: cacheDir, } plugin.syncNetworkConfig() return []network.NetworkPlugin{plugin} } |
cniNetworkPlugin.SetUpPod()的逻辑如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
func (plugin *cniNetworkPlugin) SetUpPod(namespace string, name string, id kubecontainer.ContainerID, annotations, options map[string]string) error { if err := plugin.checkInitialized(); err != nil { return err } // 根据容器ID获取网络命名空间信息 netnsPath, err := plugin.host.GetNetNS(id.ID) if err != nil { return fmt.Errorf("CNI failed to retrieve network namespace path: %v", err) } cniTimeoutCtx, cancelFunc := context.WithTimeout(context.Background(), network.CNITimeoutSec*time.Second) defer cancelFunc() // Windows没有lo网络的概念 if plugin.loNetwork != nil { // 添加lo网络接口到命名空间 if _, err = plugin.addToNetwork(cniTimeoutCtx, plugin.loNetwork, name, namespace, id, netnsPath, annotations, options); err != nil { return err } } // 添加主网络接口到命名空间 _, err = plugin.addToNetwork(cniTimeoutCtx, plugin.getDefaultNetwork(), name, namespace, id, netnsPath, annotations, options) return err } func (plugin *cniNetworkPlugin) addToNetwork(ctx context.Context, network *cniNetwork, podName string, podNamespace string, podSandboxID kubecontainer.ContainerID, podNetnsPath string, annotations, options map[string]string) (cnitypes.Result, error) { // 创建CNI配置(libcni.RuntimeConf) rt, err := plugin.buildCNIRuntimeConf(podName, podNamespace, podSandboxID, podNetnsPath, annotations, options) pdesc := podDesc(podNamespace, podName, podSandboxID) netConf, cniNet := network.NetworkConfig, network.CNIConfig // 调用标准的CNI接口 res, err := cniNet.AddNetworkList(ctx, netConf, rt) return res, nil } |
该方法负责创建一个容器,并加入到指定的沙盒中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 |
func (ds *dockerService) CreateContainer(_ context.Context, r *runtimeapi.CreateContainerRequest) (*runtimeapi.CreateContainerResponse, error) { podSandboxID := r.PodSandboxId config := r.GetConfig() sandboxConfig := r.GetSandboxConfig() labels := makeLabels(config.GetLabels(), config.GetAnnotations()) // 添加标签 io.kubernetes.docker.type labels[containerTypeLabelKey] = containerTypeLabelContainer // 添加标签 io.kubernetes.container.logpath labels[containerLogPathLabelKey] = filepath.Join(sandboxConfig.LogDirectory, config.LogPath) // 添加标签 io.kubernetes.sandbox.id labels[sandboxIDLabelKey] = podSandboxID apiVersion, err := ds.getDockerAPIVersion() image := "" if iSpec := config.GetImage(); iSpec != nil { image = iSpec.Image } // K8S前缀_容器名_沙盒名_沙盒命名空间_沙盒UID_第几次尝试创建此容器 containerName := makeContainerName(sandboxConfig, config) // Docker的容器配置对象 createConfig := dockertypes.ContainerCreateConfig{ Name: containerName, Config: &dockercontainer.Config{ Entrypoint: dockerstrslice.StrSlice(config.Command), Cmd: dockerstrslice.StrSlice(config.Args), Env: generateEnvList(config.GetEnvs()), Image: image, WorkingDir: config.WorkingDir, Labels: labels, // 交互式容器 OpenStdin: config.Stdin, StdinOnce: config.StdinOnce, Tty: config.Tty, // 在得到官方支持之前,禁用Docker的健康检查 // (https://github.com/kubernetes/kubernetes/issues/25829). Healthcheck: &dockercontainer.HealthConfig{ Test: []string{"NONE"}, }, }, // 解挂载列表转换为Docker能够理解的字符串: // <HostPath>:<ContainerPath>[:options] // 其中options支持逗号分隔的: // ro 提示只读 // Z 提示卷要求SELinux relabeling // 例如rslave之类的传播模式设置 HostConfig: &dockercontainer.HostConfig{ Binds: generateMountBindings(config.GetMounts()), RestartPolicy: dockercontainer.RestartPolicy{ Name: "no", }, }, } hc := createConfig.HostConfig // 完成以下操作: // CPU、内存配额 // OOM分数设置 // 应用security context // 应用通过沙盒推导出的cgroupsParent err = ds.updateCreateConfig(&createConfig, config, sandboxConfig, podSandboxID, securityOptSeparator, apiVersion) if err != nil { return nil, fmt.Errorf("failed to update container create config: %v", err) } // 设置宿主机和容器之间的device mapping devices := make([]dockercontainer.DeviceMapping, len(config.Devices)) for i, device := range config.Devices { devices[i] = dockercontainer.DeviceMapping{ PathOnHost: device.HostPath, PathInContainer: device.ContainerPath, CgroupPermissions: device.Permissions, } } hc.Resources.Devices = devices // 安全选项 securityOpts, err := ds.getSecurityOpts(config.GetLinux().GetSecurityContext().GetSeccompProfilePath(), securityOptSeparator) if err != nil { return nil, fmt.Errorf("failed to generate security options for container %q: %v", config.Metadata.Name, err) } hc.SecurityOpt = append(hc.SecurityOpt, securityOpts...) cleanupInfo, err := ds.applyPlatformSpecificDockerConfig(r, &createConfig) if err != nil { return nil, err } // 调用Docker客户端,创建容器 createResp, createErr := ds.client.CreateContainer(createConfig) if createErr != nil { createResp, createErr = recoverFromCreationConflictIfNeeded(ds.client, createConfig, createErr) } // ... return nil, createErr } |
containerd的CRI支持,以前是通过一个进程外的cri-containerd完成,现在则改为containerd内部的插件。该插件支持containerd 1.1+版本。containerd的CRI架构如下:
cri-containerd插件,是通过init函数注册到containerd的:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
func init() { // 缺省CRI配置 config := criconfig.DefaultConfig() // 注册当前插件 plugin.Register(&plugin.Registration{ Type: plugin.GRPCPlugin, // 插件类型,这里提示containerd和该插件也通过gRPC交互 ID: "cri", Config: &config, // 插件配置,interface{} Requires: []plugin.Type{ // 插件之间有依赖 plugin.ServicePlugin, }, InitFn: initCRIService, // 初始化函数 }) } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
func initCRIService(ic *plugin.InitContext) (interface{}, error) { ic.Meta.Platforms = []imagespec.Platform{platforms.DefaultSpec()} ic.Meta.Exports = map[string]string{"CRIVersion": constants.CRIVersion} ctx := ic.Context // 对应CRI插件的toml配置文件 pluginConfig := ic.Config.(*criconfig.PluginConfig) // 校验配置文件 if err := criconfig.ValidatePluginConfig(ctx, pluginConfig); err != nil { return nil, errors.Wrap(err, "invalid plugin config") } // 关于CRI服务器的所有配置信息 c := criconfig.Config{ PluginConfig: *pluginConfig, ContainerdRootDir: filepath.Dir(ic.Root), ContainerdEndpoint: ic.Address, RootDir: ic.Root, StateDir: ic.State, } servicesOpts, err := getServicesOpts(ic) // Containerd的客户端(gRPC) client, err := containerd.New( "", containerd.WithDefaultNamespace(constants.K8sContainerdNamespace), containerd.WithDefaultPlatform(criplatforms.Default()), containerd.WithServices(servicesOpts...), ) // 创建criService的实例 s, err := server.NewCRIService(c, client) // 启动gRPC服务 go func() { if err := s.Run(); err != nil { log.G(ctx).WithError(err).Fatal("Failed to run CRI service") } }() return s, nil } |
和dockershim的CRIService类似:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
type grpcServices interface { runtime.RuntimeServiceServer runtime.ImageServiceServer } type CRIService interface { Run() error // 优雅关闭CRI服务 io.Closer // 将CRI服务注册到gRPC服务器 plugin.Service // CRI接口 grpcServices } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
type criService struct { // 所有配置信息 config criconfig.Config // 镜像文件系统路径 imageFSPath string // 系统相关操作接口 os osinterface.OS // 存储和沙盒相关的资源 sandboxStore *sandboxstore.Store // 存储沙盒名字,并确保每个名字唯一 sandboxNameIndex *registrar.Registrar // 存储和容器相关的资源 containerStore *containerstore.Store // 存储容器名字,并确保每个名字唯一 containerNameIndex *registrar.Registrar // 存储所有关联镜像的资源 imageStore *imagestore.Store // 存储快照(用于构建容器根文件系统)的信息 snapshotStore *snapshotstore.Store // 网络插件,负责容器网络的创建和消耗iu netPlugin cni.CNI // containerd客户端 client *containerd.Client // 处理容器流请求 streamServer streaming.Server // 监控容器事件 eventMonitor *eventMonitor // 提示该服务是否初始化完毕 initialized atomic.Bool // 用于重新加载CNI网络配置 cniNetConfMonitor *cniNetConfSyncer } |
整体流程:
- 如有必要,下载沙盒镜像
- 为沙盒创建网络命名空间
- 生成沙盒容器的Spec
- 配置SELinux标签
- 确定容器运行时
- 通过containerd客户端,创建容器
- 为沙盒创建存放/dev/shm、/etc/hosts等文件的目录
- 在容器中运行任务,也就是执行二进制文件
- 监控任务的退出事件
代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 |
func (c *criService) RunPodSandbox(ctx context.Context, r *runtime.RunPodSandboxRequest) (_ *runtime.RunPodSandboxResponse, retErr error) { // CNI沙盒配置 config := r.GetConfig() log.G(ctx).Debugf("Sandbox config %+v", config) // 生成随机ID id := util.GenerateID() metadata := config.GetMetadata() if metadata == nil { return nil, errors.New("sandbox config must include metadata") } // 沙盒名称,格式类似dockershim name := makeSandboxName(metadata) log.G(ctx).Debugf("Generated id %q for sandbox %q", id, name) // 预定沙盒名称,防止并发的RunPodSandbox请求,启动相同的沙盒 if err := c.sandboxNameIndex.Reserve(name, id); err != nil { return nil, errors.Wrapf(err, "failed to reserve sandbox name %q", name) } defer func() { // 释放沙盒名称 if retErr != nil { c.sandboxNameIndex.ReleaseByName(name) } }() //沙盒对象 sandbox := sandboxstore.NewSandbox( sandboxstore.Metadata{ ID: id, Name: name, Config: config, RuntimeHandler: r.GetRuntimeHandler(), }, sandboxstore.Status{ State: sandboxstore.StateUnknown, }, ) // 确保沙盒镜像的snapshot存在,如果不存在,则调用相应的CRI接口拉取镜像 image, err := c.ensureImageExists(ctx, c.config.SandboxImage, config) if err != nil { return nil, errors.Wrapf(err, "failed to get sandbox image %q", c.config.SandboxImage) } // 转换为containerd镜像 containerdImage, err := c.toContainerdImage(ctx, *image) if err != nil { return nil, errors.Wrapf(err, "failed to get image from containerd %q", image.ID) } // runtime handler就是runtime class // containerd配置文件中需要包含相应的运行时 ociRuntime, err := c.getSandboxRuntime(config, r.GetRuntimeHandler()) if err != nil { return nil, errors.Wrap(err, "failed to get sandbox runtime") } log.G(ctx).Debugf("Use OCI %+v for sandbox %q", ociRuntime, id) podNetwork := true // 在Windows下Pod网络是必须的 if goruntime.GOOS != "windows" && config.GetLinux().GetSecurityContext().GetNamespaceOptions().GetNetwork() == runtime.NamespaceMode_NODE { // 在Linux下,使用宿主机网络时 podNetwork = false } if podNetwork { // 为沙盒创建网络命名空间 sandbox.NetNS, err = netns.NewNetNS() sandbox.NetNSPath = sandbox.NetNS.GetPath() defer func() { // 最终清理工作 if retErr != nil { // 出现错误的情况下,清理容器网络、删除网络命名空间 if err := c.teardownPodNetwork(ctx, sandbox); err != nil { log.G(ctx).WithError(err).Errorf("Failed to destroy network for sandbox %q", id) } if err := sandbox.NetNS.Remove(); err != nil { log.G(ctx).WithError(err).Errorf("Failed to remove network namespace %s for sandbox %q", sandbox.NetNSPath, id) } sandbox.NetNSPath = "" } }() // 调用CNI插件,创建容器网络 // 某些基于VM的解决方案,例如clear containers (containerd/cri-containerd#524) // 依赖于这样的假设:CRI shim不去查询网络命名空间以获得网络状态信息,例如IP地址 // // 在未来,运行时实现应当避免依赖于CRI shim的实现细节。在这里,缓存IP地址可以提升性能 // 因为避免了每次处理SandboxStatus请求时都去查询Pod的网络命名空间,来获得veth的IP地址 if err := c.setupPodNetwork(ctx, &sandbox); err != nil { return nil, errors.Wrapf(err, "failed to setup network for sandbox %q", id) } } // 创建沙盒容器 // 生成规格,sandboxContainerSpec函数不应该有副作用,例如访问/创建文件 spec, err := c.sandboxContainerSpec(id, config, &image.ImageSpec.Config, sandbox.NetNSPath, ociRuntime.PodAnnotations) if err != nil { return nil, errors.Wrap(err, "failed to generate sandbox container spec") } log.G(ctx).Debugf("Sandbox container %q spec: %#+v", id, spew.NewFormatter(spec)) // 为沙盒进程设置SELinux标签 sandbox.ProcessLabel = spec.Process.SelinuxLabel defer func() { if retErr != nil { selinux.ReleaseLabel(sandbox.ProcessLabel) } }() if config.GetLinux().GetSecurityContext().GetPrivileged() { // 特权容器不设置SELinux标签 spec.Process.SelinuxLabel = "" } // 生成OCI Spec相关选项 specOpts, err := c.sandboxContainerSpecOpts(config, &image.ImageSpec.Config) if err != nil { return nil, errors.Wrap(err, "failed to generate sanbdox container spec options") } sandboxLabels := buildLabels(config.Labels, containerKindSandbox) // 容器运行时选项,使用何种运行时、加速什么cgroup... runtimeOpts, err := generateRuntimeOptions(ociRuntime, c.config) if err != nil { return nil, errors.Wrap(err, "failed to generate runtime options") } opts := []containerd.NewContainerOpts{ // 容器使用的snapshotter containerd.WithSnapshotter(c.config.ContainerdConfig.Snapshotter), // 指定容器rootfs customopts.WithNewSnapshot(id, containerdImage), // 将指定的spec设置到容器 containerd.WithSpec(spec, specOpts...), // 设置容器标签 containerd.WithContainerLabels(sandboxLabels), containerd.WithContainerExtension(sandboxMetadataExtension, &sandbox.Metadata), // 设置容器使用的运行时 containerd.WithRuntime(ociRuntime.Type, runtimeOpts)} // 调用containerd创建容器 container, err := c.client.NewContainer(ctx, id, opts...) if err != nil { return nil, errors.Wrap(err, "failed to create containerd container") } defer func() { if retErr != nil { // 出错,则删除容器。在超时上下文中进行 deferCtx, deferCancel := ctrdutil.DeferContext() defer deferCancel() if err := container.Delete(deferCtx, containerd.WithSnapshotCleanup); err != nil { log.G(ctx).WithError(err).Errorf("Failed to delete containerd container %q", id) } } }() // 创建沙盒根目录,此目录用于管理hosts文件之类的文件 sandboxRootDir := c.getSandboxRootDir(id) if err := c.os.MkdirAll(sandboxRootDir, 0755); err != nil { return nil, errors.Wrapf(err, "failed to create sandbox root directory %q", sandboxRootDir) } defer func() { if retErr != nil { // 清理 if err := c.os.RemoveAll(sandboxRootDir); err != nil { log.G(ctx).WithError(err).Errorf("Failed to remove sandbox root directory %q", sandboxRootDir) } } }() // 创建volatile,目录,沙盒在目录中存放不稳定文件,例如命名管道 volatileSandboxRootDir := c.getVolatileSandboxRootDir(id) if err := c.os.MkdirAll(volatileSandboxRootDir, 0755); err != nil { return nil, errors.Wrapf(err, "failed to create volatile sandbox root directory %q", volatileSandboxRootDir) } defer func() { if retErr != nil { // 清理 if err := c.os.RemoveAll(volatileSandboxRootDir); err != nil { log.G(ctx).WithError(err).Errorf("Failed to remove volatile sandbox root directory %q", volatileSandboxRootDir) } } }() // 为沙盒准备好/dev/shm, /etc/hosts, /etc/resolv.conf, /etc/hostname 等文件 if err = c.setupSandboxFiles(id, config); err != nil { return nil, errors.Wrapf(err, "failed to setup sandbox files") } defer func() { if retErr != nil { // 清理 if err = c.cleanupSandboxFiles(id, config); err != nil { log.G(ctx).WithError(err).Errorf("Failed to cleanup sandbox files in %q", sandboxRootDir) } } }() taskOpts := c.taskOpts(ociRuntime.Type) // 在容器中创建Task,Task是containerd中的可执行对象,对应进程 // 该任务不需要stdio task, err := container.NewTask(ctx, containerdio.NullIO, taskOpts...) if err != nil { return nil, errors.Wrap(err, "failed to create containerd task") } defer func() { if retErr != nil { deferCtx, deferCancel := ctrdutil.DeferContext() defer deferCancel() // 清理,杀死Task对应的进程 if _, err := task.Delete(deferCtx, containerd.WithProcessKill); err != nil && !errdefs.IsNotFound(err) { log.G(ctx).WithError(err).Errorf("Failed to delete sandbox container %q", id) } } }() // 当任务退出时,通过exitCh获得退出码 exitCh, err := task.Wait(ctrdutil.NamespacedContext()) if err != nil { return nil, errors.Wrap(err, "failed to wait for sandbox container task") } // 启动容器任务,即执行用户定义的二进制文件 if err := task.Start(ctx); err != nil { return nil, errors.Wrapf(err, "failed to start sandbox container task %q", id) } // 更新沙盒状态 if err := sandbox.Status.Update(func(status sandboxstore.Status) (sandboxstore.Status, error) { // Set the pod sandbox as ready after successfully start sandbox container. status.Pid = task.Pid() status.State = sandboxstore.StateReady return status, nil }); err != nil { return nil, errors.Wrap(err, "failed to update sandbox status") } sandbox.Container = container // 存储沙盒状态 if err := c.sandboxStore.Add(sandbox); err != nil { return nil, errors.Wrapf(err, "failed to add sandbox %+v into store", sandbox) } // 监控沙盒状态,确保monitor接收到TaskExit事件之时,沙盒在store中 c.eventMonitor.startExitMonitor(context.Background(), id, task.Pid(), exitCh) return &runtime.RunPodSandboxResponse{PodSandboxId: id}, nil } |
Leave a Reply