gRpc 客户端调用服务接口

gRPC 客户端调用服务接口

不依赖protobuf生成的代码文件进行调用,可以通过反射接口进行调用。但需要Server端提供io.grpc.reflection.v1alpha.ServerReflectionGrpc 服务,用于获取服务的描述文件。

大致流程:

  1. 根据方法名称,调用服务端反射服务的方法,获取方法所在proto文件
  2. 根据proto描述文件,获取文件描述、服务描述,用于重新构建被调用方法的方法描述MethodDescriptor
  3. 根据方法描述,将请求内容序列化为对应的类型
  4. 使用重新构建的MethodDescriptor和其他参数对Server端相应的方法发起调用
  5. 解析响应并返回

实现

1. proto文件定义 hello.proto

syntax = "proto3";
option java_multiple_files = true;
option java_package = "com.haust.hello";
option java_outer_classname = "HelloProto";
package com.haust.hello;


service HelloService {
  rpc SayHello (HelloRequest) returns (HelloReply) {}
}

message HelloRequest {
  string message = 1;
}

message HelloReply {
  string message =  1;
}

2. 构建反射服务

package com.haust.grpc.myrpc;

import com.google.protobuf.*;
import com.google.protobuf.util.JsonFormat;
import io.grpc.CallOptions;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.MethodDescriptor;
import io.grpc.protobuf.ProtoUtils;
import io.grpc.reflection.v1alpha.ServerReflectionGrpc;
import io.grpc.reflection.v1alpha.ServerReflectionRequest;
import io.grpc.reflection.v1alpha.ServerReflectionResponse;
import io.grpc.stub.ClientCalls;
import io.grpc.stub.StreamObserver;

import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

public class MyGrpc {

    public static void main(String[] args) throws Exception {
        // 请求方法
        String methodSymbol = "com.haust.hello.HelloService.SayHello";
        // 请求内容
        String requestContext = "{\"message\":\"this is request\"}";
        // 构建channel
        final ManagedChannel channel = ManagedChannelBuilder.forAddress("127.0.0.1", 9921).usePlaintext().build();
        // 使用channel 构建Stub 创建一个新的异步的stub
        ServerReflectionGrpc.ServerReflectionStub reflectionStub = ServerReflectionGrpc.newStub(channel);
        // 响应观察器
        // 需要Server端 加入ProtoReflectionService.newInstance()
        StreamObserver<ServerReflectionResponse> streamObserver = new StreamObserver<ServerReflectionResponse>() {
            @Override
            public void onNext(ServerReflectionResponse serverReflectionResponse) {
                // 处理响应
                try {
                    if (serverReflectionResponse.getMessageResponseCase() == ServerReflectionResponse.MessageResponseCase.FILE_DESCRIPTOR_RESPONSE) {
                        List<ByteString> fileDescriptorProtoList = serverReflectionResponse.getFileDescriptorResponse().getFileDescriptorProtoList();
                        // requestContent 请求内容
                        handleResponse(fileDescriptorProtoList, channel, methodSymbol, requestContext);
                    } else {
                        System.out.println("响应处理失败" + serverReflectionResponse.getMessageResponseCase());
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
            @Override
            public void onError(Throwable throwable) {
                throwable.printStackTrace();
            }
            @Override
            public void onCompleted() {
                System.out.println("completed!");
            }
        };
        // 请求观察器
        StreamObserver<ServerReflectionRequest> requestStreamObserver = reflectionStub.serverReflectionInfo(streamObserver);

        // 根据方法名称获取文件描述请求
        ServerReflectionRequest getFileContainingSymbolRequest = ServerReflectionRequest.newBuilder().setFileContainingSymbol(methodSymbol).build();

        requestStreamObserver.onNext(getFileContainingSymbolRequest);

        channel.awaitTermination(10, TimeUnit.SECONDS);
    }

    // 在处理请求时,先解析了包名、服务名和方法名,然后根据包名和服务名,从返回的文件描述中获取到了响应方法所在文件的描述;然后从文件描述中获取服务描述,最终获取到方法描述,根据方法描述执行调用
    private static void handleResponse(List<ByteString> fileDescriptorProtoList, ManagedChannel channel, String methodFullName, String requestContent) {
        try {
            // 解析方法和服务名称
            String fullName = extraPrefix(methodFullName);
            String methodName = extraSuffix(methodFullName);
            String packageName = extraPrefix(fullName);
            String serviceName = extraSuffix(fullName);

            // 根据响应解析 FileDescriptor
            Descriptors.FileDescriptor fileDescriptor = getFileDescriptor(fileDescriptorProtoList, packageName, serviceName);

            // 查找服务描述
            Descriptors.ServiceDescriptor serviceDescriptor = fileDescriptor.getFile().findServiceByName(serviceName);
            // 查找方法描述
            Descriptors.MethodDescriptor methodDescriptor = serviceDescriptor.findMethodByName(methodName);

            // 发起请求
            executeCall(channel, fileDescriptor, methodDescriptor, requestContent);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    // 根据响应找到方法对应的文件的FileDescriptorProto ,然后构建出对应的 FileDescriptor
    private static Descriptors.FileDescriptor getFileDescriptor(List<ByteString> fileDescriptorProtoList,
                                                                String packageName,
                                                                String serviceName) throws Exception {

        Map<String, DescriptorProtos.FileDescriptorProto> fileDescriptorProtoMap =
                fileDescriptorProtoList.stream()
                        .map(bs -> {
                            try {
                                return DescriptorProtos.FileDescriptorProto.parseFrom(bs);
                            } catch (InvalidProtocolBufferException e) {
                                e.printStackTrace();
                            }
                            return null;
                        })
                        .filter(Objects::nonNull)
                        .collect(Collectors.toMap(DescriptorProtos.FileDescriptorProto::getName, f -> f));


        if (fileDescriptorProtoMap.isEmpty()) {
            System.out.println("服务不存在");
            throw new IllegalArgumentException("方法的文件描述不存在");
        }

        // 查找服务对应的 Proto 描述
        DescriptorProtos.FileDescriptorProto fileDescriptorProto = findServiceFileDescriptorProto(packageName, serviceName, fileDescriptorProtoMap);

        // 获取这个 Proto 的依赖
        Descriptors.FileDescriptor[] dependencies = getDependencies(fileDescriptorProto, fileDescriptorProtoMap);

        // 生成 Proto 的 FileDescriptor
        return Descriptors.FileDescriptor.buildFrom(fileDescriptorProto, dependencies);
    }

    // 根据包名查找响应的文件描述
    private static DescriptorProtos.FileDescriptorProto findServiceFileDescriptorProto(String packageName, String serviceName, Map<String, DescriptorProtos.FileDescriptorProto> fileDescriptorProtoMap) {
        for (DescriptorProtos.FileDescriptorProto proto : fileDescriptorProtoMap.values()) {
            if (proto.getPackage().equals(packageName)) {
                boolean exist = proto.getServiceList().stream().anyMatch(s -> serviceName.equals(s.getName()));
                if (exist) {
                    return proto;
                }
            }
        }
        throw new IllegalArgumentException("服务不存在");
    }

    // 获取依赖类型
    private static Descriptors.FileDescriptor[] getDependencies(DescriptorProtos.FileDescriptorProto proto, Map<String, DescriptorProtos.FileDescriptorProto> fileDescriptorProtoMap) {
        return proto.getDependencyList().stream().map(fileDescriptorProtoMap::get)
                .map(f -> toFileDescriptor(f, getDependencies(f, fileDescriptorProtoMap))).toArray(Descriptors.FileDescriptor[]::new);
    }

    // 将FileDescriptorProto转为Descriptor
    private static Descriptors.FileDescriptor toFileDescriptor(DescriptorProtos.FileDescriptorProto fileDescriptorProto, Descriptors.FileDescriptor[] dependencies) {
        try {
            return Descriptors.FileDescriptor.buildFrom(fileDescriptorProto, dependencies);
        } catch (Descriptors.DescriptorValidationException e) {
            e.printStackTrace();
        }
        return null;
    }

    // 获取前缀
    private static String extraPrefix(String context) {
        int index = context.lastIndexOf(".");
        return context.substring(0, index);
    }

    // 获取后缀
    private static String extraSuffix(String context) {
        int index = context.lastIndexOf(".");
        return context.substring(index + 1);
    }

    // 执行方法调用
    private static void executeCall(ManagedChannel channel, Descriptors.FileDescriptor fileDescriptor, Descriptors.MethodDescriptor originMethodDescriptor, String requestContext) throws Exception {
        // 重新生成MethodDescriptor
        MethodDescriptor<DynamicMessage, DynamicMessage> methodDescriptor = generateMethodDescriptor(originMethodDescriptor);

        CallOptions callOptions = CallOptions.DEFAULT;

        TypeRegistry typeRegistry = TypeRegistry.newBuilder().add(fileDescriptor.getMessageTypes()).build();
        // 将请求内容诸位响应的类型
        JsonFormat.Parser parser = JsonFormat.parser().usingTypeRegistry(typeRegistry);
        DynamicMessage.Builder messageBuilder = DynamicMessage.newBuilder(originMethodDescriptor.getInputType());
        parser.merge(requestContext, messageBuilder);
        DynamicMessage requestMessage = messageBuilder.build();

        // 调用, 调用方可以通过originMethodDescriptor.isClientStreaming()和originMethodDescriptor.isServerStreaming() 推断
        DynamicMessage response = ClientCalls.blockingUnaryCall(channel, methodDescriptor, callOptions, requestMessage);

        // 将响应解析为JSON字符串
        JsonFormat.Printer printer = JsonFormat.printer().usingTypeRegistry(typeRegistry).includingDefaultValueFields();
        String responseContent = printer.print(response);

        System.out.println("响应 responseContent: " + responseContent);
    }

    //格式,而需要的是package.service/method格式,同时请求和响应类型也需要重新设置为 DynamicMessage,所以需要重新生成 MethodDescriptor
    private static MethodDescriptor<DynamicMessage, DynamicMessage> generateMethodDescriptor(Descriptors.MethodDescriptor originMethodDescriptor) {
        // 生成方法全名
        String fullMethodName = MethodDescriptor.generateFullMethodName(originMethodDescriptor.getService().getFullName(), originMethodDescriptor.getName());
        // 请求和响应类型
        MethodDescriptor.Marshaller<DynamicMessage> inputTypeMarshaller = ProtoUtils.marshaller(DynamicMessage.newBuilder(originMethodDescriptor.getInputType())
                .buildPartial());
        MethodDescriptor.Marshaller<DynamicMessage> outputTypeMarshaller = ProtoUtils.marshaller(DynamicMessage.newBuilder(originMethodDescriptor.getOutputType())
                .buildPartial());

        // 生成方法描述, originMethodDescriptor 的 fullMethodName 不正确
        return MethodDescriptor.<DynamicMessage, DynamicMessage>newBuilder()
                .setFullMethodName(fullMethodName)
                .setRequestMarshaller(inputTypeMarshaller)
                .setResponseMarshaller(outputTypeMarshaller)
                // 使用 UNKNOWN,自动修改
                .setType(MethodDescriptor.MethodType.UNKNOWN)
                .build();
    }
}

Server端 服务实现

package com.haust.grpc.server;

import io.grpc.Server;
import io.grpc.netty.NettyServerBuilder;
import io.grpc.protobuf.services.ProtoReflectionService;

import java.util.concurrent.TimeUnit;

public class ReflectionServer {

    public static void main(String[] args) {
        Server server = NettyServerBuilder.forPort(9921).addService(new HelloGrpcImpl())
                .addService(ProtoReflectionService.newInstance()).build();
        try {
            server.start();
            Runtime.getRuntime().addShutdownHook(new Thread(() -> {
                try {
                    server.awaitTermination(10, TimeUnit.SECONDS);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }));
            server.awaitTermination();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

当然一般情况下,在公司的grpc服务中,是不可以开启反射的,此情况仅适用于开发环境

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,651评论 6 501
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,468评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,931评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,218评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,234评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,198评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,084评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,926评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,341评论 1 311
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,563评论 2 333
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,731评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,430评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,036评论 3 326
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,676评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,829评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,743评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,629评论 2 354

推荐阅读更多精彩内容