凤玉写了一个分布式事务方案,做一个记录。
wiki:
ppt:
/**
* 提交分布式事务
*/
public void commit(CommitReq req) throws BusinessException, ConcurrentWriteException {
List<AssetTrans> transPOs = assetTransDao.getByTransId(req.getMemberId(), req.getTransId());
if (transPOs.size() == 0) {
return;
}
// 删除事务记录
// 并发删除的时候,先执行删除操作的事务T1未结束的情况下,后执行删除操作的事务T2会阻塞,如果T1提交事务成功了,则T2 deleteByTransId会返回0
int result = assetTransDao.deleteByTransId(req.getMemberId(), req.getTransId());
if (result == 0) {
return;
}
...
//根据类型进行操作
}
/**
* 回滚分布式事务
*/
public void rollback(RollbackReq req) throws BusinessException, ConcurrentWriteException {
List<AssetTrans> transPOs = assetTransDao.getByTransId(req.getMemberId(), req.getTransId());
if (transPOs.size() == 0) {
return;
}
// 删除事务记录
int result = assetTransDao.deleteByTransId(req.getMemberId(), req.getTransId());
if (result == 0) {
return;
}
for (AssetTrans transPO : transPOs) {
// 添加积分操作回滚事务
if (transPO.getProcessType().equals(AssetProcessType.ADD_MEMBER_POINT.getValue())) {
// do nothing
}
// 消费积分操作回滚事务
if (transPO.getProcessType().equals(AssetProcessType.PAY_MEMBER_POINT.getValue())) {
PayMemberPointTransData transData = FastJsonUtils.parse(transPO.data, PayMemberPointTransData.class);
pointService.rollbackPayMemberPoint(transData);
}
// 取消添加积分操作回滚事务
if (transPO.getProcessType().equals(AssetProcessType.CANCEL_ADD_POINT.getValue())) {
CancelPointAddTransData transData = FastJsonUtils.parse(transPO.data, CancelPointAddTransData.class);
pointService.rollbackCancelAddPoint(transData);
}
//取消撤销消费积分操作
if (transPO.getProcessType().equals(AssetProcessType.CANCEL_PAY_POINT.getValue())) {
// do nothing
}
//添加储值回滚事务
if (transPO.getProcessType().equals(AssetProcessType.ADD_MEMBER_BALANCE.getValue())) {
//do nothing
}
//消费储值操作回滚事务
if (transPO.getProcessType().equals(AssetProcessType.PAY_MEMBER_BALANCE.getValue())) {
PayMemberBalanceTransData transData = FastJsonUtils.parse(transPO.getData(), PayMemberBalanceTransData.class);
balanceService.rollbackPayMemberBalance(transData);
}
//取消增加储值操作回滚事务
if (transPO.getProcessType().equals(AssetProcessType.CANCEL_ADD_BALANCE.getValue())) {
CancelAddBalanceTransData transData = FastJsonUtils.parse(transPO.getData(), CancelAddBalanceTransData.class);
balanceService.rollbackCancelAddBalance(transData);
}
//取消消费储值操作回滚事务
if (transPO.getProcessType().equals(AssetProcessType.CANCEL_PAY_BALANCE.getValue())) {
//do nothing
}
}
}
}