LMAX Disruptor简介

LMAX是什么?

要说Disruptor需要先说下LMAX,LMAX是一个英国外汇黄金交易所,它是第一家也是唯一一家采用多边交易设施Multilateral Trading Facility(MTF),拥有交易所拍照和经纪商拍照的欧洲顶级金融公司。而LMAX所用的Disruptor技术,在一个线程每秒处理6百万订单。没错,这个Disruptor就是我们这里的
Disruptor。
而Disruptor只是LMAX平台一部分,LMAX是一个新型零售金融交易平台,它能够达到低延迟、高吞吐量(大量交易)。这个系统建立在JVM平台上,核心是一个逻辑处理器,每秒能够处理600百万订单。业务逻辑处理器完全运行在内存中(in-memory),使用事件源驱动方式(event sourcing)。而业务逻辑处理器核心是Disruptor,这是一个并发组件,能够在无锁情况下实现网络并发查询操作。他们研究表明,现在所谓的高性能研究方向似乎和现在CPU设计是相左的。

什么是Disruptor

Disruptor

Disruptor实现了队列的功能,而且是一个有界的队列。所以应用场景自然就是"生产者-消费者"模型了。可以看下JDK中的BlockingQuery是一个FIFO队列,生产者(Producer)发布(Publish)一项事件(Event,消息)时,消费者(Consumer)能够获得通知;当队列中没有事件时,消费者会被阻塞,直到生产者发布了新的事件。而Disruptor不仅仅只是这些:

  • 同一个事件可以有多个消费者,消费者之间可以并行处理,可以相互依赖处理
  • 预分配用于存储事件的内存
  • 针对极高的性能目标而实现极度优化和无锁设计

可能你对这种场景还不是很明白,简单说就是当需要两个独立的处理过程(两个线程)之间需要传递数据时,就可以使用Disruptor,当然可以使用队列。

Disruptor中的核心概念

  • Ring Buffer
    环形缓冲区,曾经是Disruptor中的核心对象,不过从3.0版本开始,只负责对通过Disruptor进行交换的数据(事件)进行存储和更新。在一些高级使用中,Ring Buffer可以由用户自定义的来代替。

  • Sequence
    通过递增的序号管理进行交换的数据(事件),对数据(事件)的处理过程总是沿着序号逐个递增处理的。一个Sequence用于跟踪标识某个特定的事件处理者(RingBuffer/Consumer)的进度。虽然可以使用AutomicLong标识进度,但定义Sequence另一个目的是防止CPU缓存伪共享(Disruptor高性能关键点之一)

  • Sequencer
    Sequence是Disruptor的真正核心。这个接口有两个实现类SingleProducerSequence和MultiProducerSequence,它们定义生产者和消费者之间快速、正确地传递数据的并发算法。

  • Sequence Barrier
    保持RingBuffer的main published Sequence和Consumer依赖的其它Sequence的引用。Sequence Barrier还定义决定Consumer是否还有可处理的事件逻辑。

  • Event
    在Disruptor中,在生产者和消费者之间进行交换的数据称为事件(Event)。它不是Disruptor定义的类型,而是使用者自己定义并指定的(可以看作一个Bean)

  • EventHandler
    Disruptor定义的事件处理接口,由用户实现,用于处理事件,是Consumer的真正实现

  • EventProcessor
    EventProcessor持有特定消费者(Consumer)的Sequence,并提供用来调用事件处理实现事件循环(Event loop)

  • Wait Strategy
    定义Consumer如何等待下一个事件策略。(Disruptor提供了多种策略)

  • Producer
    生产者,泛指Disruptor发布事件的代码,Disruptor没有定义特定的接口或类型。

Models

简单Demo

1.定义事件

事件(Event)是Disruptor进行数据交换的数据类型

<pre>
public class PeopleEvent {
private String name;
private Integer age;
private Integer sex;

public void setName(String name) {
    this.name = name;
}

public String getName() {
    return name;
}

public void setAge(Integer age) {
    this.age = age;
}

public void setSex(Integer sex) {
    this.sex = sex;
}

public Integer getAge() {
    return age;
}

public Integer getSex() {
    return sex;
}

}
</pre>

2.定义事件工厂

事件工厂(Event Factory)用来实例化之前的事件(Event),需要实现接口com.lmax.disruptor.EventFactory<>。Disruptor通过EventFactory在RingBuffer中创建Event的实例。一个Event实例实际被用作一个"数据槽",发布者发布前,先从RingBuffer获得一个Event的实例,然后往Event中填充数据,之后发布到RingBuffer中,之后由Consumer获得该Event实例并从中取出数据。

<pre>
public class PeopleEventFactory implements EventFactory<PeopleEvent> {
public PeopleEvent newInstance(){
return new PeopleEvent();
}
}
</pre>

3.定义事件处理的具体实现(业务逻辑核心)
需要实现com.lmax.disruptor.EventHandler<>接口,来定义事件处理的具体逻辑

<pre>
public class PeopleEventHandler implements EventHandler<PeopleEvent> {

public void onEvent(PeopleEvent event, long sequence, boolean endOfBatch) throws Exception {
    System.out.println("name:" + event.getName()+",sex:" + event.getSex() + ",age:" + event.getAge());
}

}
</pre>

4.组合事件处理流程

<pre>
public class DisruptorDemo {

public static void main(String[] args){

    //Executor将用来为消费者构建线程
    Executor executor = Executors.newCachedThreadPool();

    //事件工厂用来创建事件
    PeopleEventFactory peopleEventFactory = new PeopleEventFactory();

    //指定Ring Buffer大小,2的倍数
    int buffSize = 1024;

    /**
     * 构造Disruptor
     * 并发系统提高性能之一就是单一写者原则,如果代码中仅有一个事件生产者,可以设置单一生产者模式来提高系统的性能。
     * 通过ProduceType.SINGLE和ProduceType.MULTI进行控制。
     *
     * 等待策略
     * Disruptor默认的等待策略是BlockingWaitStrategy,使用一个锁和条件变量来控制执行和等待,这是最慢的策略,但也是CPU使用最低
     * 和最稳定的策略。
     * SleepingWaitStrategy:也是CPU使用率低的策略,它使用循环等待并且循环间调用LockSupport.parkNanos(1)来睡眠。它的优点在于
     * 生产线程只需记数,而不执行任何命令,并且没有条件变量的消耗。但是对象从生产者到消费者传递延迟变大了,适用于不需要低延迟的场景,
     * YieldingWaitStrategy:是可以被用作低延迟系统的两个策略之一,这种策略在低延迟同时会增加CPU运算量。YieldingWaitStrategy
     * 会循环等待sequence增加到合适值,循环调用Tread.yield()允许其它准备好的线程执行。如果高性能而且事件消费者线程比逻辑内核少的
     * 时候,推荐使用YieldingWaitStrategy策略。
     * BusySpinWaitStrategy是性能最高的策略,同时也是对部署环境要求最高的策略。这个策略最好用在时间处理线程比物理内核数目还要少的时候。
     */
    Disruptor<PeopleEvent> disruptor = new Disruptor<PeopleEvent>(peopleEventFactory,buffSize,executor,
            ProducerType.SINGLE,new YieldingWaitStrategy());

    //链接处理器
    disruptor.handleEventsWith(new PeopleEventHandler());

    //启动Disruptor,启动所有线程
    disruptor.start();

    //从Disruptor获取RingBuffer,用来发布
    RingBuffer<PeopleEvent> ringBuffer = disruptor.getRingBuffer();

    PeopleEventProducer producer = new PeopleEventProducer(ringBuffer);

    Map<String,Object> map = new HashMap<String, Object>();
    map.put("name","yjz");
    map.put("age",25);
    map.put("sex",1);

    producer.onData(map);

}

}

</pre>

5.事件发布

事件发布包括三个步骤:

  • 从RingBuffer获取一个可以写入事件的序号
  • 获取对应的事件对象,将数据写入事件对象
  • 将事件提交到RingBuffer

事件在提交之后才会通知EventProcessor进行处理。

<pre>
long sequence = ringBuffer.next();
try {
PeopleEvent event = ringBuffer.get(sequence);
event.setName(data.get("name").toString());
event.setAge((Integer) data.get("age"));
event.setSex((Integer) data.get("sex"));
}finally {
ringBuffer.publish(sequence);
}
</code></pre>

RingBuffer.publish必须在finally来确保调用,如果某个sequence未被提交,将会阻塞还需发布或其它的producer。
Disruptor提供了另一种方式简化上述操作,来确保publish总被调用:

<pre>
private static final EventTranslatorOneArg<PeopleEvent,Map<String,Object>> tranlator = new EventTranslatorOneArg<PeopleEvent, Map<String, Object>>() {
public void translateTo(PeopleEvent event, long sequence, Map<String, Object> data) {
event.setName(data.get("name").toString());
event.setAge((Integer) data.get("age"));
event.setSex((Integer) data.get("sex"));
}
};

/**
 * onData用来发布事件,每调用一次就发布一次事件,它的参数会通过事件传递给消费者
 */
public void onData(Map<String,Object> data){
    ringBuffer.publishEvent(tranlator,data);
}  

</code></pre>

事件发布完整代码

<pre>
/**

  • Created by yangjianzhang on 17/2/4.

  • PeopleEventProducer是一个生成事件的源,在这里面通过读取磁盘IO、数据库、network等。当事件源会在IO读取一部分数据时候触发事件(触发事件

  • 不是自动触发的,需要在读取到数据的时候自己触发事件并发布)
    */
    public class PeopleEventProducer {

    private final RingBuffer<PeopleEvent> ringBuffer;

    public PeopleEventProducer(RingBuffer ringBuffer){
    this.ringBuffer = ringBuffer;
    }

    private static final EventTranslatorOneArg<PeopleEvent,Map<String,Object>> tranlator = new EventTranslatorOneArg<PeopleEvent, Map<String, Object>>() {
    public void translateTo(PeopleEvent event, long sequence, Map<String, Object> data) {
    event.setName(data.get("name").toString());
    event.setAge((Integer) data.get("age"));
    event.setSex((Integer) data.get("sex"));
    }
    };

    /**

    • onData用来发布事件,每调用一次就发布一次事件,它的参数会通过事件传递给消费者
      */
      public void onData(Map<String,Object> data){
      ringBuffer.publishEvent(tranlator,data);
      }
      }
      </code></pre>

6.关闭Disruptor

<pre>
disruptor.shutdown();
executor.shutdown();
</pre>

这里只是Disruptor的一个简介,深入了解后再继续分享。

关注我

欢迎关注我的公众号,会定期推送优质技术文章,让我们一起进步、一起成长!
公众号搜索:data_tc
或直接扫码:🔽


欢迎关注我
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,029评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,395评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,570评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,535评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,650评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,850评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,006评论 3 408
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,747评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,207评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,536评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,683评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,342评论 4 330
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,964评论 3 315
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,772评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,004评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,401评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,566评论 2 349

推荐阅读更多精彩内容