public class DualChannel extends BasicChannelSemantics {
private static final Logger logger = LoggerFactory.getLogger(DualChannel.class);
/****************************** fileChannel **********************************/
private volatile boolean open = false;
private static final String PUT = "PUT";
private static final String TAKE = "TAKE";
/**************************** else ************************************/
private AtomicBoolean putToMemChannel = new AtomicBoolean(true);
private AtomicBoolean takeFromMemChannel = new AtomicBoolean(true);
private FileChannel fileChannel;
private MemoryChannel memoryChannel;
private FileChannel.FileBackedTransaction fileBackedTransaction;
private MemoryChannel.MemoryTransaction memoryTransaction;
public DualChannel() {
this.fileChannel = new FileChannel();
this.memoryChannel = new MemoryChannel();
}
@Override
protected BasicTransactionSemantics createTransaction() {
return null;
}
@Override
protected BasicTransactionSemantics createTransaction(String action) {
logger.info("method createTransaction ( DualTransaction ) before......");
BasicTransactionSemantics basicTransactionSemantics = null;
if (open) {
if (PUT.equals(action)) {//获取
if (putToMemChannel.get()) {//
basicTransactionSemantics = memoryChannel.createTransaction();
memoryTransaction = (MemoryChannel.MemoryTransaction) basicTransactionSemantics;
} else {
basicTransactionSemantics = fileChannel.createTransaction();
fileBackedTransaction = (FileChannel.FileBackedTransaction) fileChannel.createTransaction();
}
} else if (TAKE.equals(action)) {
if (takeFromMemChannel.get()) {
basicTransactionSemantics = memoryChannel.createTransaction();
memoryTransaction = (MemoryChannel.MemoryTransaction) basicTransactionSemantics;
} else {
basicTransactionSemantics = fileChannel.createTransaction();
fileBackedTransaction = (FileChannel.FileBackedTransaction) fileChannel.createTransaction();
}
}
}
return basicTransactionSemantics;
}
@Override
public synchronized void setName(String name) {
logger.info("method setName before......" + name);
fileChannel.setName(name);
memoryChannel.setName(name);
super.setName(name);
}
@Override
public void configure(Context context) {
logger.info("method configgure before......" + context.toString());
this.memoryChannel.configure(context);
this.fileChannel.configure(context);
}
@Override
public synchronized void start() {
super.start();
open = true;
logger.info("method start before......");
this.fileChannel.start();
this.memoryChannel.start();
logger.info("method start after......");
}
@Override
public synchronized void stop() {
super.stop();
open = false;
logger.info("method stop before......");
this.fileChannel.stop();
this.memoryChannel.stop();
logger.info("method stop after......");
}
@Override
public void put(Event event) throws ChannelException {
logger.info("method put before......");
if (open) {
if (putToMemChannel.get()) {
//往memChannel中写数据
memoryTransaction.put(event);
if (memoryChannel.isFull()) {
putToMemChannel.set(false);
}
} else {
//往fileChannel中写数据
fileBackedTransaction.put(event);
}
}
logger.info("method put after......");
}
@Override
public Event take() throws ChannelException {//put 和 take 事务紊乱
logger.info("method take before......");
Event event = null;
if (open) {
if (takeFromMemChannel.get()) {
//从memChannel中取数据
event = memoryTransaction.take();
if (event == null) {
takeFromMemChannel.set(false);
}
} else {
//从fileChannel中取数据
event = fileBackedTransaction.take();
if (event == null) {
takeFromMemChannel.set(true);
putToMemChannel.set(true);
}
}
logger.info("method take after......");
}
return event;
}
}