准备工作
1.商品库存都以50为例存在redis中
2.商品购买接口
Controller
@RestController
@RequestMapping("/test")
public class TestController {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
@GetMapping("/buy")
public String buy(){
//查询当前商品库存
Integer store = (Integer) redisTemplate.opsForValue().get("store");
if(store > 0){
//库存不为0则库存减一
store = store - 1;
redisTemplate.opsForValue().set("store",store);
System.out.println("购买成功,购买后库存为:"+ store);
}else{
System.out.println("购买失败");
}
return "ok";
}
}
3.使用jMeter进行压测
设置访问地址:http://localhost/lock-service/test/buy
设置并发数,这里是0s内发送200个请求。
测试
1.未加锁
结果:原库存为50,但成功卖出了200件商品,卖出200件商品后库存变为20,商品超卖了。
2.加synchronized锁
重点为锁对象的选取,这里使用商品id字符串,对应的常量池中的引用做为锁对象。
学习synchronized的时候一般都是用this做为锁对象的,这里如果使用this,购买不同商品时,也会争夺同一把锁,效率较低。
@GetMapping("/buy")
public String buy(){
//使用id的字符串做为锁对象,intern方法 (返回常量池中该字符串的引用)
String poductId = "100";
String lock = poductId.intern();
synchronized (lock.getClass()){
//查询当前商品库存
Integer store = (Integer) redisTemplate.opsForValue().get("store");
if(store > 0){
//库存不为0则库存减一
store = store - 1;
redisTemplate.opsForValue().set("store",store);
System.out.println("购买成功,购买后库存为:"+ store);
}else{
System.out.println("购买失败");
}
}
return "ok";
}
结果:原库存为50,成功卖出了50件商品,卖出50件商品后库存变为0,加锁成功
3.加synchronized锁,并启动多个商品服务
我这里启动两个商品购买服务,锁还是使用synchronized锁,使用SpringClouldGateway负载均衡调用商品服务。
商品服务实例1
商品服务实例2
结果:通过实例1和实例2的结果可以看出,至少36、35、22、18存在着重复卖出,商品超卖了。因为两个商品服务是启动在两个jvm中,synchronized无法实现跨虚拟机加锁,所以分布式系统中不能使用synchronized锁。
4.加Redis分布式锁,并启动多个商品服务
a.引入依赖,SpringBoot版本自选
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-redis</artifactId>
<version>5.2.0.RELEASE</version>
</dependency>
b.yml配置
spring:
redis:
database: 0 # Redis数据库索引(默认为0)
host: 127.0.0.1
port: 6379
jedis:
pool:
max-active: 60 # 连接池最大连接数(使用负值表示没有限制)
max-idle: 60 # 连接池中的最大空闲连接
min-idle: 20 # 连接池中的最小空闲连接
c.注入Bean对象
@Bean
RedisLockRegistry redisLockRegistry(RedisConnectionFactory connectionFactory) {
return new RedisLockRegistry(connectionFactory, "redisLock", 5000L);
}
注 RedisLockRegistry 提供两个构造方法, 上述示例, 最后一个参数为 默认过期时间
To avoid “hung” locks (when a server fails), the locks in this registry are expired after a default 60 seconds, but you can configure this value on the registry. Locks are normally held for a much smaller time.
d.使用Redis锁
@Autowired
private RedisLockRegistry redisLockRegistry;
@GetMapping("/buy")
public String buy(){
String poductId = "100";
Lock lock = redisLockRegistry.obtain("buyRedis:" + poductId);
try{
lock.lock();
//查询当前商品库存
Integer store = (Integer) redisTemplate.opsForValue().get("store");
if(store > 0){
//库存不为0则库存减一
store = store - 1;
redisTemplate.opsForValue().set("store",store);
System.out.println("购买成功,购买后库存为:"+ store);
}else{
System.out.println("购买失败");
}
}finally {
lock.unlock();
}
return "ok";
}
e.启动两个商品服务进行测试
因为我设置50个商品时,总是在某一个实例里卖完了所有商品(执行太快了),所以我把商品数量设成了200.
实例1
实例2
结果:原库存为200,成功卖出了200件商品,卖出200件商品后库存变为0,加分布式锁成功。
自己实现一个redis锁
1.代码
@RestController
@RequestMapping("/test")
public class TestController {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
@GetMapping("/buy")
public String buy(){
String lockKey = "productKey_100";
//这个锁是阻塞型的锁且会自动增加锁的生存时间
lock(lockKey);
try {
//设置购买操作的时间大于锁的默认生存时间9s,测试锁的续命效果
Thread.sleep(12000L);
buyProduct();
}catch (Exception e){
System.out.println("休眠错误");
}finally {
unlock(lockKey);
}
return "ok";
}
/**
* 购买商品
*/
private void buyProduct(){
//查询当前商品库存
Integer store = (Integer) redisTemplate.opsForValue().get("store");
if(store > 0){
//库存不为0则库存减一
store = store - 1;
redisTemplate.opsForValue().set("store",store);
System.out.println("购买成功,购买后库存为:"+ store);
}else{
System.out.println("购买失败");
}
}
/**
* 加锁
*/
private void lock(String lockKey){
String uuid = UUID.randomUUID().toString();
while(true){
if(redisTemplate.opsForValue().setIfAbsent(lockKey,uuid , 9, TimeUnit.SECONDS)){
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
Timer timer = new Timer();
TimerTask timerTask = new TimerTask() {
@Override
public void run() {
//锁存在则将生存时间重置为9s
String o =(String) redisTemplate.opsForValue().get(lockKey);
if(uuid.equals(o)){
redisTemplate.expire(lockKey,9,TimeUnit.SECONDS);
}else{
timer.cancel();
}
}
};
//定时器启动3s后执行第一次,之后每隔3s执行一次
timer.schedule(timerTask,3000L,3000L);
}
});
thread.run();
break;
}
}
}
/**
* 解锁锁
*/
private void unlock(String lockKey){
redisTemplate.delete(lockKey);
}
}
2.效果
设置30个库存,并发发送30个购买请求,执行了12*30秒,得到结果如下
30个商品都成功卖出,卖出后库存为0,加锁成功,锁的阻塞效果和锁的续命效果也成功了。
容易踩的坑
1.事务中加锁,会导致锁失效
原因:由于事务是加锁前开启的,锁是事务未提交前释放的,此时其他线程拿到锁之后进行锁住的代码块,读取的库存数据不是最新的。
解决方法:我们可以在@Transactional注释的方法之前就加上锁,在还没有开事务之前就加锁,那么就可以保证线程的安全性,从而不会出现脏读和数据不一致性等情况。
2.锁对象的选取
锁对象的选取要和实际业务关联,例如商品购买,就要和商品关联。一律使用当前类(this)做锁对象效率不高。
同时对于同一商品的锁对象要是相同对象(同一引用的对象)。
3.单实例服务可以使用非分布式的锁,多实例服务只能使用分布式锁
分布式锁的实现方案有很多