基于netty的RPC实现

这里解决了三个问题

  1. 协议定义,解决 粘包/拆包 问题
  2. 单客户端并发发送/消息维护问题
  3. 服务端并发提供服务问题

三个问题的具体实现如下

1.协议定义:

完整数据块包含数据 开始标识头,数据长度,真实数据三部分,如下图.


在这里插入图片描述

客户端,具体发送代码实现如下:

 public class RpcEncoder extends MessageToByteEncoder {
    @Override
    protected void encode(ChannelHandlerContext ctx, Object requestBoday, ByteBuf out) throws Exception {
        //序列化传输对象. 也可只是传输字符串,服务端解析,但是局限不较大,无法应对多样的调用函数,对应参数,已经类型
        byte[] data = SerializationUtil.serialize(requestBoday);
        //先写入 开始标识
        out.writeBytes(Constants.SERVIE_HEARD.getBytes());
        //再写入数据长度
        out.writeInt(data.length);
        //再写入真实数据
        out.writeBytes(data);
    }
}

服务端,具体接收解析代码实现如下:

public class RpcDecoder extends ByteToMessageDecoder {
     .............
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        //hadReadHeard避免多次判断头信息
        if (!hadReadHeard) {
            while (true) {
                //这里保证至少读到一个头信息,也可以读到一个头和数据长度在做处理
                if (in.readableBytes() < 4) {
                    return;
                }
                in.markReaderIndex();
                in.readBytes(dataHeardBuffer);
                System.out.println(Constants.SERVIE_HEARD.getBytes().length);
                String s = new String(dataHeardBuffer);
                //读到头标识信息,准备读取数据长度和数据
                if (s.equals(Constants.SERVIE_HEARD)) {
                    hadReadHeard = true;
                    break;
                } else {
                    in.resetReaderIndex();
                    //为读取到 头标识,则过滤一个字节,继续判断是否收到头标识
                    in.readByte();
                }
            }
        }

        in.markReaderIndex();
        int dataLength = in.readInt();
        if (in.readableBytes() < dataLength) {
            in.resetReaderIndex();
            return;
        }
        hadReadHeard = false;
        byte[] data = new byte[dataLength];
        in.readBytes(data);
        out.add(SerializationUtil.deserialize(data, requestResponseRpc));
    }
}

2.单客户端并发发送/消息维护问题:

发送消息的维护:
1)消息通过唯一id来区分
2)所有"发送的消息" 都记录到hashmap中维护记录.
3)发送消息后,会阻塞等待结果返回
4)所有接收的消息,都借助唯一ID匹配到"发送的消息",并唤醒(notify)阻塞的发送线程处理返回数据

public class ProxyHelperTool {
    ...........
    public <T> T create(final Class<?> interfaceClass) {
        return (T) Proxy.newProxyInstance(
                interfaceClass.getClassLoader(),
                new Class<?>[]{interfaceClass},
                new InvocationHandler() {
                    //@Override
                    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                        if (method.getDeclaringClass().getAnnotation(ServiceName.class) == null) {
                            throw new RuntimeException("Annotation(ServiceName) is null.");
                        }
                        //构造请求消息,并获取请求服务,方法,参数,参数类型
                        RequestRpc requestRpc = new RequestRpc();
                        requestRpc.setMethodName(method.getName());
                        requestRpc.setServiceName(method.getDeclaringClass().getAnnotation(ServiceName.class).name());
                        requestRpc.setParameters(args);
                        requestRpc.setParameterTypes(method.getParameterTypes());
                        //设置唯一id,确保消息的唯一性
                        requestRpc.setRequestId(StringUtil.getUiid());
                        //将发送的消息 送入列表维护起来.
                        ClientHandler.waitingRPC.put(requestRpc.getRequestId(),requestRpc);
                        ProxyHelperTool.client.send(requestRpc);
                        //进入阻塞等待,直到服务返回消息 唤醒.To do:这里缺过时处理
                        synchronized(requestRpc){
                            requestRpc.wait();
                        }
                        return requestRpc.getResult();
                    }
                }
        );
    }
}

3.服务端并发服务:

public class ServerHandler extends ChannelInboundHandlerAdapter {
    .............
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        //將服务方静线程里执行,避免阻塞
        ServerService.submit(new Runnable() {
            @Override
            public void run() {
                RequestRpc requestRpc = (RequestRpc)msg;
                ResponseRpc responseRpc = handle(requestRpc);
                responseRpc.setRequestId(requestRpc.getRequestId());
                ctx.writeAndFlush(responseRpc).addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture channelFuture) throws Exception {
                        System.out.println("Server operationComplete");
                    }
                });
            }
        });

        /*.addListener(ChannelFutureListener.CLOSE)*/
    }
    //真实处理服务的地方,依据对方传递的 调用服务和参数通过反射调用获取结果返回
    private ResponseRpc handle(RequestRpc requestRpc){
        ResponseRpc responseRpc = new ResponseRpc();
        Object object = ServerService.getService(requestRpc.getServiceName());
        if(object == null){
            responseRpc.setException(new RuntimeException("Not service:"+requestRpc.
                    getServiceName()));
            return responseRpc;
        }

        try {
            Class<?> serviceClass = object.getClass();
            Method method = serviceClass.getMethod(requestRpc.getMethodName(),
                    requestRpc.getParameterTypes());
            method.setAccessible(true);
            Object[] parameters = requestRpc.getParameters();
            responseRpc.setResult(method.invoke(object, parameters));
        } catch (Exception e){
            responseRpc.setResult(e);
        }
        return responseRpc;
    }
  ........
}

测试方式,以及结果

客戶端 测试模拟 调用远程服务

这里, 客户端建立单链接,并发发送消息的方式 向服务端发起服务调用

public class TestClient {
    public static ProxyHelperTool proxyHelperTool = new ProxyHelperTool();
    public static void main(String[] args) throws Exception {
        int threadNumber = 15;
        CountDownLatch countDownLatch = new CountDownLatch(threadNumber);
        //开始15个线程发送 服务调用消息
        for(int i=0;i<threadNumber;i++){
            new Thread(){
                @Override
                public void run() {
                    //客户端,通过传递当前线程的名称(Thread.currentThread().getName)给服务端;
                    //服务端,组合收到的字符 再次发回来。
                    //通过对比 "线程名",可见各个线程收到的是否是自己发送的。
                    MsgService msgService = proxyHelperTool.create(MsgService.class);
                    String reslut = msgService.send(Thread.currentThread().getName());
                    System.out.println("Client("+Thread.currentThread().getName()+") get mag:" + "\n" + "..." + reslut);
                    countDownLatch.countDown();
                }
            }.start();
        }
        countDownLatch.await();
        ClientHelper.getClientHelper().close();
    }

}

客戶端 测试模拟 收到的结果

可见对应的调用线程,都收到了自己发出去的消息. 对应的thread-name 匹配


在这里插入图片描述

参考

https://my.oschina.net/huangyong/blog/361751?fromerr=NpC3phqY
https://github.com/apache/hadoop

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容