扩展Istio
如果Istio不能满足你的需求,你可以考虑扩展它。
Pilot的功能相对比较固定,主要负责和Envoy代理基于xDS协议的数据交换,通常不需要进行扩展和定制。
Mixer本身即是高度模块化、并且鼓励扩展的。我们可以定义自己的模板,从网格流量中抽取新的属性,也可以开发自己的适配器,来支持和各种后端基础设施的对接。本文的主要篇幅将用来探讨Mixer的扩展。
Istio开发所需的资源参考官方Wiki。
在进行Istio开发之前,先准备好:
- Go 1.11版本
- Docker,Istio包含一个镜像构建系统,能够创建、发布Docker镜像
- 如果在K8S环境下运行Istio,你需要K8S 1.7.3以上版本
1 2 3 |
# 必须签出到$GOPATH/src/istio.io下 pushd /home/alex/Go/workspaces/default/src/istio.io git clone https://github.com/istio/istio.git |
1 2 3 4 5 6 7 8 9 |
export GOPATH=~/go export PATH=$PATH:$GOPATH/bin export ISTIO=$GOPATH/src/github.com/istio/istio # Docker镜像仓库和Tag export HUB="docker.gmem.cc/istio" export TAG=1.0.5 export KUBECONFIG=${HOME}/.kube/config |
在本机环境下构建:
1 2 3 4 5 6 7 8 |
# 基于本机的体系结构构建Istio的Pilot、Mixer、Citadel等组件 make # 构建包含调试信息的组件,可以基于Delve等Debugger进行单步调试 make DEBUG=1 # 提升非第一次构建的速度,-i让Go缓存中间结果 GOBUILDFLAGS=-i make |
构建并打包到容器:
1 2 3 4 5 |
make docker make DEBUG=1 docker # 推送到镜像仓库 make push |
你可以参考下面的命令在本地启动Pilot:
1 |
... discovery --log_output_level=default:debug --domain=k8s.gmem.cc --kubeconfig=/home/alex/.kube/config --meshConfig=pilot/mesh |
在K8S环境下,调试Istio容器的步骤如下:
- 定位到Istio容器在什么节点下运行
- 确保必要的工具都在节点上安装好,包括Go、Delve
- 将可执行文件基于的源码拷贝到节点上
- 在节点上找到Istio容器对应的进程
- 执行 sudo dlv attach pilot-pid开始调试
你也可以使用Squash配合Delve进行调试,可能需要修改Istio的基础镜像(例如alpine)。使用Squash的优势是不需要在所有节点上都安装Delve+Go
给Deployment增加注解: sidecar.istio.io/discoveryAddress: 10.0.0.1:15010 即可强制指定Pilot的地址。
1 2 3 4 5 6 7 8 9 10 11 12 |
# 运行所有测试 make test # 运行Pilot的单元测试 make pilot-test # 使用Go竞态条件检测工具运行测试 make racetest # 获取测试覆盖率信息 make coverage |
只有通过单元测试、集成测试之后,才能提交PR,否则不会被合并:
- 单元测必须是密封的。仅仅访问test binary中的资源
- 所有包、重要文件必须进行单元测试
- 单元测试使用标准的Go测试包
- 测试多种场景/输入时,最好使用表驱动测试
- 必须通过并发测试
1 |
make format |
1 2 3 4 |
make lint # 仅仅针对本地变更进行检查 bin/linters.sh -s HEAD^ |
Istio使用CircleCI作为持续集成系统,所有PR必须通过全部CircleCI测试才能被合并。当Fork了Istio之后,CircleCI测试环境也被复制到本地,可以完整重现Istio的测试基础设施。
你可以注册CircleCI账号,并在Fork中测试代码的变更,防止PR不被通过。
- Fork主仓库
- 克隆Fork到本地
- 启用提交前钩子: ./bin/pre-commit
- 创建一个分支,修改一些代码
- 保持Fork和主仓库同步:
12git fetch upstreamgit rebase upstream/master -
提交变更到Fork
-
推送变更到Fork
-
创建一个PR
- PR会分配给1-N个reviewer,他们会检查代码、文档、注释,包括代码样式
开发新的试验特性,或者进行可能对master稳定性造成巨大影响的变更时,应当新开启特性分支,并遵守:
- 以 collab-<feature-name>的方式命名分支
- 周期性的从master合并代码,长期不合并,导致最终将特性分支合并到master时非常困难
让用测试例本身快速、可靠的基于云环境运行是困难的,Istio测试框架尝试解决该问题。
Istio测试框架的目标:
- 编写测试:
- 平台不可知:API将底层平台的细节屏蔽掉,让开发人员专注于测试Istio本身的裸机
- 可重用测试:可以基于任何支持Istio的底层平台运行测试
- 运行测试:
- 基于Go语言标准测试机制
- 简单:不需要或需要很少的标记即可运行测试
- 快速
- 可靠:在本机运行测试天然比在集群中可靠,但是针对各平台的组件都具有可靠性机制,例如重试
使用此测试框架,你需要编写一个TestMain函数:
1 2 3 4 5 |
func TestMain(m *testing.M) { framework. NewSuite("my_test", m). Run() } |
在此函数中你需要调用NewSuite,从而:
- 启动一个平台特定的环境,默认使用本地环境,如果需要在K8S上运行测试,设置标记 --istio.test.env=kube
- 运行当前包的所有测试用例
- 停止环境
然后在当前包中编写一个个的测试用例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
func TestHTTP(t *testing.T) { // 获取测试环境上下文 ctx := framework.GetContext(t) defer ctx.Done() // 获取需要测试的组件(例如Pilot、Mixer、Apps) apps := components.GetApps(t, ctx) a := apps.GetAppOrFail("a", t) b := apps.GetAppOrFail("b", t) // 和组件进行交互,每个组件都定义了自己的API be := b.EndpointsForProtocol(model.ProtocolHTTP)[0] result := a.CallOrFail(be, components.AppCallOptions{}, t)[0] if !result.IsOK() { t.Fatalf("HTTP Request unsuccessful: %s", result.Body) } } |
如果你需要执行测试套件级别的检查,可以:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
func TestMain(m *testing.M) { framework.NewTest("my_test", m). // 要求Kubernetes环境 RequireEnvironment(environment.Kube). // 部署供整个测试套件使用的Istio SetupOnEnv(environment.Kube, istio.Setup(&ist, setupIstioConfig)). // 调用你的setp函数 Setup(setup). Run() } func setupIstioConfig(cfg *istio.Config) { cfg.Values["your-feature-enabled"] = "true" } func setup(ctx resource.Context) error { // 准备测试环境 } |
Native |
在本机(进程内或进程外)运行测试,默认值 好处是简单、快、可靠 |
Kubernetes | 需要使用标记 --istio.test.env=kube,默认情况下会使用 ~/.kube/config来部署Istio |
标记 | 默认值 | 说明 |
istio.test.env | native |
运行测试的环境 |
istio.test.work_dir | '' | 创建 logs/temp文件的本地目录,如果不指定使用系统临时目录 |
istio.test.hub | '' | 使用的Docker仓库,默认从HUB环境变量读取 |
istio.test.tag | '' | 使用的镜像标签,默认从TAG环境变量读取 |
istio.test.pullpolicy | Always | 镜像拉取策略,可用环境变量PULL_POLICY指定 |
istio.test.nocleanup | false | 测试完毕后不要清理资源 |
istio.test.ci | false | 启用CI模式,以打印更多日志和状态信息 |
istio.test.kube.config | ~/.kube/config | 使用的Kubeconfig |
istio.test.kube.minikube | false | 基于Minikube环境运行 |
istio.test.kube.systemNamespace | istio-system | 废弃 |
istio.test.kube.istioNamespace | istio-system | Istio CA和证书分发组件所在命名空间 |
istio.test.kube.configNamespace | istio-system | 配置文件、服务发现、自动注入组件部署到的命名空间 |
istio.test.kube.telemetryNamespace | istio-system | mixer, kiali, tracing providers, graphana, prometheus 部署到的命名空间 |
istio.test.kube.policyNamespace | istio-system | policy checker部署到的命名空间 |
istio.test.kube.ingressNamespace | istio-system | ingressgateway部署到的命名空间 |
istio.test.kube.egressNamespace | istio-system | egressgateway部署到的命名空间 |
istio.test.kube.deploy | true | 如果为true则部署组件,否则假设组件已经部署了 |
istio.test.kube.helm.chartDir | $(ISTIO)/install/kubernetes/helm/istio | Istio的Helm Chart位置 |
istio.test.kube.helm.valuesFile | values-e2e.yaml | 相对于relative to istio.test.kube.helm.chartDir的Chart 覆盖变量文件 |
istio.test.kube.helm.values | '' | 提供Chart覆盖变量 |
Prow提供CI特性、一套工具集来提升开发人员额度生产力,它由K8S社区开发,部署在GCE中。你也可以在任何K8S集群中部署它。
Prow能运行:pre-submit、post-submit、周期性的Jobs,并提供生产力工具:
- Tide:自动合并PR
- hold:保持没有被合并的PR
- 分支保护:基于配置更新Github分支保护策略
- needs-rebase:提示PR需要rebase
配置文件主要有两个:
- config.yaml:定义Job、一般性设置
- plugins.yaml:插件配置
Mixer使用模板(Template)来结构化入站的属性。模板描述了需要发送给适配器的数据的形式,它还定义了适配器为了接受数据必须实现的gRPC接口。Mixer提供了一些开箱即用的默认模板,当实现自己的适配器时,应当尽可能使用这些默认模板。
模板以Proto文件的形式声明,此定义中包含Template消息,指定了Template变体(Check/Report/Quota)。从Template消息会生成:
- InstanceMsg消息,在请求期间,作为参数传递
- Handle服务,InstanceMsg消息传递给该服务
- Type消息,在配置期间传递,描述InstanceMsg的规格
如果可能,不要自己定义模板。Istio内置的模板通常可以满足需要。
前面提到过,Template是使用Proto文件定义的,它对应一个名为Template的消息。所有Go代码都基于此消息自动生成。
每个模板具有两个额外的属性:
- Name,模板的独特的名称。适配器会使用此名称来注册到Mixer,声明自己需要消费这种类型模板的Instance
- template_variety,表示模板的种类,种类决定了适配器必须实现的、消费模板Instance的方法的签名
- Check,这种模板需要的实例仅仅在Mixer客户端进行Check API调用时生成
- Report,这种模板需要的实例仅仅在Mixer客户端进行Report API调用时生成
- Quota,这种模板需要的实例仅仅在Mixer客户端进行Check API调用,以要求进行Quota分配时生成
- AttributeGenerator,这种模板需要的实例在Check/Report调用时都会生成并分发,这种模板的处理发生在补充属性生成阶段(supplementary attribute generation phase) —— 早于任何其它模板的处理。处理AttributeGenerator的适配器称为属性生成适配器,它们负责生成模板声明的输出数据,你可以基于这些数据来配置新的属性
下面是listentry模板的Proto文件:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
syntax = "proto3"; // 模板的包名词,它决定了模板的名字,对应CRD的名字 package listEntry; import "mixer/adapter/model/v1beta1/extensions.proto"; // 这是一个CHECK模板,可选的种类 CHECK, REPORT, QUOTA, or ATTRIBUTE_GENERATOR // 种类决定了在Mixer处理流水线的什么地方调用消费此模板的适配器 option (istio.mixer.adapter.model.v1beta1.template_variety) = TEMPLATE_VARIETY_CHECK; // 配置示例: // // apiVersion: "config.istio.io/v1alpha2" // kind: listentry // metadata: // name: appversion // namespace: istio-system // spec: // 实例的数据 // value: source.labels["version"] // 根据模板类型的不同,需要定义不同的消息。你总是需要定义一个名为Template的消息 message Template { // 实例的元数据,决定了在运行时,此模板的实例是什么形状,实例会发送给适配器进行处理 string value = 1; } |
需要注意:Template消息上面的注释,将用作模板的文档,该文档同时面向适配器开发人员、运维操作人员。
对于ATTRIBUTE_GENERATOR类型的模板,还需要定义一个额外的OutputTemplate消息:
- Template消息,定义传递给使用该模板实例的适配器的输入
- OutputTemplate消息,定义上述适配器需要返回的输出
注意:目前不支持内嵌Message,enum,oneof,repeated。
可以在Proto中使用的模板字段类型包括:
模板字段类型 | Go字段类型 |
string | string |
int64 | int64 |
double | float64 |
bool | bool |
istio.mixer.adapter.model.v1beta1.TimeStamp | time.Time |
istio.mixer.adapter.model.v1beta1.Duration | time.Duration |
istio.mixer.adapter.model.v1beta1.IPAddress | net.IP |
istio.mixer.adapter.model.v1beta1.DNSName | adapter.DNSName |
istio.mixer.adapter.model.v1beta1.Value | interface{} |
map<string, string> | map[string]string |
map<string, int64> | map[string]int64 |
map<string, double> | map[string]float64 |
map<string, bool> | map[string]bool |
map<string, istio.mixer.adapter.model.v1beta1.TimeStamp> | map[string]time.Time |
map<string, istio.mixer.adapter.model.v1beta1.Duration> | map[string]time.Duration |
map<string, istio.mixer.adapter.model.v1beta1.IPAddress> | map[string]net.IP |
map<string, istio.mixer.adapter.model.v1beta1.DNSName> | map[string]adapter.DNSName |
map<string, istio.mixer.adapter.model.v1beta1.Value> | map[string]interface{} |
基于上述Proto文件生成的Go代码包括:
- InstanceMsg结构:定义了在请求期间传递给适配器的数据的结构。Mixer会基于请求属性和你给出的配置,生成此结构的实例
- OutputMsg结构:仅AttributeGenerator模板生成此结构。定义在属性生成阶段,适配器应当返回的数据的结构
- Handler***Service服务:定义Mixer用来分发InstanceMsg消息给适配器时使用的gRPC接口
- Type结构:如果InstanceMsg中的某些字段的数据类型是动态的(istio.policy.v1beta1.Value),则你提供的配置决定这些字段的真实类型
注意:生成的服务接口,由消费模板实例的那些适配器负责实现。
这是内置的metric模板的例子:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
syntax = "proto3"; package metric; import "mixer/adapter/model/v1beta1/type.proto"; import "mixer/adapter/model/v1beta1/extensions.proto"; option (istio.mixer.v1.config.template.template_variety) = TEMPLATE_VARIETY_REPORT; // 表示需要报告的单个数据 message Template { // 报告的值 istio.mixer.adapter.model.v1beta1.Value value = 1; // 唯一性标识此指标的维度列表 map<string, istio.mixer.adapter.model.v1beta1.Value> dimensions = 2; } |
自动生成如下供适配器使用的Proto定义:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 |
// 需要处理请求期间metric类型的实例的适配器,都需要实现该服务接口 service HandleMetricService { // 处理指标 rpc HandleMetric(HandleMetricRequest) returns (istio.mixer.adapter.model.v1beta1.ReportResult); } // 请求消息结构 message HandleMetricRequest { // metric的实例 repeated InstanceMsg instances = 1; // 适配器特定的Handler配置 // // 注意:可以实现InfrastructureBackend服务,从而可以在会话创建(InfrastructureBackend.CreateSession)期间 // 接收处理器配置。在这种情况下,adapter_config会包含type_url: google.protobuf.Any.type_url字段,并且包含 // 由InfrastructureBackend.CreateSession调用返回的session_id: string google.protobuf.Any adapter_config = 2; // 用于去除针对Mixer的重复调用 string dedup_id = 3; } // metric模板的载荷 message InstanceMsg { // 实例名 string name = 72295727; // 报告的值 istio.policy.v1beta1.Value value = 1; // 指标的维度 map<string, istio.policy.v1beta1.Value> dimensions = 2; } // 包含推断出的、metric模板实例的类型信息 // 在配置期间,通过InfrastructureBackend.CreateSession调用传递 message Type { // The value being reported. istio.policy.v1beta1.ValueType value = 1; // The unique identity of the particular metric to report. map<string, istio.policy.v1beta1.ValueType> dimensions = 2; } |
这是内置listentry模板的例子:
1 2 3 4 5 6 7 8 9 10 11 |
syntax = "proto3"; package listentry; import "mixer/adapter/model/v1beta1/extensions.proto"; option (istio.mixer.v1.config.template.template_variety) = TEMPLATE_VARIETY_CHECK; message Template { string value = 1; } |
自动生成如下供适配器使用的Proto定义:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
service HandleListEntryService { rpc HandleListEntry(HandleListEntryRequest) returns (istio.mixer.adapter.model.v1beta1.CheckResult); } message HandleListEntryRequest { InstanceMsg instance = 1; google.protobuf.Any adapter_config = 2; string dedup_id = 3; } message InstanceMsg { string name = 72295727; string value = 1; } message Type { } |
1 2 3 4 5 6 7 8 9 10 |
package quota; import "policy/v1beta1/type.proto"; import "mixer/adapter/model/v1beta1/extensions.proto"; option (istio.mixer.adapter.model.v1beta1.template_variety) = TEMPLATE_VARIETY_QUOTA; message Template { map<string, istio.policy.v1beta1.Value> dimensions = 1; } |
自动生成如下供适配器使用的Proto定义:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
service HandleQuotaService { rpc HandleQuota(HandleQuotaRequest) returns (istio.mixer.adapter.model.v1beta1.QuotaResult); } message HandleQuotaRequest { InstanceMsg instance = 1; google.protobuf.Any adapter_config = 2; string dedup_id = 3; istio.mixer.adapter.model.v1beta1.QuotaRequest quota_request = 4; } message InstanceMsg { string name = 72295727; map<string, istio.policy.v1beta1.Value> dimensions = 1; } message Type { map<string, istio.policy.v1beta1.ValueType> dimensions = 1; } |
注意:早先Istio支持扩展进程内的适配器,这种适配器(和内置适配器一样)是在Mixer进程内部运行的。目前进程内适配器已经被弃用,应该考虑开发进程外(Out Of Process)的gRPC适配器。
适配器将Mixer和各种基础设施后端,例如负责指标采集的Prometheus、负责日志收集的Fluentd,集成起来。Mixer是一个属性处理引擎,它负责基于用户提供的配置来将请求属性映射为适配器输入参数,然后通过适配器来调用后端系统。
gRPC适配器可以由两种实现模型——基于会话或者无会话的。
注意:目前尚未支持。
Mixer仅仅在使用会话标识符创建会话时,将适配器的配置信息传递给适配器一次。未来Mixer和适配器的通信,均是基于会话标识符,你需要通过此标识符来引用最初传入的配置。
基于这种实现模型的适配器,需要在实现Handle***服务的同时实现InfrastructureBackend服务,Mixer调用后者方法的时序如下:
- 调用Validate方法
- 调用CreateSession方法,返回的session_id将用作后续的
- 针对Handle***的实时调用
- 最终的CloseSession调用
适配器仅仅需要实现Handle***服务,Mixer仅仅在请求(Check/Report/Quota)时期和适配器交互,每次交互都传递完整的适配器配置信息。
每种适配器都需要提供一个资源配置,你需要在Istio的配置存储中添加该配置。资源配置是adapter类型的CR。
创建这种资源配置的方法有两种:
- 调用工具
mixer/tool/mixgen,创建一个adapter资源:
12345//go:generate go run $GOPATH/src/istio.io/istio/mixer/tools/mixgen/main.go adapter \# 适配器类型 是否基于会话 消费的模板类型-n mygrpcadapter -s=false -t metric \-c $GOPATH/src/istio.io/istio/mixer/adapter/mygrpcadapter/config/config.proto_descriptor \-o mygrpcadapter-nosession.yaml -
使用 mixer_codegen.sh -a 命令传入适配器的config.proto:
12//go:generate $GOPATH/src/istio.io/istio/bin/mixer_codegen.sh \-a mixer/adapter/mygrpcadapter/config/config.proto -x "-n mygrpcadapter -s=false -t metric "
不管使用哪种方式,命令都可以作为 go generate阶段的一部分自动执行,都会生成adapter类型的CR。对于上面的例子,会生成一个无会话的、支持metric模板的,名为mygrpcadapter的资源配置,适配器的配置的Proto(声明该适配器支持哪些配置项)也会被编码到其中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
apiVersion: "config.istio.io/v1alpha2" kind: adapter metadata: name: mygrpcadapter namespace: istio-system spec: description: # 是否基于会话 session_based: false # 支持的模板 templates: - metric # 适配器配置的Proto config: CsD3AgogZ29vZ2xlL3Byb3RvYnVmL2Rlc2NyaXB0b3..... |
Istio提供了一个简单的用于测试适配器的框架。该框架会:
- 创建一个进程内的Mixer gRPC服务器,该服务器使用基于本地文件系统的配置存储。
- 创建一个Mixer gRPC客户端
测试框架的实现位于pkg/adapter/test/integration.go。Istio提供了基于此测试框架来测试Prometheus REPORT适配器的例子。
Mixs支持为自定义的适配器生成专门的CRD,执行下面的命令:
1 |
$GOPATH/out/linux_amd64/release/mixs crd adapter |
此Mixer内嵌的适配器的CRD信息会打印到控制台。找到自定义适配器的CRD,用kubectl命令存储到K8S中即可。
有了专门的CRD后,你不需要使用通用的handler来创建处理器,直接创建CR即可。
本节给出实现、测试、插入一个简单的进程外gRPC适配器的完整例子。该适配器:
- 支持metric模板
- 对于每个请求,打印它从Mixer接收的数据到文件
在开始前,请参考“如何贡献”一节,签出Istio代码,准备好环境:
- 你需要安装3.5.1或者更高版本的protoc(Protocol编译器)
- 设置环境变量:
12export MIXER_REPO=$GOPATH/src/istio.io/istio/mixerexport ISTIO=$GOPATH/src/istio.io - 确保Mixer能构建成功:
1pushd $ISTIO/istio && make mixs
在Istio源码树中为新的适配器创建目录:
1 |
cd $MIXER_REPO/adapter && mkdir mygrpcadapter && cd mygrpcadapter |
然后在mygrpcadapter.go中编写如下骨架代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 |
package mygrpcadapter import ( "context" "fmt" "net" "google.golang.org/grpc" "istio.io/api/mixer/adapter/model/v1beta1" "istio.io/istio/mixer/template/metric" ) type ( // gRPC服务器的接口 Server interface { Addr() string Close() error Run(shutdown chan error) } // 适配器结构 MyGrpcAdapter struct { listener net.Listener server *grpc.Server } ) // 该适配器消费metric实例,因此必须实现下面的接口 var _ metric.HandleMetricServiceServer = &MyGrpcAdapter{} // 编写所有接口方法的骨架 /* 实现HandleMetricServiceServer */ func (s *MyGrpcAdapter) HandleMetric(ctx context.Context, r *metric.HandleMetricRequest) (*v1beta1.ReportResult, error) { return nil, nil } /* 实现Server */ func (s *MyGrpcAdapter) Addr() string { return s.listener.Addr().String() } func (s *MyGrpcAdapter) Run(shutdown chan error) { // 传递监听器,启动gRPC服务器 shutdown<- s.server.Serve(s.listener) } // 优雅关闭服务器,测试用 func (s *MyGrpcAdapter) Close() error { if s.server != nil { s.server.GracefulStop() } if s.listener != nil { _ = s.listener.Close() } return nil } // 创建gRPC服务器并监听 func NewMyGrpcAdapter(addr string) (Server, error) { if addr == "" { addr = "0" } listener, err := net.Listen("tcp", fmt.Sprintf(":%s", addr)) if err != nil { return nil, fmt.Errorf("unable to listen on socket: %v", err) } s := &MyGrpcAdapter{ listener: listener, } fmt.Printf("listening on \"%v\"\n", s.Addr()) s.server = grpc.NewServer() metric.RegisterHandleMetricServiceServer(s.server, s) return s, nil } |
执行下面的命令,确保能构建成功:
1 |
go build ./... |
我们开发的适配器需要将接收到的信息打印到文件中,因此需要一个参数,提供文件的路径。
创建mygrpcadapter/config子目录,然后创建Proto文件config.proto:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
syntax = "proto3"; // 包名 package adapter.mygrpcadapter.config; import "gogoproto/gogo.proto"; // 生成的Go代码使用的包名 option go_package="config"; // 适配器的配置,使用Params消息表示 message Params { // 文件路径 string file_path = 1; } |
我们需要从上述Proto生成对应的Go源码,以及适配器的adaptor CR。 在适配器源码上添加以下注释:
1 2 3 4 |
// nolint:lll //go:generate $GOPATH/src/istio.io/istio/bin/mixer_codegen.sh -a mixer/adapter/mygrpcadapter/config/config.proto -x "-s=false -n mygrpcadapter -t metric" package mygrpcadapter |
并执行下面的命令:
1 2 |
go generate ./... go build ./... |
如果一切正常,会生成以下文件:
- 类型为adapter的自定义资源,此资源提供自定义适配器的元数据,包括是否基于会话、描述、适配器参数信息:
1234567891011apiVersion: "config.istio.io/v1alpha2"kind: adaptermetadata:name: mygrpcadapternamespace: istio-systemspec:description:session_based: falsetemplates:- metricConfig: ... - Config.pb.go,适配器的配置的Go代码
- mysampleadapter.config.pb.html,适配器的文档
- Config.proto_descriptor,一个中介文件,适配器代码不会直接使用它
适配器完整的代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 |
// nolint:lll // Generates the mygrpcadapter adapter's resource yaml. It contains the adapter's configuration, name, supported template // names (metric in this case), and whether it is session or no-session based. //go:generate $GOPATH/src/istio.io/istio/bin/mixer_codegen.sh -a mixer/adapter/mygrpcadapter/config/config.proto -x "-s=false -n mygrpcadapter -t metric" package mygrpcadapter import ( "context" "crypto/tls" "crypto/x509" "fmt" "io/ioutil" "net" "google.golang.org/grpc" "google.golang.org/grpc/credentials" "bytes" "os" "istio.io/api/mixer/adapter/model/v1beta1" policy "istio.io/api/policy/v1beta1" "istio.io/istio/mixer/adapter/mygrpcadapter/config" "istio.io/istio/mixer/template/metric" "istio.io/istio/pkg/log" ) type ( Server interface { Addr() string Close() error Run(shutdown chan error) } MyGrpcAdapter struct { listener net.Listener server *grpc.Server } ) var _ metric.HandleMetricServiceServer = &MyGrpcAdapter{} func (s *MyGrpcAdapter) HandleMetric(ctx context.Context, r *metric.HandleMetricRequest) (*v1beta1.ReportResult, error) { log.Infof("received request %v\n", *r) var b bytes.Buffer // 配置参数 cfg := &config.Params{} if r.AdapterConfig != nil { // 将请求中附带的适配器配置进行反序列化处理 if err := cfg.Unmarshal(r.AdapterConfig.Value); err != nil { log.Errorf("error unmarshalling adapter config: %v", err) return nil, err } } b.WriteString(fmt.Sprintf("HandleMetric invoked with:\n Adapter config: %s\n Instances: %s\n", cfg.String(), instances(r.Instances))) if cfg.FilePath == "" { fmt.Println(b.String()) } else { // 输出到文件 _, err := os.OpenFile("out.txt", os.O_RDONLY|os.O_CREATE, 0666) f, err := os.OpenFile(cfg.FilePath, os.O_APPEND|os.O_WRONLY, 0600) defer f.Close() log.Infof("writing instances to file %s", f.Name()) } // 返回空的报告结果 return &v1beta1.ReportResult{}, nil } // 解码metric的维度,注意维度值类型可以是动态的 func decodeDimensions(in map[string]*policy.Value) map[string]interface{} { out := make(map[string]interface{}, len(in)) for k, v := range in { out[k] = decodeValue(v.GetValue()) } return out } // 解码metric的值,注意值的类型可以是动态的 func decodeValue(in interface{}) interface{} { switch t := in.(type) { case *policy.Value_StringValue: return t.StringValue case *policy.Value_Int64Value: return t.Int64Value case *policy.Value_DoubleValue: return t.DoubleValue default: return fmt.Sprintf("%v", in) } } func instances(in []*metric.InstanceMsg) string { var b bytes.Buffer // 对于每个InstanceMsg,解码其值、维度,并打印 for _, inst := range in { b.WriteString(fmt.Sprintf("'%s':\n"+ " {\n"+ " Value = %v\n"+ " Dimensions = %v\n"+ " }", inst.Name, decodeValue(inst.Value.GetValue()), decodeDimensions(inst.Dimensions))) } return b.String() } // ... |
编写一个main函数,以独立进程的形式启动该适配器:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
package main import ( "fmt" "os" mygrpcadapter "istio.io/istio/mixer/adapter/mygrpcadapter" ) func main() { addr := "" if len(os.Args) > 1 { addr = os.Args[1] } s, err := mygrpcadapter.NewMyGrpcAdapter(addr) shutdown := make(chan error, 1) go func() { s.Run(shutdown) }() _ = <-shutdown } |
要使用上述适配器,你需要配置三类Istio资源:
- 处理器(Handler):为适配器提供配置参数
- 实例(Instance):指定如何从请求属性来生成实例,在这里就是metric
- 规则(Rule) :将处理器和实例组合起来
配置示例如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 |
# 处理器配置 apiVersion: "config.istio.io/v1alpha2" # 那些基于内置适配器的处理器,类型可以是prometheus, fluentd ... # 基于自定义适配器的,可以统一配置为handler kind: handler metadata: name: h1 namespace: istio-system spec: # 需要指定适配器类型 adapter: mygrpcadapter connection: # address: "{ADDRESS}" address: "127.0.0.1:38355" # 适配器参数 params: file_path: "out.txt" --- # 模板metric的实例 apiVersion: "config.istio.io/v1alpha2" kind: instance metadata: name: i1metric namespace: istio-system spec: template: metric params: value: request.size | 0 dimensions: response_code: "200" --- # 规则 apiVersion: "config.istio.io/v1alpha2" kind: rule metadata: name: r1 namespace: istio-system spec: actions: - handler: h1.istio-system instances: - i1metric --- |
首先启动适配器,注意我们没有指定端口,随机分配的监听端口会打印到标准输出:
1 2 3 4 |
export ISTIO=$GOPATH/src/istio.io export MIXER_REPO=$GOPATH/src/istio.io/istio/mixer cd $MIXER_REPO/adapter/mygrpcadapter go run cmd/main.go 127.0.0.1:38355 |
我们使用文件系统作为Mixer的配置存储, 将所有配置文件拷贝到同一目录:
1 2 3 4 5 6 7 8 9 |
mkdir testdata # 处理器、实例、规则 cp sample_operator_cfg.yaml $MIXER_REPO/adapter/mygrpcadapter/testdata # 适配器CR cp config/mygrpcadapter.yaml $MIXER_REPO/adapter/mygrpcadapter/testdata # Istio供测试使用的属性清单 cp $MIXER_REPO/testdata/config/attributes.yaml $MIXER_REPO/adapter/mygrpcadapter/testdata # Metric模板 cp $MIXER_REPO/template/metric/template.yaml $MIXER_REPO/adapter/mygrpcadapter/testdata |
构建Mixer,并从上述配置存储启动它:
1 2 |
pushd $ISTIO/istio && make mixs $GOPATH/out/linux_amd64/release/mixs server --configStoreURL=fs://$(pwd)/mixer/adapter/mygrpcadapter/testdata |
启动Mixer后,可以用命令行工具mixc来向Mixer报告:
1 2 3 |
pushd $ISTIO/istio && make mixc # 报告字符串属性 报告整数属性 $GOPATH/out/linux_amd64/release/mixc report -s destination.service="svc.cluster.local" -i request.size=1235 |
打开输出文件,应该可以看到如下内容:
1 2 3 4 5 6 7 |
HandleMetric invoked with: Adapter config: &Params{FilePath:out.txt,} Instances: 'i1metric.instance.istio-system': { Value = 1235 Dimensions = map[response_code:200] } |
你可以利用pkg/adapter/test包编写集成测试,启动进程内的Mixer服务器,并通过Mixer客户端调用它:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 |
package mygrpcadapter import ( "fmt" "io/ioutil" "testing" adapter_integration "istio.io/istio/mixer/pkg/adapter/test" "os" "strings" ) func TestReport(t *testing.T) { // 读取适配器的CR adptCrBytes, err := ioutil.ReadFile("config/mygrpcadapter.yaml") if err != nil { t.Fatalf("could not read file: %v", err) } // 读取处理器、实例、规则配置 operatorCfgBytes, err := ioutil.ReadFile("sample_operator_cfg.yaml") if err != nil { t.Fatalf("could not read file: %v", err) } operatorCfg := string(operatorCfgBytes) shutdown := make(chan error, 1) // 输出文件 var outFile *os.File outFile, err = os.OpenFile("out.txt", os.O_RDONLY|os.O_CREATE, 0666) if err != nil { t.Fatal(err) } defer func() { // 测试完毕后移除输出文件 if removeErr := os.Remove(outFile.Name()); removeErr != nil { t.Logf("Could not remove temporary file %s: %v", outFile.Name(), removeErr) } }() // 适配器集成测试框架 adapter_integration.RunTest( t, nil, // Scenario定义一个完整的集成测试场景 adapter_integration.Scenario{ // 测试前的准备 Setup: func() (ctx interface{}, err error) { // 创建适配器 pServer, err := NewMyGrpcAdapter("") if err != nil { return nil, err } go func() { // 启动服务器 pServer.Run(shutdown) _ = <-shutdown }() return pServer, nil }, // 测试后清理 Teardown: func(ctx interface{}) { s := ctx.(Server) s.Close() }, // 需要对Mixer并行发起的调用列表 ParallelCalls: []adapter_integration.Call{ { CallKind: adapter_integration.REPORT, Attrs: map[string]interface{}{"request.size": int64(555)}, }, }, // 测试结果验证 GetState: func(ctx interface{}) (interface{}, error) { bytes, err := ioutil.ReadFile("out.txt") if err != nil { return nil, err } s := string(bytes) wantStr := `HandleMetric invoked with: Adapter config: &Params{FilePath:out.txt,} Instances: 'i1metric.instance.istio-system': { Value = 555 Dimensions = map[response_code:200] } ` // 断言失败 if normalize(s) != normalize(wantStr) { return nil, fmt.Errorf("got adapters state as : '%s'; want '%s'", s, wantStr) } return nil, nil }, // Mixer需要读取的CRDs数组 GetConfig: func(ctx interface{}) ([]string, error) { s := ctx.(Server) return []string{ string(adptCrBytes), strings.Replace(operatorCfg, "{ADDRESS}", s.Addr(), 1), }, nil }, // 期望的测试结果的JSON字符串形式 Want: ` { "AdapterState": null, "Returns": [ { "Check": { "Status": {}, "ValidDuration": 0, "ValidUseCount": 0 }, "Quota": null, "Error": null } ] } `, }, ) } |
执行测试: cd $MIXER_REPO/adapter/mygrpcadapter && go build ./... && go test *.go
Istio支持基于mTLS来保护任何工作负载之间的通信,mTLS同样可以用于进程外适配器和Mixer之间的流量。
任何处理器都可以指定基于mTLS进行双向认证:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
apiVersion: "config.istio.io/v1alpha2" kind: handler metadata: name: h1 namespace: istio-system spec: adapter: mygrpcadapter connection: address: "{ADDRESS}" #replaces at runtime by the test authentication: # 这些数字证书文件必须位于Mixer服务器对应目录 mutual: private_key: "/tmp/grpc-test-key-cert/mixer.key" client_certificate: "/tmp/grpc-test-key-cert/mixer.crt" ca_certificates: "/tmp/grpc-test-key-cert/ca.pem" |
改造我们的适配器,使其支持TLS:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 |
// 创建适配器的TLS选项 func getServerTLSOption(credential, privateKey, caCertificate string) (grpc.ServerOption, error) { // 从文件加载X509密钥对 certificate, err := tls.LoadX509KeyPair( credential, privateKey, ) // 证书池 certPool := x509.NewCertPool() bs, err := ioutil.ReadFile(caCertificate) // 将CA证书加入证书池 ok := certPool.AppendCertsFromPEM(bs) // TLS配置 tlsConfig := &tls.Config{ Certificates: []tls.Certificate{certificate}, ClientCAs: certPool, } // 要求客户端(Mixer)提供证书,并基于CA验证证书的合法性 tlsConfig.ClientAuth = tls.RequireAndVerifyClientCert // 返回ServerOption return grpc.Creds(credentials.NewTLS(tlsConfig)), nil } func NewMyGrpcAdapter(addr string) (Server, error) { if addr == "" { addr = "0" } listener, err := net.Listen("tcp", fmt.Sprintf(":%s", addr)) if err != nil { return nil, fmt.Errorf("unable to listen on socket: %v", err) } s := &MyGrpcAdapter{ listener: listener, } fmt.Printf("listening on \"%v\"\n", s.Addr()) // 适配器使用的证书 credential := os.Getenv("GRPC_ADAPTER_CREDENTIAL") privateKey := os.Getenv("GRPC_ADAPTER_PRIVATE_KEY") certificate := os.Getenv("GRPC_ADAPTER_CERTIFICATE") if credential != "" { // 获取TLS选项 so, err := getServerTLSOption(credential, privateKey, certificate) // 使用该选项创建gRPC服务器 s.server = grpc.NewServer(so) } else { s.server = grpc.NewServer() } metric.RegisterHandleMetricServiceServer(s.server, s) return s, nil } |
从Istio 1.1开始支持进程外的属性生成适配器,这类适配器需要实现某种ATTRIBUTE_GENERATOR类型的模板(所生成的接口)。这类适配器的目的是在执行Check/Report调用之前,添加额外的属性。
本节,我们会实现一个名为mapper的简单属性生成适配器,它提供一个额外的属性值。
注意:如果Istio内置的模板能满足需要,不要定义自己的模板。在K8S环境下,属性生成器kubernetesenv开箱即用,可以抽取工作负载的各种元数据。
首先在Istio源码树中为我们的适配器创建一个目录:
1 2 3 4 5 6 |
mkdir -p $GOPATH/src/istio.io/ && \ cd $GOPATH/src/istio.io/ && \ git clone https://github.com/istio/istio cd istio mkdir -p mixer/adapter/mapper |
然后,定义如下的模板:
1 2 3 4 5 6 7 8 9 10 |
syntax = "proto3"; package mapper; import "mixer/adapter/model/v1beta1/extensions.proto"; option (istio.mixer.adapter.model.v1beta1.template_variety) = TEMPLATE_VARIETY_ATTRIBUTE_GENERATOR; message Template { string key = 1; } message OutputTemplate { string value = 1; } |
执行下面的命令,从该模板生成相关文件:
1 |
bin/mixer_codegen.sh -t mixer/adapter/mapper/template.proto |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
package mapper import context "golang.org/x/net/context" type MyAdapter struct{} // 模板暴露的方法,处理输入模板规定的消息,返回输出模板规定的消息 func (MyAdapter) HandleMapper(_ context.Context, req *HandleMapperRequest) (*OutputMsg, error) { lookup := map[string]string{ "hello": "world", } // Instance.Key,对应上面模板的Template消息的key字段,注意自动大写 return &OutputMsg{Value: lookup[req.Instance.Key]}, nil // 返回值对存储到OutputTmmplate.value } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
package main import ( "net" "google.golang.org/grpc" "istio.io/istio/mixer/adapter/mapper" ) func main() { listener, err := net.Listen("tcp", ":38355") server := grpc.NewServer() // 注册服务实现到gRPC服务器 mapper.RegisterHandleMapperServiceServer(server, mapper.MyAdapter{}) server.Serve(listener) } |
所有适配器都需要提供配置参数,这样Mixer才能调用适配器。对于这个例子,我们只需要一个空的配置参数(Schema)即可:
1 2 3 |
syntax = "proto3"; package config; message Params{} |
生成对应的Go代码:
1 |
bin/mixer_codegen.sh -a mixer/adapter/mapper/config/config.proto -x "-s=false -n myadapter -t mapper" |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 |
# template资源 kubectl apply -f mixer/adapter/mapper/template.yaml # 适配器的adaptor资源 kubectl apply -f mixer/adapter/mapper/config/myadapter.yaml # 处理器 cat <<EOF | kubectl create -f - apiVersion: config.istio.io/v1alpha2 kind: handler metadata: name: h1 namespace: istio-system spec: adapter: myadapter connection: address: ":9070" params: {} EOF # 模板实例 cat <<EOF | kubectl create -f - apiVersion: config.istio.io/v1alpha2 kind: instance metadata: name: i1 namespace: istio-system spec: template: mapper params: key: destination.namespace attribute_bindings: source.namespace: output.value | "unknown" EOF # 规则 cat <<EOF | kubectl create -f - apiVersion: config.istio.io/v1alpha2 kind: rule metadata: name: r1 namespace: istio-system spec: actions: - handler: h1.istio-system instances: ["i1"] EOF |
发起一个报告:
1 |
go run mixer/cmd/mixc/main.go report -s destination.namespace="hello" |
查看mixc的调试日志,可以看到在预处理期间生成的source.namespace属性:
1 2 3 4 5 6 |
debug api Dispatching Preprocess debug api Dispatching to main adapters after running preprocessors debug api Attribute Bag: destination.namespace : hello # 新生成的属性 source.namespace : world |
Leave a Reply