gRPC 的 java 使用
gRPC是一个现代的开源高性能远程过程调用(RPC)框架,可以在任何环境中运行。它可以通过对负载平衡、跟踪、健康检查和身份验证的可插拔支持,有效地连接数据中心内和数据中心间的服务。它还适用于分布式计算,将设备、移动应用程序和浏览器连接到后端服务。
在gRPC中,客户端应用程序可以直接调用另一台计算机上服务器应用程序上的方法,就好像它是本地对象一样,这样可以更容易地创建分布式应用程序和服务。与许多RPC系统一样,gRPC基于定义服务的思想,指定可以通过其参数和返回类型远程调用的方法。在服务器端,服务器实现了这个接口,并运行gRPC服务器来处理客户端调用。在客户端,客户端有一个存根(在某些语言中称为客户端),它提供与服务器相同的方法。
gRPC客户端和服务器可以在各种环境中运行并相互通信,从谷歌内部的服务器到您自己的桌面,并且可以用gRPC支持的任何语言编写。因此,例如,您可以使用Go、Python或Ruby中的客户端轻松地用Java创建gRPC服务器。此外,最新的谷歌API将有gRPC版本的接口,让你可以轻松地将谷歌功能构建到你的应用程序中。
默认情况下,gRPC使用Protocol Buffers,用于序列化结构化数据(尽管它可以与JSON等其他数据格式一起使用)
下面介绍 gRPC 在java中的使用
gRPC 官网:/
目录
1、创建 gRPC项目
2、grpc基本使用
2.1、基本使用
2.2、服务端添加拦截器
3、负载均衡
4、健康检查
5、失败重试
1、创建 gRPC项目
新建maven项目,项目名是 grpc-learn
pom.xml 文件内容
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns=".0.0"xmlns:xsi=""xsi:schemaLocation=".0.0 .0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.wsjzzcbq</groupId><artifactId>grpc-learn</artifactId><version>1.0-SNAPSHOT</version><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><mavenpiler.source>1.8</mavenpiler.source><mavenpiler.target>1.8</mavenpiler.target></properties><repositories><repository><id>naxus-aliyun</id><name>naxus-aliyun</name><url>;/url><releases><enabled>true</enabled></releases><snapshots><enabled>false</enabled></snapshots></repository></repositories><dependencies><dependency><groupId>io.grpc</groupId><artifactId>grpc-netty-shaded</artifactId><version>1.57.2</version><scope>runtime</scope></dependency><dependency><groupId>io.grpc</groupId><artifactId>grpc-protobuf</artifactId><version>1.57.2</version></dependency><dependency><groupId>io.grpc</groupId><artifactId>grpc-stub</artifactId><version>1.57.2</version></dependency><dependency> <!-- necessary for Java 9+ --><groupId>org.apache.tomcat</groupId><artifactId>annotations-api</artifactId><version>6.0.53</version><scope>provided</scope></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.28</version><scope>provided</scope></dependency></dependencies><build><extensions><extension><groupId>kr.motd.maven</groupId><artifactId>os-maven-plugin</artifactId><version>1.7.1</version></extension></extensions><plugins><plugin><groupId>org.xolstice.maven.plugins</groupId><artifactId>protobuf-maven-plugin</artifactId><version>0.6.1</version><configuration><protocArtifact>com.google.protobuf:protoc:3.22.3:exe:${os.detected.classifier}</protocArtifact><pluginId>grpc-java</pluginId><pluginArtifact>io.grpc:protoc-gen-grpc-java:1.57.2:exe:${os.detected.classifier}</pluginArtifact></configuration><executions><execution><goals><goal>compile</goal><goal>compile-custom</goal></goals></execution></executions></plugin></plugins></build></project>
在项目main目录下新建proto目录,在proto目录下新建 helloword.proto文件
项目目录结构
helloword.proto文件内容
// Copyright 2019 The gRPC Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// .0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
syntax = "proto3";option java_multiple_files = true;
option java_package = "io.grpc.examples.helloworld";
option java_outer_classname = "HelloWorldProto";
option objc_class_prefix = "HLW";package helloworld;// The greeting service definition.
service Greeter {// Sends a greetingrpc SayHello (HelloRequest) returns (HelloReply) {}rpc SayHelloAgain (HelloRequest) returns (HelloReply) {}
}// The request message containing the user's name.
message HelloRequest {string name = 1;
}// The response message containing the greetings
message HelloReply {string message = 1;
}
helloword.proto文件内容不懂的可以单独学习一下,这里不做过多介绍
项目截图
proto文件创建完成后,使用 protobuf maven插件生成 protobuf 代码和 grpc 代码
在IDEA编辑器中打开maven project 面板,找到 protobuf:compile 和 protobuf:compile-custom
运行 protobuf:compile 生成 protobuf 代码
运行 protobuf:compile-custom 生成 grpc 代码
生成的代码不用动,在项目中能引到
项目新建完成
2、grpc基本使用
2.1、基本使用
在上面新建的项目上新建包名 com.wsjzzcbq.grpc.basic
新建服务端 server 代码
package com.wsjzzcbq.grpc.basic;import io.grpc.Grpc;
import io.grpc.InsecureServerCredentials;
import io.grpc.Server;
import io.grpc.examples.helloworld.GreeterGrpc;
import io.grpc.examples.helloworld.HelloReply;
import io.grpc.examples.helloworld.HelloRequest;
import io.grpc.stub.StreamObserver;
import java.io.IOException;/*** GrpcServer** @author wsjz* @date 2023/08/22*/
public class GrpcServer {public static void main(String[] args) throws IOException, InterruptedException {int port = 50051;Server server = Grpc.newServerBuilderForPort(port, InsecureServerCredentials.create()).addService(new GreeterImpl()).build().start();Runtime.getRuntime().addShutdownHook(new Thread(()->{// Use stderr here since the logger may have been reset by its JVM shutdown hook.System.err.println("*** shutting down gRPC server since JVM is shutting down");stopServer(server);System.err.println("*** server shut down");}));server.awaitTermination();}private static void stopServer(Server server) {if (server != null) {server.shutdown();}}static class GreeterImpl extends GreeterGrpc.GreeterImplBase {@Overridepublic void sayHello(HelloRequest req, StreamObserver<HelloReply> responseObserver) {HelloReply reply = HelloReply.newBuilder().setMessage("你好!山外青山楼外楼, " + req.getName()).build();responseObserver.onNext(reply);responseObserver.onCompleted();}@Overridepublic void sayHelloAgain(HelloRequest req, StreamObserver<HelloReply> responseObserver) {HelloReply reply = HelloReply.newBuilder().setMessage("你好,行到水穷处,坐看云起时 " + req.getName()).build();responseObserver.onNext(reply);responseObserver.onCompleted();}}
}
再建客户端 client 代码
package com.wsjzzcbq.grpc.basic;import io.grpc.Grpc;
import io.grpc.InsecureChannelCredentials;
import io.grpc.ManagedChannel;
import io.grpc.examples.helloworld.GreeterGrpc;
import io.grpc.examples.helloworld.HelloReply;
import io.grpc.examples.helloworld.HelloRequest;import java.util.concurrent.TimeUnit;/*** GrpcClient** @author wsjz* @date 2023/08/22*/
public class GrpcClient {public static void main(String[] args) throws InterruptedException {String host = "localhost";int port = 50051;ManagedChannel managedChannel = Grpc.newChannelBuilderForAddress(host, port, InsecureChannelCredentials.create()).build();GreeterGrpc.GreeterBlockingStub blockingStub = GreeterGrpc.newBlockingStub(managedChannel);HelloRequest helloRequest = HelloRequest.newBuilder().setName("沧海月明珠有泪").build();HelloReply reply = blockingStub.sayHello(helloRequest);System.out.println(reply.getMessage());HelloReply response = blockingStub.sayHelloAgain(helloRequest);System.out.println(response.getMessage());managedChannel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);}
}
运行测试
先启动服务端 GrpcServer,在启动客户端 GrpcClient
2.2、服务端添加拦截器
服务端可以添加拦截器做一些处理,在上面基本使用的代码上添加拦截器
拦截器 GrpcInterceptor 代码
package com.wsjzzcbq.grpc.basic;import io.grpc.*;
import java.util.UUID;/*** GrpcInterceptor** @author wsjz* @date 2023/08/22*/
public class GrpcInterceptor implements ServerInterceptor {@Overridepublic <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> serverCall, Metadata metadata, ServerCallHandler<ReqT, RespT> serverCallHandler) {System.out.println("拦截");//将uudi字符串放入context中,后面处理可从context中获取Context ctx = Context.current().withValue(Constant.USER_TOKEN, UUID.randomUUID().toString());return Contexts.interceptCall(ctx, serverCall, metadata, serverCallHandler);}
}
添加常量代码 Constant
package com.wsjzzcbq.grpc.basic;import io.grpc.Context;/*** Constant** @author wsjz* @date 2023/08/22*/
public class Constant {public static final Context.Key<String> USER_TOKEN = Context.key("userToken");
}
修改服务端 server 代码
package com.wsjzzcbq.grpc.basic;import io.grpc.Grpc;
import io.grpc.InsecureServerCredentials;
import io.grpc.Server;
import io.grpc.examples.helloworld.GreeterGrpc;
import io.grpc.examples.helloworld.HelloReply;
import io.grpc.examples.helloworld.HelloRequest;
import io.grpc.stub.StreamObserver;
import java.io.IOException;/*** GrpcServer** @author wsjz* @date 2023/08/22*/
public class GrpcServer {public static void main(String[] args) throws IOException, InterruptedException {int port = 50051;Server server = Grpc.newServerBuilderForPort(port, InsecureServerCredentials.create()).addService(new GreeterImpl()).intercept(new GrpcInterceptor()).build().start();Runtime.getRuntime().addShutdownHook(new Thread(()->{// Use stderr here since the logger may have been reset by its JVM shutdown hook.System.err.println("*** shutting down gRPC server since JVM is shutting down");stopServer(server);System.err.println("*** server shut down");}));server.awaitTermination();}private static void stopServer(Server server) {if (server != null) {server.shutdown();}}static class GreeterImpl extends GreeterGrpc.GreeterImplBase {@Overridepublic void sayHello(HelloRequest req, StreamObserver<HelloReply> responseObserver) {//context中获取String userToken = Constant.USER_TOKEN.get();System.out.println(userToken);HelloReply reply = HelloReply.newBuilder().setMessage("你好!山外青山楼外楼, " + userToken + " " + req.getName()).build();responseObserver.onNext(reply);responseObserver.onCompleted();}@Overridepublic void sayHelloAgain(HelloRequest req, StreamObserver<HelloReply> responseObserver) {HelloReply reply = HelloReply.newBuilder().setMessage("你好,行到水穷处,坐看云起时 " + req.getName()).build();responseObserver.onNext(reply);responseObserver.onCompleted();}}
}
客户端代码不变
运行测试
3、负载均衡
gRPC的一个关键特性是负载平衡,它允许来自客户端的请求分布在多个服务器上。这有助于防止任何一台服务器过载,并允许系统通过添加更多的服务器来扩大规模。
gRPC负载均衡策略由名称解析程序提供服务器IP地址列表。该策略负责维护与服务器的连接(子通道),并在发送RPC时选择要使用的连接。
默认情况下,将使用pick_first策略。此策略实际上不进行负载均衡,只是尝试从名称解析程序获得的每个地址,并使用它可以连接到的第一个地址。通过更新gRPC服务配置,您还可以使用round_robin策略,该策略将连接到它获得的每个地址,并通过每个RPC的连接后端进行轮询。
新建com.wsjzzcbq.grpc.loadbalance 包,用来存放负载均衡的代码
新建 ExampleNameResolver 类
package com.wsjzzcbq.grpc.loadbalance;import com.googlemon.collect.ImmutableMap;
import io.grpc.EquivalentAddressGroup;
import io.grpc.NameResolver;
import io.grpc.Status;
import java.InetSocketAddress;
import java.SocketAddress;
import java.URI;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;/*** ExampleNameResolver** @author wsjz* @date 2023/08/22*/
public class ExampleNameResolver extends NameResolver{private Listener2 listener;private final URI uri;private final Map<String,List<InetSocketAddress>> addrStore;public ExampleNameResolver(URI targetUri) {this.uri = targetUri;// This is a fake name resolver, so we just hard code the address here.addrStore = ImmutableMap.<String, List<InetSocketAddress>>builder().put(LoadBalanceClient.exampleServiceName,Stream.iterate(LoadBalanceServer.startPort, p->p+1).limit(LoadBalanceServer.serverCount).map(port->new InetSocketAddress("localhost",port)).collect(Collectors.toList())).build();}@Overridepublic String getServiceAuthority() {// Be consistent with behavior in grpc-go, authority is saved in Host field of URI.if (uri.getHost() != null) {return uri.getHost();}return "no host";}@Overridepublic void shutdown() {}@Overridepublic void start(Listener2 listener) {this.listener = listener;this.resolve();}@Overridepublic void refresh() {this.resolve();}private void resolve() {List<InetSocketAddress> addresses = addrStore.get(uri.getPath().substring(1));try {List<EquivalentAddressGroup> equivalentAddressGroup = addresses.stream()// convert to socket address.map(this::toSocketAddress)// every socket address is a single EquivalentAddressGroup, so they can be accessed randomly.map(Arrays::asList).map(this::addrToEquivalentAddressGroup).collect(Collectors.toList());ResolutionResult resolutionResult = ResolutionResult.newBuilder().setAddresses(equivalentAddressGroup).build();this.listener.onResult(resolutionResult);} catch (Exception e){// when error occurs, notify listenerthis.listener.onError(Status.UNAVAILABLE.withDescription("Unable to resolve host ").withCause(e));}}private SocketAddress toSocketAddress(InetSocketAddress address) {return new InetSocketAddress(address.getHostName(), address.getPort());}private EquivalentAddressGroup addrToEquivalentAddressGroup(List<SocketAddress> addrList) {return new EquivalentAddressGroup(addrList);}
}
新建 ExampleNameResolverProvider 类
package com.wsjzzcbq.grpc.loadbalance;import io.grpc.NameResolver;
import io.grpc.NameResolverProvider;
import java.URI;/*** ExampleNameResolverProvider** @author wsjz* @date 2023/08/22*/
public class ExampleNameResolverProvider extends NameResolverProvider {@Overridepublic NameResolver newNameResolver(URI targetUri, NameResolver.Args args) {return new ExampleNameResolver(targetUri);}@Overrideprotected boolean isAvailable() {return true;}@Overrideprotected int priority() {return 5;}@Override// gRPC choose the first NameResolverProvider that supports the target URI scheme.public String getDefaultScheme() {return LoadBalanceClient.exampleScheme;}
}
新建服务端 LoadBalanceServer 类
package com.wsjzzcbq.grpc.loadbalance;import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.examples.helloworld.GreeterGrpc;
import io.grpc.examples.helloworld.HelloReply;
import io.grpc.examples.helloworld.HelloRequest;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.concurrent.TimeUnit;/*** LoadBalanceServer** @author wsjz* @date 2023/08/22*/
public class LoadBalanceServer {public static final int serverCount = 3;public static final int startPort = 50051;public static void main(String[] args) throws IOException, InterruptedException {//创建3个serverServer[] servers = new Server[3];for (int i=0; i<serverCount; i++) {int port = startPort + i;servers[i] = ServerBuilder.forPort(port).addService(new GreeterImpl(port)).build().start();}Runtime.getRuntime().addShutdownHook(new Thread(() -> {System.err.println("*** shutting down gRPC server since JVM is shutting down");try {stop(servers);} catch (InterruptedException e) {e.printStackTrace(System.err);}System.err.println("*** server shut down");}));blockUntilShutdown(servers);}private static void blockUntilShutdown(Server[] servers) throws InterruptedException {for (int i = 0; i < serverCount; i++) {if (servers[i] != null) {servers[i].awaitTermination();}}}private static void stop(Server[] servers) throws InterruptedException {for (int i = 0; i < serverCount; i++) {if (servers[i] != null) {servers[i].shutdown().awaitTermination(30, TimeUnit.SECONDS);}}}static class GreeterImpl extends GreeterGrpc.GreeterImplBase {int port;public GreeterImpl(int port) {this.port = port;}@Overridepublic void sayHello(HelloRequest req, StreamObserver<HelloReply> responseObserver) {HelloReply reply = HelloReply.newBuilder().setMessage("Hello " + req.getName() + " from server<" + this.port + ">").build();responseObserver.onNext(reply);responseObserver.onCompleted();}@Overridepublic void sayHelloAgain(HelloRequest request, StreamObserver<HelloReply> responseObserver) {HelloReply reply = HelloReply.newBuilder().setMessage("可上九天揽月,可下五洋捉鳖" + request.getName() + "server port " + this.port).build();responseObserver.onNext(reply);responseObserver.onCompleted();}}
}
新建客户端 LoadBalanceClient 类
package com.wsjzzcbq.grpc.loadbalance;import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.NameResolverRegistry;
import io.grpc.StatusRuntimeException;
import io.grpc.examples.helloworld.GreeterGrpc;
import io.grpc.examples.helloworld.HelloReply;
import io.grpc.examples.helloworld.HelloRequest;
import java.util.concurrent.TimeUnit;/*** LoadBalanceClient** @author wsjz* @date 2023/08/22*/
public class LoadBalanceClient {public static final String exampleScheme = "example";public static final String exampleServiceName = "lb.example.grpc.io";public static void main(String[] args) throws InterruptedException {NameResolverRegistry.getDefaultRegistry().register(new ExampleNameResolverProvider());String target = String.format("%s:///%s", exampleScheme, exampleServiceName);first_pickPolicy(target);System.out.println("更改发送策略");roundRobinPolicy(target);}/*** 使用first_pick策略发送消息* @param target*/private static void first_pickPolicy(String target) throws InterruptedException {ManagedChannel channel = ManagedChannelBuilder.forTarget(target).usePlaintext().build();try {GreeterGrpc.GreeterBlockingStub blockingStub = GreeterGrpc.newBlockingStub(channel);//连发5条消息for (int i = 0; i < 5; i++) {greet(blockingStub, "request" + i);}} finally {channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);}}/*** 使用round_robin策略发送消息* @param target* @throws InterruptedException*/private static void roundRobinPolicy(String target) throws InterruptedException {//使用round_robin策略发送消息ManagedChannel channel = ManagedChannelBuilder.forTarget(target).defaultLoadBalancingPolicy("round_robin").usePlaintext().build();try {GreeterGrpc.GreeterBlockingStub blockingStub2 = GreeterGrpc.newBlockingStub(channel);for (int i = 0; i < 6; i++) {greet2(blockingStub2, "request" + i);}} finally {channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);}}public static void greet(GreeterGrpc.GreeterBlockingStub blockingStub, String name) {HelloRequest request = HelloRequest.newBuilder().setName(name).build();HelloReply response;try {response = blockingStub.sayHello(request);System.out.println("Greeting返回: " + response.getMessage());} catch (StatusRuntimeException e) {System.out.println(e.getStatus());return;}}public static void greet2(GreeterGrpc.GreeterBlockingStub blockingStub, String name) {HelloRequest request = HelloRequest.newBuilder().setName(name).build();HelloReply response;try {response = blockingStub.sayHelloAgain(request);System.out.println("Greeting返回: " + response.getMessage());} catch (StatusRuntimeException e) {System.out.println(e.getStatus());return;}}
}
运行测试
客户端代码先使用 first_pick 策略发送5次请求,之后再用 round_robin 策略发送6次请求
看打印 ip 情况
4、健康检查
grpc自带健康检查功能,需要添加依赖 grpc-services
<dependency><groupId>io.grpc</groupId><artifactId>grpc-services</artifactId><version>1.57.2</version>
</dependency>
依赖添加完成后,更新maven
新建包 com.wsjzzcbq.grpc.healthservice
新建服务端 HealthServiceServer
package com.wsjzzcbq.grpc.healthservice;import io.grpc.Grpc;
import io.grpc.InsecureServerCredentials;
import io.grpc.Server;
import io.grpc.health.v1.HealthCheckResponse.ServingStatus;
import io.grpc.protobuf.services.HealthStatusManager;
import java.io.IOException;
import java.time.LocalDateTime;
import java.util.concurrent.TimeUnit;/*** HealthServiceServer** @author wsjz* @date 2023/08/22*/
public class HealthServiceServer {public static void main(String[] args) throws IOException, InterruptedException {int port = 50051;//健康状态管理HealthStatusManager health = new HealthStatusManager();Server server = Grpc.newServerBuilderForPort(port, InsecureServerCredentials.create()).addService(health.getHealthService()).build().start();changeHealthStatus(health);Runtime.getRuntime().addShutdownHook(new Thread(()->{System.err.println("*** shutting down gRPC server since JVM is shutting down");try {stopServer(server);} catch (InterruptedException e) {e.printStackTrace(System.err);}System.err.println("*** server shut down");}));blockUntilShutdown(server);}private static void stopServer(Server server) throws InterruptedException {if (server != null) {server.shutdown().awaitTermination(30, TimeUnit.SECONDS);}}private static void blockUntilShutdown(Server server) throws InterruptedException {if (server != null) {server.awaitTermination();}}private static void changeHealthStatus(HealthStatusManager health) {System.out.println(LocalDateTime.now() + "服务可用");new Thread(()->{try {TimeUnit.SECONDS.sleep(8);} catch (InterruptedException e) {e.printStackTrace();}//3秒钟后健康状态改为不能提供服务health.setStatus("", ServingStatus.NOT_SERVING);System.out.println(LocalDateTime.now() + "修改为服务不可用");try {TimeUnit.SECONDS.sleep(10);} catch (InterruptedException e) {e.printStackTrace();}//再过3秒修改健康状态为可服务health.setStatus("", ServingStatus.SERVING);System.out.println(LocalDateTime.now() + "修改为服务可用");}).start();}}
代码说明, changeHealthStatus 方法,会在服务端启动8秒钟后将健康状态改为不能服务状态,之后再过10秒再将服务状态改为可以服务状态,也就是说服务端启动8秒后,会有10秒健康状态是不能服务
新建客户端 HealthServiceClient
package com.wsjzzcbq.grpc.healthservice;import io.grpc.Grpc;
import io.grpc.InsecureChannelCredentials;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.health.v1.HealthCheckRequest;
import io.grpc.health.v1.HealthCheckResponse;
import io.grpc.health.v1.HealthGrpc;
import io.grpc.stub.StreamObserver;
import java.time.LocalDateTime;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;/*** HealthServiceClient** @author wsjz* @date 2023/08/22*/
public class HealthServiceClient {public static void main(String[] args) throws InterruptedException {String target = "localhost:50051";ManagedChannelBuilder<?> builder = Grpc.newChannelBuilder(target, InsecureChannelCredentials.create());ManagedChannel channel = builder.build();//阻塞调用HealthGrpc.HealthBlockingStub healthBlockingStub = HealthGrpc.newBlockingStub(channel);blockingStub(healthBlockingStub);//非阻塞调用
// HealthGrpc.HealthStub healthStub = HealthGrpc.newStub(channel);
// CountDownLatch countDownLatch = new CountDownLatch(1);
// healthStub(healthStub, countDownLatch);
// countDownLatch.await();}/*** 同步阻塞调用* @param healthBlockingStub* @throws InterruptedException*/private static void blockingStub(HealthGrpc.HealthBlockingStub healthBlockingStub) throws InterruptedException {HealthCheckRequest healthCheckRequest = HealthCheckRequest.getDefaultInstance();HealthCheckResponse response = healthBlockingStub.check(healthCheckRequest);System.out.println(LocalDateTime.now() + ":" + response.getStatus());TimeUnit.SECONDS.sleep(8);HealthCheckResponse response2 = healthBlockingStub.check(healthCheckRequest);System.out.println(LocalDateTime.now() + ":" + response2.getStatus());TimeUnit.SECONDS.sleep(18);HealthCheckResponse response3 = healthBlockingStub.check(healthCheckRequest);System.out.println(LocalDateTime.now() + ":" + response3.getStatus());}/*** 异步调用* @param healthStub* @param countDownLatch*/private static void healthStub(HealthGrpc.HealthStub healthStub, CountDownLatch countDownLatch) {HealthCheckRequest healthCheckRequest = HealthCheckRequest.getDefaultInstance();healthStub.check(healthCheckRequest, new StreamObserver<HealthCheckResponse>() {@Overridepublic void onNext(HealthCheckResponse value) {System.out.println("onNext");System.out.println(value);}@Overridepublic void onError(Throwable t) {System.out.println("onError");}@Overridepublic void onCompleted() {System.out.println("onCompleted");countDownLatch.countDown();}});}
}
代码说明,客户端调用分阻塞调用和非阻塞调用,笔者先测试阻塞调用
看过服务端代码说明后,这里不再解释。客户端启动后,会向服务端发送一次健康检查,如果此时在服务端启动8秒之内,会收到服务端返回可以服务;8秒之后再发送第二次健康检查,此时服务端已经过了8秒,处于服务不可用的10秒钟,因此第二次健康检查会返回服务不可用;再过18秒,发送第三次健康检查,这时服务端已经过完18秒,服务可用,因此会返回健康状态服务可用
下面运行测试
测试非阻塞调用
将非阻塞调用的代码去掉注释,将阻塞调用的代码注释掉
HealthStatusManager 说明
其实这里的健康检查本质上是一个grpc服务,和前面写的 GreeterImpl 是一样的,只是这个是官网实现的,如果不想用,也可以自己实现一个,感兴趣的读者可以看看官方代码
下面是 HealthStatusManager 的代码截图
HealthStatusManager 的 getHealthService方法返回一个 HealthServiceImpl 对象
HealthServiceImpl 的代码截图
可以看到 HealthServiceImpl 继承自 HealthGrpc.HealthImplBase
5、失败重试
grpc提供了失败重试功能
新建包 com.wsjzzcbq.grpc.retrying
服务端 RetryingHelloWorldServer 代码
package com.wsjzzcbq.grpc.retrying;import io.grpc.Grpc;
import io.grpc.InsecureServerCredentials;
import io.grpc.Server;
import io.grpc.Status;
import io.grpc.examples.helloworld.GreeterGrpc;
import io.grpc.examples.helloworld.HelloReply;
import io.grpc.examples.helloworld.HelloRequest;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;/*** RetryingHelloWorldServer** @author wsjz* @date 2023/08/23*/
public class RetryingHelloWorldServer {public static void main(String[] args) throws IOException, InterruptedException {int port = 50051;Server server = Grpc.newServerBuilderForPort(port, InsecureServerCredentials.create()).addService(new GreeterImpl()).build().start();Runtime.getRuntime().addShutdownHook(new Thread(()->{try {stopServer(server);} catch (InterruptedException e) {e.printStackTrace();}}));blockUntilShutdown(server);}private static void stopServer(Server server) throws InterruptedException {if (server != null) {server.shutdown().awaitTermination(30, TimeUnit.SECONDS);}}private static void blockUntilShutdown(Server server) throws InterruptedException {if (server != null) {server.awaitTermination();}}static class GreeterImpl extends GreeterGrpc.GreeterImplBase {AtomicInteger retryCounter = new AtomicInteger(0);@Overridepublic void sayHello(HelloRequest request, StreamObserver<HelloReply> responseObserver) {int count = retryCounter.incrementAndGet();System.out.println("调用次数:" + count);responseObserver.onError(Status.UNAVAILABLE.withDescription("Greeter temporarily unavailable..." + request.getName()).asRuntimeException());}}}
客户端
这里失败重试需要一些配置项,看下面 json文件配置
{"methodConfig": [{"name": [{"service": "helloworld.Greeter","method": "SayHello"}],"retryPolicy": {"maxAttempts": 5,"initialBackoff": "0.5s","maxBackoff": "30s","backoffMultiplier": 2,"retryableStatusCodes": ["UNAVAILABLE"]}}]
}
maxAttempts 最大重试次数
retryableStatusCodes 进行重试的状态码
上面的 json 配置需要客户端创建 ManagedChannel 时配置,可以通过代码读取 json文件转成map传进去,或者改成代码配置,笔者改成了在代码中配置
添加 json 依赖
<dependency><groupId>com.alibaba.fastjson2</groupId><artifactId>fastjson2</artifactId><version>2.0.39</version>
</dependency>
客户端代码
package com.wsjzzcbq.grpc.retrying;import com.alibaba.fastjson2.JSONArray;
import com.alibaba.fastjson2.JSONObject;
import io.grpc.*;
import io.grpc.examples.helloworld.GreeterGrpc;
import io.grpc.examples.helloworld.HelloReply;
import io.grpc.examples.helloworld.HelloRequest;/*** RetryingHelloWorldClient** @author wsjz* @date 2023/08/23*/
public class RetryingHelloWorldClient {public static void main(String[] args) {String host = "localhost";int port = 50051;ManagedChannelBuilder<?> channelBuilder = Grpc.newChannelBuilderForAddress(host, port, InsecureChannelCredentials.create())//添加重试配置.defaultServiceConfig(methodConfig()).enableRetry();ManagedChannel channel = channelBuilder.build();GreeterGrpc.GreeterBlockingStub blockingStub = GreeterGrpc.newBlockingStub(channel);HelloRequest request = HelloRequest.newBuilder().setName("平生无长物,独往有深情").build();HelloReply response = null;try {response = blockingStub.sayHello(request);System.out.println(response.getMessage());} catch (StatusRuntimeException e) {System.out.println("异常");e.printStackTrace();}}private static JSONObject methodConfig() {JSONArray name = new JSONArray();JSONObject nameItem = new JSONObject();nameItem.put("service", "helloworld.Greeter");nameItem.put("method", "SayHello");name.add(nameItem);JSONObject retryPolicy = new JSONObject();retryPolicy.put("maxAttempts", "5");retryPolicy.put("initialBackoff", "0.5s");retryPolicy.put("maxBackoff", "30s");retryPolicy.put("backoffMultiplier", "2");retryPolicy.put("retryableStatusCodes", JSONArray.of("UNAVAILABLE"));JSONObject methodConfigItem = new JSONObject();methodConfigItem.put("name", name);methodConfigItem.put("retryPolicy", retryPolicy);JSONObject methodConfig = new JSONObject();methodConfig.put("methodConfig", JSONArray.of(methodConfigItem));return methodConfig;}
}
在 methodConfig 方法中配置重试策略
运行测试
服务端打印调用次数,且始终返回错误,看客户端是否进行重试
因为服务端返回错误,客户端连续调用5次,5次都没成功,走到了catch里
下面笔者修改服务端代码,让前4次都失败,第5次成功
修改后的服务端代码
package com.wsjzzcbq.grpc.retrying;import io.grpc.Grpc;
import io.grpc.InsecureServerCredentials;
import io.grpc.Server;
import io.grpc.Status;
import io.grpc.examples.helloworld.GreeterGrpc;
import io.grpc.examples.helloworld.HelloReply;
import io.grpc.examples.helloworld.HelloRequest;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;/*** RetryingHelloWorldServer** @author wsjz* @date 2023/08/23*/
public class RetryingHelloWorldServer {public static void main(String[] args) throws IOException, InterruptedException {int port = 50051;Server server = Grpc.newServerBuilderForPort(port, InsecureServerCredentials.create()).addService(new GreeterImpl()).build().start();Runtime.getRuntime().addShutdownHook(new Thread(()->{try {stopServer(server);} catch (InterruptedException e) {e.printStackTrace();}}));blockUntilShutdown(server);}private static void stopServer(Server server) throws InterruptedException {if (server != null) {server.shutdown().awaitTermination(30, TimeUnit.SECONDS);}}private static void blockUntilShutdown(Server server) throws InterruptedException {if (server != null) {server.awaitTermination();}}static class GreeterImpl extends GreeterGrpc.GreeterImplBase {AtomicInteger retryCounter = new AtomicInteger(0);@Overridepublic void sayHello(HelloRequest request, StreamObserver<HelloReply> responseObserver) {int count = retryCounter.incrementAndGet();System.out.println("调用次数:" + count);if (count <=4) {responseObserver.onError(Status.UNAVAILABLE.withDescription("Greeter temporarily unavailable..." + request.getName()).asRuntimeException());} else {HelloReply reply = HelloReply.newBuilder().setMessage("瘦影自怜秋水照,卿须怜我我怜卿").build();responseObserver.onNext(reply);responseObserver.onCompleted();}}}}
运行测试
客户端没有报错,连续调用5次
至此完
gRPC 的 java 使用
gRPC是一个现代的开源高性能远程过程调用(RPC)框架,可以在任何环境中运行。它可以通过对负载平衡、跟踪、健康检查和身份验证的可插拔支持,有效地连接数据中心内和数据中心间的服务。它还适用于分布式计算,将设备、移动应用程序和浏览器连接到后端服务。
在gRPC中,客户端应用程序可以直接调用另一台计算机上服务器应用程序上的方法,就好像它是本地对象一样,这样可以更容易地创建分布式应用程序和服务。与许多RPC系统一样,gRPC基于定义服务的思想,指定可以通过其参数和返回类型远程调用的方法。在服务器端,服务器实现了这个接口,并运行gRPC服务器来处理客户端调用。在客户端,客户端有一个存根(在某些语言中称为客户端),它提供与服务器相同的方法。
gRPC客户端和服务器可以在各种环境中运行并相互通信,从谷歌内部的服务器到您自己的桌面,并且可以用gRPC支持的任何语言编写。因此,例如,您可以使用Go、Python或Ruby中的客户端轻松地用Java创建gRPC服务器。此外,最新的谷歌API将有gRPC版本的接口,让你可以轻松地将谷歌功能构建到你的应用程序中。
默认情况下,gRPC使用Protocol Buffers,用于序列化结构化数据(尽管它可以与JSON等其他数据格式一起使用)
下面介绍 gRPC 在java中的使用
gRPC 官网:/
目录
1、创建 gRPC项目
2、grpc基本使用
2.1、基本使用
2.2、服务端添加拦截器
3、负载均衡
4、健康检查
5、失败重试
1、创建 gRPC项目
新建maven项目,项目名是 grpc-learn
pom.xml 文件内容
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns=".0.0"xmlns:xsi=""xsi:schemaLocation=".0.0 .0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.wsjzzcbq</groupId><artifactId>grpc-learn</artifactId><version>1.0-SNAPSHOT</version><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><mavenpiler.source>1.8</mavenpiler.source><mavenpiler.target>1.8</mavenpiler.target></properties><repositories><repository><id>naxus-aliyun</id><name>naxus-aliyun</name><url>;/url><releases><enabled>true</enabled></releases><snapshots><enabled>false</enabled></snapshots></repository></repositories><dependencies><dependency><groupId>io.grpc</groupId><artifactId>grpc-netty-shaded</artifactId><version>1.57.2</version><scope>runtime</scope></dependency><dependency><groupId>io.grpc</groupId><artifactId>grpc-protobuf</artifactId><version>1.57.2</version></dependency><dependency><groupId>io.grpc</groupId><artifactId>grpc-stub</artifactId><version>1.57.2</version></dependency><dependency> <!-- necessary for Java 9+ --><groupId>org.apache.tomcat</groupId><artifactId>annotations-api</artifactId><version>6.0.53</version><scope>provided</scope></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.28</version><scope>provided</scope></dependency></dependencies><build><extensions><extension><groupId>kr.motd.maven</groupId><artifactId>os-maven-plugin</artifactId><version>1.7.1</version></extension></extensions><plugins><plugin><groupId>org.xolstice.maven.plugins</groupId><artifactId>protobuf-maven-plugin</artifactId><version>0.6.1</version><configuration><protocArtifact>com.google.protobuf:protoc:3.22.3:exe:${os.detected.classifier}</protocArtifact><pluginId>grpc-java</pluginId><pluginArtifact>io.grpc:protoc-gen-grpc-java:1.57.2:exe:${os.detected.classifier}</pluginArtifact></configuration><executions><execution><goals><goal>compile</goal><goal>compile-custom</goal></goals></execution></executions></plugin></plugins></build></project>
在项目main目录下新建proto目录,在proto目录下新建 helloword.proto文件
项目目录结构
helloword.proto文件内容
// Copyright 2019 The gRPC Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// .0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
syntax = "proto3";option java_multiple_files = true;
option java_package = "io.grpc.examples.helloworld";
option java_outer_classname = "HelloWorldProto";
option objc_class_prefix = "HLW";package helloworld;// The greeting service definition.
service Greeter {// Sends a greetingrpc SayHello (HelloRequest) returns (HelloReply) {}rpc SayHelloAgain (HelloRequest) returns (HelloReply) {}
}// The request message containing the user's name.
message HelloRequest {string name = 1;
}// The response message containing the greetings
message HelloReply {string message = 1;
}
helloword.proto文件内容不懂的可以单独学习一下,这里不做过多介绍
项目截图
proto文件创建完成后,使用 protobuf maven插件生成 protobuf 代码和 grpc 代码
在IDEA编辑器中打开maven project 面板,找到 protobuf:compile 和 protobuf:compile-custom
运行 protobuf:compile 生成 protobuf 代码
运行 protobuf:compile-custom 生成 grpc 代码
生成的代码不用动,在项目中能引到
项目新建完成
2、grpc基本使用
2.1、基本使用
在上面新建的项目上新建包名 com.wsjzzcbq.grpc.basic
新建服务端 server 代码
package com.wsjzzcbq.grpc.basic;import io.grpc.Grpc;
import io.grpc.InsecureServerCredentials;
import io.grpc.Server;
import io.grpc.examples.helloworld.GreeterGrpc;
import io.grpc.examples.helloworld.HelloReply;
import io.grpc.examples.helloworld.HelloRequest;
import io.grpc.stub.StreamObserver;
import java.io.IOException;/*** GrpcServer** @author wsjz* @date 2023/08/22*/
public class GrpcServer {public static void main(String[] args) throws IOException, InterruptedException {int port = 50051;Server server = Grpc.newServerBuilderForPort(port, InsecureServerCredentials.create()).addService(new GreeterImpl()).build().start();Runtime.getRuntime().addShutdownHook(new Thread(()->{// Use stderr here since the logger may have been reset by its JVM shutdown hook.System.err.println("*** shutting down gRPC server since JVM is shutting down");stopServer(server);System.err.println("*** server shut down");}));server.awaitTermination();}private static void stopServer(Server server) {if (server != null) {server.shutdown();}}static class GreeterImpl extends GreeterGrpc.GreeterImplBase {@Overridepublic void sayHello(HelloRequest req, StreamObserver<HelloReply> responseObserver) {HelloReply reply = HelloReply.newBuilder().setMessage("你好!山外青山楼外楼, " + req.getName()).build();responseObserver.onNext(reply);responseObserver.onCompleted();}@Overridepublic void sayHelloAgain(HelloRequest req, StreamObserver<HelloReply> responseObserver) {HelloReply reply = HelloReply.newBuilder().setMessage("你好,行到水穷处,坐看云起时 " + req.getName()).build();responseObserver.onNext(reply);responseObserver.onCompleted();}}
}
再建客户端 client 代码
package com.wsjzzcbq.grpc.basic;import io.grpc.Grpc;
import io.grpc.InsecureChannelCredentials;
import io.grpc.ManagedChannel;
import io.grpc.examples.helloworld.GreeterGrpc;
import io.grpc.examples.helloworld.HelloReply;
import io.grpc.examples.helloworld.HelloRequest;import java.util.concurrent.TimeUnit;/*** GrpcClient** @author wsjz* @date 2023/08/22*/
public class GrpcClient {public static void main(String[] args) throws InterruptedException {String host = "localhost";int port = 50051;ManagedChannel managedChannel = Grpc.newChannelBuilderForAddress(host, port, InsecureChannelCredentials.create()).build();GreeterGrpc.GreeterBlockingStub blockingStub = GreeterGrpc.newBlockingStub(managedChannel);HelloRequest helloRequest = HelloRequest.newBuilder().setName("沧海月明珠有泪").build();HelloReply reply = blockingStub.sayHello(helloRequest);System.out.println(reply.getMessage());HelloReply response = blockingStub.sayHelloAgain(helloRequest);System.out.println(response.getMessage());managedChannel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);}
}
运行测试
先启动服务端 GrpcServer,在启动客户端 GrpcClient
2.2、服务端添加拦截器
服务端可以添加拦截器做一些处理,在上面基本使用的代码上添加拦截器
拦截器 GrpcInterceptor 代码
package com.wsjzzcbq.grpc.basic;import io.grpc.*;
import java.util.UUID;/*** GrpcInterceptor** @author wsjz* @date 2023/08/22*/
public class GrpcInterceptor implements ServerInterceptor {@Overridepublic <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> serverCall, Metadata metadata, ServerCallHandler<ReqT, RespT> serverCallHandler) {System.out.println("拦截");//将uudi字符串放入context中,后面处理可从context中获取Context ctx = Context.current().withValue(Constant.USER_TOKEN, UUID.randomUUID().toString());return Contexts.interceptCall(ctx, serverCall, metadata, serverCallHandler);}
}
添加常量代码 Constant
package com.wsjzzcbq.grpc.basic;import io.grpc.Context;/*** Constant** @author wsjz* @date 2023/08/22*/
public class Constant {public static final Context.Key<String> USER_TOKEN = Context.key("userToken");
}
修改服务端 server 代码
package com.wsjzzcbq.grpc.basic;import io.grpc.Grpc;
import io.grpc.InsecureServerCredentials;
import io.grpc.Server;
import io.grpc.examples.helloworld.GreeterGrpc;
import io.grpc.examples.helloworld.HelloReply;
import io.grpc.examples.helloworld.HelloRequest;
import io.grpc.stub.StreamObserver;
import java.io.IOException;/*** GrpcServer** @author wsjz* @date 2023/08/22*/
public class GrpcServer {public static void main(String[] args) throws IOException, InterruptedException {int port = 50051;Server server = Grpc.newServerBuilderForPort(port, InsecureServerCredentials.create()).addService(new GreeterImpl()).intercept(new GrpcInterceptor()).build().start();Runtime.getRuntime().addShutdownHook(new Thread(()->{// Use stderr here since the logger may have been reset by its JVM shutdown hook.System.err.println("*** shutting down gRPC server since JVM is shutting down");stopServer(server);System.err.println("*** server shut down");}));server.awaitTermination();}private static void stopServer(Server server) {if (server != null) {server.shutdown();}}static class GreeterImpl extends GreeterGrpc.GreeterImplBase {@Overridepublic void sayHello(HelloRequest req, StreamObserver<HelloReply> responseObserver) {//context中获取String userToken = Constant.USER_TOKEN.get();System.out.println(userToken);HelloReply reply = HelloReply.newBuilder().setMessage("你好!山外青山楼外楼, " + userToken + " " + req.getName()).build();responseObserver.onNext(reply);responseObserver.onCompleted();}@Overridepublic void sayHelloAgain(HelloRequest req, StreamObserver<HelloReply> responseObserver) {HelloReply reply = HelloReply.newBuilder().setMessage("你好,行到水穷处,坐看云起时 " + req.getName()).build();responseObserver.onNext(reply);responseObserver.onCompleted();}}
}
客户端代码不变
运行测试
3、负载均衡
gRPC的一个关键特性是负载平衡,它允许来自客户端的请求分布在多个服务器上。这有助于防止任何一台服务器过载,并允许系统通过添加更多的服务器来扩大规模。
gRPC负载均衡策略由名称解析程序提供服务器IP地址列表。该策略负责维护与服务器的连接(子通道),并在发送RPC时选择要使用的连接。
默认情况下,将使用pick_first策略。此策略实际上不进行负载均衡,只是尝试从名称解析程序获得的每个地址,并使用它可以连接到的第一个地址。通过更新gRPC服务配置,您还可以使用round_robin策略,该策略将连接到它获得的每个地址,并通过每个RPC的连接后端进行轮询。
新建com.wsjzzcbq.grpc.loadbalance 包,用来存放负载均衡的代码
新建 ExampleNameResolver 类
package com.wsjzzcbq.grpc.loadbalance;import com.googlemon.collect.ImmutableMap;
import io.grpc.EquivalentAddressGroup;
import io.grpc.NameResolver;
import io.grpc.Status;
import java.InetSocketAddress;
import java.SocketAddress;
import java.URI;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;/*** ExampleNameResolver** @author wsjz* @date 2023/08/22*/
public class ExampleNameResolver extends NameResolver{private Listener2 listener;private final URI uri;private final Map<String,List<InetSocketAddress>> addrStore;public ExampleNameResolver(URI targetUri) {this.uri = targetUri;// This is a fake name resolver, so we just hard code the address here.addrStore = ImmutableMap.<String, List<InetSocketAddress>>builder().put(LoadBalanceClient.exampleServiceName,Stream.iterate(LoadBalanceServer.startPort, p->p+1).limit(LoadBalanceServer.serverCount).map(port->new InetSocketAddress("localhost",port)).collect(Collectors.toList())).build();}@Overridepublic String getServiceAuthority() {// Be consistent with behavior in grpc-go, authority is saved in Host field of URI.if (uri.getHost() != null) {return uri.getHost();}return "no host";}@Overridepublic void shutdown() {}@Overridepublic void start(Listener2 listener) {this.listener = listener;this.resolve();}@Overridepublic void refresh() {this.resolve();}private void resolve() {List<InetSocketAddress> addresses = addrStore.get(uri.getPath().substring(1));try {List<EquivalentAddressGroup> equivalentAddressGroup = addresses.stream()// convert to socket address.map(this::toSocketAddress)// every socket address is a single EquivalentAddressGroup, so they can be accessed randomly.map(Arrays::asList).map(this::addrToEquivalentAddressGroup).collect(Collectors.toList());ResolutionResult resolutionResult = ResolutionResult.newBuilder().setAddresses(equivalentAddressGroup).build();this.listener.onResult(resolutionResult);} catch (Exception e){// when error occurs, notify listenerthis.listener.onError(Status.UNAVAILABLE.withDescription("Unable to resolve host ").withCause(e));}}private SocketAddress toSocketAddress(InetSocketAddress address) {return new InetSocketAddress(address.getHostName(), address.getPort());}private EquivalentAddressGroup addrToEquivalentAddressGroup(List<SocketAddress> addrList) {return new EquivalentAddressGroup(addrList);}
}
新建 ExampleNameResolverProvider 类
package com.wsjzzcbq.grpc.loadbalance;import io.grpc.NameResolver;
import io.grpc.NameResolverProvider;
import java.URI;/*** ExampleNameResolverProvider** @author wsjz* @date 2023/08/22*/
public class ExampleNameResolverProvider extends NameResolverProvider {@Overridepublic NameResolver newNameResolver(URI targetUri, NameResolver.Args args) {return new ExampleNameResolver(targetUri);}@Overrideprotected boolean isAvailable() {return true;}@Overrideprotected int priority() {return 5;}@Override// gRPC choose the first NameResolverProvider that supports the target URI scheme.public String getDefaultScheme() {return LoadBalanceClient.exampleScheme;}
}
新建服务端 LoadBalanceServer 类
package com.wsjzzcbq.grpc.loadbalance;import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.examples.helloworld.GreeterGrpc;
import io.grpc.examples.helloworld.HelloReply;
import io.grpc.examples.helloworld.HelloRequest;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.concurrent.TimeUnit;/*** LoadBalanceServer** @author wsjz* @date 2023/08/22*/
public class LoadBalanceServer {public static final int serverCount = 3;public static final int startPort = 50051;public static void main(String[] args) throws IOException, InterruptedException {//创建3个serverServer[] servers = new Server[3];for (int i=0; i<serverCount; i++) {int port = startPort + i;servers[i] = ServerBuilder.forPort(port).addService(new GreeterImpl(port)).build().start();}Runtime.getRuntime().addShutdownHook(new Thread(() -> {System.err.println("*** shutting down gRPC server since JVM is shutting down");try {stop(servers);} catch (InterruptedException e) {e.printStackTrace(System.err);}System.err.println("*** server shut down");}));blockUntilShutdown(servers);}private static void blockUntilShutdown(Server[] servers) throws InterruptedException {for (int i = 0; i < serverCount; i++) {if (servers[i] != null) {servers[i].awaitTermination();}}}private static void stop(Server[] servers) throws InterruptedException {for (int i = 0; i < serverCount; i++) {if (servers[i] != null) {servers[i].shutdown().awaitTermination(30, TimeUnit.SECONDS);}}}static class GreeterImpl extends GreeterGrpc.GreeterImplBase {int port;public GreeterImpl(int port) {this.port = port;}@Overridepublic void sayHello(HelloRequest req, StreamObserver<HelloReply> responseObserver) {HelloReply reply = HelloReply.newBuilder().setMessage("Hello " + req.getName() + " from server<" + this.port + ">").build();responseObserver.onNext(reply);responseObserver.onCompleted();}@Overridepublic void sayHelloAgain(HelloRequest request, StreamObserver<HelloReply> responseObserver) {HelloReply reply = HelloReply.newBuilder().setMessage("可上九天揽月,可下五洋捉鳖" + request.getName() + "server port " + this.port).build();responseObserver.onNext(reply);responseObserver.onCompleted();}}
}
新建客户端 LoadBalanceClient 类
package com.wsjzzcbq.grpc.loadbalance;import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.NameResolverRegistry;
import io.grpc.StatusRuntimeException;
import io.grpc.examples.helloworld.GreeterGrpc;
import io.grpc.examples.helloworld.HelloReply;
import io.grpc.examples.helloworld.HelloRequest;
import java.util.concurrent.TimeUnit;/*** LoadBalanceClient** @author wsjz* @date 2023/08/22*/
public class LoadBalanceClient {public static final String exampleScheme = "example";public static final String exampleServiceName = "lb.example.grpc.io";public static void main(String[] args) throws InterruptedException {NameResolverRegistry.getDefaultRegistry().register(new ExampleNameResolverProvider());String target = String.format("%s:///%s", exampleScheme, exampleServiceName);first_pickPolicy(target);System.out.println("更改发送策略");roundRobinPolicy(target);}/*** 使用first_pick策略发送消息* @param target*/private static void first_pickPolicy(String target) throws InterruptedException {ManagedChannel channel = ManagedChannelBuilder.forTarget(target).usePlaintext().build();try {GreeterGrpc.GreeterBlockingStub blockingStub = GreeterGrpc.newBlockingStub(channel);//连发5条消息for (int i = 0; i < 5; i++) {greet(blockingStub, "request" + i);}} finally {channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);}}/*** 使用round_robin策略发送消息* @param target* @throws InterruptedException*/private static void roundRobinPolicy(String target) throws InterruptedException {//使用round_robin策略发送消息ManagedChannel channel = ManagedChannelBuilder.forTarget(target).defaultLoadBalancingPolicy("round_robin").usePlaintext().build();try {GreeterGrpc.GreeterBlockingStub blockingStub2 = GreeterGrpc.newBlockingStub(channel);for (int i = 0; i < 6; i++) {greet2(blockingStub2, "request" + i);}} finally {channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);}}public static void greet(GreeterGrpc.GreeterBlockingStub blockingStub, String name) {HelloRequest request = HelloRequest.newBuilder().setName(name).build();HelloReply response;try {response = blockingStub.sayHello(request);System.out.println("Greeting返回: " + response.getMessage());} catch (StatusRuntimeException e) {System.out.println(e.getStatus());return;}}public static void greet2(GreeterGrpc.GreeterBlockingStub blockingStub, String name) {HelloRequest request = HelloRequest.newBuilder().setName(name).build();HelloReply response;try {response = blockingStub.sayHelloAgain(request);System.out.println("Greeting返回: " + response.getMessage());} catch (StatusRuntimeException e) {System.out.println(e.getStatus());return;}}
}
运行测试
客户端代码先使用 first_pick 策略发送5次请求,之后再用 round_robin 策略发送6次请求
看打印 ip 情况
4、健康检查
grpc自带健康检查功能,需要添加依赖 grpc-services
<dependency><groupId>io.grpc</groupId><artifactId>grpc-services</artifactId><version>1.57.2</version>
</dependency>
依赖添加完成后,更新maven
新建包 com.wsjzzcbq.grpc.healthservice
新建服务端 HealthServiceServer
package com.wsjzzcbq.grpc.healthservice;import io.grpc.Grpc;
import io.grpc.InsecureServerCredentials;
import io.grpc.Server;
import io.grpc.health.v1.HealthCheckResponse.ServingStatus;
import io.grpc.protobuf.services.HealthStatusManager;
import java.io.IOException;
import java.time.LocalDateTime;
import java.util.concurrent.TimeUnit;/*** HealthServiceServer** @author wsjz* @date 2023/08/22*/
public class HealthServiceServer {public static void main(String[] args) throws IOException, InterruptedException {int port = 50051;//健康状态管理HealthStatusManager health = new HealthStatusManager();Server server = Grpc.newServerBuilderForPort(port, InsecureServerCredentials.create()).addService(health.getHealthService()).build().start();changeHealthStatus(health);Runtime.getRuntime().addShutdownHook(new Thread(()->{System.err.println("*** shutting down gRPC server since JVM is shutting down");try {stopServer(server);} catch (InterruptedException e) {e.printStackTrace(System.err);}System.err.println("*** server shut down");}));blockUntilShutdown(server);}private static void stopServer(Server server) throws InterruptedException {if (server != null) {server.shutdown().awaitTermination(30, TimeUnit.SECONDS);}}private static void blockUntilShutdown(Server server) throws InterruptedException {if (server != null) {server.awaitTermination();}}private static void changeHealthStatus(HealthStatusManager health) {System.out.println(LocalDateTime.now() + "服务可用");new Thread(()->{try {TimeUnit.SECONDS.sleep(8);} catch (InterruptedException e) {e.printStackTrace();}//3秒钟后健康状态改为不能提供服务health.setStatus("", ServingStatus.NOT_SERVING);System.out.println(LocalDateTime.now() + "修改为服务不可用");try {TimeUnit.SECONDS.sleep(10);} catch (InterruptedException e) {e.printStackTrace();}//再过3秒修改健康状态为可服务health.setStatus("", ServingStatus.SERVING);System.out.println(LocalDateTime.now() + "修改为服务可用");}).start();}}
代码说明, changeHealthStatus 方法,会在服务端启动8秒钟后将健康状态改为不能服务状态,之后再过10秒再将服务状态改为可以服务状态,也就是说服务端启动8秒后,会有10秒健康状态是不能服务
新建客户端 HealthServiceClient
package com.wsjzzcbq.grpc.healthservice;import io.grpc.Grpc;
import io.grpc.InsecureChannelCredentials;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.health.v1.HealthCheckRequest;
import io.grpc.health.v1.HealthCheckResponse;
import io.grpc.health.v1.HealthGrpc;
import io.grpc.stub.StreamObserver;
import java.time.LocalDateTime;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;/*** HealthServiceClient** @author wsjz* @date 2023/08/22*/
public class HealthServiceClient {public static void main(String[] args) throws InterruptedException {String target = "localhost:50051";ManagedChannelBuilder<?> builder = Grpc.newChannelBuilder(target, InsecureChannelCredentials.create());ManagedChannel channel = builder.build();//阻塞调用HealthGrpc.HealthBlockingStub healthBlockingStub = HealthGrpc.newBlockingStub(channel);blockingStub(healthBlockingStub);//非阻塞调用
// HealthGrpc.HealthStub healthStub = HealthGrpc.newStub(channel);
// CountDownLatch countDownLatch = new CountDownLatch(1);
// healthStub(healthStub, countDownLatch);
// countDownLatch.await();}/*** 同步阻塞调用* @param healthBlockingStub* @throws InterruptedException*/private static void blockingStub(HealthGrpc.HealthBlockingStub healthBlockingStub) throws InterruptedException {HealthCheckRequest healthCheckRequest = HealthCheckRequest.getDefaultInstance();HealthCheckResponse response = healthBlockingStub.check(healthCheckRequest);System.out.println(LocalDateTime.now() + ":" + response.getStatus());TimeUnit.SECONDS.sleep(8);HealthCheckResponse response2 = healthBlockingStub.check(healthCheckRequest);System.out.println(LocalDateTime.now() + ":" + response2.getStatus());TimeUnit.SECONDS.sleep(18);HealthCheckResponse response3 = healthBlockingStub.check(healthCheckRequest);System.out.println(LocalDateTime.now() + ":" + response3.getStatus());}/*** 异步调用* @param healthStub* @param countDownLatch*/private static void healthStub(HealthGrpc.HealthStub healthStub, CountDownLatch countDownLatch) {HealthCheckRequest healthCheckRequest = HealthCheckRequest.getDefaultInstance();healthStub.check(healthCheckRequest, new StreamObserver<HealthCheckResponse>() {@Overridepublic void onNext(HealthCheckResponse value) {System.out.println("onNext");System.out.println(value);}@Overridepublic void onError(Throwable t) {System.out.println("onError");}@Overridepublic void onCompleted() {System.out.println("onCompleted");countDownLatch.countDown();}});}
}
代码说明,客户端调用分阻塞调用和非阻塞调用,笔者先测试阻塞调用
看过服务端代码说明后,这里不再解释。客户端启动后,会向服务端发送一次健康检查,如果此时在服务端启动8秒之内,会收到服务端返回可以服务;8秒之后再发送第二次健康检查,此时服务端已经过了8秒,处于服务不可用的10秒钟,因此第二次健康检查会返回服务不可用;再过18秒,发送第三次健康检查,这时服务端已经过完18秒,服务可用,因此会返回健康状态服务可用
下面运行测试
测试非阻塞调用
将非阻塞调用的代码去掉注释,将阻塞调用的代码注释掉
HealthStatusManager 说明
其实这里的健康检查本质上是一个grpc服务,和前面写的 GreeterImpl 是一样的,只是这个是官网实现的,如果不想用,也可以自己实现一个,感兴趣的读者可以看看官方代码
下面是 HealthStatusManager 的代码截图
HealthStatusManager 的 getHealthService方法返回一个 HealthServiceImpl 对象
HealthServiceImpl 的代码截图
可以看到 HealthServiceImpl 继承自 HealthGrpc.HealthImplBase
5、失败重试
grpc提供了失败重试功能
新建包 com.wsjzzcbq.grpc.retrying
服务端 RetryingHelloWorldServer 代码
package com.wsjzzcbq.grpc.retrying;import io.grpc.Grpc;
import io.grpc.InsecureServerCredentials;
import io.grpc.Server;
import io.grpc.Status;
import io.grpc.examples.helloworld.GreeterGrpc;
import io.grpc.examples.helloworld.HelloReply;
import io.grpc.examples.helloworld.HelloRequest;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;/*** RetryingHelloWorldServer** @author wsjz* @date 2023/08/23*/
public class RetryingHelloWorldServer {public static void main(String[] args) throws IOException, InterruptedException {int port = 50051;Server server = Grpc.newServerBuilderForPort(port, InsecureServerCredentials.create()).addService(new GreeterImpl()).build().start();Runtime.getRuntime().addShutdownHook(new Thread(()->{try {stopServer(server);} catch (InterruptedException e) {e.printStackTrace();}}));blockUntilShutdown(server);}private static void stopServer(Server server) throws InterruptedException {if (server != null) {server.shutdown().awaitTermination(30, TimeUnit.SECONDS);}}private static void blockUntilShutdown(Server server) throws InterruptedException {if (server != null) {server.awaitTermination();}}static class GreeterImpl extends GreeterGrpc.GreeterImplBase {AtomicInteger retryCounter = new AtomicInteger(0);@Overridepublic void sayHello(HelloRequest request, StreamObserver<HelloReply> responseObserver) {int count = retryCounter.incrementAndGet();System.out.println("调用次数:" + count);responseObserver.onError(Status.UNAVAILABLE.withDescription("Greeter temporarily unavailable..." + request.getName()).asRuntimeException());}}}
客户端
这里失败重试需要一些配置项,看下面 json文件配置
{"methodConfig": [{"name": [{"service": "helloworld.Greeter","method": "SayHello"}],"retryPolicy": {"maxAttempts": 5,"initialBackoff": "0.5s","maxBackoff": "30s","backoffMultiplier": 2,"retryableStatusCodes": ["UNAVAILABLE"]}}]
}
maxAttempts 最大重试次数
retryableStatusCodes 进行重试的状态码
上面的 json 配置需要客户端创建 ManagedChannel 时配置,可以通过代码读取 json文件转成map传进去,或者改成代码配置,笔者改成了在代码中配置
添加 json 依赖
<dependency><groupId>com.alibaba.fastjson2</groupId><artifactId>fastjson2</artifactId><version>2.0.39</version>
</dependency>
客户端代码
package com.wsjzzcbq.grpc.retrying;import com.alibaba.fastjson2.JSONArray;
import com.alibaba.fastjson2.JSONObject;
import io.grpc.*;
import io.grpc.examples.helloworld.GreeterGrpc;
import io.grpc.examples.helloworld.HelloReply;
import io.grpc.examples.helloworld.HelloRequest;/*** RetryingHelloWorldClient** @author wsjz* @date 2023/08/23*/
public class RetryingHelloWorldClient {public static void main(String[] args) {String host = "localhost";int port = 50051;ManagedChannelBuilder<?> channelBuilder = Grpc.newChannelBuilderForAddress(host, port, InsecureChannelCredentials.create())//添加重试配置.defaultServiceConfig(methodConfig()).enableRetry();ManagedChannel channel = channelBuilder.build();GreeterGrpc.GreeterBlockingStub blockingStub = GreeterGrpc.newBlockingStub(channel);HelloRequest request = HelloRequest.newBuilder().setName("平生无长物,独往有深情").build();HelloReply response = null;try {response = blockingStub.sayHello(request);System.out.println(response.getMessage());} catch (StatusRuntimeException e) {System.out.println("异常");e.printStackTrace();}}private static JSONObject methodConfig() {JSONArray name = new JSONArray();JSONObject nameItem = new JSONObject();nameItem.put("service", "helloworld.Greeter");nameItem.put("method", "SayHello");name.add(nameItem);JSONObject retryPolicy = new JSONObject();retryPolicy.put("maxAttempts", "5");retryPolicy.put("initialBackoff", "0.5s");retryPolicy.put("maxBackoff", "30s");retryPolicy.put("backoffMultiplier", "2");retryPolicy.put("retryableStatusCodes", JSONArray.of("UNAVAILABLE"));JSONObject methodConfigItem = new JSONObject();methodConfigItem.put("name", name);methodConfigItem.put("retryPolicy", retryPolicy);JSONObject methodConfig = new JSONObject();methodConfig.put("methodConfig", JSONArray.of(methodConfigItem));return methodConfig;}
}
在 methodConfig 方法中配置重试策略
运行测试
服务端打印调用次数,且始终返回错误,看客户端是否进行重试
因为服务端返回错误,客户端连续调用5次,5次都没成功,走到了catch里
下面笔者修改服务端代码,让前4次都失败,第5次成功
修改后的服务端代码
package com.wsjzzcbq.grpc.retrying;import io.grpc.Grpc;
import io.grpc.InsecureServerCredentials;
import io.grpc.Server;
import io.grpc.Status;
import io.grpc.examples.helloworld.GreeterGrpc;
import io.grpc.examples.helloworld.HelloReply;
import io.grpc.examples.helloworld.HelloRequest;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;/*** RetryingHelloWorldServer** @author wsjz* @date 2023/08/23*/
public class RetryingHelloWorldServer {public static void main(String[] args) throws IOException, InterruptedException {int port = 50051;Server server = Grpc.newServerBuilderForPort(port, InsecureServerCredentials.create()).addService(new GreeterImpl()).build().start();Runtime.getRuntime().addShutdownHook(new Thread(()->{try {stopServer(server);} catch (InterruptedException e) {e.printStackTrace();}}));blockUntilShutdown(server);}private static void stopServer(Server server) throws InterruptedException {if (server != null) {server.shutdown().awaitTermination(30, TimeUnit.SECONDS);}}private static void blockUntilShutdown(Server server) throws InterruptedException {if (server != null) {server.awaitTermination();}}static class GreeterImpl extends GreeterGrpc.GreeterImplBase {AtomicInteger retryCounter = new AtomicInteger(0);@Overridepublic void sayHello(HelloRequest request, StreamObserver<HelloReply> responseObserver) {int count = retryCounter.incrementAndGet();System.out.println("调用次数:" + count);if (count <=4) {responseObserver.onError(Status.UNAVAILABLE.withDescription("Greeter temporarily unavailable..." + request.getName()).asRuntimeException());} else {HelloReply reply = HelloReply.newBuilder().setMessage("瘦影自怜秋水照,卿须怜我我怜卿").build();responseObserver.onNext(reply);responseObserver.onCompleted();}}}}
运行测试
客户端没有报错,连续调用5次
至此完
发布评论