任务队列实现心得

近日,研究了一下任务队列,于是想到了阻塞队列BlockingQueue,不得不提起到它的两个方法,put,take这两个方法都是阻塞式的,当队列满时,put方法阻塞,当队列空时,take方法阻塞.

我们使用BlockingQueue,只是完成了基本功能FIFO,就是任务先进先出。我们想要实现任务LIFO,这种方式就不行了。需要使用到BlockingDque,双端队列,在队列的两端都可以插入和获取元素,还有阻塞的这一特性。所以我们在使用中,使用的是BlockingDque,来实现我们的需求。

非阻塞队列-FIFO

public class NoneBlockingQTesterFIFO implements ITester {
  private static int MAX_SIZE = 10;
  private static LinkedList<Integer> queue = new LinkedList<>();
  private Consumer consumer;
  private Producer producer;
  private static Random random = new Random();

  public NoneBlockingQTesterFIFO() {
    consumer = new Consumer();
    producer = new Producer();
  }

  @Override
  public void test() {
    producer.start();
    consumer.start();
  }

  public static boolean isFull() {
    return queue.size() == MAX_SIZE;
  }

  public static boolean isEmpty() {
    return queue.size() == 0;
  }

  static class Producer extends Thread {

    int c = 0;

    @Override
    public void run() {
      System.out.println("Producer run consume ...");
      produce();
    }

    private void produce() {
      System.out.println("Producer run produce ...");
      while (c <= 20) {
        synchronized (queue) {
          while (queue.size() == MAX_SIZE) {
            try {
              System.out.println("队列满了...");
              queue.wait();
            } catch (InterruptedException e) {
              e.printStackTrace();
              // 通知其它线程取数据
              queue.notify();
            }
          }

          System.out.println("");
          // 每次插入一个元素
          int v = random.nextInt(100);
          queue.offer(v);
          c++;
          System.out.println("添加了数据: " + v);
          queue.notify();
          // System.out.println("向队列中添加了一个元素,剩余空间 = " + (MAX_SIZE - queue.size()));
        }
      }
    }
  }

  static class Consumer extends Thread {

    @Override
    public void run() {
      System.out.println("Consumer run...");
      consume();
    }

    private void consume() {
      System.out.println("Consumer run consume ...");
      while (true) {
        synchronized (queue) {
          while (queue.size() == 0) {
            try {
              System.out.println("队列空了...");
              queue.wait();
            } catch (InterruptedException e) {
              e.printStackTrace();
              queue.notify();
            }
          }
          System.out.println("");
          // 每次移走队首元素
          Integer v = queue.poll();
          System.out.println("取到了数据 " + v);
          queue.notify();
          // System.out.println("从队列中取走了一个元素,剩余空间 = " + (MAX_SIZE - queue.size()));
        }
      }
    }
  }

  /**
  添加了数据: 45
  添加了数据: 58
  添加了数据: 82
  添加了数据: 20
  添加了数据: 29
  添加了数据: 18
  添加了数据: 39
  添加了数据: 83
  添加了数据: 82
  添加了数据: 41
  队列满了...
  取到了数据 45
  取到了数据 58
  取到了数据 82
  取到了数据 20
  取到了数据 29
  取到了数据 18
  取到了数据 39
  取到了数据 83
  取到了数据 82
  取到了数据 41
  队列空了...
  添加了数据: 51
  添加了数据: 91
  添加了数据: 76
  添加了数据: 46
  添加了数据: 34
  添加了数据: 38
  添加了数据: 19
  添加了数据: 36
  添加了数据: 86
  添加了数据: 14
  队列满了...
  取到了数据 51
  取到了数据 91
  取到了数据 76
  取到了数据 46
  取到了数据 34
  取到了数据 38
  取到了数据 19
  取到了数据 36
  取到了数据 86
  取到了数据 14
   */
}

非阻塞队列的-LIFO

public class NoneBlockingQTesterLIFO implements ITester {
  private static int MAX_SIZE = 10;
  private static LinkedList<Integer> queue = new LinkedList<>();
  private Consumer consumer;
  private Producer producer;
  private static Random random = new Random();

  public NoneBlockingQTesterLIFO() {
    consumer = new Consumer();
    producer = new Producer();
  }

  @Override
  public void test() {
    producer.start();
    consumer.start();
  }

  public static boolean isFull() {
    return queue.size() == MAX_SIZE;
  }

  public static boolean isEmpty() {
    return queue.size() == 0;
  }

  static class Producer extends Thread {

    int c = 0;

    @Override
    public void run() {
      System.out.println("Producer run consume ...");
      produce();
    }

    private void produce() {
      System.out.println("Producer run produce ...");
      while (c <= 20) {
        synchronized (queue) {
          while (queue.size() == MAX_SIZE) {
            try {
              System.out.println("队列满了...");
              queue.wait();
            } catch (InterruptedException e) {
              e.printStackTrace();
              // 通知其它线程取数据
              queue.notify();
            }
          }

          System.out.println("");
          // 每次插入一个元素
          int v = random.nextInt(100);
          queue.offerFirst(v);
          c++;
          System.out.println("添加了数据: " + v);
          queue.notify();
          // System.out.println("向队列中添加了一个元素,剩余空间 = " + (MAX_SIZE - queue.size()));
        }
      }
    }
  }

  static class Consumer extends Thread {

    @Override
    public void run() {
      System.out.println("Consumer run...");
      consume();
    }

    private void consume() {
      System.out.println("Consumer run consume ...");
      while (true) {
        synchronized (queue) {
          while (queue.size() == 0) {
            try {
              System.out.println("队列空了...");
              queue.wait();
            } catch (InterruptedException e) {
              e.printStackTrace();
              queue.notify();
            }
          }
          System.out.println("");
          // 每次移走队首元素
          Integer v = queue.pollFirst();
          System.out.println("取到了数据 " + v);
          queue.notify();
          // System.out.println("从队列中取走了一个元素,剩余空间 = " + (MAX_SIZE - queue.size()));
        }
      }
    }
  }

  /**
   *添加了数据: 39
    添加了数据: 80
    添加了数据: 58
    添加了数据: 87
    添加了数据: 73
    添加了数据: 94
    添加了数据: 0
    添加了数据: 52
    添加了数据: 23
    添加了数据: 59
    队列满了...
    取到了数据 59
    取到了数据 23
    取到了数据 52
    取到了数据 0
    取到了数据 94
    取到了数据 73
    取到了数据 87
    取到了数据 58
    取到了数据 80
    取到了数据 39
    队列空了...
    添加了数据: 38
    添加了数据: 42
    添加了数据: 87
    添加了数据: 53
    添加了数据: 33
    添加了数据: 32
    添加了数据: 68
    添加了数据: 13
    添加了数据: 59
    添加了数据: 43
    队列满了...
    取到了数据 43
    取到了数据 59
    取到了数据 13
    取到了数据 68
    取到了数据 32
    取到了数据 33
    取到了数据 53
    取到了数据 87
    取到了数据 42
    取到了数据 38
    队列空了...
    添加了数据: 39
    取到了数据 39
    队列空了...
   */

}

阻塞队列之双端队列的使用

public class LinkedBlockingDqTester implements Itester {

  int MAX_ITEM_SIZE = 10;
  private LinkedBlockingDeque<Integer> queue = new LinkedBlockingDeque<>(MAX_ITEM_SIZE);
  private int mFlag;

  public LinkedBlockingDqTester(int flag) {
    this.mFlag = flag;
  }

  @Override
  public void test() {
    switch (mFlag) {
      case 0:
        fifo();
        break;
      case 1:
        lifo();
        break;
    }
  }

  // put -> [9,8,7,6,5,4,3,2,1,0]
  // get -> [9]
  // [8,7,6,5,4,3,2,1,0]
  // put -> [10,8,7,6,5,4,3,2,1,0]
  // get -> [10]
  // [8,7,6,5,4,3,2,1,0]
  // put -> [11,8,7,6,5,4,3,2,1,0]
  // get -> [11]
  // [8,7,6,5,4,3,2,1,0]
  // put -> [12,8,7,6,5,4,3,2,1,0]
  // get -> [12]
  // [8,7,6,5,4,3,2,1,0]
  // put -> [13,8,7,6,5,4,3,2,1,0]
  // get -> [13]
  // [8,7,6,5,4,3,2,1,0]
  // put -> [14,8,7,6,5,4,3,2,1,0]
  // get -> [14]
  // [8,7,6,5,4,3,2,1,0]
  // get -> [8]
  // get -> [7]
  // get -> [6]
  // get -> [5]
  // get -> [4]
  // get -> [3]
  // get -> [2]
  // get -> [1]
  // get -> [0]
  /**
   * put -> putFirst
   * take -> takeFirst
   */
  private void lifo() {
    new Thread() {
      public void run() {
        for (int i = 0; i < 15; i++) {
          try {
            // 解决避免线程执行过快,带来的问题
            System.out.println("");
            queue.putFirst(i);
            System.out.println("存数据 " + i);
          } catch (InterruptedException e) {
            e.printStackTrace();
            System.out.println(e);
          }
        }
      };
    }.start();

    new Thread() {
      public void run() {
        try {
          Thread.sleep(3000);
        } catch (InterruptedException e1) {
          e1.printStackTrace();
        }
        while (true) {
          try {
            // 解决避免线程执行过快,带来的问题
            System.out.println("");
            Integer v = queue.takeFirst();
            System.out.println("取数据 = " + v);
          } catch (InterruptedException e) {
            e.printStackTrace();
            System.out.println(e);
          }
        }
      };
    }.start();
  }

  // put(putLast) -> [0,1,2,3,4,5,6,7,8,9]
  // get(getFirst) -> [0,1,2,3]
  // [4,5,6,7,8,9]
  // put(putLast) -> [4,5,6,7,8,9,10]
  // [4,5,6,7,8,9,10]
  // get(getFirst) -> [4,5,6,7]
  // [8,9,10]
  // put(putLast) -> [8,9,10,11]
  // get(getFirst) -> [8,9]
  // [10,11]
  // put(putLast) -> [10,11,12,13]
  // [10,11,12,13]
  // get(getFirst) -> [10]
  // [11,12,13]
  // put(putLast) -> [11,12,13,14]
  // get(getFirst) -> [11]
  // get(getFirst) -> [12]
  // get(getFirst) -> [13]
  // get(getFirst) -> [14]
  /**
   * put -> putLast
   * take -> takeFirst
   */
  private void fifo() {
    new Thread() {
      public void run() {
        for (int i = 0; i < 15; i++) {
          try {
            queue.put(i);
            System.out.println("存数据 " + i);
          } catch (InterruptedException e) {
            e.printStackTrace();
          }
        }
      };
    }.start();

    new Thread() {
      public void run() {
        try {
          Thread.sleep(3000);
        } catch (InterruptedException e1) {
          e1.printStackTrace();
        }
        System.out.println(queue.toArray());
        while (true) {
          try {
            Integer v = queue.take();
            System.out.println("取数据 = " + v);
          } catch (InterruptedException e) {
            e.printStackTrace();
          }
        }
      };
    }.start();
  }

}

重点说明
本项目中的代码,有部分是参照网络上的资料进行的修改,如有雷同,敬请谅解。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,084评论 6 503
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,623评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 163,450评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,322评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,370评论 6 390
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,274评论 1 300
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,126评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,980评论 0 275
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,414评论 1 313
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,599评论 3 334
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,773评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,470评论 5 344
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,080评论 3 327
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,713评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,852评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,865评论 2 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,689评论 2 354