示例代码基础框架
Event
import java.util.concurrent.atomic.AtomicInteger;
/**
* Disruptor中的 Event
*/
public class Trade {
private String id;
private String name;
private double price;
private AtomicInteger count = new AtomicInteger(0);
public Trade() {}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public double getPrice() {
return price;
}
public void setPrice(double price) {
this.price = price;
}
public AtomicInteger getCount() {
return count;
}
public void setCount(AtomicInteger count) {
this.count = count;
}
}
生产者
- 使用disruptor.publishEvent(eventTranslator)提交Event到容器中,区别于ringBuffer.publish(sequence);
- disruptor.publishEvent(eventTranslator)的参数是com.lmax.disruptor.EventTranslator的实现类,其提供了一个Event的空对象,具体实现将填充空Event对象,完成Event的生产;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import com.lmax.disruptor.EventTranslator;
import com.lmax.disruptor.dsl.Disruptor;
public class TradePushlisher implements Runnable {
private Disruptor<Trade> disruptor;
private CountDownLatch latch;
private static int PUBLISH_COUNT = 1;
public TradePushlisher(CountDownLatch latch, Disruptor<Trade> disruptor) {
this.disruptor = disruptor;
this.latch = latch;
}
public void run() {
TradeEventTranslator eventTranslator = new TradeEventTranslator();
for(int i =0; i < PUBLISH_COUNT; i ++){
//新的提交任务的方式
disruptor.publishEvent(eventTranslator);
}
latch.countDown();
}
}
class TradeEventTranslator implements EventTranslator<Trade> {
private Random random = new Random();
public void translateTo(Trade event, long sequence) {
this.generateTrade(event);
}
private void generateTrade(Trade event) {
event.setPrice(random.nextDouble() * 9999);
}
}
消费者
注:消费者既可以通过实现EventHandler成为消费者,也可以通过实现WorkHandler成为消费者;
- 消费者1:重置Event的name;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.WorkHandler;
public class Handler1 implements EventHandler<Trade>, WorkHandler<Trade>{
//EventHandler
public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {
this.onEvent(event);
}
//WorkHandler
public void onEvent(Trade event) throws Exception {
System.err.println("handler 1 : SET NAME");
Thread.sleep(1000);
event.setName("H1");
}
}
- 消费者2:设置ID
import java.util.UUID;
import com.lmax.disruptor.EventHandler;
public class Handler2 implements EventHandler<Trade> {
public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {
System.err.println("handler 2 : SET ID");
Thread.sleep(2000);
event.setId(UUID.randomUUID().toString());
}
}
- 消费者3:输出Event信息
import com.lmax.disruptor.EventHandler;
public class Handler3 implements EventHandler<Trade> {
public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {
System.err.println("handler 3 : NAME: "
+ event.getName()
+ ", ID: "
+ event.getId()
+ ", PRICE: "
+ event.getPrice()
+ " INSTANCE : " + event.toString());
}
}
- 消费者4:设置Event价格;
import com.lmax.disruptor.EventHandler;
public class Handler4 implements EventHandler<Trade> {
public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {
System.err.println("handler 4 : SET PRICE");
Thread.sleep(1000);
event.setPrice(17.0);
}
}
- 消费者5:原价格基础上加3;
import com.lmax.disruptor.EventHandler;
public class Handler5 implements EventHandler<Trade> {
public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {
System.err.println("handler 5 : GET PRICE: " + event.getPrice());
Thread.sleep(1000);
event.setPrice(event.getPrice() + 3.0);
}
}
串行消费示例
- 通过disruptor对handleEventsWith(final EventHandler<? super T>... handlers)的链式调用,制定Consumer的消费顺序;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import com.lmax.disruptor.BusySpinWaitStrategy;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.EventHandlerGroup;
import com.lmax.disruptor.dsl.ProducerType;
public class Main {
@SuppressWarnings("unchecked")
public static void main(String[] args) throws Exception {
//构建一个线程池用于提交任务
ExecutorService es1 = Executors.newFixedThreadPool(1);
ExecutorService es2 = Executors.newFixedThreadPool(5);
//1 构建Disruptor
Disruptor<Trade> disruptor = new Disruptor<Trade>(
new EventFactory<Trade>() {
public Trade newInstance() {
return new Trade();
}
},
1024*1024,
es2,
ProducerType.SINGLE,
new BusySpinWaitStrategy());
//2 把消费者设置到Disruptor中 handleEventsWith
//2.1 串行操作:
disruptor
.handleEventsWith(new Handler1())
.handleEventsWith(new Handler2())
.handleEventsWith(new Handler3());
//3 启动disruptor
RingBuffer<Trade> ringBuffer = disruptor.start();
CountDownLatch latch = new CountDownLatch(1);
long begin = System.currentTimeMillis();
es1.submit(new TradePushlisher(latch, disruptor));
latch.await(); //进行向下
disruptor.shutdown();
es1.shutdown();
es2.shutdown();
System.err.println("总耗时: " + (System.currentTimeMillis() - begin));
}
}
输出:
- 3个Consumer顺序消费;
handler 1 : SET NAME
handler 2 : SET ID
handler 3 : NAME: H1, ID: f42732e1-7c8e-4373-a1c2-0850dbadd4c7, PRICE: 6128.80919458283 INSTANCE : com.bfxy.disruptor.heigh.chain.Trade@edba98c
总耗时: 4629
并行消费示例
- 有2种实现并行消费的代码编写方式
- 在handleEventsWith方法中添加多个handler实现即可;
- 依次调用handleEventsWith方法;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import com.lmax.disruptor.BusySpinWaitStrategy;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.EventHandlerGroup;
import com.lmax.disruptor.dsl.ProducerType;
public class Main {
@SuppressWarnings("unchecked")
public static void main(String[] args) throws Exception {
//构建一个线程池用于提交任务
ExecutorService es1 = Executors.newFixedThreadPool(1);
ExecutorService es2 = Executors.newFixedThreadPool(5);
//1 构建Disruptor
Disruptor<Trade> disruptor = new Disruptor<Trade>(
new EventFactory<Trade>() {
public Trade newInstance() {
return new Trade();
}
},
1024*1024,
es2,
ProducerType.SINGLE,
new BusySpinWaitStrategy());
//2 把消费者设置到Disruptor中 handleEventsWith
//2.2 并行操作: 可以有两种方式去进行
//1 handleEventsWith方法 添加多个handler实现即可
//2 handleEventsWith方法 分别进行调用
disruptor.handleEventsWith(new Handler1(), new Handler2(), new Handler3());
// disruptor.handleEventsWith(new Handler1());
// disruptor.handleEventsWith(new Handler2());
// disruptor.handleEventsWith(new Handler3());
//3 启动disruptor
RingBuffer<Trade> ringBuffer = disruptor.start();
CountDownLatch latch = new CountDownLatch(1);
long begin = System.currentTimeMillis();
es1.submit(new TradePushlisher(latch, disruptor));
latch.await(); //进行向下
disruptor.shutdown();
es1.shutdown();
es2.shutdown();
System.err.println("总耗时: " + (System.currentTimeMillis() - begin));
}
}
输出:
- 3个消费者并行消费;
handler 1 : SET NAME
handler 2 : SET ID
handler 3 : NAME: null, ID: null, PRICE: 5878.873178063168 INSTANCE : com.bfxy.disruptor.heigh.chain.Trade@edba98c
总耗时: 3715