概述
在分布式系统中,使用redis+mq的方式进行商品库存的扣减,是实现高性能与高可用的方案。
本系统设计的思路:
1.下单接口同步调用商品服务商品扣库存接口,商品服务进行库存扣减与预扣日志记录,根据扣除结果返回订单服务。
2.下单成功后订单系统调用支付服务,支付成功后回调订单接口,订单接口发送mq订单成功消息。
3.商品服务收到订单消息修改预扣日志状态。
时序图:
商品库存扣减时序图.png
流程图:
商品库存扣除流程图.png
代码
1.redis扣除库存
参考:https://segmentfault.com/a/1190000021003438?utm_source=tag-newest
伪代码:
@Service
public class StockManager{
@Resource
private RedissonClient redisson;
@Resource
private Jedis jedis;
@Autowire
private ApplicationContext ctx;
// -3:库存未初始化,-2:库存不足 ,大于等于0:扣除后剩余库存
private static Integer NO_STOCK = -3;
private static Integer NO_ENOUGH = -2;
//redis处理的lua脚本
public static final String STOCK_LUA;
static{
StringBuilder sb = new StringBuilder();
sb.append("if (redis.call('exists', KEYS[1]) == 1) then");
sb.append(" local stock = tonumber(redis.call('get', KEYS[1]));");
sb.append(" local num = tonumber(ARGV[1]);");
sb.append(" if (stock == -1) then");
sb.append(" return -1;");
sb.append(" end;");
sb.append(" if (stock >= num) then");
sb.append(" return redis.call('incrby', KEYS[1], 0 - num);");
sb.append(" end;");
sb.append(" return -2;");
sb.append("end;");
sb.append("return -3;");
STOCK_LUA = sb.toString();
}
//对外公开的扣库存方法
@Transactional(rollbackFor = Exception.class)
public void reduceStock(Long skuId,Integer originStock,Integer reduceStock,String orderNo)
{
//获取商品信息
ItemSkuEo itemSkuEo = itemSkuDao.queryBySkuId(skuId);
Integer originStock = itemSkuEo.getStock();
if(originStock<=0){
throw new CustomerException("库存不足");
}
//1.记录预扣库存日志
itemStockLogDao.recordStockLog(skuId,originStock,reduceStock,orderNo);
//2.对redis进行操作
Integer result = this.stock(skuId,reduceStock,originStock);
if(result.equals(NO_STOCK)){
this.initStock(skuId,originStock);
result = this.stock(skuId,reduceStock,originStock);
if(result.equals(NO_STOCK)){
throw new CustomerException("库存不足");
}
}else if(result.equals(NO_ENOUGH)){
throw new CustomerException("库存不足");
}
//3.发送event事件
Map<String,Object> MsgEntiy = new HashMap<>();
MsgEntiy.put("skuId",skuId);
MsgEntiy.put("reduceStock",reduceStock);
MsgEntiy.put("orderNo",orderNo);
CommonListenner.MqEvent mq = new CommonListenner.MqEvent("ITEM","REDUCE_STOCK",MsgEntiy);
ctx.publishEvent(mq);
}
//redis操作库存
private Integer stock(Long skuId,Integer reduceStock,Integer originStock){
List<String> keys = Arrays,asList(skuId.toString());
List<String> args = Arrays,asList(reduceStock.toString());
Object o = jedis.eval(sb.toString,keys,args);
Integer result = (Integer)o;
return result;
}
//redis初始化库存
private void initStock(Long skuId,Integer originStock){
try{
this.getLock(skuId.toString());
cacheService.setnx(skuId.toString(),originStock.toString());
}catch(Exception e){
if(e instanceof CustomerException){
logger.error("获取分布式锁出错");
}
throw e;
}finally{
this.unlock(skuId.toString());
}
}
//获取分布式锁
private String getLock(String lockKey){
//获取分布式锁
Rlock rlock = redisson.getFairLock(lockKey);
try{
rlock.tryLock(5,5,TimeUtil.SECOND);
}catch(Exception e){
logger.error("获取分布式锁失败:{}",e);
throw new CustomerException();
}
}
//释放锁
private String unLock(String lockKey){
//获取分布式锁
Rlock rlock = redisson.getFairLock(lockKey);
try{
rlock.unlock();
}catch(Exception e){
logger.error("解锁失败:{}",e);
}
}
}
2.监听Event事件并发送MQ消息
@Component
public class CommonListenner{
private static String MSG_QUEUE = "ITEM_STOCK_QUEUE";
@Autowire
private RabbitTemplate rabbitTemplate;
//在事务提交后监听消息
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT,condition = "#event.getMsgDomain() == '"+ "ITEM"+"'" )
public void sendMq(MqEvent event){
Map sku2Stock = event.getMsgEntiy();
try{
//发送mq消息,1.扣除数据库库存 2.修改日志状态
rabbitTemplate.convertAndSend(MSG_QUEUE, sku2Stock);
}catch(Exception e){
//mq发送失败,记录失败消息主体
bussinessMessageDao.insertFailMsg(sku2Stock);
logger.error("mq消息发送失败");
}
}
public class MqEvent{
private String msgDomain;
private String msgTag;
private Map msgEntiy;
public MqEvent(String msgDomain,String msgTag,Map msgEntiy){
this.msgDomain = msgDomain;
this.msgTag = msgTag;
this.msgEntiy = msgEntiy;
}
public Integer getMsgTag() {
return msgTag;
}
public void setMsgTag(Integer msgTag) {
this.msgTag = msgTag;
}
public Object getMsgObject() {
return msgObject;
}
public void setMsgObject(Object msgObject) {
this.msgObject = msgObject;
}
public String getMsgDomain() {
return msgDomain;
}
public void setMsgDomain(String msgDomain) {
this.msgDomain = msgDomain;
}
}
}
3.接收mq消息扣除数据库库存
4.定时任务扫描失败消息
技术点分析:
-
redis&lua脚本实现redis原子性操作
redis单线程,保证了单次操作的原子性,但是无法保证两步或多步操作的原子性。
在先取值后更新的两步操作场景下,redis无法保证更新时的值还是原来获取的值。
redis原生支持lua脚本,通过lua脚本实现多步操作的原子性。
但是,这种方式也不是完美的,因为redis只能保证操作同时执行,不能执行事务回滚等操作。
-
TransactionalEventListener实现事务监控
由于发送mq的操作是在库存扣除完成后进行的,发送成功与否,都不应该影响扣库存的结果。
所以,扣除库存的方法发送Event本地事件,只有当方法内的事务commit后,包含@TransactionalEventListener注解方法才会收到Event本地事件进行下一步操作。
-
mq消息解耦
作为订单下单操作比较重要的子项,扣库存是一项并发量大的操作,要求的实时性也比较高,如果扣库存直接操作数据库修改记录,所以做法是先从redis上扣除,再同步操作到数据库,这两步操作不应该是同步操作,因为同步操作适用redis的意义就没有了,通过中间件mq,把需要同步到数据库的