Disruptor高级(二)串行消费 & 并行消费

示例代码基础框架

Event
import java.util.concurrent.atomic.AtomicInteger;

/**
 * Disruptor中的 Event
 */
public class Trade {

    private String id;
    private String name;
    private double price;
    private AtomicInteger count = new AtomicInteger(0);

    public Trade() {}
    
    public String getId() {
        return id;
    }
    public void setId(String id) {
        this.id = id;
    }
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
    public double getPrice() {
        return price;
    }
    public void setPrice(double price) {
        this.price = price;
    }
    public AtomicInteger getCount() {
        return count;
    }
    public void setCount(AtomicInteger count) {
        this.count = count;
    }
    
}
生产者
  • 使用disruptor.publishEvent(eventTranslator)提交Event到容器中,区别于ringBuffer.publish(sequence);
  • disruptor.publishEvent(eventTranslator)的参数是com.lmax.disruptor.EventTranslator的实现类,其提供了一个Event的空对象,具体实现将填充空Event对象,完成Event的生产;
import java.util.Random;
import java.util.concurrent.CountDownLatch;

import com.lmax.disruptor.EventTranslator;
import com.lmax.disruptor.dsl.Disruptor;

public class TradePushlisher implements Runnable {

    private Disruptor<Trade> disruptor;
    private CountDownLatch latch;
    
    private static int PUBLISH_COUNT = 1;
    
    public TradePushlisher(CountDownLatch latch, Disruptor<Trade> disruptor) {
        this.disruptor = disruptor;
        this.latch = latch;
    }

    public void run() {
        TradeEventTranslator eventTranslator = new TradeEventTranslator();
        for(int i =0; i < PUBLISH_COUNT; i ++){
            //新的提交任务的方式
            disruptor.publishEvent(eventTranslator);            
        }
        latch.countDown();
    }
}


class TradeEventTranslator implements EventTranslator<Trade> {

    private Random random = new Random();

    public void translateTo(Trade event, long sequence) {
        this.generateTrade(event);
    }

    private void generateTrade(Trade event) {
        event.setPrice(random.nextDouble() * 9999);
    }
    
}
消费者

注:消费者既可以通过实现EventHandler成为消费者,也可以通过实现WorkHandler成为消费者;

  • 消费者1:重置Event的name;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.WorkHandler;

public class Handler1 implements EventHandler<Trade>, WorkHandler<Trade>{

    //EventHandler
    public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {
        this.onEvent(event);
    }

    //WorkHandler
    public void onEvent(Trade event) throws Exception {
        System.err.println("handler 1 : SET NAME");
        Thread.sleep(1000);
        event.setName("H1");
    }

}
  • 消费者2:设置ID
import java.util.UUID;

import com.lmax.disruptor.EventHandler;

public class Handler2 implements EventHandler<Trade> {

    public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {
        System.err.println("handler 2 : SET ID");
        Thread.sleep(2000);
        event.setId(UUID.randomUUID().toString());
    }

}
  • 消费者3:输出Event信息
import com.lmax.disruptor.EventHandler;

public class Handler3 implements EventHandler<Trade> {

    public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {
        System.err.println("handler 3 : NAME: " 
                                + event.getName() 
                                + ", ID: " 
                                + event.getId()
                                + ", PRICE: " 
                                + event.getPrice()
                                + " INSTANCE : " + event.toString());
    }

}
  • 消费者4:设置Event价格;
import com.lmax.disruptor.EventHandler;

public class Handler4 implements EventHandler<Trade> {

    public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {
        System.err.println("handler 4 : SET PRICE");
        Thread.sleep(1000);
        event.setPrice(17.0);
    }

}
  • 消费者5:原价格基础上加3;
import com.lmax.disruptor.EventHandler;

public class Handler5 implements EventHandler<Trade> {

    public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {
        System.err.println("handler 5 : GET PRICE: " +  event.getPrice());
        Thread.sleep(1000);
        event.setPrice(event.getPrice() + 3.0);
    }

}

串行消费示例

  • 通过disruptor对handleEventsWith(final EventHandler<? super T>... handlers)的链式调用,制定Consumer的消费顺序;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import com.lmax.disruptor.BusySpinWaitStrategy;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.EventHandlerGroup;
import com.lmax.disruptor.dsl.ProducerType;

public class Main {
    
    @SuppressWarnings("unchecked")
    public static void main(String[] args) throws Exception {
            
        //构建一个线程池用于提交任务
        ExecutorService es1 = Executors.newFixedThreadPool(1);
        ExecutorService es2 = Executors.newFixedThreadPool(5);
        //1 构建Disruptor
        Disruptor<Trade> disruptor = new Disruptor<Trade>(
                new EventFactory<Trade>() {
                    public Trade newInstance() {
                        return new Trade();
                    }
                },
                1024*1024,
                es2,
                ProducerType.SINGLE,
                new BusySpinWaitStrategy());        
        
        //2 把消费者设置到Disruptor中 handleEventsWith      
        //2.1 串行操作:
        disruptor
        .handleEventsWith(new Handler1())
        .handleEventsWith(new Handler2())
        .handleEventsWith(new Handler3());
        
        //3 启动disruptor
        RingBuffer<Trade> ringBuffer = disruptor.start();
        
        CountDownLatch latch = new CountDownLatch(1);
        
        long begin = System.currentTimeMillis();
        
        es1.submit(new TradePushlisher(latch, disruptor));
        
        latch.await();  //进行向下
        
        disruptor.shutdown();
        es1.shutdown();
        es2.shutdown();
        System.err.println("总耗时: " + (System.currentTimeMillis() - begin));
        
    }

}

输出:

  • 3个Consumer顺序消费;

handler 1 : SET NAME
handler 2 : SET ID
handler 3 : NAME: H1, ID: f42732e1-7c8e-4373-a1c2-0850dbadd4c7, PRICE: 6128.80919458283 INSTANCE : com.bfxy.disruptor.heigh.chain.Trade@edba98c
总耗时: 4629

并行消费示例

  • 有2种实现并行消费的代码编写方式
    • 在handleEventsWith方法中添加多个handler实现即可;
    • 依次调用handleEventsWith方法;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import com.lmax.disruptor.BusySpinWaitStrategy;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.EventHandlerGroup;
import com.lmax.disruptor.dsl.ProducerType;

public class Main {
    
    @SuppressWarnings("unchecked")
    public static void main(String[] args) throws Exception {
            
        //构建一个线程池用于提交任务
        ExecutorService es1 = Executors.newFixedThreadPool(1);
        ExecutorService es2 = Executors.newFixedThreadPool(5);
        //1 构建Disruptor
        Disruptor<Trade> disruptor = new Disruptor<Trade>(
                new EventFactory<Trade>() {
                    public Trade newInstance() {
                        return new Trade();
                    }
                },
                1024*1024,
                es2,
                ProducerType.SINGLE,
                new BusySpinWaitStrategy());        
        
        //2 把消费者设置到Disruptor中 handleEventsWith      
        //2.2 并行操作: 可以有两种方式去进行
        //1 handleEventsWith方法 添加多个handler实现即可
        //2 handleEventsWith方法 分别进行调用
        disruptor.handleEventsWith(new Handler1(), new Handler2(), new Handler3());
 //     disruptor.handleEventsWith(new Handler1());
//      disruptor.handleEventsWith(new Handler2());
//      disruptor.handleEventsWith(new Handler3());
        
        //3 启动disruptor
        RingBuffer<Trade> ringBuffer = disruptor.start();
        
        CountDownLatch latch = new CountDownLatch(1);
        
        long begin = System.currentTimeMillis();
        
        es1.submit(new TradePushlisher(latch, disruptor));
        
        latch.await();  //进行向下
        
        disruptor.shutdown();
        es1.shutdown();
        es2.shutdown();
        System.err.println("总耗时: " + (System.currentTimeMillis() - begin));
        
    }

}

输出:

  • 3个消费者并行消费;

handler 1 : SET NAME
handler 2 : SET ID
handler 3 : NAME: null, ID: null, PRICE: 5878.873178063168 INSTANCE : com.bfxy.disruptor.heigh.chain.Trade@edba98c
总耗时: 3715

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,277评论 6 503
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,689评论 3 393
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 163,624评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,356评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,402评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,292评论 1 301
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,135评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,992评论 0 275
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,429评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,636评论 3 334
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,785评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,492评论 5 345
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,092评论 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,723评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,858评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,891评论 2 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,713评论 2 354

推荐阅读更多精彩内容