基于本地gRPC的Go插件系统
Go语言缺乏好用的动态加载代码的机制,Go程序通常是单个自包含的二进制文件,因此难以实现类似于Java那样的插件系统。
这种插件直接编译到二进制文件中。典型的例子是database/sql包中的数据库驱动。这种插件都是通过空白导入:
| 
					 1 2  | 
						package main import _ "name"  | 
					
的方式激活,插件包的init方法负责插件的注册和初始化。这类插件的缺点包括:
- 违反开闭原则,引入插件必须修改依赖它的程序
 
对于编译型(直接编译为本地代码,不是JVM那样的虚拟机的代码)语言,都可以调用共享库。Go语言也不例外,其内置的plugin包,就是基于共享库实现插件化。plugin包有一些致命的缺点:
- 限制条件非常苛刻,对于主程序、插件共同使用的包,有任何修改,你都必须重新编译主程序和插件。否则会得到报错:plugin was built with a different version of package
 - 主程序和插件的编译器、编译标记也必须一致
 - 对操作系统的支持不完善,不支持Windows
 
hashicorp / go-plugin是一个通过RPC实现的Go插件系统,在Packer、Terraform, Nomad、Vault等由HashiCorp主导的项目中均由应用。go-plugin允许应用程序通过本地网络(本机)的gRPC调用插件,规避了Go无法动态加载代码的缺点。
go-plugin插件由宿主进程调用,宿主进程会以插件二进制文件为映像创建子进程,并且通过单个网络连接与之通信。网络协议可以是:
- net/rpc:这种情况下go-plugin使用yamux库对连接进行多路复用
 - gRPC:这种情况下基于HTTP2协议进行多路复用
 
go-plugin的特性包括:
- 插件是Go接口的实现:这让插件的编写、使用非常自然。对于插件的作者来说,他只需要实现一个Go接口即可;对于插件的用户来说,他只需要调用一个Go接口即可。go-plugin会处理好本地调用转换为gRPC调用的所有细节
 - 跨语言支持:插件可以基于任何主流语言编写,同样可以被任何主流语言消费
 - 支持复杂的参数、返回值:go-plugin可以处理接口、io.Reader/Writer等复杂类型
 - 双向通信:为了支持复杂参数,宿主进程能够将接口实现发送给插件,插件也能够回调到宿主进程
 - 内置日志系统:任何使用log标准库的的插件,都会将日志信息传回宿主机进程。宿主进程会在这些日志前面加上插件二进制文件的路径,并且打印日志
 - 协议版本化:支持一个简单的协议版本化,增加版本号后可以基于老版本协议的插件无效化。当接口签名变化时应当增加版本
 - 标准输出/错误同步:插件以子进程的方式运行,这些子进程可以自由的使用标准输出/错误,并且打印的内容会被自动同步到宿主进程,宿主进程可以为同步的日志指定一个io.Writer
 - TTY Preservation:插件子进程可以链接到宿主进程的stdin文件描述符,以便要求TTY的软件能正常工作
 - 宿主进程升级:宿主进程升级的时候,插件子进程可以继续允许,并在升级后自动关联到新的宿主进程
 - 加密通信:gRPC信道可以加密
 - 完整性校验:支持对插件的二进制文件进行Checksum
 - 插件崩溃了,不会导致宿主进程崩溃
 - 容易安装:只需要将插件放到某个宿主进程能够访问的目录即可
 
插件接口是宿主进程、插件进程进行通信的桥梁:
- 宿主进程会将插件接口的实现放到自己的插件集中
 - 插件进程会将插件接口的实现放到自己的插件集中,并为每个插件指定业务接口的实现。一个插件可以实现多个业务接口
 - 宿主进程可以:
- 主动创建插件进程,从而得到插件进程的监听套接字
 - 关联到既有插件进程,需要手工提供插件进程的监听套接字
 
 - 宿主进程会调用Client方法,获得插件客户端
 - 插件进程会调用Server方法,创建插件服务器端,并且将请求委托给业务接口的实现
 - 宿主进程通过plugin.Client,可以获得业务接口的Stub,调用业务接口的方法会自动转换为RPC远程调用
 - 插件进程上的UDS套接字监听到调用后,会解析RPC请求,并(Lazy的)分发(Dispense)给对应的业务接口Impl
 
架构示意如下:
接口 Plugin用于获得一个插件的服务器、客户端:
| 
					 1 2 3 4 5 6 7  | 
						type Plugin interface {     // 返回一个RPC服务器兼容的结构,提供客户端可以通过net/rpc调用的方法     Server(*MuxBroker) (interface{}, error)     // 返回一个RPC客户端     Client(*MuxBroker, *rpc.Client) (interface{}, error) }  | 
					
这个接口类似,只是通信方式基于gRPC:
| 
					 1 2 3 4 5 6 7  | 
						type GRPCPlugin interface {     // 由于gRPC插件以单例方式服务,因此该方法仅调用一次     GRPCServer(*GRPCBroker, *grpc.Server) error     // 插件进程退出时,context会被go-plugin关闭     GRPCClient(context.Context, *GRPCBroker, *grpc.ClientConn) (interface{}, error) }  | 
					
这个结构负责管理一个插件应用(进程)的完整生命周期,包括创建插件进程、连接到插件进程、Dispense业务接口的实现、最后杀死进程。
对于每个插件(二进制文件),宿主进程需要创建一个plugin.Client实例。
该结构的字段如下:
| 
					 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27  | 
						type Client struct {     // 插件客户端配置     config            *ClientConfig     // 插件进程是否已经退出     exited            bool     l                 sync.Mutex     // 插件进程的RPC监听地址     address           net.Addr     // 插件进程对象     process           *os.Process     // 协议客户端,宿主进程需要调用其Dispense方法来获得业务接口的Stub     client            ClientProtocol     // 通信协议     protocol          Protocol     logger            hclog.Logger     doneCtx           context.Context     ctxCancel         context.CancelFunc     negotiatedVersion int     // 用于管理 插件管理协程的生命周期     clientWaitGroup sync.WaitGroup     stderrWaitGroup sync.WaitGroup     //  测试用,标记进程是否被强杀     processKilled bool }  | 
					
包含初始化一个插件客户端所需的配置信息,一旦初始化客户端,即不可修改:
| 
					 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44  | 
						type ClientConfig struct {     // 握手信息,用于宿主、插件的匹配。如果不匹配,插件会拒绝连接     HandshakeConfig     // 可以调用的插件列表     Plugins PluginSet     // 版本化的插件列表,用于支持在客户端、服务器之间协商兼容版本     VersionedPlugins map[int]PluginSet     // 启动插件进程使用的命令行,不能和Reattach联用     Cmd      *exec.Cmd     // 连接到既有插件进程的必要信息,不能和Cmd联用     Reattach *ReattachConfig     // 用于在启动插件时校验二进制文件的完整性     SecureConfig *SecureConfig     // 基于TLS进行RPC通信时需要的信息     TLSConfig *tls.Config     // 客户端是否应该被plugin包自动管理     // 如果为true,则调用CleanupClients自动清理     // 否则用户需要负责杀掉插件客户端,默认false     Managed bool     // 和子进程通信使用的端口范围     MinPort, MaxPort uint     // 启动插件的超时     StartTimeout time.Duration     Stderr io.Writer     SyncStdout io.Writer     SyncStderr io.Writer     // 支持的协议,如果不指定仅仅支持netrpc     AllowedProtocols []Protocol     // 不指定则为hclog的默认logger     Logger hclog.Logger     AutoMTLS bool }  | 
					
插件主函数的结尾,必须调用此函数来启动监听,需要传入一个ServeConfig。
| 
					 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19  | 
						type ServeConfig struct {     // 和客户端匹配的握手配置     HandshakeConfig     // 调用此函数得到tls.Config     TLSProvider func() (*tls.Config, error)     // 插件集     Plugins PluginSet     // 版本化的插件集     VersionedPlugins map[int]PluginSet     // 如果通过gRPC提供服务,则此字段不能为空     // 调用此函数创建一个gRPC服务器对象     GRPCServer func([]grpc.ServerOption) *grpc.Server     Logger hclog.Logger }   | 
					
ClientConfig、ServerConfig都会嵌入此结构,此结构用于宿主、插件建立连接前的握手。
宿主进程创建插件进程时,会将此信息通过环境变量传递。这样插件进程能够正常启动。如果没有这些环境变量,则插件进程会退出,并提示你插件必须由宿主进程启动。
| 
					 1 2 3 4 5 6 7 8  | 
						type HandshakeConfig struct {     // 协议版本号     ProtocolVersion uint     // 用于简单的校验插件进程是否人为手工启动的     MagicCookieKey   string     MagicCookieValue string }  | 
					
基于net/rpc的通信方式目前仅仅用于向后兼容,任何新开发的代码都应该考虑使用gRPC方式。
很简单,返回一个问候信息:
| 
					 1 2 3  | 
						type Greeter interface {     Greet() string }  | 
					
需要提供一个基于net/rpc的实现:
| 
					 1 2 3 4 5 6 7 8 9 10 11 12 13 14  | 
						import "net/rpc" // Here is an implementation that talks over RPC type GreeterRPC struct{ client *rpc.Client } func (g *GreeterRPC) Greet() string {     var resp string     err := g.client.Call("Plugin.Greet", new(interface{}), &resp)     if err != nil {         panic(err)     }     return resp }  | 
					
此结构实现plugin.Plugin接口:
| 
					 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24  | 
						type GreeterPlugin struct {     // 内嵌业务接口,插件进程会设置此自动,宿主进程则置空     Impl Greeter } // 此方法由插件进程Lazy的调用 func (p *GreeterPlugin) Server(*plugin.MuxBroker) (interface{}, error) {     return &GreeterRPCServer{Impl: p.Impl}, nil } // 此方法由宿主进程调用 func (GreeterPlugin) Client(b *plugin.MuxBroker, c *rpc.Client) (interface{}, error) {     return &GreeterRPC{client: c}, nil } // 这是net/rpc所需要的RPC服务器对象 type GreeterRPCServer struct {     // This is the real implementation     Impl Greeter } func (s *GreeterRPCServer) Greet(args interface{}, resp *string) error {     *resp = s.Impl.Greet()     return nil }  | 
					
即插件客户端:
| 
					 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67  | 
						package main import (     "fmt"     "github.com/hashicorp/go-hclog"     "github.com/hashicorp/go-plugin"     "github.com/hashicorp/go-plugin/examples/basic/commons"     "log"     "net"     "os" ) func main() {     logger := hclog.New(&hclog.LoggerOptions{         Name:   "plugin",         Output: os.Stdout,         Level:  hclog.Debug,     })     // 宿主机调用plugin.NewClient,创建或连接到插件进程     client := plugin.NewClient(&plugin.ClientConfig{         HandshakeConfig: handshakeConfig,         Plugins: map[string]plugin.Plugin{             // 插件名字到插件的映射关系             "greeter": &example.GreeterPlugin{},         },         // 创建新进程         Cmd:             exec.Command("./plugin/greeter"),         // 连接到现有进程         Reattach: &plugin.ReattachConfig{             Pid: 2802223,             Addr: &net.UnixAddr{                 Net:  "unix",                 Name: "/tmp/plugin137476534",             },         },         Logger: logger,     })     // 此调用会终止插件子进程的执行,并且进行必要的清理工作(例如收集日志)     defer client.Kill()     // 获取RPC客户端对象     rpcClient, err := client.Client()     if err != nil {         log.Fatal(err)     }     // 产生具有指定名字的插件的实例     raw, err := rpcClient.Dispense("greeter")     if err != nil {         log.Fatal(err)     }     // 插件实例可以强制转型为业务接口     greeter := raw.(example.Greeter)     // 像调用本地函数一样调用远程插件,客户端本身是线程安全的,你可以并发的调用业务接口     fmt.Println(greeter.Greet()) } // 用于插件和宿主之间的简单握手,用于提升用户体验(而非安全特性) // 如果握手失败,则提示一个友好的信息 // 可以防止:1、执行错误的插件;2、用户手工启动插件 var handshakeConfig = plugin.HandshakeConfig{     ProtocolVersion:  1,     MagicCookieKey:   "BASIC_PLUGIN",     MagicCookieValue: "hello", }  | 
					
不管是插件客户端还是服务器,都需要(在ClientConfig/ServeConfig中)指定一个一个PluginSet,列出支持的插件。对于服务器,必须指定每个插件的Impl。
| 
					 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50  | 
						package main import (     "os"     "github.com/hashicorp/go-hclog"     "github.com/hashicorp/go-plugin"     "github.com/hashicorp/go-plugin/examples/basic/commons" ) // 业务接口的真实实现 type GreeterHello struct {     logger hclog.Logger } func (g *GreeterHello) Greet() string {     g.logger.Debug("message from GreeterHello.Greet")     return "Hello!" } // 握手配置 var handshakeConfig = plugin.HandshakeConfig{     ProtocolVersion:  1,     MagicCookieKey:   "BASIC_PLUGIN",     MagicCookieValue: "hello", } func main() {     logger := hclog.New(&hclog.LoggerOptions{         Level:      hclog.Trace,         Output:     os.Stderr,         JSONFormat: true,     })     greeter := &GreeterHello{         logger: logger,     }     // 插件集类似于宿主进程,只是插件需要指定Impl字段     var pluginMap = map[string]plugin.Plugin{         "greeter": &example.GreeterPlugin{Impl: greeter},     }     logger.Debug("message from plugin", "foo", "bar")     // 启动RPC监听     plugin.Serve(&plugin.ServeConfig{         HandshakeConfig: handshakeConfig,         Plugins:         pluginMap,     }) }  | 
					
服务器调用plugin.Serve方法后,主线程即阻塞。直到客户端调用 Dispense方法请求插件实例时,服务器端才会实例化插件(业务接口的实现):
| 
					 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35  | 
						func (d *dispenseServer) Dispense(     name string, response *uint32) error {     // 从PluginSet中查找     p, ok := d.plugins[name]     if !ok {         return fmt.Errorf("unknown plugin type: %s", name)     }     // 调用(下面的那个函数)插件接口的方法     impl, err := p.Server(d.broker)     if err != nil {         return errors.New(err.Error())     }     // MuxBroker基于唯一性的ID进行TCP连接的多路复用     id := d.broker.NextId()     *response = id     // 在另外一个协程中处理该请求     go func() {         conn, err := d.broker.Accept(id)         if err != nil {             log.Printf("[ERR] go-plugin: plugin dispense error: %s: %s", name, err)             return         }         serve(conn, "Plugin", impl)     }()     return nil } func (p *GreeterPlugin) Server(*plugin.MuxBroker) (interface{}, error) {     return &GreeterRPCServer{Impl: p.Impl}, nil }  | 
					
应当尽量使用grpc而非net/rpc,原因如下:
- gRPC支持多种语言来实现插件,而net/rpc是Go专有的
 - 在gRPC模式下,go-plugin插件请求通过HTTP2发送,而非私有的yamux
 - net/rpc使用go类型,尽管比较方便,但却存在潜在的兼容性问题
 
此外需要注意,对于gRPC模式来说,插件进程中只会有单个插件“实例”。对于net/rpc你可能创建多个“实例”。
使用gRPC模式时,你需要:
- 通过Proto文件定义语言中立的接口
 - 从Proto生成gRPC客户端、服务器样板代码
 - 实现 plugin.GRPCPlugin的两个方法,分别调用上面生成的样板代码,获取gRPC客户端、注册gRPC服务实现类
 - 在gRPC服务实现类中,将gRPC接口适配为业务接口
 - 业务接口的实现由插件进程提供
 
样板类模板比较多,有些繁琐。
一个简单的键值存储服务:
| 
					 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22  | 
						syntax = "proto3"; package proto; message GetRequest {     string key = 1; } message GetResponse {     bytes value = 1; } message PutRequest {     string key = 1;     bytes value = 2; } message Empty {} service KV {     rpc Get(GetRequest) returns (GetResponse);     rpc Put(PutRequest) returns (Empty); }  | 
					
执行命令: protoc -I proto/ proto/kv.proto --go_out=plugins=grpc:proto/ 生成Go代码。
| 
					 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19  | 
						// Package shared contains shared data between the host and plugins. package shared import (     "context"     "net/rpc"     "google.golang.org/grpc"     "github.com/hashicorp/go-plugin"     "github.com/hashicorp/go-plugin/examples/grpc/proto" ) // 业务接口 type KV interface {     Put(key string, value []byte) error     Get(key string) ([]byte, error) }  | 
					
这一节的代码被宿主进程,插件进程共享。
gRPC模式下,你需要实现接口plugin.GRPCPlugin,并嵌入plugin.Plugin接口:
| 
					 1 2 3 4 5 6  | 
						type KVGRPCPlugin struct {     // 需要嵌入插件接口     plugin.Plugin     // 具体实现,仅当业务接口实现基于Go时该字段有用     Impl KV }  | 
					
plugin.GRPCPlugin接口的规格如下,你需要实现两个方法:
| 
					 1 2 3 4 5 6 7 8 9 10 11  | 
						type GRPCPlugin interface {     // 此方法被插件进程调用,插件进程提供一个grpc.Server     // 你需要向其注册gRPC服务的实现(服务器端存根)     // 由于gRPC下服务器端是单例模式,因此该方法仅调用一次     GRPCServer(*GRPCBroker, *grpc.Server) error     // 此方法被宿主进程调用     // 你需要返回一个业务接口的实现(客户端存根),此实现直接将请求转给gRPC客户端即可     // 传入的context对象会在插件进程销毁时取消     GRPCClient(context.Context, *GRPCBroker, *grpc.ClientConn) (interface{}, error) }  | 
					
在GRPCServer方法的实现中,你需要向参数提供的grpc.Server注册通过Proto文件生成的gRPC服务的Go接口:
| 
					 1 2 3 4  | 
						type KVServer interface {     Get(context.Context, *GetRequest) (*GetResponse, error)     Put(context.Context, *PutRequest) (*Empty, error) }  | 
					
的实现(服务器存根):
| 
					 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20  | 
						// 在此方法中提供实现: func (p *KVGRPCPlugin) GRPCServer(broker *plugin.GRPCBroker, s *grpc.Server) error {     proto.RegisterKVServer(s, &KVServerStub{Impl: p.Impl})     return nil } // 实现自动生成的KVServer接口,具体逻辑委托给业务接口KV的实现 type KVServerStub struct {     // This is the real implementation     Impl KV } // 样板代码 func (m *KVServerStub) Put( ctx context.Context, req *proto.PutRequest) (*proto.Empty, error) {     return &proto.Empty{}, m.Impl.Put(req.Key, req.Value) } func (m *KVServerStub) Get( ctx context.Context, req *proto.GetRequest) (*proto.GetResponse, error) {     v, err := m.Impl.Get(req.Key)     return &proto.GetResponse{Value: v}, err }  | 
					
在GRPCClient方法的实现中,你需要返回一个业务接口的实现(客户端存根),此实现只是将请求转发给gRPC服务处理:
| 
					 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28  | 
						func (p *KVGRPCPlugin) GRPCClient(ctx context.Context, broker *plugin.GRPCBroker, c *grpc.ClientConn) (interface{}, error) {     //                         创建gRPC客户端的方法是自动生成的     return &KVClientStub{client: proto.NewKVClient(c)}, nil } // 业务接口KV的实现,通过gRPC客户端转发请求给插件进程 type KVClientStub struct{ client proto.KVClient } //                   业务接口 func (m *KVClientStub) Put(key string, value []byte) error { // 转发     _, err := m.client.Put(context.Background(), &proto.PutRequest{         Key:   key,         Value: value,     })     return err } func (m *KVClientStub) Get(key string) ([]byte, error) {     resp, err := m.client.Get(context.Background(), &proto.GetRequest{         Key: key,     })     if err != nil {         return nil, err     }     return resp.Value, nil }  | 
					
使用gRPC方式时,只需要设置AllowedProtocols,其余的和netrpc模式没有区别。
| 
					 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50  | 
						package main import (     "fmt"     "io/ioutil"     "log"     "os"     "os/exec"     "github.com/hashicorp/go-plugin"     "github.com/hashicorp/go-plugin/examples/grpc/shared" ) func main() {     log.SetOutput(ioutil.Discard)     // 创建插件进程     client := plugin.NewClient(&plugin.ClientConfig{         HandshakeConfig: plugin.HandshakeConfig{             ProtocolVersion:  1,             MagicCookieKey:   "BASIC_PLUGIN",             MagicCookieValue: "hello",         },         Plugins:         map[string]plugin.Plugin{             "kv_grpc": &KVGRPCPlugin{},         },         Cmd:             exec.Command("sh", "-c", os.Getenv("KV_PLUGIN")),            // 允许的协议类型,默认情况下允许netrpc         AllowedProtocols: []plugin.Protocol{             plugin.ProtocolNetRPC, plugin.ProtocolGRPC},     })     defer client.Kill()     // 获取RPC客户端     rpcClient, err := client.Client()     if err != nil {         fmt.Println("Error:", err.Error())         os.Exit(1)     }     // 得到插件实例     raw, err := rpcClient.Dispense("kv_grpc")     if err != nil {         fmt.Println("Error:", err.Error())         os.Exit(1)     }     // 插件实例转换为业务接口     // ... }  | 
					
和netrpc方式也没什么差别,只需要指定GRPCServer,提供创建gRPC服务器的函数即可:
| 
					 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37  | 
						package main import (     "fmt"     "io/ioutil"     "github.com/hashicorp/go-plugin"     "github.com/hashicorp/go-plugin/examples/grpc/shared" ) // 这里是KV的真实实现 type KV struct{} func (KV) Put(key string, value []byte) error {     value = []byte(fmt.Sprintf("%s\n\nWritten from plugin-go-grpc", string(value)))     return ioutil.WriteFile("kv_"+key, value, 0644) } func (KV) Get(key string) ([]byte, error) {     return ioutil.ReadFile("kv_" + key) } func main() {     plugin.Serve(&plugin.ServeConfig{         HandshakeConfig: plugin.HandshakeConfig{             ProtocolVersion:  1,             MagicCookieKey:   "BASIC_PLUGIN",             MagicCookieValue: "hello",         },         Plugins: map[string]plugin.Plugin{             "kv": &shared.KVGRPCPlugin{Impl: &KV{}},         },         // 该字段不为nil,则基于gRPC协议进行服务         GRPCServer: plugin.DefaultGRPCServer,     }) }  | 
					
            
"宿主进程会将插件接口的实现放到自己的插件集中"
这句话表述的不正确,宿主进程只是将插件名称和插件类型的映射管理放到了PluginSet插件集中
插件进程如何主动调用宿主进程呀?官方的双向通信我看了一下,是将结构的地址传递过去了,实际上还是从宿主进程主动调用插件进程