问题引入
假设有这样一个场景:在redis有一个键stock
,存的是库存数量,每次调用接口delstock
后库存数量减1。代码如下:
@RestController
public class StockController {
@Autowired
private StringRedisTemplate stringRedisTemplate;
@GetMapping("/delstock")
public String delStock() {
synchronized (this) {
Integer stock = Integer.valueOf(stringRedisTemplate.opsForValue().get("stock"));
if (stock > 0) {
stock = stock - 1;
System.out.println("剩余库存:" + stock);
stringRedisTemplate.opsForValue().set("stock", stock.toString());
} else {
System.out.println("库存不足");
}
}
return "success";
}
}
使用JMeter进行并发测试,可以看到重复扣库存的情况。
什么是分布式锁
分布式锁:当多个进程不在同一个系统中,用分布式锁控制多个进程对资源的访问。
使用Synchronized解决并发问题
由上面测试可以看出,在高并发情形下,出现重复库存的问题。这时,可以使用Synchronized
解决,代码如下:
public String delStock() {]
Integer stock = Integer.valueOf(stringRedisTemplate.opsForValue().get("stock"));
if (stock > 0) {
stock = stock - 1;
System.out.println("剩余库存:" + stock);
stringRedisTemplate.opsForValue().set("stock", stock.toString());
} else {
System.out.println("库存不足");
}
return "success";
}
再次进行测试,这时候就正常。在分布式环境下多个操作需要以原子的方式执行。
使用Synchronized的问题
Synchronized
能解决单服务的问题,一般会启动多个服务实现负载均衡,这时候就会出现问题。比如使用Nginx做负载均衡,配置如下:
···
upstream redislock {
server localhost:8080 weight=1;
server localhost:8090 weight=1;
}
server {
listen 8000;
location / {
root html;
index /opt/nginx/html/index.html;
proxy_pass http://redislock;
}
}
···
启动多一个8090的应用,访问:http://localhost:8000/delstock
,成功返回说明配置没问题。
再使用JMeter进行测试,出现下面问题,出现重复库存,意味着JDK自带的锁失效:
8080端口: 8090端口:
使用Redis实现分布式锁
SETNX key value
:将key的值设为value,当且仅当key不存在。若给定的key已经存在,则SETNX
不做任何动作。
代码实现:
@RestController
public class StockController {
public static final String LOCK = "lock";
@Autowired
private StringRedisTemplate stringRedisTemplate;
@GetMapping("/delstock")
public String delStock() {
Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(LOCK, "lock");
if (result) {
Integer stock = Integer.valueOf(stringRedisTemplate.opsForValue().get("stock"));
if (stock > 0) {
stock = stock - 1;
System.out.println("剩余库存:" + stock);
stringRedisTemplate.opsForValue().set("stock", stock.toString());
} else {
System.out.println("库存不足");
}
stringRedisTemplate.delete(LOCK);
}
return "success";
}
}
问题:
上面的代码还有一个问题,如果在删除lock之前程序挂掉,那就出现问题了,锁一直都在。所以可以设置一个超时时间。
Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(LOCK, "lock", 10, TimeUnit.SECOND);
这样还有一个问题要考虑,因为程序执行时间可能较长,设置超时时间可能出现锁永久失效的问题。那怎么解决呢?也就是自己解自己的锁。可以这样实现:
String clientId = UUID.randomUUID().toString();
Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(LOCK, clientId, 10, TimeUnit.SECOND);
···
if (clientId.equals(stringRedisTemplate.opsForValue().get(LOCK))) {
stringRedisTemplate.delete(LOCK);
}
这样自己的锁就不会给别人释放掉,但还是有一个bug,如果锁过期时间小于程序执行时间,有可能同一时刻可能拿到多把锁。怎么解决呢?可以在程序执行过程中给锁续期。成熟的解决方案有Redisson
,每隔10s检查是否还持有锁,如果持有则延长锁的时间。
使用Redisson实现分布式锁
添加依赖:
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.6.5</version>
</dependency>
添加配置:
@Bean
public Redisson redisson() {
Config config = new Config();
config.useSingleServer().setAddress("redis://192.168.241.132:6379").setDatabase(0);
return (Redisson) Redisson.create(config);
}
使用Redisson:
@GetMapping("/delstock2")
public String delStock2() {
RLock lock = redisson.getLock(LOCK);
lock.lock();
Integer stock = Integer.valueOf(stringRedisTemplate.opsForValue().get("stock"));
if (stock > 0) {
stock = stock - 1;
System.out.println("剩余库存:" + stock);
stringRedisTemplate.opsForValue().set("stock", stock.toString());
} else {
System.out.println("库存不足");
}
lock.unlock();
return "success";
}