基于本地gRPC的Go插件系统
Go语言缺乏好用的动态加载代码的机制,Go程序通常是单个自包含的二进制文件,因此难以实现类似于Java那样的插件系统。
这种插件直接编译到二进制文件中。典型的例子是database/sql包中的数据库驱动。这种插件都是通过空白导入:
1 2 |
package main import _ "name" |
的方式激活,插件包的init方法负责插件的注册和初始化。这类插件的缺点包括:
- 违反开闭原则,引入插件必须修改依赖它的程序
对于编译型(直接编译为本地代码,不是JVM那样的虚拟机的代码)语言,都可以调用共享库。Go语言也不例外,其内置的plugin包,就是基于共享库实现插件化。plugin包有一些致命的缺点:
- 限制条件非常苛刻,对于主程序、插件共同使用的包,有任何修改,你都必须重新编译主程序和插件。否则会得到报错:plugin was built with a different version of package
- 主程序和插件的编译器、编译标记也必须一致
- 对操作系统的支持不完善,不支持Windows
hashicorp / go-plugin是一个通过RPC实现的Go插件系统,在Packer、Terraform, Nomad、Vault等由HashiCorp主导的项目中均由应用。go-plugin允许应用程序通过本地网络(本机)的gRPC调用插件,规避了Go无法动态加载代码的缺点。
go-plugin插件由宿主进程调用,宿主进程会以插件二进制文件为映像创建子进程,并且通过单个网络连接与之通信。网络协议可以是:
- net/rpc:这种情况下go-plugin使用yamux库对连接进行多路复用
- gRPC:这种情况下基于HTTP2协议进行多路复用
go-plugin的特性包括:
- 插件是Go接口的实现:这让插件的编写、使用非常自然。对于插件的作者来说,他只需要实现一个Go接口即可;对于插件的用户来说,他只需要调用一个Go接口即可。go-plugin会处理好本地调用转换为gRPC调用的所有细节
- 跨语言支持:插件可以基于任何主流语言编写,同样可以被任何主流语言消费
- 支持复杂的参数、返回值:go-plugin可以处理接口、io.Reader/Writer等复杂类型
- 双向通信:为了支持复杂参数,宿主进程能够将接口实现发送给插件,插件也能够回调到宿主进程
- 内置日志系统:任何使用log标准库的的插件,都会将日志信息传回宿主机进程。宿主进程会在这些日志前面加上插件二进制文件的路径,并且打印日志
- 协议版本化:支持一个简单的协议版本化,增加版本号后可以基于老版本协议的插件无效化。当接口签名变化时应当增加版本
- 标准输出/错误同步:插件以子进程的方式运行,这些子进程可以自由的使用标准输出/错误,并且打印的内容会被自动同步到宿主进程,宿主进程可以为同步的日志指定一个io.Writer
- TTY Preservation:插件子进程可以链接到宿主进程的stdin文件描述符,以便要求TTY的软件能正常工作
- 宿主进程升级:宿主进程升级的时候,插件子进程可以继续允许,并在升级后自动关联到新的宿主进程
- 加密通信:gRPC信道可以加密
- 完整性校验:支持对插件的二进制文件进行Checksum
- 插件崩溃了,不会导致宿主进程崩溃
- 容易安装:只需要将插件放到某个宿主进程能够访问的目录即可
插件接口是宿主进程、插件进程进行通信的桥梁:
- 宿主进程会将插件接口的实现放到自己的插件集中
- 插件进程会将插件接口的实现放到自己的插件集中,并为每个插件指定业务接口的实现。一个插件可以实现多个业务接口
- 宿主进程可以:
- 主动创建插件进程,从而得到插件进程的监听套接字
- 关联到既有插件进程,需要手工提供插件进程的监听套接字
- 宿主进程会调用Client方法,获得插件客户端
- 插件进程会调用Server方法,创建插件服务器端,并且将请求委托给业务接口的实现
- 宿主进程通过plugin.Client,可以获得业务接口的Stub,调用业务接口的方法会自动转换为RPC远程调用
- 插件进程上的UDS套接字监听到调用后,会解析RPC请求,并(Lazy的)分发(Dispense)给对应的业务接口Impl
架构示意如下:
接口 Plugin用于获得一个插件的服务器、客户端:
1 2 3 4 5 6 7 |
type Plugin interface { // 返回一个RPC服务器兼容的结构,提供客户端可以通过net/rpc调用的方法 Server(*MuxBroker) (interface{}, error) // 返回一个RPC客户端 Client(*MuxBroker, *rpc.Client) (interface{}, error) } |
这个接口类似,只是通信方式基于gRPC:
1 2 3 4 5 6 7 |
type GRPCPlugin interface { // 由于gRPC插件以单例方式服务,因此该方法仅调用一次 GRPCServer(*GRPCBroker, *grpc.Server) error // 插件进程退出时,context会被go-plugin关闭 GRPCClient(context.Context, *GRPCBroker, *grpc.ClientConn) (interface{}, error) } |
这个结构负责管理一个插件应用(进程)的完整生命周期,包括创建插件进程、连接到插件进程、Dispense业务接口的实现、最后杀死进程。
对于每个插件(二进制文件),宿主进程需要创建一个plugin.Client实例。
该结构的字段如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
type Client struct { // 插件客户端配置 config *ClientConfig // 插件进程是否已经退出 exited bool l sync.Mutex // 插件进程的RPC监听地址 address net.Addr // 插件进程对象 process *os.Process // 协议客户端,宿主进程需要调用其Dispense方法来获得业务接口的Stub client ClientProtocol // 通信协议 protocol Protocol logger hclog.Logger doneCtx context.Context ctxCancel context.CancelFunc negotiatedVersion int // 用于管理 插件管理协程的生命周期 clientWaitGroup sync.WaitGroup stderrWaitGroup sync.WaitGroup // 测试用,标记进程是否被强杀 processKilled bool } |
包含初始化一个插件客户端所需的配置信息,一旦初始化客户端,即不可修改:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
type ClientConfig struct { // 握手信息,用于宿主、插件的匹配。如果不匹配,插件会拒绝连接 HandshakeConfig // 可以调用的插件列表 Plugins PluginSet // 版本化的插件列表,用于支持在客户端、服务器之间协商兼容版本 VersionedPlugins map[int]PluginSet // 启动插件进程使用的命令行,不能和Reattach联用 Cmd *exec.Cmd // 连接到既有插件进程的必要信息,不能和Cmd联用 Reattach *ReattachConfig // 用于在启动插件时校验二进制文件的完整性 SecureConfig *SecureConfig // 基于TLS进行RPC通信时需要的信息 TLSConfig *tls.Config // 客户端是否应该被plugin包自动管理 // 如果为true,则调用CleanupClients自动清理 // 否则用户需要负责杀掉插件客户端,默认false Managed bool // 和子进程通信使用的端口范围 MinPort, MaxPort uint // 启动插件的超时 StartTimeout time.Duration Stderr io.Writer SyncStdout io.Writer SyncStderr io.Writer // 支持的协议,如果不指定仅仅支持netrpc AllowedProtocols []Protocol // 不指定则为hclog的默认logger Logger hclog.Logger AutoMTLS bool } |
插件主函数的结尾,必须调用此函数来启动监听,需要传入一个ServeConfig。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
type ServeConfig struct { // 和客户端匹配的握手配置 HandshakeConfig // 调用此函数得到tls.Config TLSProvider func() (*tls.Config, error) // 插件集 Plugins PluginSet // 版本化的插件集 VersionedPlugins map[int]PluginSet // 如果通过gRPC提供服务,则此字段不能为空 // 调用此函数创建一个gRPC服务器对象 GRPCServer func([]grpc.ServerOption) *grpc.Server Logger hclog.Logger } |
ClientConfig、ServerConfig都会嵌入此结构,此结构用于宿主、插件建立连接前的握手。
宿主进程创建插件进程时,会将此信息通过环境变量传递。这样插件进程能够正常启动。如果没有这些环境变量,则插件进程会退出,并提示你插件必须由宿主进程启动。
1 2 3 4 5 6 7 8 |
type HandshakeConfig struct { // 协议版本号 ProtocolVersion uint // 用于简单的校验插件进程是否人为手工启动的 MagicCookieKey string MagicCookieValue string } |
基于net/rpc的通信方式目前仅仅用于向后兼容,任何新开发的代码都应该考虑使用gRPC方式。
很简单,返回一个问候信息:
1 2 3 |
type Greeter interface { Greet() string } |
需要提供一个基于net/rpc的实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
import "net/rpc" // Here is an implementation that talks over RPC type GreeterRPC struct{ client *rpc.Client } func (g *GreeterRPC) Greet() string { var resp string err := g.client.Call("Plugin.Greet", new(interface{}), &resp) if err != nil { panic(err) } return resp } |
此结构实现plugin.Plugin接口:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
type GreeterPlugin struct { // 内嵌业务接口,插件进程会设置此自动,宿主进程则置空 Impl Greeter } // 此方法由插件进程Lazy的调用 func (p *GreeterPlugin) Server(*plugin.MuxBroker) (interface{}, error) { return &GreeterRPCServer{Impl: p.Impl}, nil } // 此方法由宿主进程调用 func (GreeterPlugin) Client(b *plugin.MuxBroker, c *rpc.Client) (interface{}, error) { return &GreeterRPC{client: c}, nil } // 这是net/rpc所需要的RPC服务器对象 type GreeterRPCServer struct { // This is the real implementation Impl Greeter } func (s *GreeterRPCServer) Greet(args interface{}, resp *string) error { *resp = s.Impl.Greet() return nil } |
即插件客户端:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 |
package main import ( "fmt" "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-plugin" "github.com/hashicorp/go-plugin/examples/basic/commons" "log" "net" "os" ) func main() { logger := hclog.New(&hclog.LoggerOptions{ Name: "plugin", Output: os.Stdout, Level: hclog.Debug, }) // 宿主机调用plugin.NewClient,创建或连接到插件进程 client := plugin.NewClient(&plugin.ClientConfig{ HandshakeConfig: handshakeConfig, Plugins: map[string]plugin.Plugin{ // 插件名字到插件的映射关系 "greeter": &example.GreeterPlugin{}, }, // 创建新进程 Cmd: exec.Command("./plugin/greeter"), // 连接到现有进程 Reattach: &plugin.ReattachConfig{ Pid: 2802223, Addr: &net.UnixAddr{ Net: "unix", Name: "/tmp/plugin137476534", }, }, Logger: logger, }) // 此调用会终止插件子进程的执行,并且进行必要的清理工作(例如收集日志) defer client.Kill() // 获取RPC客户端对象 rpcClient, err := client.Client() if err != nil { log.Fatal(err) } // 产生具有指定名字的插件的实例 raw, err := rpcClient.Dispense("greeter") if err != nil { log.Fatal(err) } // 插件实例可以强制转型为业务接口 greeter := raw.(example.Greeter) // 像调用本地函数一样调用远程插件,客户端本身是线程安全的,你可以并发的调用业务接口 fmt.Println(greeter.Greet()) } // 用于插件和宿主之间的简单握手,用于提升用户体验(而非安全特性) // 如果握手失败,则提示一个友好的信息 // 可以防止:1、执行错误的插件;2、用户手工启动插件 var handshakeConfig = plugin.HandshakeConfig{ ProtocolVersion: 1, MagicCookieKey: "BASIC_PLUGIN", MagicCookieValue: "hello", } |
不管是插件客户端还是服务器,都需要(在ClientConfig/ServeConfig中)指定一个一个PluginSet,列出支持的插件。对于服务器,必须指定每个插件的Impl。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
package main import ( "os" "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-plugin" "github.com/hashicorp/go-plugin/examples/basic/commons" ) // 业务接口的真实实现 type GreeterHello struct { logger hclog.Logger } func (g *GreeterHello) Greet() string { g.logger.Debug("message from GreeterHello.Greet") return "Hello!" } // 握手配置 var handshakeConfig = plugin.HandshakeConfig{ ProtocolVersion: 1, MagicCookieKey: "BASIC_PLUGIN", MagicCookieValue: "hello", } func main() { logger := hclog.New(&hclog.LoggerOptions{ Level: hclog.Trace, Output: os.Stderr, JSONFormat: true, }) greeter := &GreeterHello{ logger: logger, } // 插件集类似于宿主进程,只是插件需要指定Impl字段 var pluginMap = map[string]plugin.Plugin{ "greeter": &example.GreeterPlugin{Impl: greeter}, } logger.Debug("message from plugin", "foo", "bar") // 启动RPC监听 plugin.Serve(&plugin.ServeConfig{ HandshakeConfig: handshakeConfig, Plugins: pluginMap, }) } |
服务器调用plugin.Serve方法后,主线程即阻塞。直到客户端调用 Dispense方法请求插件实例时,服务器端才会实例化插件(业务接口的实现):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
func (d *dispenseServer) Dispense( name string, response *uint32) error { // 从PluginSet中查找 p, ok := d.plugins[name] if !ok { return fmt.Errorf("unknown plugin type: %s", name) } // 调用(下面的那个函数)插件接口的方法 impl, err := p.Server(d.broker) if err != nil { return errors.New(err.Error()) } // MuxBroker基于唯一性的ID进行TCP连接的多路复用 id := d.broker.NextId() *response = id // 在另外一个协程中处理该请求 go func() { conn, err := d.broker.Accept(id) if err != nil { log.Printf("[ERR] go-plugin: plugin dispense error: %s: %s", name, err) return } serve(conn, "Plugin", impl) }() return nil } func (p *GreeterPlugin) Server(*plugin.MuxBroker) (interface{}, error) { return &GreeterRPCServer{Impl: p.Impl}, nil } |
应当尽量使用grpc而非net/rpc,原因如下:
- gRPC支持多种语言来实现插件,而net/rpc是Go专有的
- 在gRPC模式下,go-plugin插件请求通过HTTP2发送,而非私有的yamux
- net/rpc使用go类型,尽管比较方便,但却存在潜在的兼容性问题
此外需要注意,对于gRPC模式来说,插件进程中只会有单个插件“实例”。对于net/rpc你可能创建多个“实例”。
使用gRPC模式时,你需要:
- 通过Proto文件定义语言中立的接口
- 从Proto生成gRPC客户端、服务器样板代码
- 实现 plugin.GRPCPlugin的两个方法,分别调用上面生成的样板代码,获取gRPC客户端、注册gRPC服务实现类
- 在gRPC服务实现类中,将gRPC接口适配为业务接口
- 业务接口的实现由插件进程提供
样板类模板比较多,有些繁琐。
一个简单的键值存储服务:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
syntax = "proto3"; package proto; message GetRequest { string key = 1; } message GetResponse { bytes value = 1; } message PutRequest { string key = 1; bytes value = 2; } message Empty {} service KV { rpc Get(GetRequest) returns (GetResponse); rpc Put(PutRequest) returns (Empty); } |
执行命令: protoc -I proto/ proto/kv.proto --go_out=plugins=grpc:proto/ 生成Go代码。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
// Package shared contains shared data between the host and plugins. package shared import ( "context" "net/rpc" "google.golang.org/grpc" "github.com/hashicorp/go-plugin" "github.com/hashicorp/go-plugin/examples/grpc/proto" ) // 业务接口 type KV interface { Put(key string, value []byte) error Get(key string) ([]byte, error) } |
这一节的代码被宿主进程,插件进程共享。
gRPC模式下,你需要实现接口plugin.GRPCPlugin,并嵌入plugin.Plugin接口:
1 2 3 4 5 6 |
type KVGRPCPlugin struct { // 需要嵌入插件接口 plugin.Plugin // 具体实现,仅当业务接口实现基于Go时该字段有用 Impl KV } |
plugin.GRPCPlugin接口的规格如下,你需要实现两个方法:
1 2 3 4 5 6 7 8 9 10 11 |
type GRPCPlugin interface { // 此方法被插件进程调用,插件进程提供一个grpc.Server // 你需要向其注册gRPC服务的实现(服务器端存根) // 由于gRPC下服务器端是单例模式,因此该方法仅调用一次 GRPCServer(*GRPCBroker, *grpc.Server) error // 此方法被宿主进程调用 // 你需要返回一个业务接口的实现(客户端存根),此实现直接将请求转给gRPC客户端即可 // 传入的context对象会在插件进程销毁时取消 GRPCClient(context.Context, *GRPCBroker, *grpc.ClientConn) (interface{}, error) } |
在GRPCServer方法的实现中,你需要向参数提供的grpc.Server注册通过Proto文件生成的gRPC服务的Go接口:
1 2 3 4 |
type KVServer interface { Get(context.Context, *GetRequest) (*GetResponse, error) Put(context.Context, *PutRequest) (*Empty, error) } |
的实现(服务器存根):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
// 在此方法中提供实现: func (p *KVGRPCPlugin) GRPCServer(broker *plugin.GRPCBroker, s *grpc.Server) error { proto.RegisterKVServer(s, &KVServerStub{Impl: p.Impl}) return nil } // 实现自动生成的KVServer接口,具体逻辑委托给业务接口KV的实现 type KVServerStub struct { // This is the real implementation Impl KV } // 样板代码 func (m *KVServerStub) Put( ctx context.Context, req *proto.PutRequest) (*proto.Empty, error) { return &proto.Empty{}, m.Impl.Put(req.Key, req.Value) } func (m *KVServerStub) Get( ctx context.Context, req *proto.GetRequest) (*proto.GetResponse, error) { v, err := m.Impl.Get(req.Key) return &proto.GetResponse{Value: v}, err } |
在GRPCClient方法的实现中,你需要返回一个业务接口的实现(客户端存根),此实现只是将请求转发给gRPC服务处理:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
func (p *KVGRPCPlugin) GRPCClient(ctx context.Context, broker *plugin.GRPCBroker, c *grpc.ClientConn) (interface{}, error) { // 创建gRPC客户端的方法是自动生成的 return &KVClientStub{client: proto.NewKVClient(c)}, nil } // 业务接口KV的实现,通过gRPC客户端转发请求给插件进程 type KVClientStub struct{ client proto.KVClient } // 业务接口 func (m *KVClientStub) Put(key string, value []byte) error { // 转发 _, err := m.client.Put(context.Background(), &proto.PutRequest{ Key: key, Value: value, }) return err } func (m *KVClientStub) Get(key string) ([]byte, error) { resp, err := m.client.Get(context.Background(), &proto.GetRequest{ Key: key, }) if err != nil { return nil, err } return resp.Value, nil } |
使用gRPC方式时,只需要设置AllowedProtocols,其余的和netrpc模式没有区别。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
package main import ( "fmt" "io/ioutil" "log" "os" "os/exec" "github.com/hashicorp/go-plugin" "github.com/hashicorp/go-plugin/examples/grpc/shared" ) func main() { log.SetOutput(ioutil.Discard) // 创建插件进程 client := plugin.NewClient(&plugin.ClientConfig{ HandshakeConfig: plugin.HandshakeConfig{ ProtocolVersion: 1, MagicCookieKey: "BASIC_PLUGIN", MagicCookieValue: "hello", }, Plugins: map[string]plugin.Plugin{ "kv_grpc": &KVGRPCPlugin{}, }, Cmd: exec.Command("sh", "-c", os.Getenv("KV_PLUGIN")), // 允许的协议类型,默认情况下允许netrpc AllowedProtocols: []plugin.Protocol{ plugin.ProtocolNetRPC, plugin.ProtocolGRPC}, }) defer client.Kill() // 获取RPC客户端 rpcClient, err := client.Client() if err != nil { fmt.Println("Error:", err.Error()) os.Exit(1) } // 得到插件实例 raw, err := rpcClient.Dispense("kv_grpc") if err != nil { fmt.Println("Error:", err.Error()) os.Exit(1) } // 插件实例转换为业务接口 // ... } |
和netrpc方式也没什么差别,只需要指定GRPCServer,提供创建gRPC服务器的函数即可:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 |
package main import ( "fmt" "io/ioutil" "github.com/hashicorp/go-plugin" "github.com/hashicorp/go-plugin/examples/grpc/shared" ) // 这里是KV的真实实现 type KV struct{} func (KV) Put(key string, value []byte) error { value = []byte(fmt.Sprintf("%s\n\nWritten from plugin-go-grpc", string(value))) return ioutil.WriteFile("kv_"+key, value, 0644) } func (KV) Get(key string) ([]byte, error) { return ioutil.ReadFile("kv_" + key) } func main() { plugin.Serve(&plugin.ServeConfig{ HandshakeConfig: plugin.HandshakeConfig{ ProtocolVersion: 1, MagicCookieKey: "BASIC_PLUGIN", MagicCookieValue: "hello", }, Plugins: map[string]plugin.Plugin{ "kv": &shared.KVGRPCPlugin{Impl: &KV{}}, }, // 该字段不为nil,则基于gRPC协议进行服务 GRPCServer: plugin.DefaultGRPCServer, }) } |
"宿主进程会将插件接口的实现放到自己的插件集中"
这句话表述的不正确,宿主进程只是将插件名称和插件类型的映射管理放到了PluginSet插件集中
插件进程如何主动调用宿主进程呀?官方的双向通信我看了一下,是将结构的地址传递过去了,实际上还是从宿主进程主动调用插件进程