在最开始,没有出现容器运行时规范时,K8S要创建容器,直接调用Docker守护进程,后者直接(不需要containerd)调用libcontainer,然后就把容器运行起来了。
为了防止Docker公司一家独大,控制容器运行时标准,出现了OCI标准,即Open Container Initiative。Docker将libcontainer封装为runc,作为OCI的参考实现。
Docker推出Swarm之后,将容器相关操作,都移动到containerd项目中,Docker自身则负责容器编排。containerd最初只是对接runc这样的OCI实现的守护进程。
OCI标准,主要关注两方面的内容:
OCI是一套想对宽泛的标准,实现起来比较自由。runc这个参考实现,基于namespaces + cgroups进行隔离,但是,基于虚拟化技术进行隔离,也是可以的。
兼容OCI标准的项目包括:runC、Kata(及其前身runV、Clear Containers)、gVisor
基于虚拟化技术的OCI实现,用VM来运行容器,前身是runV + Clear Container。
Google的项目,它不去创建VM,而是启动名为Sentry的用户态进程,负责处理容器的系统调用。拦截容器系统调用到Sentry的过程,由KVM或ptrace实现。
gVisor所包含的OCI运行时叫做runsc。
这个项目仍然在发展过程中,但是对于Serverless场景有它的优势:
CRI即Container Runtime Interface,它包含一套规范、一套ProtoBuf API、一些库(用于容器运行时和Kubelet的集成)。CRI目前仍然处于Alpha状态。
CRI出现的动机是,不同容器运行时需要支持K8S。最开始,K8S只支持Docker,后来对rkt的支持被加入。对不同容器运行时的支持,是通过实现Kubelet的某个高级别接口,以in-tree方式写在K8S代码库中的。这样下去,势必会导致K8S(sig-node)的代码库难以维护。因此,从1.5版本开始K8S推出CRI,容器运行时需要接入K8S,自己提供out-of-tree的CRI实现即可。
CRI推出之时,K8S的地位没有现在这么高,Docker还希望通过Swarm和K8S竞争。因此,K8S开发了dockershim,作为Docker的CRI垫片(适配器),dockershim运行在kubelet内部,实现CRI接口,实际创建容器的操作,则是调用Docker实现。尽管CRI-O、containerd-plugin都可以实现Docker到K8S的对接,但是dockershim是最成熟的,经过生产环境检验的
CRI标准包含的接口有三部分:
兼容CRI标准的项目包括:Docker(通过dockershim)、containerd(通过CRI-containerd)、CRI-O
Kubernetes + Docker是最流行的组合,在这种组合中,容器创建流程如下:
当Kubelet需要创建容器时,需要:
要在K8S中使用Kata这样的基于虚拟化的OCI运行时,可以使用CRI-O,容器创建流程如下:
当Kubelet需要创建Pod时:
这是一个项目,它实现了CRI规范,并把CRI调用转换为对应的OCI调用。有了CRI-O,你就可以在K8S中使用任何OCI兼容的容器运行时。
CRI是主要目标是提供一个命令式的、容器级别的接口,允许Kubelet直接控制器容器的生命周期。
Pod由一组容器组成,它们在一个隔离的、具有资源约束的环境中。在K8S中Pod是最小调度单元。当Pod被调度到节点上后,Kubelet会为它创建运行环境,并且在此运行环境中增删改Pod,以满足Pod Spec。这里的运行环境,称为PodSandBox。
到底PodSandBox的实体是什么,取决于容器运行时:
不论如何,PodSandBox必须支持:隔离性、资源需求和限制。
PodSandbox的生命周期是和容器解耦的。沙盒在任何容器创建之前即创建,在任何容器终止之后才退出:
create sandbox Foo --> create container C --> start container C stop container C --> remove container C --> delete sandbox Foo
容器运行时不应该擅自影响Pod/容器的生命周期,例如启动一个容器。所有指令应该来自Kubelet,垃圾回收、重启容器等都是Kubelet的职责。
Kubelet还负责在删除沙盒之前,优雅的停止所有容器。
任何沙盒/容器的生命周期操作,例如create/start/stop/delete,要么返回错误,要么阻塞直到成功。一个成功的操作,应该包含沙盒/容器的状态变迁,例如Create调用成功后,容器的状态应该变为created。
K8S仅仅支持修改Pod Spec中很少的一部分,例如镜像地址。这些变更要求Kubelet重新创建容器。
在容器启动、关闭期间,K8S支持执行钩子。Kubelet会通过CRI的Exec函数来调用容器运行时,执行钩子。
为了让生命周期钩子正常工作,Exec需要访问容器的文件系统(Mount命名空间)。这意味着钩子就是执行容器里面的某个命令。
pre-start post-start pre-stop post-stop | | | | exec exec exec exec | | | | create --------> start ----------------> stop --------> remove
CRI的接口以gRPC方式描述,定义在staging/src/k8s.io/cri-api/pkg/apis/services.go。
该接口定义了远程容器运行时需要支持的公共操作,包括沙盒、容器、运行时自身的操作:
service RuntimeService { // 返回运行时名称、版本,以及API版本 rpc Version(VersionRequest) returns (VersionResponse) {} // 创建并启动Pod沙盒,如果调用成功,则运行时必须保证沙盒已经就绪 // the sandbox is in the ready state on success. rpc RunPodSandbox(RunPodSandboxRequest) returns (RunPodSandboxResponse) {} // 停止沙盒中所有运行中的进程,回收分配给沙盒的网络资源(例如IP地址) // 沙盒中正在运行的容器必须强制停止 // 必须实现幂等操作:如果相关资源已经回收,也不能返回错误 // 在调用RemovePodSandbox之前,Kubelet至少会调用该接口一次. // Kubelet会尝试尽早回收资源,只要沙盒不再需要,它就会调用StopPodSandbox rpc StopPodSandbox(StopPodSandboxRequest) returns (StopPodSandboxResponse) {} // 移除沙盒,如果其中还有容器运行,必须强制停止并移除 // 必须实现幂等操作:即使沙盒已经被删除,也不能返回错误 rpc RemovePodSandbox(RemovePodSandboxRequest) returns (RemovePodSandboxResponse) {} // 返回沙盒的状态,如果沙盒不存在,则返回错误 rpc PodSandboxStatus(PodSandboxStatusRequest) returns (PodSandboxStatusResponse) {} // 列出运行时现有的沙盒 rpc ListPodSandbox(ListPodSandboxRequest) returns (ListPodSandboxResponse) {} // 在指定的沙盒中创建容器 rpc CreateContainer(CreateContainerRequest) returns (CreateContainerResponse) {} // 启动沙盒中的容器 rpc StartContainer(StartContainerRequest) returns (StartContainerResponse) {} // 以一个优雅等待期停止容器 // 必须实现幂等操作:即使容器已经停止了,也不能返回错误 rpc StopContainer(StopContainerRequest) returns (StopContainerResponse) {} // 移除一个容器,如果容器正在运行,必须停止并移除 // 必须实现幂等操作:即使容器已经移除了,也不能返回错误 rpc RemoveContainer(RemoveContainerRequest) returns (RemoveContainerResponse) {} // 基于过滤器查询容器列表 rpc ListContainers(ListContainersRequest) returns (ListContainersResponse) {} // 返回容器的状态,如果容器不存在,返回错误 rpc ContainerStatus(ContainerStatusRequest) returns (ContainerStatusResponse) {} // 更新容器的ContainerConfig rpc UpdateContainerResources(UpdateContainerResourcesRequest) returns (UpdateContainerResourcesResponse) {} // 请求容器运行时,重新打开容器的stdout/stderr日志文件 // 该调用经常发生在容器日志轮换之后 // 如果容器不在运行,则运行时可以有两个选择: // 1. 创建新日志文件并返回nil // 2. 返回一个错误,但是绝不能创建一个新的容器日志文件 rpc ReopenContainerLog(ReopenContainerLogRequest) returns (ReopenContainerLogResponse) {} // 同步的在一个容器里面执行命令 rpc ExecSync(ExecSyncRequest) returns (ExecSyncResponse) {} // 准备一个流式端点,用于在容器中执行命令 rpc Exec(ExecRequest) returns (ExecResponse) {} // 准备一个流式端点,Attach到运行中的容器 rpc Attach(AttachRequest) returns (AttachResponse) {} //准备一个流式端点,从沙盒中转发端口 rpc PortForward(PortForwardRequest) returns (PortForwardResponse) {} // 返回容器的统计信息 // 如果容器不存在,返回错误 rpc ContainerStats(ContainerStatsRequest) returns (ContainerStatsResponse) {} // 列出所有容器的统计信息 rpc ListContainerStats(ListContainerStatsRequest) returns (ListContainerStatsResponse) {} // 更新运行时配置 rpc UpdateRuntimeConfig(UpdateRuntimeConfigRequest) returns (UpdateRuntimeConfigResponse) {} // 返回运行时的状态 rpc Status(StatusRequest) returns (StatusResponse) {} }
定义用于管理镜像的API:
service ImageService { // 列出现有的镜像 rpc ListImages(ListImagesRequest) returns (ListImagesResponse) {} // 返回镜像的状态,如果镜像不存在,返回响应,设置 // ImageStatusResponse.Image为nil rpc ImageStatus(ImageStatusRequest) returns (ImageStatusResponse) {} // 使用指定的身份验证配置,来拉取镜像 rpc PullImage(PullImageRequest) returns (PullImageResponse) {} // 移除一个镜像 // 必须实现幂等操作:即使镜像已经移除了,也不能返回错误 rpc RemoveImage(RemoveImageRequest) returns (RemoveImageResponse) {} // 返回用于存放镜像的文件系统的信息 rpc ImageFsInfo(ImageFsInfoRequest) returns (ImageFsInfoResponse) {} }
创建沙盒时,提供的请求对象如下:
message RunPodSandboxRequest { // 沙盒配置 PodSandboxConfig config = 1; // 命名的运行时配置(RuntimeClass),此配置用于此沙盒 // 如果指定的handler不存在,则运行时应该拒绝此请求 // 指定为空字符串,则使用默认handler // 参考: https://git.k8s.io/enhancements/keps/sig-node/runtime-class.md string runtime_handler = 2; } message PodSandboxConfig { // 沙盒的元数据,用于唯一性的识别沙盒 // 运行时依赖于此元数据来确保正确的操作,运行时也应该利用这些元数据提升UX,例如创建 // 可理解的容器名 PodSandboxMetadata metadata = 1; // 沙盒的主机名,如果使用宿主机网络命名空间,可以为空 string hostname = 2; // 宿主机上,存放容器日志的路径 // 默认情况下,容器的STDOUT/STDERR就是容器日志文件,被写到此目录 // // 此目录下还可能包含各容器的结构化日志,例如基于\n分隔的JSON日志、systemd-journald日志 // gRPC跟踪文件,等等。示例: // PodSandboxConfig.LogDirectory = `/var/log/pods/<podUID>/` 沙盒日志根目录 // ContainerConfig.LogPath = `containerName/Instance#.log` 容器日志子目录 // string log_directory = 3; // 沙盒的DNS配置 DNSConfig dns_config = 4; // 沙盒的端口映射 repeated PortMapping port_mappings = 5; // 用于定位和选择资源的标签 map<string, string> labels = 6; // Kubelet可能设置此字段,来存储、取回任何元数据。包括Pod上设置的任何Annotation // // 运行时不应该修改任何Annotation,这里传递的Annotation,必须在PodSandboxStatus中原样返回 // // 为了维持Kubelet和运行时之间的接口时良好定义的,Annoatation不应该影响Runtime的行为 // // 运行时作者也可以用Annotation来试验新的、对K8S API opaque的特性 map<string, string> annotations = 7; // 针对Linux宿主机的特殊配置 LinuxPodSandboxConfig linux = 8; } // 包含生成沙盒名字所需要的所有必要信息 // 鼓励运行时使用这里的信息来创建用户可见的标识(例如容器名),以提升用户体验 message PodSandboxMetadata { // Pod的名字 string name = 1; // Pod的UID string uid = 2; // Pod的命名空间 string namespace = 3; // 尝试创建沙盒的次数,默认0 uint32 attempt = 4; } // 沙盒的DNS配置 message DNSConfig { // 集群DNS服务器列表 repeated string servers = 1; // 集群的搜索域 repeated string searches = 2; // DNS选项 repeated string options = 3; } // 沙盒到宿主机的端口映射 message PortMapping { // 协议 Protocol protocol = 1; // 容器端口 int32 container_port = 2; // 宿主机端口 int32 host_port = 3; // 宿主机IP string host_ip = 4; } // Linux特有的沙盒配置 message LinuxPodSandboxConfig { // 沙盒的父cgroup // 使用 cgroupfs 风格, 运行时可能将其转换为systemd语义 string cgroup_parent = 1; // 沙盒安全属性 LinuxSandboxSecurityContext security_context = 2; // 沙盒的sysctls配置 map<string, string> sysctls = 3; } // 应用到沙盒的安全设置,注意: // 1) 不会应用到Pod中的容iq // 2) 可能对没有运行进程的沙盒不适用 message LinuxSandboxSecurityContext { // 沙盒命名空间配置 // 仅当沙盒基于命名空间做隔离时有意义 NamespaceOption namespace_options = 1; // SELinux上下文 // SELinux,Security-Enhanced Linux,为内核提供访问控制策略 // 可以最大限度地减小系统中服务进程可访问的资源 // 传统的Linux权限机制管理的主体是用户,用户是否对某个资源具有读、写、执行的权利 // SELinux则控制每一类进程是否拥有对某一类资源的访问权限。它限制的主体是进程,这样 // 如果程序有缺陷,它产生的破坏性也可以被限制,而不在于它是否root身份运行 SELinuxOption selinux_options = 2; // 运行沙盒的用户的UID Int64Value run_as_user = 3; // 运行沙盒的GID Int64Value run_as_group = 8; // 沙盒的rootfs是否只读 bool readonly_rootfs = 4; // 父GID之外的辅助组 repeated int64 supplemental_groups = 5; // 沙盒是否启动一个特权容器 bool privileged = 6; // 沙盒Seccomp profile,可选值 // * runtime/default:运行时默认值 // * unconfined:自由的,不支持seccomp沙盒化 // * localhost/<full-path-to-profile>: 安装在节点上的profile // <full-path-to-profile> 完整路径 // Default: "", 等价于unconfined // seccomp即short for secure computing mode,是一种限制系统调用的机制 // 严格模式下,仅允许exit(),sigreturn(),read()和write()这几个系统调用 // 过滤模式下,可以选择允许的系统调用 string seccomp_profile_path = 7; } message NamespaceOption { // 容器或沙盒的网络命名空间类型,目前仅仅支持 POD, NODE NamespaceMode network = 1; // PID命名空间类型:POD, CONTAINER, NODE, TARGET NamespaceMode pid = 2; // IPC命名空间类型:POD, NODE NamespaceMode ipc = 3; // 上述命名空间,类型为TARGET的,用该字段指定目标 // 该字段必须是当前沙盒中创建的,命名空间类型是CONTAINER的容器的ID // 不支持为不同命名空间设置不同的TARGET string target_id = 4; }
message CreateContainerRequest { // 在其中创建容器的沙盒的ID string pod_sandbox_id = 1; // 容器配置 ContainerConfig config = 2; // 沙盒配置,就是创建沙盒时提供的那个对象,在这里仅仅为了引用方便 // 沙盒配置在Pod生命周期内不改变 PodSandboxConfig sandbox_config = 3; } message ContainerConfig { // 容器元数据,唯一性的标识容器。运行时应当基于此元数据,确保正确的行为 // 运行时可以基于此元数据提升UX,例如设置一个可读的容器名 ContainerMetadata metadata = 1 ; // 使用的镜像 ImageSpec image = 2; // 执行的命令,例如Docker的入口点 repeated string command = 3; // 命令的参数 repeated string args = 4; // 工作目录 string working_dir = 5; // 环境变量 repeated KeyValue envs = 6; // 挂载情况 repeated Mount mounts = 7; // 设备列表 repeated Device devices = 8; // 容器标签 map<string, string> labels = 9; // 容器注解 map<string, string> annotations = 10; // 相对于PodSandboxConfig.LogDirectory的日志路径(存放STDOUT、STDERR) string log_path = 11; // 交互式容器字段,交互式容器具有特殊应用场景(例如debugging) bool stdin = 12; bool stdin_once = 13; bool tty = 14; // Linux容器特殊配置 LinuxContainerConfig linux = 15; // Windows容器特殊配置 WindowsContainerConfig windows = 16; } // 说明宿主机的卷如何挂载到容器中 message Mount { // 容器内挂载路径 string container_path = 1; // 宿主机挂载路径。如果路径不存在,运行时应该报错 // 如果路径是符号连接,运行时应该跟踪直到找到真实路径 string host_path = 2; // 只读挂载 bool readonly = 3; // 如果true,则挂载需要SELinux relabeling. bool selinux_relabel = 4; // (子树)挂载传播模式 MountPropagation propagation = 5; } // 说明宿主机设备如何挂载到容器中 message Device { // 设备在容器中的路径 string container_path = 1; // 设备在宿主机中的路径 string host_path = 2; // 设备的Cgroups 特权 // * r - 允许容器读 // * w - 允许容器写 // * m - 允许容器创建不存在的设备文件 string permissions = 3; } message LinuxContainerConfig { // 请求资源信息 LinuxContainerResources resources = 1; // 安全上下文信息 LinuxContainerSecurityContext security_context = 2; } message LinuxContainerResources { // CPU CFS (Completely Fair Scheduler) period int64 cpu_period = 1; // CPU CFS (Completely Fair Scheduler) quota int64 cpu_quota = 2; // CPU shares (relative weight vs. other containers) int64 cpu_shares = 3; // Memory limit in bytes int64 memory_limit_in_bytes = 4; // OOMScoreAdj adjusts the oom-killer score int64 oom_score_adj = 5; // CpusetCpus constrains the allowed set of logical CPUs string cpuset_cpus = 6; // CpusetMems constrains the allowed set of memory nodes string cpuset_mems = 7; // List of HugepageLimits to limit the HugeTLB usage of container per page size repeated HugepageLimit hugepage_limits = 8; } message LinuxContainerSecurityContext { // 需要添加或删除的特权 Capability capabilities = 1; // 在特权模式下运行容器,特权模式和下列字段不兼容: // 1. capabilities // 2. selinux_options // 4. seccomp // 5. apparmor // 运行在特权模式下,则上述字段无效。并且隐含意味着: // 1. 具有所有特权 // 2. 敏感的路径,例如sysfs中内核模块的路径,没有被遮罩 // 3. 所有sysfs、procfs挂载为可读写 // 4. 没有启用Apparmor confinement // 5. 没有启用Seccomp restrictions // 6. device cgroup does 不限制访问任何设备 // 7. 宿主机上所有 /dev 下的设备对容器可见 // 8. 没有启用SELinux restrictions bool privileged = 2; // 基于命名空间进行隔离时,容器的命名空间配置 NamespaceOption namespace_options = 3; // SELinux上下文 SELinuxOption selinux_options = 4; // 运行容器的UID Int64Value run_as_user = 5; // 运行容器的GID Int64Value run_as_group = 12; // 运行容器的用户的名字,名字必须定义在容器镜像的/etc/passwd中 string run_as_username = 6; // 是否设置容器rootfs为只读 bool readonly_rootfs = 7; // 补充的GID repeated int64 supplemental_groups = 8; // AppArmor profile string apparmor_profile = 9; // Seccomp profile string seccomp_profile_path = 10; // 是否在容器上设置no_new_privs bool no_new_privs = 11; // 容器运行时应当遮罩掉的宿主机路径 repeated string masked_paths = 13; // 容器运行时应当设置为只读的宿主机路径 repeated string readonly_paths = 14; }
位于pkg/kubelet/dockershim是一个CRI实现,它作为Docker适配到CRI的垫片(适配器)。
在Kubelet启动时,它会运行KubeletServer:
func Run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies, featureGate featuregate.FeatureGate, stopCh <-chan struct{}) error { klog.Infof("Version: %+v", version.Get()) if err := initForOS(s.KubeletFlags.WindowsService); err != nil { return fmt.Errorf("failed OS init: %v", err) } // 运行服务 if err := run(s, kubeDeps, featureGate, stopCh); err != nil { return fmt.Errorf("failed to run Kubelet: %v", err) } return nil }
作为KubeletServer启动过程的一部分,会进行容器运行时的预(在运行Kubelet之前)初始化:
func run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies, featureGate featuregate.FeatureGate, stopCh <-chan struct{}) (err error) { // ... // 初始化容器运行时服务 err = kubelet.PreInitRuntimeService(&s.KubeletConfiguration, kubeDeps, &s.ContainerRuntimeOptions, s.ContainerRuntime, s.RuntimeCgroups, s.RemoteRuntimeEndpoint, s.RemoteImageEndpoint, s.NonMasqueradeCIDR) }
初始化逻辑如下:
func PreInitRuntimeService(kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *Dependencies, crOptions *config.ContainerRuntimeOptions, containerRuntime string, runtimeCgroups string, remoteRuntimeEndpoint string, remoteImageEndpoint string, nonMasqueradeCIDR string) error { // ... switch containerRuntime { // 如果节点使用Docker运行时,则创建dockersim case kubetypes.DockerContainerRuntime: // docker runDockershim( kubeCfg, kubeDeps, crOptions, runtimeCgroups, remoteRuntimeEndpoint, remoteImageEndpoint, nonMasqueradeCIDR, ) case kubetypes.RemoteContainerRuntime: // remote // 否则,不做任何操作,CRI运行时是一个进程外的gRPC服务 break default: return fmt.Errorf("unsupported CRI runtime: %q", containerRuntime) } // ... }
runDockershim负责创建dockershim,运行在kubelet进程内的CRI服务器端:
func runDockershim(kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *Dependencies, crOptions *config.ContainerRuntimeOptions, runtimeCgroups string, remoteRuntimeEndpoint string, remoteImageEndpoint string, nonMasqueradeCIDR string) error { // NetworkPluginSettings是Kubelet运行时参数的一部分,为CNI提供静态配置 pluginSettings := dockershim.NetworkPluginSettings{ // Kubelet如何配置网络,以处理hairpin包(在网桥上,需要从接收封包的端口,将封包再发回去) // hairpin-veth:在容器运行时创建的容器的veth上设置hairpin标记 // promiscuous-bridge:将容器网桥设置为混杂的,这会强制让其接受hairpin包 // none:不设置,如果kubelet以hairpin模式启动,kube-proxy运行在iptables模式,hairpin包被丢弃 HairpinMode: kubeletconfiginternal.HairpinMode(kubeCfg.HairpinMode), // 不应该包含在任何MASQUERADE规则中各的CIDR NonMasqueradeCIDR: nonMasqueradeCIDR, // 插件名 PluginName: crOptions.NetworkPluginName, // 插件位置 PluginConfDir: crOptions.CNIConfDir, PluginBinDirString: crOptions.CNIBinDir, PluginCacheDir: crOptions.CNICacheDir, MTU: int(crOptions.NetworkPluginMTU), } // 创建Docker的CRI shim,启动为gRPC服务器 streamingConfig := getStreamingConfig(kubeCfg, kubeDeps, crOptions) // Docker客户端配置 dockerClientConfig := &dockershim.ClientConfig{ DockerEndpoint: kubeDeps.DockerOptions.DockerEndpoint, RuntimeRequestTimeout: kubeDeps.DockerOptions.RuntimeRequestTimeout, ImagePullProgressDeadline: kubeDeps.DockerOptions.ImagePullProgressDeadline, } // 创建dockerService,shim的核心逻辑所在,实现了CRI的gRPC接口 ds, err := dockershim.NewDockerService(dockerClientConfig, crOptions.PodSandboxImage, streamingConfig, &pluginSettings, runtimeCgroups, kubeCfg.CgroupDriver, crOptions.DockershimRootDirectory, !crOptions.RedirectContainerStreaming) if err != nil { return err } if crOptions.RedirectContainerStreaming { kubeDeps.criHandler = ds } // Kubelet和dockershim之间通信的UDS,gRPC服务在其上监听 klog.V(2).Infof("Starting the GRPC server for the docker CRI shim.") dockerServer := dockerremote.NewDockerServer(remoteRuntimeEndpoint, ds) if err := dockerServer.Start(); err != nil { return err } return nil }
其中dockershim.NewDockerService调用创建的是dockerService,dockerService是dockershim的核心,它实现了CRI的所有接口。
通过下面的工厂函数创建dockerService:
func NewDockerService(config *ClientConfig, podSandboxImage string, streamingConfig *streaming.Config, pluginSettings *NetworkPluginSettings, cgroupsName string, kubeCgroupDriver string, dockershimRootDir string, startLocalStreamingServer bool) (DockerService, error) { // Docker客户端接口 libdocker.Interface client := NewDockerClientFromConfig(config) c := libdocker.NewInstrumentedInterface(client) // 支持Docker的Checkpoint/Restore操作 checkpointManager, err := checkpointmanager.NewCheckpointManager(filepath.Join(dockershimRootDir, sandboxCheckpointDir)) if err != nil { return nil, err } ds := &dockerService{ client: c, // 用于转发对底层OS的操作 os: kubecontainer.RealOS{}, // 容器沙盒镜像(pause镜像) podSandboxImage: podSandboxImage, // 提供流和执行命令 streamingRuntime: &streamingRuntime{ client: client, execHandler: &NativeExecHandler{}, }, // 容器管理器,能够启动容器 containerManager: cm.NewContainerManager(cgroupsName, client), // 检查点管理器 checkpointManager: checkpointManager, // 是否需要在localhost上开启stream server startLocalStreamingServer: startLocalStreamingServer, networkReady: make(map[string]bool), // 用于容器清理 containerCleanupInfos: make(map[string]*containerCleanupInfo), } // 检查Docker版本兼容性 if err = ds.checkVersionCompatibility(); err != nil { return nil, err } // 创建流服务器 if streamingConfig != nil { var err error ds.streamingServer, err = streaming.NewServer(*streamingConfig, ds.streamingRuntime) if err != nil { return nil, err } } // 确定使用的hairpin模式 if err := effectiveHairpinMode(pluginSettings); err != nil { // This is a non-recoverable error. Returning it up the callstack will just // lead to retries of the same failure, so just fail hard. return nil, err } klog.Infof("Hairpin mode set to %q", pluginSettings.HairpinMode) // 目前仅仅支持CNI插件 pluginSettings.PluginBinDirs = cni.SplitDirs(pluginSettings.PluginBinDirString) // 基于CNI配置文件和二进制文件来检测、获取网络插件 cniPlugins := cni.ProbeNetworkPlugins(pluginSettings.PluginConfDir, pluginSettings.PluginCacheDir, pluginSettings.PluginBinDirs) cniPlugins = append(cniPlugins, kubenet.NewPlugin(pluginSettings.PluginBinDirs, pluginSettings.PluginCacheDir)) // 用于获取端口映射信息,容器所述网络命名空间信息 netHost := &dockerNetworkHost{ &namespaceGetter{ds}, &portMappingGetter{ds}, } // 根据名字来找到并初始化CNI插件 plug, err := network.InitNetworkPlugin(cniPlugins, pluginSettings.PluginName, netHost, pluginSettings.HairpinMode, pluginSettings.NonMasqueradeCIDR, pluginSettings.MTU) if err != nil { return nil, fmt.Errorf("didn't find compatible CNI plugin with given settings %+v: %v", pluginSettings, err) } // 网络插件管理器 ds.network = network.NewPluginManager(plug) klog.Infof("Docker cri networking managed by %v", plug.Name()) // 判断cgroups驱动 cgroupDriver := defaultCgroupDriver dockerInfo, err := ds.client.Info() klog.Infof("Docker Info: %+v", dockerInfo) if err != nil { klog.Errorf("Failed to execute Info() call to the Docker client: %v", err) klog.Warningf("Falling back to use the default driver: %q", cgroupDriver) } else if len(dockerInfo.CgroupDriver) == 0 { klog.Warningf("No cgroup driver is set in Docker") klog.Warningf("Falling back to use the default driver: %q", cgroupDriver) } else { cgroupDriver = dockerInfo.CgroupDriver } if len(kubeCgroupDriver) != 0 && kubeCgroupDriver != cgroupDriver { return nil, fmt.Errorf("misconfiguration: kubelet cgroup driver: %q is different from docker cgroup driver: %q", kubeCgroupDriver, cgroupDriver) } klog.Infof("Setting cgroupDriver to %s", cgroupDriver) ds.cgroupDriver = cgroupDriver ds.versionCache = cache.NewObjectCache( func() (interface{}, error) { return ds.getDockerVersion() }, versionCacheTTL, ) // Register prometheus metrics. metrics.Register() return ds, nil }
type dockerService struct { // Docker客户端 client libdocker.Interface // 系统级服务 os kubecontainer.OSInterface // 使用沙盒镜像 podSandboxImage string // 流服务相关,用于在节点上执行命令 streamingRuntime *streamingRuntime streamingServer streaming.Server // 负责CNI相关工作,为每个Pod设置一个锁,保证针对单个Pod的CNI操作串行化执行 network *network.PluginManager // 保存每个沙盒网络是否ready networkReady map[string]bool networkReadyLock sync.Mutex // 容器管理器,在Linux下负责管理cgroups,并确保容器在cgroups的管理之下 containerManager cm.ContainerManager // 时用的cgroup驱动 cgroupDriver string checkpointManager checkpointmanager.CheckpointManager // 缓存运行时的版本信息,为了保证跨Docker版本的兼容性,dockershim可能需要为 // 某些操作执行版本检查。该缓存避免每次都去调用docker守护程序 versionCache *cache.ObjectCache // 是否需要在本机启动stream服务 startLocalStreamingServer bool // 维持 container id到containerCleanupInfo的映射 // containerCleanupInfo中包含容器移除后需要清理的对象的信息 containerCleanupInfos map[string]*containerCleanupInfo }
结构dockerService实现了DockerService接口,此接口包含了CRI gRPC服务的所有方法:
type DockerService interface { CRIService // 处理streaming调用 http.Handler // 支持遗留特性 legacy.DockerLegacyService } type CRIService interface { // 下面两个是CRI的gRPC接口 runtimeapi.RuntimeServiceServer runtimeapi.ImageServiceServer // 启动服务 Start() error }
在前面的代码中我们已经看到,Kubelet会创建dockerSerivce并调用它的Start方法:
// dockerService的启动过程 func (ds *dockerService) Start() error { // 清理上次运行期间残留的东西 ds.initCleanup() // 启用本地streaming服务 if ds.startLocalStreamingServer { go func() { if err := ds.streamingServer.Start(true); err != nil { klog.Fatalf("Streaming server stopped unexpectedly: %v", err) } }() } // 启动容器管理器 return ds.containerManager.Start() } // 容器管理器的启动过程 func (m *containerManager) Start() error { if len(m.cgroupsName) != 0 { manager, err := createCgroupManager(m.cgroupsName) if err != nil { return err } m.cgroupsManager = manager } // 下面的循环负责: // 1、确保进程运行在cgroups中 // 2、确保进程的OOM score被应用 go wait.Until(m.doWork, 5*time.Minute, wait.NeverStop) return nil }
从本节开始,我们将分析dockershim对CRI接口的实现。第一个分析的是创建沙盒的接口RunPodSandbox。
总体来说,沙盒创建包含一下几个步骤:
完整的代码:
// 对于Docker来说,沙盒的实现是一个容器,它持有(维持)Pod的网络命名空间 // Docker尚未使用LogDirectory func (ds *dockerService) RunPodSandbox(ctx context.Context, r *runtimeapi.RunPodSandboxRequest) (*runtimeapi.RunPodSandboxResponse, error) { // 请求的沙盒配置 config := r.GetConfig() // 第一步:拉取沙盒的镜像 image := defaultSandboxImage podSandboxImage := ds.podSandboxImage if len(podSandboxImage) != 0 { image = podSandboxImage } // 如果使用私有仓库的沙盒镜像,用户必须在节点上合理配置凭证信息 // 如果镜像不存在,则拉取 if err := ensureSandboxImageExists(ds.client, image); err != nil { return nil, err } // 第二步:创建沙盒容器 // 使用何种运行时,注意Docker也可以支持多种容器运行时 if r.GetRuntimeHandler() != "" && r.GetRuntimeHandler() != runtimeName { return nil, fmt.Errorf("RuntimeHandler %q not supported", r.GetRuntimeHandler()) } // 将 runtimeapi.PodSandboxConfig(CRI的配置信息) 转换为 dockertypes.ContainerCreateConfig(Docker的配置信息) createConfig, err := ds.makeSandboxDockerConfig(config, image) if err != nil { return nil, fmt.Errorf("failed to make sandbox docker config for pod %q: %v", config.Metadata.Name, err) } // 调用Docker Client API,创建容器 createResp, err := ds.client.CreateContainer(*createConfig) if err != nil { // 针对1.11-的workaround createResp, err = recoverFromCreationConflictIfNeeded(ds.client, *createConfig, err) } if err != nil || createResp == nil { return nil, fmt.Errorf("failed to create a sandbox for pod %q: %v", config.Metadata.Name, err) } resp := &runtimeapi.RunPodSandboxResponse{PodSandboxId: createResp.ID} // 设置内存中的沙盒网络就绪状态 ds.setNetworkReady(createResp.ID, false) defer func(e *error) { // 外层函数的调用结果,决定是否设置网络为ready if *e == nil { ds.setNetworkReady(createResp.ID, true) } }(&err) // 使用(当前函数调用的)最后一刻的err // 第三步:为沙盒创建Checkpoint if err = ds.checkpointManager.CreateCheckpoint(createResp.ID, constructPodSandboxCheckpoint(config)); err != nil { return nil, err } // 第四步:启动沙盒容器 // 假设在沙盒启动失败的情况下,Kubelet的垃圾回收器会在之后自动删除沙盒 err = ds.client.StartContainer(createResp.ID) if err != nil { return nil, fmt.Errorf("failed to start sandbox container for pod %q: %v", config.Metadata.Name, err) } // 覆盖Docker生成的resolv.conf文件 // NOTE: 不管在什么场景下,集群DNS设置都不会再次传递给Docker API // 这里修改的resolv.conf会被沙盒中所有容器共享 if dnsConfig := config.GetDnsConfig(); dnsConfig != nil { containerInfo, err := ds.client.InspectContainer(createResp.ID) if err != nil { return nil, fmt.Errorf("failed to inspect sandbox container for pod %q: %v", config.Metadata.Name, err) } if err := rewriteResolvFile(containerInfo.ResolvConfPath, dnsConfig.Servers, dnsConfig.Searches, dnsConfig.Options); err != nil { return nil, fmt.Errorf("rewrite resolv.conf failed for pod %q: %v", config.Metadata.Name, err) } } // 如果使用宿主机网络,则不调用CNI插件 if config.GetLinux().GetSecurityContext().GetNamespaceOptions().GetNetwork() == runtimeapi.NamespaceMode_NODE { return resp, nil } // 第五步:为沙盒准备好容器网络 // 任何Pod的网络,都是利用启动阶段发现的CNI插件创建的 // 这个CNI插件负责:分配IP地址给Pod、在沙盒中创建路由、创建网络接口... // 虽然从理论上来说,CNI仅仅应该仓库沙盒网络,但是它也可能在宿主机上 // 插入iptables规则、打开端口。以满足Pod Spec中CNI标准展示不能识别的部分 cID := kubecontainer.BuildContainerID(runtimeName, createResp.ID) networkOptions := make(map[string]string) if dnsConfig := config.GetDnsConfig(); dnsConfig != nil { // Build DNS options. dnsOption, err := json.Marshal(dnsConfig) if err != nil { return nil, fmt.Errorf("failed to marshal dns config for pod %q: %v", config.Metadata.Name, err) } networkOptions["dns"] = string(dnsOption) } // 调用CNI插件管理器,为Pod准备网络 err = ds.network.SetUpPod(config.GetMetadata().Namespace, config.GetMetadata().Name, cID, config.Annotations, networkOptions) if err != nil { errList := []error{fmt.Errorf("failed to set up sandbox container %q network for pod %q: %v", createResp.ID, config.Metadata.Name, err)} // 出错的情况下,确保沙盒网络资源会被清理干净 err = ds.network.TearDownPod(config.GetMetadata().Namespace, config.GetMetadata().Name, cID) if err != nil { errList = append(errList, fmt.Errorf("failed to clean up sandbox container %q network for pod %q: %v", createResp.ID, config.Metadata.Name, err)) } // 停止沙盒容器 err = ds.client.StopContainer(createResp.ID, defaultSandboxGracePeriod) if err != nil { errList = append(errList, fmt.Errorf("failed to stop sandbox container %q for pod %q: %v", createResp.ID, config.Metadata.Name, err)) } return resp, utilerrors.NewAggregate(errList) } return resp, nil }
在沙盒创建过程中,作为最后一步,会调用PluginManager.SetUpPod()来准备容器网络:
// Pod命名空间 Pod名 沙盒的容器ID func (pm *PluginManager) SetUpPod(podNamespace, podName string, id kubecontainer.ContainerID, annotations, options map[string]string) error { defer recordOperation("set_up_pod", time.Now()) fullPodName := kubecontainer.BuildPodFullName(podName, podNamespace) // 串行化针对某个Pod的CNI操作 pm.podLock(fullPodName).Lock() defer pm.podUnlock(fullPodName) // 调用网络插件 if err := pm.plugin.SetUpPod(podNamespace, podName, id, annotations, options); err != nil { return fmt.Errorf("networkPlugin %s failed to set up pod %q network: %v", pm.plugin.Name(), fullPodName, err) } return nil } // 注意两级PodLock的处理 func (pm *PluginManager) podLock(fullPodName string) *sync.Mutex { // 首先获取全局锁 pm.podsLock.Lock() defer pm.podsLock.Unlock() // 确保单个Pod的锁对象存在,不存在则创建之 lock, ok := pm.pods[fullPodName] if !ok { lock = &podLock{} pm.pods[fullPodName] = lock } // 加Pod锁时,增加此计数。podUnlock时减小此计数 lock.refcount++ return &lock.mu }
可以看到,初始化网络的过程,是委托给NetworkPlugin处理的,NetworkPlugin接口也是dockershim私有的:
type NetworkPlugin interface { // 初始化该插件,只会调用一次 Init(host Host, hairpinMode kubeletconfig.HairpinMode, nonMasqueradeCIDR string, mtu int) error // 在各类事件发生时调用,例如NET_PLUGIN_EVENT_POD_CIDR_CHANGE Event(name string, details map[string]interface{}) // 返回插件名称 Name() string // 返回NET_PLUGIN_CAPABILITY_*的集合 Capabilities() utilsets.Int // 在Pod的infra容器创建后,其它容器创建之前调用,为Pod 准备网络 SetUpPod(namespace string, name string, podSandboxID kubecontainer.ContainerID, annotations, options map[string]string) error // 在删除infra容器之前调用 TearDownPod(namespace string, name string, podSandboxID kubecontainer.ContainerID) error // 用于获取容器IP地址 GetPodNetworkStatus(namespace string, name string, podSandboxID kubecontainer.ContainerID) (*PodNetworkStatus, error) // 如果该插件处于错误状态,返回错误 Status() error }
对于CNI来说,NetworkPlugin的实现为cniNetworkPlugin,它在dockerService创建过程中初始化:
cniPlugins := cni.ProbeNetworkPlugins(pluginSettings.PluginConfDir, pluginSettings.PluginCacheDir, pluginSettings.PluginBinDirs) func ProbeNetworkPlugins(confDir, cacheDir string, binDirs []string) []network.NetworkPlugin { // ... plugin := &cniNetworkPlugin{ defaultNetwork: nil, // 主网络接口 loNetwork: getLoNetwork(binDirs), // lo网络接口 execer: utilexec.New(), confDir: confDir, binDirs: binDirs, cacheDir: cacheDir, } plugin.syncNetworkConfig() return []network.NetworkPlugin{plugin} }
cniNetworkPlugin.SetUpPod()的逻辑如下:
func (plugin *cniNetworkPlugin) SetUpPod(namespace string, name string, id kubecontainer.ContainerID, annotations, options map[string]string) error { if err := plugin.checkInitialized(); err != nil { return err } // 根据容器ID获取网络命名空间信息 netnsPath, err := plugin.host.GetNetNS(id.ID) if err != nil { return fmt.Errorf("CNI failed to retrieve network namespace path: %v", err) } cniTimeoutCtx, cancelFunc := context.WithTimeout(context.Background(), network.CNITimeoutSec*time.Second) defer cancelFunc() // Windows没有lo网络的概念 if plugin.loNetwork != nil { // 添加lo网络接口到命名空间 if _, err = plugin.addToNetwork(cniTimeoutCtx, plugin.loNetwork, name, namespace, id, netnsPath, annotations, options); err != nil { return err } } // 添加主网络接口到命名空间 _, err = plugin.addToNetwork(cniTimeoutCtx, plugin.getDefaultNetwork(), name, namespace, id, netnsPath, annotations, options) return err } func (plugin *cniNetworkPlugin) addToNetwork(ctx context.Context, network *cniNetwork, podName string, podNamespace string, podSandboxID kubecontainer.ContainerID, podNetnsPath string, annotations, options map[string]string) (cnitypes.Result, error) { // 创建CNI配置(libcni.RuntimeConf) rt, err := plugin.buildCNIRuntimeConf(podName, podNamespace, podSandboxID, podNetnsPath, annotations, options) pdesc := podDesc(podNamespace, podName, podSandboxID) netConf, cniNet := network.NetworkConfig, network.CNIConfig // 调用标准的CNI接口 res, err := cniNet.AddNetworkList(ctx, netConf, rt) return res, nil }
该方法负责创建一个容器,并加入到指定的沙盒中:
func (ds *dockerService) CreateContainer(_ context.Context, r *runtimeapi.CreateContainerRequest) (*runtimeapi.CreateContainerResponse, error) { podSandboxID := r.PodSandboxId config := r.GetConfig() sandboxConfig := r.GetSandboxConfig() labels := makeLabels(config.GetLabels(), config.GetAnnotations()) // 添加标签 io.kubernetes.docker.type labels[containerTypeLabelKey] = containerTypeLabelContainer // 添加标签 io.kubernetes.container.logpath labels[containerLogPathLabelKey] = filepath.Join(sandboxConfig.LogDirectory, config.LogPath) // 添加标签 io.kubernetes.sandbox.id labels[sandboxIDLabelKey] = podSandboxID apiVersion, err := ds.getDockerAPIVersion() image := "" if iSpec := config.GetImage(); iSpec != nil { image = iSpec.Image } // K8S前缀_容器名_沙盒名_沙盒命名空间_沙盒UID_第几次尝试创建此容器 containerName := makeContainerName(sandboxConfig, config) // Docker的容器配置对象 createConfig := dockertypes.ContainerCreateConfig{ Name: containerName, Config: &dockercontainer.Config{ Entrypoint: dockerstrslice.StrSlice(config.Command), Cmd: dockerstrslice.StrSlice(config.Args), Env: generateEnvList(config.GetEnvs()), Image: image, WorkingDir: config.WorkingDir, Labels: labels, // 交互式容器 OpenStdin: config.Stdin, StdinOnce: config.StdinOnce, Tty: config.Tty, // 在得到官方支持之前,禁用Docker的健康检查 // (https://github.com/kubernetes/kubernetes/issues/25829). Healthcheck: &dockercontainer.HealthConfig{ Test: []string{"NONE"}, }, }, // 解挂载列表转换为Docker能够理解的字符串: // <HostPath>:<ContainerPath>[:options] // 其中options支持逗号分隔的: // ro 提示只读 // Z 提示卷要求SELinux relabeling // 例如rslave之类的传播模式设置 HostConfig: &dockercontainer.HostConfig{ Binds: generateMountBindings(config.GetMounts()), RestartPolicy: dockercontainer.RestartPolicy{ Name: "no", }, }, } hc := createConfig.HostConfig // 完成以下操作: // CPU、内存配额 // OOM分数设置 // 应用security context // 应用通过沙盒推导出的cgroupsParent err = ds.updateCreateConfig(&createConfig, config, sandboxConfig, podSandboxID, securityOptSeparator, apiVersion) if err != nil { return nil, fmt.Errorf("failed to update container create config: %v", err) } // 设置宿主机和容器之间的device mapping devices := make([]dockercontainer.DeviceMapping, len(config.Devices)) for i, device := range config.Devices { devices[i] = dockercontainer.DeviceMapping{ PathOnHost: device.HostPath, PathInContainer: device.ContainerPath, CgroupPermissions: device.Permissions, } } hc.Resources.Devices = devices // 安全选项 securityOpts, err := ds.getSecurityOpts(config.GetLinux().GetSecurityContext().GetSeccompProfilePath(), securityOptSeparator) if err != nil { return nil, fmt.Errorf("failed to generate security options for container %q: %v", config.Metadata.Name, err) } hc.SecurityOpt = append(hc.SecurityOpt, securityOpts...) cleanupInfo, err := ds.applyPlatformSpecificDockerConfig(r, &createConfig) if err != nil { return nil, err } // 调用Docker客户端,创建容器 createResp, createErr := ds.client.CreateContainer(createConfig) if createErr != nil { createResp, createErr = recoverFromCreationConflictIfNeeded(ds.client, createConfig, createErr) } // ... return nil, createErr }
containerd的CRI支持,以前是通过一个进程外的cri-containerd完成,现在则改为containerd内部的插件。该插件支持containerd 1.1+版本。containerd的CRI架构如下:
cri-containerd插件,是通过init函数注册到containerd的:
func init() { // 缺省CRI配置 config := criconfig.DefaultConfig() // 注册当前插件 plugin.Register(&plugin.Registration{ Type: plugin.GRPCPlugin, // 插件类型,这里提示containerd和该插件也通过gRPC交互 ID: "cri", Config: &config, // 插件配置,interface{} Requires: []plugin.Type{ // 插件之间有依赖 plugin.ServicePlugin, }, InitFn: initCRIService, // 初始化函数 }) }
func initCRIService(ic *plugin.InitContext) (interface{}, error) { ic.Meta.Platforms = []imagespec.Platform{platforms.DefaultSpec()} ic.Meta.Exports = map[string]string{"CRIVersion": constants.CRIVersion} ctx := ic.Context // 对应CRI插件的toml配置文件 pluginConfig := ic.Config.(*criconfig.PluginConfig) // 校验配置文件 if err := criconfig.ValidatePluginConfig(ctx, pluginConfig); err != nil { return nil, errors.Wrap(err, "invalid plugin config") } // 关于CRI服务器的所有配置信息 c := criconfig.Config{ PluginConfig: *pluginConfig, ContainerdRootDir: filepath.Dir(ic.Root), ContainerdEndpoint: ic.Address, RootDir: ic.Root, StateDir: ic.State, } servicesOpts, err := getServicesOpts(ic) // Containerd的客户端(gRPC) client, err := containerd.New( "", containerd.WithDefaultNamespace(constants.K8sContainerdNamespace), containerd.WithDefaultPlatform(criplatforms.Default()), containerd.WithServices(servicesOpts...), ) // 创建criService的实例 s, err := server.NewCRIService(c, client) // 启动gRPC服务 go func() { if err := s.Run(); err != nil { log.G(ctx).WithError(err).Fatal("Failed to run CRI service") } }() return s, nil }
和dockershim的CRIService类似:
type grpcServices interface { runtime.RuntimeServiceServer runtime.ImageServiceServer } type CRIService interface { Run() error // 优雅关闭CRI服务 io.Closer // 将CRI服务注册到gRPC服务器 plugin.Service // CRI接口 grpcServices }
type criService struct { // 所有配置信息 config criconfig.Config // 镜像文件系统路径 imageFSPath string // 系统相关操作接口 os osinterface.OS // 存储和沙盒相关的资源 sandboxStore *sandboxstore.Store // 存储沙盒名字,并确保每个名字唯一 sandboxNameIndex *registrar.Registrar // 存储和容器相关的资源 containerStore *containerstore.Store // 存储容器名字,并确保每个名字唯一 containerNameIndex *registrar.Registrar // 存储所有关联镜像的资源 imageStore *imagestore.Store // 存储快照(用于构建容器根文件系统)的信息 snapshotStore *snapshotstore.Store // 网络插件,负责容器网络的创建和消耗iu netPlugin cni.CNI // containerd客户端 client *containerd.Client // 处理容器流请求 streamServer streaming.Server // 监控容器事件 eventMonitor *eventMonitor // 提示该服务是否初始化完毕 initialized atomic.Bool // 用于重新加载CNI网络配置 cniNetConfMonitor *cniNetConfSyncer }
整体流程:
代码如下:
func (c *criService) RunPodSandbox(ctx context.Context, r *runtime.RunPodSandboxRequest) (_ *runtime.RunPodSandboxResponse, retErr error) { // CNI沙盒配置 config := r.GetConfig() log.G(ctx).Debugf("Sandbox config %+v", config) // 生成随机ID id := util.GenerateID() metadata := config.GetMetadata() if metadata == nil { return nil, errors.New("sandbox config must include metadata") } // 沙盒名称,格式类似dockershim name := makeSandboxName(metadata) log.G(ctx).Debugf("Generated id %q for sandbox %q", id, name) // 预定沙盒名称,防止并发的RunPodSandbox请求,启动相同的沙盒 if err := c.sandboxNameIndex.Reserve(name, id); err != nil { return nil, errors.Wrapf(err, "failed to reserve sandbox name %q", name) } defer func() { // 释放沙盒名称 if retErr != nil { c.sandboxNameIndex.ReleaseByName(name) } }() //沙盒对象 sandbox := sandboxstore.NewSandbox( sandboxstore.Metadata{ ID: id, Name: name, Config: config, RuntimeHandler: r.GetRuntimeHandler(), }, sandboxstore.Status{ State: sandboxstore.StateUnknown, }, ) // 确保沙盒镜像的snapshot存在,如果不存在,则调用相应的CRI接口拉取镜像 image, err := c.ensureImageExists(ctx, c.config.SandboxImage, config) if err != nil { return nil, errors.Wrapf(err, "failed to get sandbox image %q", c.config.SandboxImage) } // 转换为containerd镜像 containerdImage, err := c.toContainerdImage(ctx, *image) if err != nil { return nil, errors.Wrapf(err, "failed to get image from containerd %q", image.ID) } // runtime handler就是runtime class // containerd配置文件中需要包含相应的运行时 ociRuntime, err := c.getSandboxRuntime(config, r.GetRuntimeHandler()) if err != nil { return nil, errors.Wrap(err, "failed to get sandbox runtime") } log.G(ctx).Debugf("Use OCI %+v for sandbox %q", ociRuntime, id) podNetwork := true // 在Windows下Pod网络是必须的 if goruntime.GOOS != "windows" && config.GetLinux().GetSecurityContext().GetNamespaceOptions().GetNetwork() == runtime.NamespaceMode_NODE { // 在Linux下,使用宿主机网络时 podNetwork = false } if podNetwork { // 为沙盒创建网络命名空间 sandbox.NetNS, err = netns.NewNetNS() sandbox.NetNSPath = sandbox.NetNS.GetPath() defer func() { // 最终清理工作 if retErr != nil { // 出现错误的情况下,清理容器网络、删除网络命名空间 if err := c.teardownPodNetwork(ctx, sandbox); err != nil { log.G(ctx).WithError(err).Errorf("Failed to destroy network for sandbox %q", id) } if err := sandbox.NetNS.Remove(); err != nil { log.G(ctx).WithError(err).Errorf("Failed to remove network namespace %s for sandbox %q", sandbox.NetNSPath, id) } sandbox.NetNSPath = "" } }() // 调用CNI插件,创建容器网络 // 某些基于VM的解决方案,例如clear containers (containerd/cri-containerd#524) // 依赖于这样的假设:CRI shim不去查询网络命名空间以获得网络状态信息,例如IP地址 // // 在未来,运行时实现应当避免依赖于CRI shim的实现细节。在这里,缓存IP地址可以提升性能 // 因为避免了每次处理SandboxStatus请求时都去查询Pod的网络命名空间,来获得veth的IP地址 if err := c.setupPodNetwork(ctx, &sandbox); err != nil { return nil, errors.Wrapf(err, "failed to setup network for sandbox %q", id) } } // 创建沙盒容器 // 生成规格,sandboxContainerSpec函数不应该有副作用,例如访问/创建文件 spec, err := c.sandboxContainerSpec(id, config, &image.ImageSpec.Config, sandbox.NetNSPath, ociRuntime.PodAnnotations) if err != nil { return nil, errors.Wrap(err, "failed to generate sandbox container spec") } log.G(ctx).Debugf("Sandbox container %q spec: %#+v", id, spew.NewFormatter(spec)) // 为沙盒进程设置SELinux标签 sandbox.ProcessLabel = spec.Process.SelinuxLabel defer func() { if retErr != nil { selinux.ReleaseLabel(sandbox.ProcessLabel) } }() if config.GetLinux().GetSecurityContext().GetPrivileged() { // 特权容器不设置SELinux标签 spec.Process.SelinuxLabel = "" } // 生成OCI Spec相关选项 specOpts, err := c.sandboxContainerSpecOpts(config, &image.ImageSpec.Config) if err != nil { return nil, errors.Wrap(err, "failed to generate sanbdox container spec options") } sandboxLabels := buildLabels(config.Labels, containerKindSandbox) // 容器运行时选项,使用何种运行时、加速什么cgroup... runtimeOpts, err := generateRuntimeOptions(ociRuntime, c.config) if err != nil { return nil, errors.Wrap(err, "failed to generate runtime options") } opts := []containerd.NewContainerOpts{ // 容器使用的snapshotter containerd.WithSnapshotter(c.config.ContainerdConfig.Snapshotter), // 指定容器rootfs customopts.WithNewSnapshot(id, containerdImage), // 将指定的spec设置到容器 containerd.WithSpec(spec, specOpts...), // 设置容器标签 containerd.WithContainerLabels(sandboxLabels), containerd.WithContainerExtension(sandboxMetadataExtension, &sandbox.Metadata), // 设置容器使用的运行时 containerd.WithRuntime(ociRuntime.Type, runtimeOpts)} // 调用containerd创建容器 container, err := c.client.NewContainer(ctx, id, opts...) if err != nil { return nil, errors.Wrap(err, "failed to create containerd container") } defer func() { if retErr != nil { // 出错,则删除容器。在超时上下文中进行 deferCtx, deferCancel := ctrdutil.DeferContext() defer deferCancel() if err := container.Delete(deferCtx, containerd.WithSnapshotCleanup); err != nil { log.G(ctx).WithError(err).Errorf("Failed to delete containerd container %q", id) } } }() // 创建沙盒根目录,此目录用于管理hosts文件之类的文件 sandboxRootDir := c.getSandboxRootDir(id) if err := c.os.MkdirAll(sandboxRootDir, 0755); err != nil { return nil, errors.Wrapf(err, "failed to create sandbox root directory %q", sandboxRootDir) } defer func() { if retErr != nil { // 清理 if err := c.os.RemoveAll(sandboxRootDir); err != nil { log.G(ctx).WithError(err).Errorf("Failed to remove sandbox root directory %q", sandboxRootDir) } } }() // 创建volatile,目录,沙盒在目录中存放不稳定文件,例如命名管道 volatileSandboxRootDir := c.getVolatileSandboxRootDir(id) if err := c.os.MkdirAll(volatileSandboxRootDir, 0755); err != nil { return nil, errors.Wrapf(err, "failed to create volatile sandbox root directory %q", volatileSandboxRootDir) } defer func() { if retErr != nil { // 清理 if err := c.os.RemoveAll(volatileSandboxRootDir); err != nil { log.G(ctx).WithError(err).Errorf("Failed to remove volatile sandbox root directory %q", volatileSandboxRootDir) } } }() // 为沙盒准备好/dev/shm, /etc/hosts, /etc/resolv.conf, /etc/hostname 等文件 if err = c.setupSandboxFiles(id, config); err != nil { return nil, errors.Wrapf(err, "failed to setup sandbox files") } defer func() { if retErr != nil { // 清理 if err = c.cleanupSandboxFiles(id, config); err != nil { log.G(ctx).WithError(err).Errorf("Failed to cleanup sandbox files in %q", sandboxRootDir) } } }() taskOpts := c.taskOpts(ociRuntime.Type) // 在容器中创建Task,Task是containerd中的可执行对象,对应进程 // 该任务不需要stdio task, err := container.NewTask(ctx, containerdio.NullIO, taskOpts...) if err != nil { return nil, errors.Wrap(err, "failed to create containerd task") } defer func() { if retErr != nil { deferCtx, deferCancel := ctrdutil.DeferContext() defer deferCancel() // 清理,杀死Task对应的进程 if _, err := task.Delete(deferCtx, containerd.WithProcessKill); err != nil && !errdefs.IsNotFound(err) { log.G(ctx).WithError(err).Errorf("Failed to delete sandbox container %q", id) } } }() // 当任务退出时,通过exitCh获得退出码 exitCh, err := task.Wait(ctrdutil.NamespacedContext()) if err != nil { return nil, errors.Wrap(err, "failed to wait for sandbox container task") } // 启动容器任务,即执行用户定义的二进制文件 if err := task.Start(ctx); err != nil { return nil, errors.Wrapf(err, "failed to start sandbox container task %q", id) } // 更新沙盒状态 if err := sandbox.Status.Update(func(status sandboxstore.Status) (sandboxstore.Status, error) { // Set the pod sandbox as ready after successfully start sandbox container. status.Pid = task.Pid() status.State = sandboxstore.StateReady return status, nil }); err != nil { return nil, errors.Wrap(err, "failed to update sandbox status") } sandbox.Container = container // 存储沙盒状态 if err := c.sandboxStore.Add(sandbox); err != nil { return nil, errors.Wrapf(err, "failed to add sandbox %+v into store", sandbox) } // 监控沙盒状态,确保monitor接收到TaskExit事件之时,沙盒在store中 c.eventMonitor.startExitMonitor(context.Background(), id, task.Pid(), exitCh) return &runtime.RunPodSandboxResponse{PodSandboxId: id}, nil }
Leave a Reply