利用zookeeper实现分布式队列

一、zookeeper介绍

        zookeeper是源代码开放的分布式协调服务,由雅虎创建,是Google的开源实现。zookeeper是一个高性能的分布式数据一致性解决方案,它将那些复杂的、容易出错的分布式一致性服务封装起来,构成一个高效可靠的原语集,并提供一系列简单易用的接口给用户使用。

        其本身提供了一致性保证,特点如下:

        (1)顺序一致性:客户端的更新顺序与它们被发送的顺序一致。

        (2)原子性:更新操作要么成功,要么失败,没有第三种结果。

        (3)单系统镜像:无论客户端连接到哪一个服务器,他都将看到相同的zookeeper视图。

        (4)可靠性:一旦一个更新操作被应用,那么在客户端再次更新之前,其值不会再改变。

二、应用场景

        zookeeper可以应用于很多的分布式服务场景,包括:集群管理,master选举,发布/订阅,分布式锁,分布式队列,分布式命名服务,服务注册于发现,负载均衡等等。下面一个例子介绍zookeeper如何实现分布式队列。

三、zookeeper分布式队列实现

        zookeeper分布式队列的实现完成以下几个要素:

        (1)、数据入队,在一个节点下创建有序子节点,节点中设置需要入队的数据,完成数据的入队操作。

        (2)、数据出队,取出该节点下的所有子节点,如果数量不为0,取出一个子节点,并将子节点删除。

        (3)、提供判断是否有数据等的api。

        下面为具体代码实现

1、DistributedSimpleQueue类


public class DistributedSimpleQueue{

        protected final ZkClient zkClient;

        protected final String root;

        protected static final String Node_NAME = "n_";

        public DistributedSimpleQueue(ZkClient zkClient, String root) {

                this.zkClient = zkClient;

                this.root = root;

         }

        public int size() {

            return zkClient.getChildren(root).size();

        }

        public boolean isEmpty() {

            return zkClient.getChildren(root).size() == 0;

        } 

        public boolean offer(T element) throws Exception{ 

                String nodeFullPath = root .concat( "/" ).concat( Node_NAME );

                 try { 

                         zkClient.createPersistentSequential(nodeFullPath , element); 

                 }catch (ZkNoNodeException e) { 

                         zkClient.createPersistent(root); 

                         offer(element); 

                 } catch (Exception e) { 

                     throw ExceptionUtil.convertToRuntimeException(e); 

                 } 

                 return true; 

             }

            @SuppressWarnings("unchecked")

            public T poll() throws Exception {

                   try {

                           List  list = zkClient.getChildren(root);

                           if (list.size() == 0) {

                                return null;

                            }

                            Collections.sort(list, new Comparator() {

                                    public int compare(String lhs, String rhs) {

                                            return getNodeNumber(lhs, Node_NAME).compareTo(getNodeNumber(rhs, Node_NAME));

                                    }

                            });

                            for ( String nodeName : list ){

                                    String nodeFullPath = root.concat("/").concat(nodeName);

                                    try {

                                            T node = (T) zkClient.readData(nodeFullPath);

                                            zkClient.delete(nodeFullPath);

                                            return node;

                                       } catch (ZkNoNodeException e) {

                                                // ignore

                                        }

                            }

                            return null;

                    } catch (Exception e) {

                            throw ExceptionUtil.convertToRuntimeException(e);

                    }

            }

            private String getNodeNumber(String str, String nodeName) {

                    int index = str.lastIndexOf(nodeName);

                    if (index >= 0) {

                        index += Node_NAME.length();

                        return index <= str.length() ? str.substring(index) : "";

                    }

                    return str;

            }

}

2、阻塞队列实现类

public class DistributedBlockingQueue  extends DistributedSimpleQueue{ 

         public DistributedBlockingQueue(ZkClient zkClient, String root) { 

                 super(zkClient, root);

        } 

        @Override

        public T poll() throws Exception {

                while (true){

                        final CountDownLatch latch = new CountDownLatch(1);

                        final IZkChildListener childListener = new IZkChildListener() {

                                public void handleChildChange(String parentPath, List currentChilds) throws Exception {

                                        latch.countDown();

                                }

                        };

                        zkClient.subscribeChildChanges(root, childListener);

                        try{

                                T node = super.poll();

                                if ( node != null ){

                                        return node;

                                }else{

                                        latch.await();

                                }

                        }finally{

                                zkClient.unsubscribeChildChanges(root, childListener);

                        }

                }

            }

}

        阻塞队列的实现利用了CountDownLatch 的特性。当子节点数量为0时,即队列中没有元素,这是线程在此等待,同时监听子节点的变化,如果有数据入队,则从等待返回,取出数据。

3、测试类

public class TestDistributedBlockingQueue {

            public static void main(String[] args) {

                    ScheduledExecutorService delayExector = Executors.newScheduledThreadPool(1);

                    int delayTime = 5;

                    ZkClient zkClient = new ZkClient("192.168.1.105:2181", 5000, 5000, new SerializableSerializer());

                    final DistributedBlockingQueuequeue = new DistributedBlockingQueue(zkClient,"/Queue");

                    final User user1 = new User();

                    user1.setId("1");

                    user1.setName("xiao wang");

                    final User user2 = new User();

                    user2.setId("2");

                    user2.setName("xiao wang");

                    try {

                            delayExector.schedule(new Runnable() {

                                    public void run() {

                                         try {

                                                queue.offer(user1);

                                                queue.offer(user2);

                                            } catch (Exception e) {

                                                    e.printStackTrace();

                                            }

                                        }

                               }, delayTime , TimeUnit.SECONDS);

                               System.out.println("ready poll!");

                                User u1 = (User) queue.poll();

                                User u2 = (User) queue.poll();

                                if (user1.getId().equals(u1.getId()) && user2.getId().equals(u2.getId())){

                                        System.out.println("Success!");

                                }

                    } catch (Exception e) {

                            e.printStackTrace();

                    } finally{

                                delayExector.shutdown();

                    try {

                                delayExector.awaitTermination(2, TimeUnit.SECONDS);

                    } catch (InterruptedException e) {

                    }

            }

        }

}

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,193评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,306评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,130评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,110评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,118评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,085评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,007评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,844评论 0 273
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,283评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,508评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,667评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,395评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,985评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,630评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,797评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,653评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,553评论 2 352

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,651评论 18 139
  • 1. Java基础部分 基础部分的顺序:基本语法,类相关的语法,内部类的语法,继承相关的语法,异常的语法,线程的语...
    子非鱼_t_阅读 31,622评论 18 399
  • ... 一、相关概念 中间件:为分布式系统提供协调服务的组件,如专门用于计算服务的机器就是一个计算型中间件,还有专...
    帅可儿妞阅读 473评论 0 0
  • 一、基本数据类型 注释 单行注释:// 区域注释:/* */ 文档注释:/** */ 数值 对于byte类型而言...
    龙猫小爷阅读 4,259评论 0 16
  • 石min阅读 118评论 0 0