基于Zookeeper构建分布式RPC框架

新年伊始,为了让自己快速的进入工作状态。决定自己动手构建一套分布式RPC框架。

由于是独立的RPC框架,所以采用Zookeeper做注册中心,使用Netty做服务处理。由于Netty是NIO框架,在处理网络请求等待结果返回的时候着实需要一番大改动。

  • 注册中心

    基于Zookeeper做注册中心的实现其实是比较简单的。

    具体的实现逻辑:在服务启动时扫描是否包含RpcService注解的类。然后将该类注解的serverName属性拿到注册到Zookeeper节点。再拿到该服务所在机器(或者容器)的IP,将该IP以临时节点的角色注册到Zookeeper上。同时客户端启动时监听Zookeeper节点改变事件,并及时刷新可用的服务列表。

    代码实现

    IRegisterCenterProvider //服务端逻辑

package one.bugu.zookeeper.rpc.framework.service.zookeeper;

import java.util.List;

/**
 * Created with IntelliJ IDEA.
 * User: LangK
 * Created Date 2019/2/14
 * Time: 14:30
 * Description:
 */
public interface IRegisterCenterProvider {

    /**
     * 服务端获取服务提供者信息
     *
     * 返回对象:key:服务提供者接口,value:服务提供者IP列表
     * @param serverName
     * @param ips
     */
    void registerProvider(String serverName, List<String> ips);

    /**
     * 更新服务端提供者的信息
     */
    void updateProvider();
}


RegisterCenterProviderImpl //服务端逻辑实现

package one.bugu.zookeeper.rpc.framework.service.zookeeper;

import one.bugu.zookeeper.rpc.framework.annotations.RpcService;
import one.bugu.zookeeper.rpc.framework.service.RpcServiceConfiguration;
import one.bugu.zookeeper.rpc.framework.service.socket.ServiceSocket;
import one.bugu.zookeeper.rpc.framework.util.SpringContextUtil;
import one.bugu.zookeeper.rpc.framework.zookeeper.ZooKeeperHelper;
import one.bugu.zookeeper.rpc.framework.zookeeper.ZookeeperConfiguration;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.StringUtils;

import java.net.InetAddress;
import java.net.NetworkInterface;
import java.net.SocketException;
import java.util.*;

/**
 * Created with IntelliJ IDEA.
 * User: LangK
 * Created Date 2019/2/14
 * Time: 14:31
 * Description:
 */
public class RegisterCenterProviderImpl implements IRegisterCenterProvider {

    /**
     * zookeeper配置
     */
    private ZookeeperConfiguration zookeeperConfiguration;
    /**
     * rpc配置
     */
    private RpcServiceConfiguration rpcServiceConfiguration;

    /**
     * zookeeper连接器
     */
    private ZooKeeperHelper zooKeeperHelper;

    /**
     * 保存service对应的Bean
     * 接收到客户端请求时,可快速的找到处理的bean
     */
    public static Map<String, Object> serverBean = new HashMap<>();

    private static Logger logger = LoggerFactory.getLogger(RegisterCenterProviderImpl.class);

    /**
     * netty服务
     */
    private Thread socketThread;

    public RegisterCenterProviderImpl(ZookeeperConfiguration zookeeperConfiguration, RpcServiceConfiguration rpcServiceConfiguration) {
        this.zookeeperConfiguration = zookeeperConfiguration;
        this.rpcServiceConfiguration = rpcServiceConfiguration;
    }

    @Override
    public void registerProvider(String serverName, List<String> ips) {
        if (zooKeeperHelper == null) {
            zooKeeperHelper = new ZooKeeperHelper(zookeeperConfiguration.getUrl(), new ServerNodeChangeWatcher());
        }
        String path = zookeeperConfiguration.getPath() + "/" + serverName;
        if (!zooKeeperHelper.existsNode(path)) {
            String resultPath = zooKeeperHelper.createNode(path, null);
            if (StringUtils.isEmpty(resultPath)) {
                logger.info("RPC服务注册失败,服务名:{}", serverName);
                return;
            }
        }
        for (String ip : ips) {
            String node = path + "/" + ip;
            if (!zooKeeperHelper.existsNode(node)) {
                String resultPath = zooKeeperHelper.createTempNode(node, null);
                if (StringUtils.isEmpty(resultPath)) {
                    logger.info("RPC服务注册失败,服务名:{},IP地址:{}", serverName, ip);
                }
            }
        }
    }


    public void updateProvider() {
        Map<String, Object> beansWithAnnotation = SpringContextUtil.getApplicationContext().getBeansWithAnnotation(RpcService.class);
        if (beansWithAnnotation == null || beansWithAnnotation.isEmpty()) {
            return;
        }
        //Socket地址
        serverBean.clear();
        List<String> ips = getIp();
        for (String key : beansWithAnnotation.keySet()) {
            String serverName = beansWithAnnotation.get(key).getClass().getAnnotation(RpcService.class).name();
            serverBean.put(serverName, beansWithAnnotation.get(key));
            registerProvider(serverName, ips);
        }
        if (socketThread == null) {
            socketThread = new Thread(new ServiceSocket(rpcServiceConfiguration));
        }
        if (!socketThread.isAlive()) {
            socketThread.start();
        }
    }


    /**
     * 获取IP
     *
     * @return IP地址:端口号
     */
    public List<String> getIp() {
        List<String> host_ip_list = new ArrayList<String>();
        try {
            for (NetworkInterface networkInterface : Collections
                    .list(NetworkInterface.getNetworkInterfaces())) {
                for (InetAddress addr : Collections.list(networkInterface.getInetAddresses())) {
                    if (!addr.isLoopbackAddress() && !addr.isLinkLocalAddress() && addr.isSiteLocalAddress()) {
                        host_ip_list.add(addr.getHostAddress() + ":" + rpcServiceConfiguration.getPort());
                    }
                }
            }
        } catch (SocketException e) {
            logger.error("获取IP地址异常", e);
        }
        return host_ip_list;
    }

    class ServerNodeChangeWatcher implements Watcher {
        public void process(WatchedEvent event) {
            if (event.getState() == Event.KeeperState.SyncConnected) {
                logger.info("Watch received SyncConnected event");
                updateProvider();
            }
            if (event.getState() == Event.KeeperState.Disconnected) {
                logger.info("Watch received Disconnected event");
                zooKeeperHelper = null;
            }
        }
    }
}

IRegisterCenterInvoker //客户端逻辑

package one.bugu.zookeeper.rpc.framework.client.zookeeper;

import java.util.List;
import java.util.Map;

/**
 * Created with IntelliJ IDEA.
 * User: LangK
 * Created Date 2019/2/14
 * Time: 14:27
 * Description:
 */
public interface IRegisterCenterInvoker {

    /**
     * 消费端初始化服务提供者信息本地缓存
     */
    public void initProviderMap();

    public void updateProviderMap();

    /**
     * 消费端获取服务提供者信息
     */
    public Map<String, List<String>> getProviderMap();

}

RegisterCenterInvokerImpl //客户端逻辑实现

package one.bugu.zookeeper.rpc.framework.client.zookeeper;

import one.bugu.zookeeper.rpc.framework.zookeeper.ZooKeeperHelper;
import one.bugu.zookeeper.rpc.framework.zookeeper.ZookeeperConfiguration;
import com.google.gson.Gson;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
 * Created with IntelliJ IDEA.
 * User: LangK
 * Created Date 2019/2/14
 * Time: 15:34
 * Description:
 */
public class RegisterCenterInvokerImpl implements IRegisterCenterInvoker {

    private Gson gson = new Gson();

    /**
     * zookeeper配置
     */
    private ZookeeperConfiguration zookeeperConfiguration;

    /**
     * 保存所有的service以及对应的Netty提供者的IP:PORT
     */
    private Map<String, List<String>> providerMap;

    private static Logger logger = LoggerFactory.getLogger(RegisterCenterInvokerImpl.class);

    /**
     * Zookeeper连接器
     */
    private ZooKeeperHelper zooKeeperHelper;

    public RegisterCenterInvokerImpl(ZookeeperConfiguration zookeeperConfiguration) {
        this.zookeeperConfiguration = zookeeperConfiguration;
    }


    @Override
    public void initProviderMap() {
        zooKeeperHelper = new ZooKeeperHelper(zookeeperConfiguration.getUrl(), new ClientNodeChangeWatcher());
        providerMap = new ConcurrentHashMap<>();
        updateProviderMap();

    }

    @Override
    public void updateProviderMap() {
        synchronized (this){
            providerMap.clear();
            List<String> serverList = zooKeeperHelper.getChildren(zookeeperConfiguration.getPath());
            if (serverList == null || serverList.isEmpty()) {
                return;
            }
            for (String server : serverList) {
                String serverPath = zookeeperConfiguration.getPath() + "/" + server;
                List<String> providerList = zooKeeperHelper.getChildren(serverPath);
                if (providerList!=null&&!providerList.isEmpty()){
                    providerMap.put(server,providerList);
                }
            }
            logger.info("update socket server finish. server list is:{}", gson.toJson(providerMap));
        }
    }

    @Override
    public Map<String, List<String>> getProviderMap() {
        return providerMap;
    }


    class ClientNodeChangeWatcher implements Watcher {

        public void process(WatchedEvent event) {
            if (event.getType() == Event.EventType.NodeChildrenChanged) {
                logger.info("Watch received NodeChildrenChanged event");
                updateProviderMap();
            }
        }
    }
}
  • Netty处理

Netty服务则是在检测到项目中包含@RpcServer时启动Netty服务。在检测到正在执行含有@RpcClient注解的类方法时拦截该方法,如果还未和服务端建立连接,则建立长连接,进行RPC通信(通信完成连接不断开,考虑到是服务端RPC通信,此处长连接更适合),调用服务端代码,如返回有正确结果则替换服务端返回的结果,异常时继续调用客户端方法的返回结果。

代码实现

ClientAspect //客户端拦截器

package one.bugu.zookeeper.rpc.framework.aspect;

import one.bugu.zookeeper.rpc.framework.annotations.RpcClient;
import one.bugu.zookeeper.rpc.framework.client.socket.ClientRequestPool;
import com.google.gson.Gson;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;

/**
 * Created with IntelliJ IDEA.
 * User: LangK
 * Created Date 2019/2/12
 * Time: 18:28
 * Description:客户端拦截器
 */
@Aspect
@Component
public class ClientAspect {

    @Autowired
    private ClientRequestPool clientRequestPool;

    private Gson gson = new Gson();

    private Logger logger = LoggerFactory.getLogger(ClientAspect.class);

    /**
     * 切入所有注有RpcClient注解实体类的方法
     * @param pjp
     * @return
     * @throws Throwable
     */
    @Around("@within(one.bugu.zookeeper.rpc.framework.annotations.RpcClient)")
    public Object doSocket(ProceedingJoinPoint pjp) throws Throwable {
        RpcClient an = (RpcClient) pjp.getSignature().getDeclaringType().getAnnotation(RpcClient.class);
        String serverName = an.serverName();
        String ip = an.serverIp();
        String method = pjp.getSignature().getName();
        String resultObject;
        if (StringUtils.isEmpty(ip)) {
            resultObject = clientRequestPool.send(serverName, method, pjp.getArgs());
        } else {
            resultObject = clientRequestPool.send(serverName, ip, method, pjp.getArgs());
        }
        Object object = pjp.proceed();
        if (resultObject != null) {
            try {
                return gson.fromJson(resultObject, object.getClass());
            } catch (Exception e) {
                logger.info("RPC接收结果转换异常:{}", resultObject);
                return object;
            }
        }
        return object;
    }

}

大体上的思路就是这样,具体代码当然少不了封装。这里就不过多的贴代码了,有兴趣的伙伴可以去LangK开源 / RpcFramework下载项目。

通过该项目,熟悉了使用Zookeeper做注册中心的关键功能。也对Netty框架NIO有了更深刻的理解。算是为2019年有个好的开始。

当然如果你有好的设计思想,可以一起参与完善该项目。
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,406评论 6 503
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,732评论 3 393
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 163,711评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,380评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,432评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,301评论 1 301
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,145评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,008评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,443评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,649评论 3 334
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,795评论 1 347
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,501评论 5 345
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,119评论 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,731评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,865评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,899评论 2 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,724评论 2 354

推荐阅读更多精彩内容