手写 RPC 框架

前言

在微服务当道的今天,分布式系统越来越重要,实现服务化首先就要考虑服务之间的通信问题。这里面涉及序列化、反序列化、寻址、连接等等问题。不过,有了 RPC 框架,我们就无需苦恼。

一、什么是 RPC?

RPC(Remote Procedure Call)— 远程过程调用,是一个计算机通信协议。该协议允许运行于一台计算机的程序调用另一台计算机的程序,而程序员无需额外地为这个交互作用编程。
值得注意是,两个或多个应用程序都分布在不同的服务器上,它们之间的调用都像是本地方法调用一样。


RPC框架有很多,比较知名的如阿里的 Dubbo、google 的 gRPC、Go 语言的 rpcx、Apache 的 thrift。当然了,还有Spring Cloud,不过对于 Spring Cloud 来说,RPC 只是它的一个功能模块。

如果要实现一个基本功能、简单的 RPC,要涉及哪些东西呢?

  • 动态代理
  • 反射
  • 序列化、反序列化
  • 网络通信
  • 编解码
  • 服务发现与注册
  • 心跳与链路检测
  • ......

下面,我们一起通过代码来分析,怎么把技术点串到一起,实现我们自己的 RPC。

二、环境准备

在开始之前,笔者先介绍一下所用到的软件环境。

SpringBoot、Netty、zookeeper、zkclient、fastjson

  • SpringBoot 项目的基础框架
  • Netty 通信服务器
  • zookeeper 服务发现与注册
  • zkclient zookeeper客户端
  • fastjson 序列化、反序列化

三、RPC 生产者

1、服务接口API

整个 RPC 系统,我们分为生成者和消费者。首先他们有一个共同的服务接口 API。在这里,我们搞一个操作用户信息的 service 接口。

public interface InfoUserService {
    List<InfoUser> insertInfoUser(InfoUser infoUser);
    InfoUser getInfoUserById(String id);
    void deleteInfoUserById(String id);
    String getNameById(String id);
    Map<String,InfoUser> getAllUser();
}

2、服务类实现

作为生产者,它当然要有实现类,我们创建InfoUserServiceImpl实现类,并用注解把它标注为 RPC 的服务,然后注册到 Srping 的 Bean 容器中。在这里,我们把infoUserMap当做数据库,存储用户信息。

package com.viewscenes.netsupervisor.service.impl;

@RpcService
public class InfoUserServiceImpl implements InfoUserService {

    Logger logger = LoggerFactory.getLogger(this.getClass());
    //当做数据库,存储用户信息
    Map<String,InfoUser> infoUserMap = new HashMap<>();

    public List<InfoUser> insertInfoUser(InfoUser infoUser) {
        logger.info("新增用户信息:{}", JSONObject.toJSONString(infoUser));
        infoUserMap.put(infoUser.getId(),infoUser);
        return getInfoUserList();
    }
    public InfoUser getInfoUserById(String id) {
        InfoUser infoUser = infoUserMap.get(id);
        logger.info("查询用户ID:{}",id);
        return infoUser;
    }

    public List<InfoUser> getInfoUserList() {
        List<InfoUser> userList = new ArrayList<>();
        Iterator<Map.Entry<String, InfoUser>> iterator = infoUserMap.entrySet().iterator();
        while (iterator.hasNext()){
            Map.Entry<String, InfoUser> next = iterator.next();
            userList.add(next.getValue());
        }
        logger.info("返回用户信息记录数:{}",userList.size());
        return userList;
    }
    public void deleteInfoUserById(String id) {
        logger.info("删除用户信息:{}",JSONObject.toJSONString(infoUserMap.remove(id)));
    }
    public String getNameById(String id){
        logger.info("根据ID查询用户名称:{}",id);
        return infoUserMap.get(id).getName();
    }
    public Map<String,InfoUser> getAllUser(){
        logger.info("查询所有用户信息{}",infoUserMap.keySet().size());
        return infoUserMap;
    }
}

注解@RpcService定义如下:

package com.viewscenes.netsupervisor.annotation;

@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Component
public @interface RpcService {}

3、请求信息和返回信息

所有的请求信息和返回信息,我们用两个 JavaBean 来表示。其中的重点是,返回信息要带有请求信息的ID。

package com.viewscenes.netsupervisor.entity;
public class Request {
    private String id;
    private String className;// 类名
    private String methodName;// 函数名称
    private Class<?>[] parameterTypes;// 参数类型
    private Object[] parameters;// 参数列表
    get/set ...
}
package com.viewscenes.netsupervisor.entity;
public class Response {
    private String requestId;
    private int code;
    private String error_msg;
    private Object data;
    get/set ...
}

4、Netty 服务端

Netty 作为高性能的 NIO 通信框架,在很多 RPC 框架中都有它的身影。我们也采用它当做通信服务器。说到这,我们先看个配置文件,重点有两个,zookeeper 的注册地址和 Netty 通信服务器的地址。

#TOMCAT端口
server.port=8001
#zookeeper注册地址
registry.address=192.168.174.10:2181
#RPC服务提供者地址
rpc.server.address=192.168.210.81:18868

为了方便管理,我们把它也注册成 Bean,同时实现 ApplicationContextAware 接口,把上面 @RpcService 注解的服务类捞出来,缓存起来,供消费者调用。同时,作为服务器,还要对客户端的链路进行心跳检测,超过60秒未读写数据,关闭此连接。

package com.viewscenes.netsupervisor.netty.server;

import com.viewscenes.netsupervisor.annotation.RpcService;
import com.viewscenes.netsupervisor.netty.codec.json.JSONDecoder;
import com.viewscenes.netsupervisor.netty.codec.json.JSONEncoder;
import com.viewscenes.netsupervisor.registry.ServiceRegistry;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;
import io.netty.handler.timeout.IdleStateHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;

@Component
public class NettyServer implements ApplicationContextAware,InitializingBean{

    private static final Logger logger = LoggerFactory.getLogger(NettyServer.class);
    private static final EventLoopGroup bossGroup = new NioEventLoopGroup(1);
    private static final EventLoopGroup workerGroup = new NioEventLoopGroup(4);

    private Map<String, Object> serviceMap = new HashMap<>();

    @Value("${rpc.server.address}")
    private String serverAddress;

    @Autowired
    ServiceRegistry registry;

    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {

        Map<String, Object> beans = applicationContext.getBeansWithAnnotation(RpcService.class);
        for(Object serviceBean:beans.values()){

            Class<?> clazz = serviceBean.getClass();

            Class<?>[] interfaces = clazz.getInterfaces();

            for (Class<?> inter : interfaces){
                String interfaceName = inter.getName();
                logger.info("加载服务类: {}", interfaceName);
                serviceMap.put(interfaceName, serviceBean);
            }
        }
        logger.info("已加载全部服务接口:{}", serviceMap);
    }

    public void afterPropertiesSet() throws Exception {
        start();
    }

    public void start(){

        final NettyServerHandler handler = new NettyServerHandler(serviceMap);

        new Thread(() -> {
            try {
                ServerBootstrap bootstrap = new ServerBootstrap();
                bootstrap.group(bossGroup,workerGroup).
                        channel(NioServerSocketChannel.class).
                        option(ChannelOption.SO_BACKLOG,1024).
                        childOption(ChannelOption.SO_KEEPALIVE,true).
                        childOption(ChannelOption.TCP_NODELAY,true).
                        childHandler(new ChannelInitializer<SocketChannel>() {
                            //创建NIOSocketChannel成功后,在进行初始化时,将它的ChannelHandler设置到ChannelPipeline中,用于处理网络IO事件
                            protected void initChannel(SocketChannel channel) throws Exception {
                                ChannelPipeline pipeline = channel.pipeline();
                                pipeline.addLast(new IdleStateHandler(0, 0, 60));
                                pipeline.addLast(new JSONEncoder());
                                pipeline.addLast(new JSONDecoder());
                                pipeline.addLast(new HeartBeatHandler());
                                pipeline.addLast(handler);
                            }
                        });

                String[] array = serverAddress.split(":");
                String host = array[0];
                int port = Integer.parseInt(array[1]);
                ChannelFuture cf = bootstrap.bind(host,port).sync();
                logger.info("RPC 服务器启动.监听端口:"+port);
                registry.register(serverAddress);
                //等待服务端监听端口关闭
                cf.channel().closeFuture().sync();
            } catch (Exception e) {
                e.printStackTrace();
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
        }).start();
    }
}

setApplicationContext方法,将被 @RpcService 注解的服务类,存储在 serviceMap 中。start方法,启动 Netty 服务端。new IdleStateHandler(0, 0, 60)检测心跳机制,表示 60s 内如果没有接收到客户端的读写请求,将走ChannelInboundHandlerAdapter.userEventTriggered方法。于是我们自定义HeartBeatHandler心跳处理器,来重写userEventTriggered方法,将连接关闭。

package com.viewscenes.netsupervisor.netty.server;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * 用于检测channel的心跳handler
 * 继承ChannelInboundHandlerAdapter,从而不需要实现channelRead0 方法
 * @author K. L. Mao
 * @create 2019/2/22
 */
public class HeartBeatHandler extends ChannelInboundHandlerAdapter {

    private final Logger logger = LoggerFactory.getLogger(HeartBeatHandler.class);

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent){
            IdleStateEvent event = (IdleStateEvent)evt;
            if (event.state()== IdleState.ALL_IDLE){
                logger.info("客户端已超过60秒未读写数据,关闭连接.{}",ctx.channel().remoteAddress());
                ctx.channel().close();
            }
        }
    }
}

在处理器中的构造函数中,我们先把服务 Bean 的 serviceMap 传进来,所有的处理要基于这个 serviceMap 才能找到对应的实现类。在channelRead中,获取请求方法的信息,然后通过反射调用方法获取返回值。

package com.viewscenes.netsupervisor.netty.server;

import com.alibaba.fastjson.JSON;
import com.viewscenes.netsupervisor.entity.Request;
import com.viewscenes.netsupervisor.entity.Response;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.lang.reflect.Method;
import java.util.Map;

/**
 * @program: rpc-provider
 * @description: ${description}
 * @author: shiqizhen
 * @create: 2018-11-30 17:27
 **/
@ChannelHandler.Sharable
public class NettyServerHandler extends ChannelInboundHandlerAdapter {

    private final Logger logger = LoggerFactory.getLogger(NettyServerHandler.class);
    private final Map<String, Object> serviceMap;

    public NettyServerHandler(Map<String, Object> serviceMap) {
        this.serviceMap = serviceMap;
    }

    public void channelActive(ChannelHandlerContext ctx)   {
        logger.info("客户端连接成功!"+ctx.channel().remoteAddress());
    }

    public void channelInactive(ChannelHandlerContext ctx)   {
        logger.info("客户端断开连接!{}",ctx.channel().remoteAddress());
        ctx.channel().close();
    }

    public void channelRead(ChannelHandlerContext ctx, Object msg)   {
        Request request = JSON.parseObject(msg.toString(),Request.class);

        if ("heartBeat".equals(request.getMethodName())) {
            logger.info("客户端心跳信息..."+ctx.channel().remoteAddress());
        }else{
            logger.info("RPC客户端请求接口:"+request.getClassName()+"   方法名:"+request.getMethodName());
            Response response = new Response();
            response.setRequestId(request.getId());
            try {
                Object result = this.handler(request);
                response.setData(result);
            } catch (Throwable e) {
                e.printStackTrace();
                response.setCode(1);
                response.setError_msg(e.toString());
                logger.error("RPC Server handle request error",e);
            }
            ctx.writeAndFlush(response);
        }
    }

    /**
     * 通过反射,执行本地方法
     * @param request
     * @return
     * @throws Throwable
     */
    private Object handler(Request request) throws Throwable{
        String className = request.getClassName();
        Object serviceBean = serviceMap.get(className);

        if (serviceBean!=null){
            Class<?> serviceClass = serviceBean.getClass();
            String methodName = request.getMethodName();
            Class<?>[] parameterTypes = request.getParameterTypes();
            Object[] parameters = request.getParameters();

            Method method = serviceClass.getMethod(methodName, parameterTypes);
            method.setAccessible(true);
            return method.invoke(serviceBean, getParameters(parameterTypes,parameters));
        }else{
            throw new Exception("未找到服务接口,请检查配置!:"+className+"#"+request.getMethodName());
        }
    }

    /**
     * 获取参数列表
     * @param parameterTypes
     * @param parameters
     * @return
     */
    private Object[] getParameters(Class<?>[] parameterTypes,Object[] parameters){
        if (parameters==null || parameters.length==0){
            return parameters;
        }else{
            Object[] new_parameters = new Object[parameters.length];
            for(int i=0;i<parameters.length;i++){
                new_parameters[i] = JSON.parseObject(parameters[i].toString(),parameterTypes[i]);
            }
            return new_parameters;
        }
    }

    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)   {
        logger.info(cause.getMessage());
        ctx.close();
    }
}

4、服务注册

我们启动了 Netty 通信服务器,并且把服务实现类加载到缓存,等待请求时调用。这一步,我们要进行服务注册。为了简单化处理,我们只注册通信服务器的监听地址即可。
在上面代码中,bind 之后我们执行了registry.register(serverAddress);它的作用就是,将 Netty 监听的 IP 端口注册到 zookeeper。

package com.viewscenes.netsupervisor.registry;
@Component
public class ServiceRegistry {
    Logger logger = LoggerFactory.getLogger(this.getClass());
    @Value("${registry.address}")
    private String registryAddress;
    private static final String ZK_REGISTRY_PATH = "/rpc";

    public void register(String data) {
        if (data != null) {
            ZkClient client = connectServer();
            if (client != null) {
                AddRootNode(client);
                createNode(client, data);
            }
        }
    }
    //连接zookeeper
    private ZkClient connectServer() {
        ZkClient client = new ZkClient(registryAddress,20000,20000);
        return client;
    }
    //创建根目录/rpc
    private void AddRootNode(ZkClient client){
        boolean exists = client.exists(ZK_REGISTRY_PATH);
        if (!exists){
            client.createPersistent(ZK_REGISTRY_PATH);
            logger.info("创建zookeeper主节点 {}",ZK_REGISTRY_PATH);
        }
    }
    //在/rpc根目录下,创建临时顺序子节点
    private void createNode(ZkClient client, String data) {
        String path = client.create(ZK_REGISTRY_PATH + "/provider", data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
        logger.info("创建zookeeper数据节点 ({} => {})", path, data);
    }
}

有一点需要注意,子节点必须是临时节点。这样,生产者端停掉之后,才能通知到消费者,把此服务从服务列表中剔除。到此为止,生产者端已经完成。我们看一下它的启动日志:

加载服务类: com.viewscenes.netsupervisor.service.InfoUserService
已加载全部服务接口:{com.viewscenes.netsupervisor.service.InfoUserService=com.viewscenes.netsupervisor.service.impl.InfoUserServiceImpl@46cc127b}
Initializing ExecutorService 'applicationTaskExecutor'
Tomcat started on port(s): 8001 (http) with context path ''
Started RpcProviderApplication in 2.003 seconds (JVM running for 3.1)
RPC 服务器启动.监听端口:18868
Starting ZkClient event thread.
Socket connection established to node1/192.168.174.10:2181, initiating session
Session establishment complete on server node1/192.168.174.10:2181, sessionid = 0x367835b48970010, negotiated timeout = 4000
zookeeper state changed (SyncConnected)
创建zookeeper主节点 /rpc
创建zookeeper数据节点 (/rpc/provider0000000000 => 192.168.210.81:18868)

四、RPC 消费者

首先,我们需要把生产者端的服务接口API,即InfoUserService。以相同的目录放到消费者端。路径不同,调用会找不到的哦(实际项目上,是通过依赖 jar 包实现的)。

1、代理

RPC的目标其中有一条,程序员无需额外地为这个交互作用编程。所以,我们在调用的时候,就像调用本地方法一样。就像下面这样:

@Controller
public class IndexController {  
    @Autowired
    private InfoUserService userService;
    
    @RequestMapping("getById")
    @ResponseBody
    public InfoUser getById(String id){
        logger.info("根据ID查询用户信息:{}",id);
        return userService.getInfoUserById(id);
    }
}

那么,问题来了。消费者端并没有此接口的实现,怎么调用到的呢?这里,首先就是代理。这里用的是 Spring 的工厂 Bean 机制创建的代理对象(JDK 动态代理),类似于 MyBatis 中的 Mapper 接口的调用。

首先,创建代理类(必须实现InvocationHandler):

package com.viewscenes.netsupervisor.configurer.rpc;

@Component
public class RpcFactory implements InvocationHandler {

    @Autowired
    private NettyClient client;

    Logger logger = LoggerFactory.getLogger(this.getClass());

    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        Request request = new Request();
        request.setClassName(method.getDeclaringClass().getName());
        request.setMethodName(method.getName());
        request.setParameters(args);
        request.setParameterTypes(method.getParameterTypes());
        request.setId(IdUtil.getId());

        Object result = client.send(request);
        Class<?> returnType = method.getReturnType();

        Response response = JSON.parseObject(result.toString(), Response.class);
        if (response.getCode()==1){
            throw new Exception(response.getError_msg());
        }
        if (returnType.isPrimitive() || String.class.isAssignableFrom(returnType)){
            return response.getData();
        }else if (Collection.class.isAssignableFrom(returnType)){
            return JSONArray.parseArray(response.getData().toString(),Object.class);
        }else if(Map.class.isAssignableFrom(returnType)){
            return JSON.parseObject(response.getData().toString(),Map.class);
        }else{
            Object data = response.getData();
            return JSONObject.parseObject(data.toString(), returnType);
        }
    }
}

这个代理类的invoke方法,会将客户端的Request发送给Netty 服务端,并接收服务端的返回值。

定义一个 RPC 工厂 Bean:

package com.viewscenes.netsupervisor.configurer.rpc;

import org.springframework.beans.factory.FactoryBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.support.AbstractBeanFactory;
import org.springframework.context.support.AbstractApplicationContext;

import java.lang.reflect.Proxy;

/**
 * Created by MACHENIKE on 2018-12-03.
 */
public class RpcFactoryBean<T> implements FactoryBean<T> {

    private Class<T> rpcInterface;

    @Autowired
    private RpcFactory<T> factory;

    /**
     * {@link AbstractApplicationContext#registerBeanPostProcessors(org.springframework.beans.factory.config.ConfigurableListableBeanFactory)}
     * 会通过 rpcInterface 实例化 RpcFactoryBean
     * @param rpcInterface
     */
    public RpcFactoryBean(Class<T> rpcInterface) {
        this.rpcInterface = rpcInterface;
    }

    /**
     * 把 Bean 的定义 GenericBeanDefinition 放到了容器之后,就需要初始化这些 Bean,
     * 而 Bean 的初始化时机有2个:
     *      1、在程序第一个主动调用 getBean 的时候
     *      2、在完成容器初始化的时候会初始化 lazy-init 配置为 false 的Bean(默认为false)
     * 在这里,由于 RpcFactoryBean 未设置懒加载,故初始化的时机是第二种。上面两种初始化的过程都是一样的,
     * 都会调用 {@link AbstractBeanFactory#doGetBean(java.lang.String, java.lang.Class, java.lang.Object[], boolean) 方法,
     * 里面有个方法 getObjectForBeanInstance,会判断当前的 Bean 是否实现了 {@link FactoryBean}。
     * 如果该 Bean 未实现 FactoryBean 接口,则直接返回该Bean实例;
     * 如果该 Bean 实现了 FactoryBean 接口,则会返回的实例是 getObject() 返回值。
     * @return
     * @throws Exception
     */
    public T getObject() throws Exception {
        return getRpc();
    }

    public Class<?> getObjectType() {
        return this.rpcInterface;
    }

    public boolean isSingleton() {
        return true;
    }

    /**
     * JDK 动态代理,当调用 rpcInterface 接口的方法时,会走 factory 的 invoke 方法
     * @param <T>
     * @return
     */
    public <T> T getRpc() {
        return (T) Proxy.newProxyInstance(rpcInterface.getClassLoader(), new Class[] { rpcInterface },factory);
    }
}

该类实现了FactoryBean接口,表示这个类是工厂 Bean,它在 Spring 容器存放的实例不是类本身,而是getObject的返回值。这里声明了一个参数构造器,目的是在程序启动过程中,调用AbstractApplicationContext.refresh方法,refresh方法里面会走registerBeanPostProcessors方法,该方法会通过反射,把rpcInterface传过来实例化 RpcFactoryBean

接下来,我们就要定义一个路径扫描类,来扫描指定路径下的接口,生成 Bean 的定义 BeanDefinition,并放进容器。

package com.viewscenes.netsupervisor.configurer.rpc;

import org.springframework.beans.factory.annotation.AnnotatedBeanDefinition;
import org.springframework.beans.factory.config.BeanDefinitionHolder;
import org.springframework.beans.factory.support.AbstractBeanDefinition;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.beans.factory.support.GenericBeanDefinition;
import org.springframework.context.annotation.ClassPathBeanDefinitionScanner;
import org.springframework.core.type.classreading.MetadataReader;
import org.springframework.core.type.classreading.MetadataReaderFactory;
import org.springframework.core.type.filter.AnnotationTypeFilter;
import org.springframework.core.type.filter.TypeFilter;

import java.io.IOException;
import java.lang.annotation.Annotation;
import java.util.Arrays;
import java.util.Set;

/**
 * Created by MACHENIKE on 2018-12-03.
 */
public class ClassPathRpcScanner extends ClassPathBeanDefinitionScanner{

    public ClassPathRpcScanner(BeanDefinitionRegistry registry) {
        super(registry);
    }

    public Set<BeanDefinitionHolder> doScan(String... basePackages) {
        // 获取指定路径下的 beanDefinitions
        Set<BeanDefinitionHolder> beanDefinitions = super.doScan(basePackages);

        if (beanDefinitions.isEmpty()) {
            logger.warn("No RPC mapper was found in '"
                    + Arrays.toString(basePackages)
                    + "' package. Please check your configuration.");
        } else {
            // 对 beanDefinitions 进行注册
            processBeanDefinitions(beanDefinitions);
        }

        return beanDefinitions;
    }

    /**
     * 方法会根据配置的属性生成对应的过滤器,然后这些过滤器在扫描的时候会起作用。
     */
    public void registerFilters() {
        // default include filter that accepts all classes 接收所有接口
        addIncludeFilter((metadataReader, metadataReaderFactory) ->
                true);

        // exclude package-info.java
        addExcludeFilter((metadataReader, metadataReaderFactory) -> {
            String className = metadataReader.getClassMetadata()
                    .getClassName();
            return className.endsWith("package-info");
        });
    }

    private void processBeanDefinitions(
            Set<BeanDefinitionHolder> beanDefinitions) {

        GenericBeanDefinition definition;

        for (BeanDefinitionHolder holder : beanDefinitions) {

            definition = (GenericBeanDefinition) holder.getBeanDefinition();
            definition.getConstructorArgumentValues().addGenericArgumentValue(definition.getBeanClassName());
            // 设置接口的 beanClass 都为 RpcFactoryBean<?>,因为 RpcFactoryBean 实现了 FactoryBean 接口,这样初始化 Bean 时就会调用 getObject 方法
            definition.setBeanClass(RpcFactoryBean.class);
            // 设置BeanDefinition自动注入类型,这样就能被 Spring 管理了
            definition.setAutowireMode(AbstractBeanDefinition.AUTOWIRE_BY_TYPE);
            System.out.println(holder);
        }
    }

    @Override
    protected boolean isCandidateComponent(AnnotatedBeanDefinition beanDefinition) {
        return beanDefinition.getMetadata().isInterface() && beanDefinition.getMetadata().isIndependent();
    }
}

doScan方法,是扫描出指定路径下的BeanDefinitionHolder,然后执行processBeanDefinitions(beanDefinitions),对BeanDefinitionHolder进行注册(放入容器管理)。
registerFilters方法的目的是,根据配置的属性生成对应的过滤器,然后这些过滤器在扫描的时候会起作用。本案例中由于没有配置任何属性,故生成接收所有接口的过滤器。

接下来就是要将扫描器ClassPathRpcScanner放入配置类,让其生效了。

package com.viewscenes.netsupervisor.configurer.rpc;

import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.beans.factory.support.BeanDefinitionRegistryPostProcessor;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;

/**
 * Created by MACHENIKE on 2018-12-03.
 */
@Component
public class RpcScannerConfigurer implements BeanDefinitionRegistryPostProcessor {

    // 这个可以存在配置文件
    String basePackage = "com.viewscenes.netsupervisor.service";

    /**
     * 扫描指定路径下的接口,生成 Bean 的定义 GenericBeanDefinition,并放到了容器
     * @param beanDefinitionRegistry
     * @throws BeansException
     */
    @Override
    public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry beanDefinitionRegistry) throws BeansException {
        ClassPathRpcScanner scanner = new ClassPathRpcScanner(beanDefinitionRegistry);

        scanner.registerFilters();

        scanner.scan(StringUtils.tokenizeToStringArray(this.basePackage, ConfigurableApplicationContext.CONFIG_LOCATION_DELIMITERS));
    }

    @Override
    public void postProcessBeanFactory(ConfigurableListableBeanFactory configurableListableBeanFactory) throws BeansException {

    }
}

该配置类实现BeanDefinitionRegistryPostProcessor,重写postProcessBeanDefinitionRegistry方法。然后引入ClassPathRpcScanner,调用其registerFiltersscan方法进行BeanDefinition的注册。

注册完成之后,只是把 Bean 的定义BeanDefinition放到了容器,还没有初始化这些 Bean,而 Bean 的初始化时机有2个:

  • 1、在程序第一个主动调用getBean的时候
  • 2、在完成容器初始化的时候会初始化 lazy-init 配置为 false 的Bean(默认为false)

由于RpcFactoryBean未设置懒加载,故容器初始化完成的时候就会初始化RpcFactoryBean。初始化的过程中,会调用AbstractBeanFactory.getBean方法,这就涉及到 Spring IOC 中,创建 Bean 的过程,直接上源码:

protected <T> T doGetBean(
            final String name, final Class<T> requiredType, final Object[] args, boolean typeCheckOnly)
            throws BeansException {
         // 转化beanName,因为 FactoryBean 是以“&”前缀的,需要去掉
        final String beanName = transformedBeanName(name);
        Object bean;

        // 创建单例的 Bean
        Object sharedInstance = getSingleton(beanName);
        if (sharedInstance != null && args == null) {
            if (logger.isDebugEnabled()) {
                if (isSingletonCurrentlyInCreation(beanName)) {
                    logger.debug("Returning eagerly cached instance of singleton bean '" + beanName +
                            "' that is not fully initialized yet - a consequence of a circular reference");
                }
                else {
                    logger.debug("Returning cached instance of singleton bean '" + beanName + "'");
                }
            }
            // 根据实例信息获取真正的实例,因为 FactotyBean 的实例不是 sharedInstance,而是其 getObect() 的返回值
            bean = getObjectForBeanInstance(sharedInstance, name, beanName, null);
        }
        ......

        // 校验类型与实例是否匹配
        if (requiredType != null && bean != null && !requiredType.isInstance(bean)) {
            try {
                return getTypeConverter().convertIfNecessary(bean, requiredType);
            }
            catch (TypeMismatchException ex) {
                if (logger.isDebugEnabled()) {
                    logger.debug("Failed to convert bean '" + name + "' to required type '" +
                            ClassUtils.getQualifiedName(requiredType) + "'", ex);
                }
                throw new BeanNotOfRequiredTypeException(name, requiredType, bean.getClass());
            }
        }
        return (T) bean;
    }

doGetBean里面有个方法 getObjectForBeanInstance,会判断当前的 Bean 是否实现了FactoryBean
如果该 Bean 未实现FactoryBean接口,则直接返回该 Bean 实例;
如果该 Bean 实现了FactoryBean接口,则会返回的实例是getObject()返回值。

protected Object getObjectForBeanInstance(
            Object beanInstance, String name, String beanName, RootBeanDefinition mbd) {

        // 非 FactoryBean 不能以“&”为前缀
        if (BeanFactoryUtils.isFactoryDereference(name) && !(beanInstance instanceof FactoryBean)) {
            throw new BeanIsNotAFactoryException(transformedBeanName(name), beanInstance.getClass());
        }

        // 验证是否是 FactoryBean,如果不是,直接返回实例
        if (!(beanInstance instanceof FactoryBean) || BeanFactoryUtils.isFactoryDereference(name)) {
            return beanInstance;
        }

        Object object = null;
        if (mbd == null) {
            object = getCachedObjectForFactoryBean(beanName);
        }
        if (object == null) {
            // Return bean instance from factory.
            FactoryBean<?> factory = (FactoryBean<?>) beanInstance;
            // Caches object obtained from FactoryBean if it is a singleton.
            if (mbd == null && containsBeanDefinition(beanName)) {
                mbd = getMergedLocalBeanDefinition(beanName);
            }
            boolean synthetic = (mbd != null && mbd.isSynthetic());
            // 返回 FactoryBean 实例
            object = getObjectFromFactoryBean(factory, beanName, !synthetic);
        }
        return object;
    }

由于我们的RpcFactoryBean实现了FactoryBean,故其初始化的过程中,返回的实例是getObject()的返回值。我们可以看到,getObject()的实现使用了 JDK 动态代理(T) Proxy.newProxyInstance(rpcInterface.getClassLoader(), new Class[] { rpcInterface },factory),返回值为被代理对象rpcInterface的实例。于是,当rpcInterface接口调用其方法时,就会走RpcFactory.invoke方法。在这里,封装请求信息,然后调用 Netty 的客户端方法发送消息。然后根据方法返回值类型,转成相应的对象返回。

2、服务发现

在生产者端,我们把服务 IP、端口都注册到 zookeeper 中,所以这里,我们要去拿到服务地址,然后通过 Netty 连接。重要的是,还要对根目录进行监听子节点数据变化,这样随着生产者的上线和下线,消费者端可以及时感知。

package com.viewscenes.netsupervisor.connection;

@Component
public class ServiceDiscovery {

    @Value("${registry.address}")
    private String registryAddress;
    @Autowired
    ConnectManage connectManage;

    // 服务地址列表
    private volatile List<String> addressList = new ArrayList<>();
    private static final String ZK_REGISTRY_PATH = "/rpc";
    private ZkClient client;

    Logger logger = LoggerFactory.getLogger(this.getClass());

    @PostConstruct
    public void init(){
        client = connectServer();
        if (client != null) {
            watchNode(client);
        }
    }
    
    //连接zookeeper
    private ZkClient connectServer() {
        ZkClient client = new ZkClient(registryAddress,30000,30000);
        return client;
    }
    //监听子节点变化(子节点的增加和删除)
    private void watchNode(final ZkClient client) {
        List<String> nodeList = client.subscribeChildChanges(ZK_REGISTRY_PATH, (s, nodes) -> {
            logger.info("监听到子节点变化{}",JSONObject.toJSONString(nodes));
            addressList.clear();
            getNodeData(nodes);
            updateConnectedServer();
        });
        getNodeData(nodeList);
        logger.info("已发现服务列表...{}", JSONObject.toJSONString(addressList));
        updateConnectedServer();
    }
    //连接生产者端服务
    private void updateConnectedServer(){
        connectManage.updateConnectServer(addressList);
    }

    private void getNodeData(List<String> nodes){
        logger.info("/rpc子节点数据为:{}", JSONObject.toJSONString(nodes));
        for(String node:nodes){
            String address = client.readData(ZK_REGISTRY_PATH+"/"+node);
            addressList.add(address);
        }
    }
}

其中,connectManage.updateConnectServer(addressList);就是根据服务地址,去连接生产者端的 Netty 服务。然后创建一个 Channel 列表,在发送消息的时候,从中选取一个 Channel 和生产者端进行通信。

3、Netty 客户端

Netty 客户端有两个方法比较重要,一个是doConnect:根据IP、端口连接服务器,返回Channel,加入到连接管理器;一个是send:用 Channel 发送请求数据。同时,作为客户端,空闲的时候还要往服务端发送心跳信息。

package com.viewscenes.netsupervisor.netty.client;

import com.alibaba.fastjson.JSONArray;
import com.viewscenes.netsupervisor.connection.ConnectManage;
import com.viewscenes.netsupervisor.entity.Request;
import com.viewscenes.netsupervisor.entity.Response;
import com.viewscenes.netsupervisor.netty.codec.json.JSONDecoder;
import com.viewscenes.netsupervisor.netty.codec.json.JSONEncoder;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;
import io.netty.handler.timeout.IdleStateHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;

/**
 * Created by MACHENIKE on 2018-12-03.
 */
@Component
public class NettyClient {

    Logger logger = LoggerFactory.getLogger(this.getClass());

    private EventLoopGroup group = new NioEventLoopGroup(1);
    private Bootstrap bootstrap = new Bootstrap();

    @Autowired
    NettyClientHandler clientHandler;

    @Autowired
    ConnectManage connectManage;


    public NettyClient(){
        bootstrap.group(group).
                channel(NioSocketChannel.class).
                option(ChannelOption.TCP_NODELAY, true).
                option(ChannelOption.SO_KEEPALIVE,true).
                handler(new ChannelInitializer<SocketChannel>() {
                    //创建NIOSocketChannel成功后,在进行初始化时,将它的ChannelHandler设置到ChannelPipeline中,用于处理网络IO事件
                    protected void initChannel(SocketChannel channel) throws Exception {
                        ChannelPipeline pipeline = channel.pipeline();
                        pipeline.addLast(new IdleStateHandler(0, 0, 30));
                        pipeline.addLast(new JSONEncoder());
                        pipeline.addLast(new JSONDecoder());
                        pipeline.addLast(new HeartBeatHandler());
                        pipeline.addLast(clientHandler);
                    }
                });
    }

    @PreDestroy
    public void destroy(){
        logger.info("RPC客户端退出,释放资源!");
        group.shutdownGracefully();
    }

    public Object send(Request request) throws InterruptedException{

        Channel channel = connectManage.chooseChannel();
        if (channel!=null && channel.isActive()) {
            SynchronousQueue<Object> queue = clientHandler.sendRequest(request,channel);
            Object result = queue.take();
            return JSONArray.toJSONString(result);
        }else{
            Response res = new Response();
            res.setCode(1);
            res.setError_msg("未正确连接到服务器.请检查相关配置信息!");
            return JSONArray.toJSONString(res);
        }
    }
    public Channel doConnect(SocketAddress address) throws InterruptedException {
        ChannelFuture future = bootstrap.connect(address);
        Channel channel = future.sync().channel();
        return channel;
    }
}

这里,我们依然定义了一个心跳机制处理器HeartBeatHandler,目的是在new IdleStateHandler(0, 0, 30)约定的30s内客户端未与服务端发生通信,为了告诉服务端该客户端依然正常工作(因为服务端心跳检测是60s),则客户端需要发送心跳包给服务端。

package com.viewscenes.netsupervisor.netty.client;

import com.viewscenes.netsupervisor.entity.Request;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * 用于检测channel的心跳handler
 * 继承ChannelInboundHandlerAdapter,从而不需要实现channelRead0 方法
 * @author K. L. Mao
 * @create 2019/2/22
 */
public class HeartBeatHandler extends ChannelInboundHandlerAdapter {

    private final Logger logger = LoggerFactory.getLogger(HeartBeatHandler.class);

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        logger.info("已超过30秒未与RPC服务器进行读写操作!将发送心跳消息...");
        if (evt instanceof IdleStateEvent){
            IdleStateEvent event = (IdleStateEvent)evt;
            if (event.state()== IdleState.ALL_IDLE){
                Request request = new Request();
                request.setMethodName("heartBeat");
                ctx.channel().writeAndFlush(request);
            }
        }
    }
}

我们必须重点关注send方法,它是在代理对象invoke方法调用到的。首先从连接器中轮询选择一个 Channel,然后发送数据。但是,Netty 是异步操作,我们还要转为同步,就是说要等待生产者端返回数据才往下执行。笔者在这里用的是同步队列SynchronousQueue,它的take方法会阻塞在这里,直到里面有数据可读。然后在处理器中,拿到返回信息写到队列中,take方法返回。

package com.viewscenes.netsupervisor.netty.client;

import com.alibaba.fastjson.JSON;
import com.viewscenes.netsupervisor.connection.ConnectManage;
import com.viewscenes.netsupervisor.entity.Request;
import com.viewscenes.netsupervisor.entity.Response;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.net.InetSocketAddress;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.SynchronousQueue;

/**
 * Created by MACHENIKE on 2018-12-03.
 */
@Component
@ChannelHandler.Sharable
public class NettyClientHandler extends ChannelInboundHandlerAdapter {

    @Autowired
    NettyClient client;

    @Autowired
    ConnectManage connectManage;

    Logger logger = LoggerFactory.getLogger(this.getClass());

    private ConcurrentHashMap<String,SynchronousQueue<Object>> queueMap = new ConcurrentHashMap<>();

    public void channelActive(ChannelHandlerContext ctx) {
        logger.info("已连接到RPC服务器.{}",ctx.channel().remoteAddress());
    }

    public void channelInactive(ChannelHandlerContext ctx) {
        InetSocketAddress address =(InetSocketAddress) ctx.channel().remoteAddress();
        logger.info("与RPC服务器断开连接."+address);
        ctx.channel().close();
        connectManage.removeChannel(ctx.channel());
    }

    /**
     * 接收服务端返回信息
     * @param ctx
     * @param msg
     * @throws Exception
     */
    public void channelRead(ChannelHandlerContext ctx, Object msg)throws Exception {
        Response response = JSON.parseObject(msg.toString(),Response.class);
        String requestId = response.getRequestId();
        SynchronousQueue<Object> queue = queueMap.get(requestId);
        queue.put(response);
        queueMap.remove(requestId);
    }

    public SynchronousQueue<Object> sendRequest(Request request,Channel channel) {
        SynchronousQueue<Object> queue = new SynchronousQueue<>();
        queueMap.put(request.getId(), queue);
        channel.writeAndFlush(request);
        return queue;
    }

    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause){
        logger.info("RPC通信服务器发生异常.{}",cause);
        ctx.channel().close();
    }
}

至此,消费者端也基本完成。同样的,我们先看一下启动日志:

Waiting for keeper state SyncConnected
Opening socket connection to server 192.168.174.10:2181. Will not attempt to authenticate using SASL (unknown error)
Socket connection established to 192.168.174.10:2181, initiating session
Session establishment complete on server 192.168.174.10:2181, sessionid = 0x100000273ba002c, negotiated timeout = 20000
zookeeper state changed (SyncConnected)
/rpc子节点数据为:["provider0000000015"]
已发现服务列表...["192.168.210.81:18868"]
加入Channel到连接管理器./192.168.100.74:18868
已连接到RPC服务器./192.168.210.81:18868
Initializing ExecutorService 'applicationTaskExecutor'
Tomcat started on port(s): 7002 (http) with context path ''
Started RpcConsumerApplication in 4.218 seconds (JVM running for 5.569)

五、总结

本文简单介绍了 RPC 的整个流程,如果你正在学习 RPC 的相关知识,可以根据文中的例子,自己实现一遍。相信写完之后,你会对 RPC 会有更深一些的认识。

生产者端流程:

  • 加载服务,并缓存
  • 启动通讯服务器(Netty)
  • 服务注册(把通讯地址放入 zookeeper,也可以把加载到的服务也放进去)
  • 反射,本地调用

消费者端流程:

  • 代理服务接口
  • 服务发现(连接 zookeeper,拿到服务地址列表)
  • 远程调用(轮询生产者服务列表,发送消息)

消费端调用服务流程:由于消费端使用了 JDK 动态代理(默认是使用 javassist 生成字节码包做的代理),代理了服务接口,于是当调用服务接口时,会走代理类的invoke方法,invoke方法会将接口信息通过 Netty 发送给服务端,服务端首先通过接口名找到其实现类(内部保存了映射关系),然后通过反射执行本地实现类的方法,最后将返回结果通过 Netty 发送给消费端。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,454评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,553评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,921评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,648评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,770评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,950评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,090评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,817评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,275评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,592评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,724评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,409评论 4 333
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,052评论 3 316
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,815评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,043评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,503评论 2 361
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,627评论 2 350

推荐阅读更多精彩内容

  • 在分析RMI原理一文中,我们知道RMI是通过底层封装TCP网络通信实现。基于此思路本文从以下切入点实现一个简单的R...
    匠丶阅读 2,923评论 1 4
  • 在分布式服务框架中,一个最基础的问题就是远程服务是怎么通讯的,在Java领域中有很多可实现远程通讯的技术,例如:R...
    java菜阅读 982评论 0 2
  • 八期行动星球:第12天。QQ 115 目标:1.步行5000步 完成程度:目标已完成。
    静心小站阅读 145评论 0 0
  • 学校是我们学习和生活的地方,学校又成为校园是美丽的。 走进学校大门,首先映入眼帘的是正面的一棵棵高大的松树,像忠实...
    许雅晶阅读 317评论 0 0
  • 1.付出不亚于任何人的努力 2.要谦虚,不要骄傲 3.要每天反省 4.活着,就要感谢 5.积善行,思利他 6.不要...
    六项精进阿晋阅读 194评论 0 0