gRPC学习笔记
gRPC是Google开源的高性能RPC框架,它在HTTP/2的基础之上实现。gRPC能高效的连接数据中心内部、跨数据中心的服务,并且可以提供名称解析、负载均衡、请求跟踪、健康检查以及身份验证等基础服务。它的主要适用场景包括:
- 在微服务架构中,有效的将多种语言开发的服务连接到一起
- gRPC也可以用来将设备、移动应用程序、浏览器连接到后端服务
- 生成高效的客户端库
gRPC的核心特性包括:
- 支持多种主流编程语言
- 基于HTTP/2的双向流传输
- 可拔插的基础服务支持
相比起传统的HTTP/REST/JSON方式的RPC,基于HTTP/2实现RPC具有优势:二进制协议、单个连接上的请求多路分发、头压缩。
gRPC要求Go的版本在1.6+。执行下面的命令安装所需的包:
1 2 3 4 5 |
export https_proxy="http://10.0.0.1:8087/" # gRPC go get -u google.golang.org/grpc # ProtoBuf go get -u github.com/golang/protobuf/protoc-gen-go |
下面是一个用户信息查询服务的例子:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
syntax = "proto3"; package gmem; service UserService { // gRPC服务的参数和返回值必须是message类型,不能是任何原始类型 rpc GetUser (GetUserRequest) returns (User) { } } message GetUserRequest { uint32 userId = 1; } message User { uint32 userId = 1; string userName = 2; string dob = 3; } |
执行如下命令生成gRPC的Go语言代码:
1 2 3 |
cd /home/alex/Go/workspace/default/src/grpc # Proto源文件中注意Go包声明 option go_package = "./pkg/grpc/supportplan"; protoc --proto_path=protos --go_out=. --go-grpc_out=. protos/*.proto |
命令输出为一个Go源码文件,内容如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 |
package gmem import ( proto "github.com/golang/protobuf/proto" context "golang.org/x/net/context" grpc "google.golang.org/grpc" ) /* 消息结构体代码 */ // RPC请求消息结构体 type GetUserRequest struct { UserId uint32 `protobuf:"varint,1,opt,name=userId" json:"userId,omitempty"` } // RPC响应消息结构体 type User struct { UserId uint32 `protobuf:"varint,1,opt,name=userId" json:"userId,omitempty"` UserName string `protobuf:"bytes,2,opt,name=userName" json:"userName,omitempty"` Dob string `protobuf:"bytes,3,opt,name=dob" json:"dob,omitempty"` } func init() { // 将Go语言结构指针映射到对应的ProtoBuf全限定名 proto.RegisterType((*GetUserRequest)(nil), "gmem.GetUserRequest") proto.RegisterType((*User)(nil), "gmem.User") } /* gRPC代码 */ // gRPC服务的客户端API type UserServiceClient interface { GetUser(ctx context.Context, in *GetUserRequest, opts ...grpc.CallOption) (*User, error) } type userServiceClient struct { cc *grpc.ClientConn } // 调用下面的方法来创建gRPC客户端 func NewUserServiceClient(cc *grpc.ClientConn) UserServiceClient { return &userServiceClient{cc} } func (c *userServiceClient) GetUser(ctx context.Context, in *GetUserRequest, opts ...grpc.CallOption) (*User, error) { out := new(User) err := grpc.Invoke(ctx, "/gmem.UserService/GetUser", in, out, c.cc, opts...) if err != nil { return nil, err } return out, nil } // gRPC服务的服务器端API type UserServiceServer interface { GetUser(context.Context, *GetUserRequest) (*User, error) } // 调用下面的方法来注册gRPC服务器端实现 func RegisterUserServiceServer(s *grpc.Server, srv UserServiceServer) { s.RegisterService(&_UserService_serviceDesc, srv) } func _UserService_GetUser_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { in := new(GetUserRequest) if err := dec(in); err != nil { return nil, err } if interceptor == nil { return srv.(UserServiceServer).GetUser(ctx, in) } info := &grpc.UnaryServerInfo{ Server: srv, FullMethod: "/gmem.UserService/GetUser", } handler := func(ctx context.Context, req interface{}) (interface{}, error) { return srv.(UserServiceServer).GetUser(ctx, req.(*GetUserRequest)) } return interceptor(ctx, in, info, handler) } var _UserService_serviceDesc = grpc.ServiceDesc{ ServiceName: "gmem.UserService", HandlerType: (*UserServiceServer)(nil), Methods: []grpc.MethodDesc{ { MethodName: "GetUser", Handler: _UserService_GetUser_Handler, }, }, Streams: []grpc.StreamDesc{}, Metadata: "UserService.proto", } func init() { proto.RegisterFile("UserService.proto", fileDescriptor0) } |
可以看到,gRPC的客户端存根、服务器接口是自动生成的,其中封装了和远程调用相关的逻辑。我们需要调用存根、实现接口,来编写完整的gRPC客户端或者服务器。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
package main import ( "context" "grpc/gmem" "net" "google.golang.org/grpc" "google.golang.org/grpc/reflection" "log" ) // 实现服务器端接口 UserServiceServer type UserServiceServerImpl struct { } func (srv *UserServiceServerImpl) GetUser(ctx context.Context, req *gmem.GetUserRequest) (*gmem.User, error) { return &gmem.User{ UserId: req.GetUserId(), UserName: "Alex Wong", Dob: "1986-09-12", }, nil } func main() { lis, _ := net.Listen("tcp", ":5500") grpc := grpc.NewServer() gmem.RegisterUserServiceServer(grpc, &UserServiceServerImpl{}) reflection.Register(grpc) err := grpc.Serve(lis) if err != nil { log.Fatalf("无法创建RPC服务器:%v", err) } } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
package main import ( "google.golang.org/grpc" "log" "grpc/gmem" "context" "encoding/json" ) func main() { // 连接gRPC服务器 conn, _ := grpc.Dial("localhost:5500", grpc.WithInsecure()) // 不再使用时需要关闭连接 defer conn.Close() // 创建客户端对象 var client gmem.UserServiceClient = gmem.NewUserServiceClient(conn) userRequest := gmem.GetUserRequest{UserId: 10000} user, err := client.GetUser(context.Background(), &userRequest) if err != nil { log.Fatalf("远程调用失败: %v", err) } userJson, _ := json.Marshal(*user) log.Println(string(userJson)) // {"userId":10000,"userName":"Alex Wong","dob":"1986-09-12"} } |
可以使用Maven插件protobuf-maven-plugin来生成gRPC的客户端和服务器API:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 |
<project> <!-- GRPC相关依赖 --> <dependencies> <dependency> <groupId>io.grpc</groupId> <artifactId>grpc-netty</artifactId> <version>1.9.0</version> </dependency> <dependency> <groupId>io.grpc</groupId> <artifactId>grpc-protobuf</artifactId> <version>1.9.0</version> </dependency> <dependency> <groupId>io.grpc</groupId> <artifactId>grpc-stub</artifactId> <version>1.9.0</version> </dependency> </dependencies> <build> <extensions> <extension> <groupId>kr.motd.maven</groupId> <artifactId>os-maven-plugin</artifactId> <version>1.5.0.Final</version> </extension> </extensions> <plugins> <!-- 集成基于ProtoBuf的代码生成 --> <plugin> <groupId>org.xolstice.maven.plugins</groupId> <artifactId>protobuf-maven-plugin</artifactId> <version>0.5.0</version> <configuration> <protocArtifact>com.google.protobuf:protoc:3.5.1-1:exe:${os.detected.classifier}</protocArtifact> <pluginId>grpc-java</pluginId> <pluginArtifact>io.grpc:protoc-gen-grpc-java:1.9.0:exe:${os.detected.classifier}</pluginArtifact> </configuration> <executions> <execution> <goals> <goal>compile</goal> <goal>compile-custom</goal> </goals> </execution> </executions> </plugin> <plugin> <artifactId>maven-compiler-plugin</artifactId> <version>3.5.1</version> <configuration> <encoding>${project.build.sourceEncoding}</encoding> <source>1.8</source> <target>1.8</target> <debug>true</debug> </configuration> </plugin> </plugins> </build> </project> |
需要使用到的插件目标:
目标 | 说明 |
protobuf:compile | 在target/generated-sources/protobuf/java下生成ProtoBuf消息的代码 |
protobuf:compile-custom | 在target/generated-sources/protobuf/grpc-java下生成gRPC存根代码 |
注意:
- 上述目标生成代码会自动添加到工程的源码路径,不需要拷贝到src/main/java
- proto文件需要放置在src/main/proto下
服务器示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
package cc.gmem.study.grpc; import io.grpc.ServerBuilder; import io.grpc.stub.StreamObserver; import java.io.IOException; public class UserServiceServer { public static void main( String[] args ) throws IOException, InterruptedException { // 启动GRPC服务器并注册RPC接口的实现类 io.grpc.Server server = ServerBuilder .forPort( 5500 ) .addService( new UserServiceImpl() ) .build() .start(); server.awaitTermination(); // 优雅的关闭GRPC服务器 Runtime.getRuntime().addShutdownHook( new Thread( () -> server.shutdown() ) ); } private static class UserServiceImpl extends UserServiceGrpc.UserServiceImplBase { @Override public void getUser( GetUserRequest request, StreamObserver<User> responseObserver ) { User user = User.newBuilder() .setUserId( request.getUserId() ) .setUserName( "Alex Wong" ) .setDob( "1986-09-12" ) .build(); responseObserver.onNext( user ); responseObserver.onCompleted(); } } } |
客户端示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
package cc.gmem.study.grpc; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; public class UserServiceClient { public static void main( String[] args ) throws InterruptedException, TimeoutException, ExecutionException { ManagedChannel channel = ManagedChannelBuilder.forAddress( "localhost", 5500 ).usePlaintext( true ).build(); // 阻塞的客户端存根 UserServiceGrpc.UserServiceBlockingStub stub = UserServiceGrpc.newBlockingStub( channel ); // 非阻塞的客户端存根 UserServiceGrpc.UserServiceFutureStub asyncStub = UserServiceGrpc.newFutureStub( channel ); // 构建请求消息 GetUserRequest request = GetUserRequest.newBuilder().setUserId( 10000 ).build(); // 发起同步请求 User user = stub.getUser( request ); // 发起异步请求 asyncStub.getUser( request ).get( 1, TimeUnit.SECONDS ); // 关闭通道 channel.shutdown().awaitTermination( 5, TimeUnit.SECONDS ); } } |
和典型的RPC一样,gRPC允许你定义服务、指定可以被远程调用的接口,为这些接口指定参数和返回值类型。默认的,gRPC使用ProtoBuf作为接口定义语言(IDL,Interface Definition Language )。gRPC支持四类服务定义。
1 2 3 4 5 6 7 8 9 10 11 12 |
conn, _ := grpc.Dial(ChartAddr, []grpc.DialOption{ // 不加下面这项可能出现段错误 grpc.WithInsecure(), // 阻塞直到连接成功,默认情况下Dial方法立即返回,在后台连接 grpc.WithBlock(), // 每30s发送keepalive心跳包,防止连接被上游服务器关闭 grpc.WithKeepaliveParams(keepalive.ClientParameters{ Time: time.Duration(30) * time.Second, }), // 增加最大消息限制,默认4MB grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(1024 * 1024 * 20)), }...) |
客户端发起单个请求,然后获得单个响应,就像是典型的函数调用:
1 2 |
rpc SayHello(HelloRequest) returns (HelloResponse){ } |
- 客户端调用Stub代码
- 服务器感知到RPC调用的发生,获得客户端的元数据、方法名、Deadline
- 服务器可以将自己的元数据发送给客户端,或者等待请求消息的到达。元数据必须先于响应发送
- 服务器接收到客户端的请求消息后,进行处理
- 响应消息连同状态(状态码、可选的状态信息)一起发送给客户端,可以附加一个trailing元数据一起发送
- 如果响应码为ok,客户端获取响应并处理
客户端发起单个请求,然后获得一个流,服务端通过此流连续的发送多个响应消息:
1 2 |
rpc LotsOfReplies(HelloRequest) returns (stream HelloResponse){ } |
和一元RPC类似,但是服务器会发生响应消息的流。当所有响应消息发送完毕后,状态、可选的trailing元数据被发送
服务器:
1 2 3 4 5 6 7 |
func (ms *MediaService) NewPublished(from *google_protobuf.Timestamp, stream digital.MediaService_NewPublishedServer) error { medias, error := mediaRepo.queryPublishedFrom(time.Unix(from.Seconds, int64(from.Nanos))) for _, media := range *medias { stream.Send(&media) } return error } |
客户端:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
func TestMediaNewPubRPC(t *testing.T) { conn, _ := grpc.Dial("localhost:7700", grpc.WithInsecure()) defer conn.Close() client := digital.NewMediaServiceClient(conn) from, _ := now.Parse("2017-01-01") stream, err := client.NewPublished(context.Background(), &protobuf.Timestamp{Seconds: int64(from.Unix())}) fatalIf(err) for { media, err := stream.Recv() if err == io.EOF { break } log.Println(media.GetTitle()) } } |
客户端发起一系列的请求消息,并等待服务器的单个响应消息:
1 2 |
rpc LotsOfGreetings(stream HelloRequest) returns (HelloResponse) { } |
和一元RPC类似,但是:
- 客户端发送请求消息的流
- 服务器可以在尚未接收全部请求消息的情况下给出响应
客户端、服务器都使用流来收发消息,请求流、响应流独立运行。消息处理方式很自由:
- 服务器可以等待,接收到客户端的所有消息后,进行处理并产生响应
- 服务器可以读取到一个消息,然后写入一个响应
- 其它任意的组合方式
每个流的消息的顺序是被保证的。
1 2 |
rpc BidiHello(stream HelloRequest) returns (stream HelloResponse){ } |
同步化的RPC调用会一直阻塞,直到响应到达客户端,这种通信模型是典型RPC所期望的。
但是,网络传输天生是异步的,在很多情况下,启动一个RPC调用但不阻塞当前线程,可以增强系统容量。gRPC为大部分语言提供了两套(同步/异步)编程接口。
gRPC允许客户端指定,等待RPC调用完成的最长时间,如果超时未完成会出现 DEADLINE_EXCEEDED错误。在服务器端,可以查看每个RPC调用是否已经超时,还有多久超时。
在Go语言中,超时通过Context设置:
1 2 3 4 |
ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond) defer cancel() client.GetUser(context.Background(), &userRequest) |
在Java语言中,调用Stub的方法设置超时:
1 2 3 |
blockingStub = UserServiceGrpc .newBlockingStub(channel) .withDeadlineAfter(3, TimeUnit.SECONDS); |
在gRPC中,客户端、服务器分别独立的在本地判定调用是否成功 —— 两者的结论可能不一致。服务器端成功但是客户端没有收到响应是可能的。
在任何时候,客户端、服务器都可以取消RPC调用。
元数据是和某次特定的RPC调用相关的,键值对的列表。其中键为字符串,值通常也是字符串。
元数据包括的信息例如:身份验证的详细信息
gRPC支持使用HTTP/2的PING帧实现保活。
创建客户端时,你可以:
1 2 3 4 5 6 7 |
var kacp = keepalive.ClientParameters{ Time: 10 * time.Second, // 如果没有活动,每10秒发送PING帧 Timeout: time.Second, // 等待接收到PING的ACK的超时,超时后认为连接坏掉了 PermitWithoutStream: true, // 即使没有活动的stream,也发送PING帧 } //Dial 中传入 keepalive 配置 conn, err := grpc.Dial(*addr, grpc.WithInsecure(), grpc.WithKeepaliveParams(kacp)) |
创建服务器时,你可以:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
var kaep = keepalive.EnforcementPolicy{ // 客户端在发送Keepalive PING之前,需要等待的最小时间,默认5分钟 MinTime: 5 * time.Minute, // 即使没有活动的流,也允许客户端发送PING。如果设置为false,客户端在没有流的情况下发送PING, // 服务器会答复GOAWAY,并终止HTTP/2连接 PermitWithoutStream: true, } var kasp = keepalive.ServerParameters{ // 连接最大空闲时间,超过此时间服务器主动发送GOAWAY // 空闲的起点:从连接建立开始、或者没有正常处理中的RPC调用的那一刻开始算 MaxConnectionIdle: 15 * time.Second, // 一个连接可以最大存活的时间,超过此时间(服务器增加10%抖动),则发送GOWAY MaxConnectionAge: 30 * time.Second, // 超过MaxConnectionAge之后,再等待MaxConnectionAgeGrace才强制关闭连接 MaxConnectionAgeGrace: 5 * time.Second, // 如果经过下面这么长时间,服务器没有看到任何活动,发送PING来探测客户端是否还活着 Time: 5 * time.Second, // 等待上述PING的超时,超时后关闭连接 Timeout: 1 * time.Second, } func main(){ ... s := grpc.NewServer(grpc.KeepaliveEnforcementPolicy(kaep), grpc.KeepaliveParams(kasp)) ... } |
大型的gRPC部署场景中,每个RPC服务都会有多个实例。负载均衡负责把客户端负载优化的(根据服务器能力)
代理负载均衡(服务器端负载均衡) 这种方式下,客户端向负载均衡代理发送请求,后者将请求转发给后端服务。代理负责跟踪每个后端服务的负载情况,并实现适当的负载均衡算法 这种实现方式常用于面向用户的服务,例如向移动客户端提供的API 优势:
劣势:
代理负载均衡可以基于L3/L4(传输)层实现,也可以基于L7(应用)层实现 传输层LB需要进行很少的处理,因而引入较小的额外延迟、CPU消耗也低 应用层LB需要解析客户端的HTTP/2协议请求,LB可以根据请求的细节来分配适当的后端(例如Session Affinity)。LB需要向后端发起新的HTTP/2请求,然后把从客户端接收到的HTTP/2流转发给后端 |
客户端负载均衡 这种方式下,客户端知晓多个后端服务实例。后端实例向客户端报告其负载情况,客户端的LB算法依赖于这些负载信息。在简单的场景下,不考虑服务器端负载,客户端仅仅使用Round-robin算法(可以带权重) 优势:
劣势:
客户端负载均衡有两种实现方式:胖客户端、后备负载均衡 胖客户端内置了负载均衡算法的实现 后备负载均衡器,也叫外部负载均衡器(External load balancer)。客户端查询后备负载均衡器获得至少一个后端服务地址,后者维护后端服务状态并实现LB算法。gRPC定义了一个模型,用于客户端和后备负载均衡器的通信 |
这是Go语言的gRPC库。本文Go语言的例子都是基于此库。
ClientConn可以被跨Goroutine安全的访问。但是在我们的一个项目中,长期存在的ClientConn导致很高的CPU占用,原因未知。
使用Stream时,避免从不通Goroutine多次调用同一个Stream的SendMsg、RecvMsg方法。具体来说:
- 对于同一个Stream,从一个Goroutine调用SendMsg,另一个调用RecvMsg是安全的
- 对于同一个Stream,分别从两个Goroutine调用SendMsg,或者RecvMsg是不安全的
在服务器端,每个注册到gRPC服务器的RPC Handler,都在独立的Goroutine中运行。
RPC服务的参数、返回值都必须是消息类型,不能是原始类型。
示例:
1 2 3 |
service MediaService { rpc NewPublished (google.protobuf.Timestamp) returns (stream Media); } |
使用这种方式时,客户端会得到一个迭代器。在服务器尚未把响应完全发送之前,客户端就可以开始处理。
示例:
1 2 3 4 5 6 |
message MediaResp { repeated Media medias = 1; } service MediaService { rpc NewPublished (google.protobuf.Timestamp) returns (stream Media); } |
Leave a Reply