gRPC 客户端调用服务接口
不依赖protobuf生成的代码文件进行调用,可以通过反射接口进行调用。但需要Server端提供io.grpc.reflection.v1alpha.ServerReflectionGrpc
服务,用于获取服务的描述文件。
大致流程:
- 根据方法名称,调用服务端反射服务的方法,获取方法所在proto文件
- 根据proto描述文件,获取文件描述、服务描述,用于重新构建被调用方法的方法描述
MethodDescriptor
- 根据方法描述,将请求内容序列化为对应的类型
- 使用重新构建的
MethodDescriptor
和其他参数对Server端相应的方法发起调用 - 解析响应并返回
实现
1. proto文件定义 hello.proto
syntax = "proto3";
option java_multiple_files = true;
option java_package = "com.haust.hello";
option java_outer_classname = "HelloProto";
package com.haust.hello;
service HelloService {
rpc SayHello (HelloRequest) returns (HelloReply) {}
}
message HelloRequest {
string message = 1;
}
message HelloReply {
string message = 1;
}
2. 构建反射服务
package com.haust.grpc.myrpc;
import com.google.protobuf.*;
import com.google.protobuf.util.JsonFormat;
import io.grpc.CallOptions;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.MethodDescriptor;
import io.grpc.protobuf.ProtoUtils;
import io.grpc.reflection.v1alpha.ServerReflectionGrpc;
import io.grpc.reflection.v1alpha.ServerReflectionRequest;
import io.grpc.reflection.v1alpha.ServerReflectionResponse;
import io.grpc.stub.ClientCalls;
import io.grpc.stub.StreamObserver;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
public class MyGrpc {
public static void main(String[] args) throws Exception {
// 请求方法
String methodSymbol = "com.haust.hello.HelloService.SayHello";
// 请求内容
String requestContext = "{\"message\":\"this is request\"}";
// 构建channel
final ManagedChannel channel = ManagedChannelBuilder.forAddress("127.0.0.1", 9921).usePlaintext().build();
// 使用channel 构建Stub 创建一个新的异步的stub
ServerReflectionGrpc.ServerReflectionStub reflectionStub = ServerReflectionGrpc.newStub(channel);
// 响应观察器
// 需要Server端 加入ProtoReflectionService.newInstance()
StreamObserver<ServerReflectionResponse> streamObserver = new StreamObserver<ServerReflectionResponse>() {
@Override
public void onNext(ServerReflectionResponse serverReflectionResponse) {
// 处理响应
try {
if (serverReflectionResponse.getMessageResponseCase() == ServerReflectionResponse.MessageResponseCase.FILE_DESCRIPTOR_RESPONSE) {
List<ByteString> fileDescriptorProtoList = serverReflectionResponse.getFileDescriptorResponse().getFileDescriptorProtoList();
// requestContent 请求内容
handleResponse(fileDescriptorProtoList, channel, methodSymbol, requestContext);
} else {
System.out.println("响应处理失败" + serverReflectionResponse.getMessageResponseCase());
}
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void onError(Throwable throwable) {
throwable.printStackTrace();
}
@Override
public void onCompleted() {
System.out.println("completed!");
}
};
// 请求观察器
StreamObserver<ServerReflectionRequest> requestStreamObserver = reflectionStub.serverReflectionInfo(streamObserver);
// 根据方法名称获取文件描述请求
ServerReflectionRequest getFileContainingSymbolRequest = ServerReflectionRequest.newBuilder().setFileContainingSymbol(methodSymbol).build();
requestStreamObserver.onNext(getFileContainingSymbolRequest);
channel.awaitTermination(10, TimeUnit.SECONDS);
}
// 在处理请求时,先解析了包名、服务名和方法名,然后根据包名和服务名,从返回的文件描述中获取到了响应方法所在文件的描述;然后从文件描述中获取服务描述,最终获取到方法描述,根据方法描述执行调用
private static void handleResponse(List<ByteString> fileDescriptorProtoList, ManagedChannel channel, String methodFullName, String requestContent) {
try {
// 解析方法和服务名称
String fullName = extraPrefix(methodFullName);
String methodName = extraSuffix(methodFullName);
String packageName = extraPrefix(fullName);
String serviceName = extraSuffix(fullName);
// 根据响应解析 FileDescriptor
Descriptors.FileDescriptor fileDescriptor = getFileDescriptor(fileDescriptorProtoList, packageName, serviceName);
// 查找服务描述
Descriptors.ServiceDescriptor serviceDescriptor = fileDescriptor.getFile().findServiceByName(serviceName);
// 查找方法描述
Descriptors.MethodDescriptor methodDescriptor = serviceDescriptor.findMethodByName(methodName);
// 发起请求
executeCall(channel, fileDescriptor, methodDescriptor, requestContent);
} catch (Exception e) {
e.printStackTrace();
}
}
// 根据响应找到方法对应的文件的FileDescriptorProto ,然后构建出对应的 FileDescriptor
private static Descriptors.FileDescriptor getFileDescriptor(List<ByteString> fileDescriptorProtoList,
String packageName,
String serviceName) throws Exception {
Map<String, DescriptorProtos.FileDescriptorProto> fileDescriptorProtoMap =
fileDescriptorProtoList.stream()
.map(bs -> {
try {
return DescriptorProtos.FileDescriptorProto.parseFrom(bs);
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
return null;
})
.filter(Objects::nonNull)
.collect(Collectors.toMap(DescriptorProtos.FileDescriptorProto::getName, f -> f));
if (fileDescriptorProtoMap.isEmpty()) {
System.out.println("服务不存在");
throw new IllegalArgumentException("方法的文件描述不存在");
}
// 查找服务对应的 Proto 描述
DescriptorProtos.FileDescriptorProto fileDescriptorProto = findServiceFileDescriptorProto(packageName, serviceName, fileDescriptorProtoMap);
// 获取这个 Proto 的依赖
Descriptors.FileDescriptor[] dependencies = getDependencies(fileDescriptorProto, fileDescriptorProtoMap);
// 生成 Proto 的 FileDescriptor
return Descriptors.FileDescriptor.buildFrom(fileDescriptorProto, dependencies);
}
// 根据包名查找响应的文件描述
private static DescriptorProtos.FileDescriptorProto findServiceFileDescriptorProto(String packageName, String serviceName, Map<String, DescriptorProtos.FileDescriptorProto> fileDescriptorProtoMap) {
for (DescriptorProtos.FileDescriptorProto proto : fileDescriptorProtoMap.values()) {
if (proto.getPackage().equals(packageName)) {
boolean exist = proto.getServiceList().stream().anyMatch(s -> serviceName.equals(s.getName()));
if (exist) {
return proto;
}
}
}
throw new IllegalArgumentException("服务不存在");
}
// 获取依赖类型
private static Descriptors.FileDescriptor[] getDependencies(DescriptorProtos.FileDescriptorProto proto, Map<String, DescriptorProtos.FileDescriptorProto> fileDescriptorProtoMap) {
return proto.getDependencyList().stream().map(fileDescriptorProtoMap::get)
.map(f -> toFileDescriptor(f, getDependencies(f, fileDescriptorProtoMap))).toArray(Descriptors.FileDescriptor[]::new);
}
// 将FileDescriptorProto转为Descriptor
private static Descriptors.FileDescriptor toFileDescriptor(DescriptorProtos.FileDescriptorProto fileDescriptorProto, Descriptors.FileDescriptor[] dependencies) {
try {
return Descriptors.FileDescriptor.buildFrom(fileDescriptorProto, dependencies);
} catch (Descriptors.DescriptorValidationException e) {
e.printStackTrace();
}
return null;
}
// 获取前缀
private static String extraPrefix(String context) {
int index = context.lastIndexOf(".");
return context.substring(0, index);
}
// 获取后缀
private static String extraSuffix(String context) {
int index = context.lastIndexOf(".");
return context.substring(index + 1);
}
// 执行方法调用
private static void executeCall(ManagedChannel channel, Descriptors.FileDescriptor fileDescriptor, Descriptors.MethodDescriptor originMethodDescriptor, String requestContext) throws Exception {
// 重新生成MethodDescriptor
MethodDescriptor<DynamicMessage, DynamicMessage> methodDescriptor = generateMethodDescriptor(originMethodDescriptor);
CallOptions callOptions = CallOptions.DEFAULT;
TypeRegistry typeRegistry = TypeRegistry.newBuilder().add(fileDescriptor.getMessageTypes()).build();
// 将请求内容诸位响应的类型
JsonFormat.Parser parser = JsonFormat.parser().usingTypeRegistry(typeRegistry);
DynamicMessage.Builder messageBuilder = DynamicMessage.newBuilder(originMethodDescriptor.getInputType());
parser.merge(requestContext, messageBuilder);
DynamicMessage requestMessage = messageBuilder.build();
// 调用, 调用方可以通过originMethodDescriptor.isClientStreaming()和originMethodDescriptor.isServerStreaming() 推断
DynamicMessage response = ClientCalls.blockingUnaryCall(channel, methodDescriptor, callOptions, requestMessage);
// 将响应解析为JSON字符串
JsonFormat.Printer printer = JsonFormat.printer().usingTypeRegistry(typeRegistry).includingDefaultValueFields();
String responseContent = printer.print(response);
System.out.println("响应 responseContent: " + responseContent);
}
//格式,而需要的是package.service/method格式,同时请求和响应类型也需要重新设置为 DynamicMessage,所以需要重新生成 MethodDescriptor
private static MethodDescriptor<DynamicMessage, DynamicMessage> generateMethodDescriptor(Descriptors.MethodDescriptor originMethodDescriptor) {
// 生成方法全名
String fullMethodName = MethodDescriptor.generateFullMethodName(originMethodDescriptor.getService().getFullName(), originMethodDescriptor.getName());
// 请求和响应类型
MethodDescriptor.Marshaller<DynamicMessage> inputTypeMarshaller = ProtoUtils.marshaller(DynamicMessage.newBuilder(originMethodDescriptor.getInputType())
.buildPartial());
MethodDescriptor.Marshaller<DynamicMessage> outputTypeMarshaller = ProtoUtils.marshaller(DynamicMessage.newBuilder(originMethodDescriptor.getOutputType())
.buildPartial());
// 生成方法描述, originMethodDescriptor 的 fullMethodName 不正确
return MethodDescriptor.<DynamicMessage, DynamicMessage>newBuilder()
.setFullMethodName(fullMethodName)
.setRequestMarshaller(inputTypeMarshaller)
.setResponseMarshaller(outputTypeMarshaller)
// 使用 UNKNOWN,自动修改
.setType(MethodDescriptor.MethodType.UNKNOWN)
.build();
}
}
Server端 服务实现
package com.haust.grpc.server;
import io.grpc.Server;
import io.grpc.netty.NettyServerBuilder;
import io.grpc.protobuf.services.ProtoReflectionService;
import java.util.concurrent.TimeUnit;
public class ReflectionServer {
public static void main(String[] args) {
Server server = NettyServerBuilder.forPort(9921).addService(new HelloGrpcImpl())
.addService(ProtoReflectionService.newInstance()).build();
try {
server.start();
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
try {
server.awaitTermination(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
}));
server.awaitTermination();
} catch (Exception e) {
e.printStackTrace();
}
}
}