欢迎关注作者简书
csdn传送门
1、先熟悉下什么是阻塞队列!
2、代码压测
2.1、公共部分
package com.bfxy.disruptor.ability;
public interface Constants {
int EVENT_NUM_OHM = 100000000;
int EVENT_NUM_FM = 50000000;
int EVENT_NUM_OM = 10000000;
}
package com.bfxy.disruptor.ability;
import java.io.Serializable;
public class Data implements Serializable {
private static final long serialVersionUID = 2035546038986494352L;
private Long id ;
private String name;
public Data() {
}
public Data(Long id, String name) {
super();
this.id = id;
this.name = name;
}
public Long getId() {
return id;
}
public void setId(Long id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}
2.2、ArrayBlockingQueue压测
package com.bfxy.disruptor.ablility;
import com.bfxy.disruptor.ability.Data;
import java.util.concurrent.ArrayBlockingQueue;
public class ArrayBlockingQueue4Test {
public static void main(String[] args) {
final ArrayBlockingQueue<Data> queue = new ArrayBlockingQueue<>(100000000);
final long startTime = System.currentTimeMillis();
// 向容器中添加元素
new Thread(new Runnable() {
@Override
public void run() {
long i = 0;
while(i < Constants.EVENT_NUM_FM) {
try {
queue.put(new Data(i, "c"+i));
} catch (InterruptedException e) {
e.printStackTrace();
}
i++;
}
}
}).start();
// 从容器中取出元素
new Thread(new Runnable() {
@Override
public void run() {
long k = 0;
while (k < Constants.EVENT_NUM_FM) {
try {
queue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
k++;
}
long endTime = System.currentTimeMillis();
System.out.println("ArrayBlockingQueue costTime = " + (endTime - startTime) + "ms");
}
}).start();
}
}
2.3、DisruptorSingle压测
package com.bfxy.disruptor.ability;
import com.lmax.disruptor.EventHandler;
public class DataConsumer implements EventHandler<Data> {
private long startTime;
private int i;
public DataConsumer() {
this.startTime = System.currentTimeMillis();
}
@Override
public void onEvent(Data data, long seq, boolean bool)
throws Exception {
i++;
if (i == Constants.EVENT_NUM_OHM) {
long endTime = System.currentTimeMillis();
System.out.println("Disruptor costTime = " + (endTime - startTime) + "ms");
}
}
}
package com.bfxy.disruptor.ability;
import java.util.concurrent.Executors;
import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.BusySpinWaitStrategy;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.YieldingWaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
public class DisruptorSingle4Test {
public static void main(String[] args) {
int ringBufferSize = 65536;
final Disruptor<Data> disruptor = new Disruptor<Data>(
new EventFactory<Data>() {
@Override
public Data newInstance() {
return new Data();
}
},
ringBufferSize,
Executors.newSingleThreadExecutor(),
ProducerType.SINGLE,
//new BlockingWaitStrategy()
new YieldingWaitStrategy()
);
DataConsumer consumer = new DataConsumer();
//消费数据
disruptor.handleEventsWith(consumer);
disruptor.start();
new Thread(new Runnable() {
@Override
public void run() {
RingBuffer<Data> ringBuffer = disruptor.getRingBuffer();
for (long i = 0; i < Constants.EVENT_NUM_OHM; i++) {
long seq = ringBuffer.next();
Data data = ringBuffer.get(seq);
data.setId(i);
data.setName("c" + i);
ringBuffer.publish(seq);
}
}
}).start();
}
}
性能比较
NUM | Disruptor | BlockingQueue | 性能占比 |
---|---|---|---|
1千万 | 984ms | 2218ms | 2.254 |
5千万 | 4498ms | 10589ms | 2.354 |
1亿 | 8229ms | 21146ms | 2.569 |
欢迎加入Java猿社区!
免费领取我历年收集的所有学习资料哦!