简介
Disruptor是一个无锁有界内存队列开源框架,最大的特点就是性能非常高高高。很多知名项目都有用到它比如我们熟悉的Log4j 2 .
本文主要介绍它如何做到高性能,以及具体的框架设计。
为什么性能这么强大?
主要是因为有这三个法宝:RingBuffer,无锁设计和缓存行填充。
RingBuffer
Disruptor底层采用RingBuffer的结构,RingBuffer大家都知道,就是一个循环使用下标的数组嘛。
计算访问下标的时候,通过取余计算 (cursor % size )来得到数组下标。(一个trick是,当size是2的幂的时候,可以用 cursor & (size - 1) 来快速计算下标。所以Disruptor指定size必须是2的幂。)
用RingBuffer的好处:
- 不需要清数据,用新数据去覆盖旧数据,减少GC
- 底层是数组,充分利用缓存
无锁设计
怎么做到的?
在Disruptor中,生产者和消费者有各自的游标,用来指导需要写入或读取的位置。
消费者对节点的唯一操作是读而不是写,因此不需要加锁。
只有一个生产者的时候,只需要保证生产者的游标不会超过最慢的消费者一圈(即,不会把消费者还没读完的数据覆盖掉)即可,因此不需要锁。
当有多个生产者时,Disruptor采用CAS来保证游标的线程安全。在整个复杂的框架中,只有这一个地方出现多线程竞争修改同一个变量值。具体的交互在后面讲。
(另一方面,用volatile来标记游标,采用内存屏障来代替锁?)
缓存行填充
先来了解一个概念,“伪共享”。
我们知道计算机有多级的缓存体系,越靠近CPU的缓存,速度越快,容量越小。而缓存由很多cache line组成,每个cache line通常是64bytes,所以一个cache line通常可以缓存8个long变量。从内存中拉取数据的时候,会把相邻的数据都一起加载到缓存中。在某些情况下这个缓存行优势会失效,导致并发速度反而下降了,这种情况称为伪共享。以下是一个典型的例子:
假设有两个线程Thread1和Thread2,分别在Core1和Core2上运行。有两个变量head和tail由它们共享,Thread1只读写head, Thread2只读写tail。理想情况下它们不应该有干扰,但是我们可以看到,当Thread1写入head以后,其他Core对应的cache line被都置为失效,也就意味着Core2想要读写tail,需要从内存中重新读取,而这显然是一种浪费。
我们可以通过缓存行填充来解决这类问题:
class LhsPadding
{
protected long p1, p2, p3, p4, p5, p6, p7;
}
class Value extends LhsPadding
{
protected volatile long value;
}
class RhsPadding extends Value
{
protected long p9, p10, p11, p12, p13, p14, p15;
}
在Disruptor中,游标做了类似的处理。
在由我们自己定义的Event类中,也值得这样做。如果有不同的生产者往不同的字段写入(?),我们要确保各个字段之间不会出现伪共享。
具体设计
直觉上理解一下Disruptor的设计:Disruptor通过对游标的管理,保证任何时候只有一个生产者去写一个槽,就省了很多并发问题;只要好好看好游标就行了。
消费
先从比较好理解的消费者讲起。
在Disruptor中,消费者被称为EventProcessor,通过SequenceBarrier和RingBuffer交互。
如上图Stage2所示,事件处理器的最大序号是16.它向SequenceBarrier调用waitFor(17)以获得17格中的数据。如果没有数据写入RingBuffer,Stage2事件处理器将挂起等待下一个序号。但是,如上图所示的情况,RingBuffer已经被填充到18格,所以waitFor函数将返回18并通知事件处理器,它可以读取包括直到18格在内的数据,如下图所示。
你应该已经感觉得到,这样做是怎样有助于平缓延迟的峰值了——以前需要逐个节点地询问“我可以拿下一个数据吗?现在可以了么?现在呢?”,消费者现在只需要简单的说“当你拿到的数字比我这个要大的时候请告诉我”,函数返回值会告诉它有多少个新的节点可以读取数据了。因为这些新的节点的确已经写入了数据(Ring Buffer本身的序号已经更新),而且消费者对这些节点的唯一操作是读而不是写,因此访问不用加锁。这太好了,不仅代码实现起来可以更加安全和简单,而且不用加锁使得速度更快。
另一个好处是——你可以用多个消费者去读同一个RingBuffer ,不需要加锁,也不需要用另外的队列来协调不同的线程(消费者)。这样你可以在Disruptor的协调下实现真正的并发数据处理。
生产
向RingBuffer写入数据需要经过两阶段提交。
只有一个发布者时
首先,发布者必须确定RingBuffer中下一个可以插入的格, RingBuffer持有最近写入格的序号(下图中的18格),从而确定下一个插入格的序号。RingBuffer通过检查所有EventProcessor正在从RingBuffer中读取的当前序号来判断下一个插入格是否空闲,只需要保证下一个插入格已经被所有EventProcessor读取过即可。
发现了下一个插入格:
当发布者得到下一个序号后,它可以获得该格中的对象,并可以对该对象进行任意操作。你可以把格想象成一个简单的可以写入任意值的容器。
同时,在发布者处理19格数据的时候,RingBuffer的序号依然是18,所以其他事件处理器将不会读到19格中的数据。
对象的改动保存进了RingBuffer:
最终,发布者将数据写入19格后,通知RingBuffer发布19格的数据。这时,RingBuffer更新序号并且所有从RingBuffer读数据的事件处理器都可以看到19格中的数据。
总结一下,发布者需要先申请一个可写入的位置,然后再写入然后提交,这是一个明显的两阶段提交设计。
有多个发布者时
单生产者的情况比较简单,当有多个生产者时,申请写入位置的时候就会产生竞争。上文说过,Disruptor采用CAS来保证游标的线程安全。直接上源码吧。这段源码应该是最能体现Disruptor核心设计思想的部分了。
com.lmax.disruptor.MultiProducerSequencer
/**
* @see Sequencer#next(int)
*/
@Override
public long next(int n) //生产者申请分配n个位置
{
if (n < 1 || n > bufferSize)
{
throw new IllegalArgumentException("n must be > 0 and < bufferSize");
}
long current;
long next;
do
{
current = cursor.get(); // cursor是已经分配的位置的头
next = current + n; // 待分配位置的头
long wrapPoint = next - bufferSize; // 待分配位置回退一圈的位置。用于和消费者的位置比较,避免把还没有读的位置也分配了
long cachedGatingSequence = gatingSequenceCache.get(); // 缓存的最慢消费者的位置
if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current) // 如果缓存的最慢消费者的位置不符合条件,则重新进入检查
{
long gatingSequence = Util.getMinimumSequence(gatingSequences, current);
if (wrapPoint > gatingSequence) // 最慢消费者的位置太小,会被生产者覆盖,因此不能完成分配,所以让出CPU。
{
LockSupport.parkNanos(1);
continue;
}
gatingSequenceCache.set(gatingSequence);
}
else if (cursor.compareAndSet(current, next)) // 设置新的游标位置! 由于CAS的特性,多个生产者同时试图修改current游标的时候,只有一个会成功
// 其他的会重新进入循环,获取新的游标位置继续尝试申请。
{
break;
}
}
while (true);
return next;
}
进阶
Disruptor Wizard
提供了一系列API来设置event handlers,并设置它们之间的依赖关系。
WaitStrategy
当消费者等待在SequenceBarrier上时,有许多可选的等待策略,不同的等待策略在延迟和CPU资源的占用上有所不同,可以视应用场景选择。
源码梳理
3.x的代码,感兴趣的同学可以参考: https://zhanghaoxin.blog.csdn.net/article/category/6121943 ,对源码结构有较详细的讲解。或者参考 Disruptor 3.x源码梳理(简版)
参考资料
http://lmax-exchange.github.io/disruptor/files/Disruptor-1.0.pdf 论文
https://tech.meituan.com/2016/11/18/disruptor.html by美团技术团队
http://ifeve.com/disruptor/ 官网发布的系列文章的译文(比较老了,很多是根据1.x和2.x的讲解的,但是用来参考核心思想是没问题的)