高并发分布式锁Redis解决方案

Posted by Kaka Blog on April 24, 2019

问题引入

假设有这样一个场景:在redis有一个键stock,存的是库存数量,每次调用接口delstock后库存数量减1。代码如下:

@RestController
public class StockController {
    @Autowired
    private StringRedisTemplate stringRedisTemplate;

    @GetMapping("/delstock")
    public String delStock() {
        synchronized (this) {
            Integer stock = Integer.valueOf(stringRedisTemplate.opsForValue().get("stock"));
            if (stock > 0) {
                stock = stock - 1;
                System.out.println("剩余库存:" + stock);
                stringRedisTemplate.opsForValue().set("stock", stock.toString());
            } else {
                System.out.println("库存不足");
            }
        }
        return "success";
    }
}

使用JMeter进行并发测试,可以看到重复扣库存的情况。 img

什么是分布式锁

分布式锁:当多个进程不在同一个系统中,用分布式锁控制多个进程对资源的访问。

使用Synchronized解决并发问题

由上面测试可以看出,在高并发情形下,出现重复库存的问题。这时,可以使用Synchronized解决,代码如下:

public String delStock() {]
    Integer stock = Integer.valueOf(stringRedisTemplate.opsForValue().get("stock"));
    if (stock > 0) {
        stock = stock - 1;
        System.out.println("剩余库存:" + stock);
        stringRedisTemplate.opsForValue().set("stock", stock.toString());
    } else {
        System.out.println("库存不足");
    }
    return "success";
}

再次进行测试,这时候就正常。在分布式环境下多个操作需要以原子的方式执行。

使用Synchronized的问题

Synchronized能解决单服务的问题,一般会启动多个服务实现负载均衡,这时候就会出现问题。比如使用Nginx做负载均衡,配置如下:

···
upstream redislock {
    server localhost:8080 weight=1;
    server localhost:8090 weight=1;
}

server {
    listen       8000;

    location / {
        root   html;
        index  /opt/nginx/html/index.html;
        proxy_pass http://redislock;
    }
}
···

启动多一个8090的应用,访问:http://localhost:8000/delstock,成功返回说明配置没问题。

再使用JMeter进行测试,出现下面问题,出现重复库存,意味着JDK自带的锁失效:

8080端口: img 8090端口: img

使用Redis实现分布式锁

SETNX key value:将key的值设为value,当且仅当key不存在。若给定的key已经存在,则SETNX不做任何动作。

代码实现:

@RestController
public class StockController {
    public static final String LOCK = "lock";
    @Autowired
    private StringRedisTemplate stringRedisTemplate;

    @GetMapping("/delstock")
    public String delStock() {
        Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(LOCK, "lock");
        if (result) {
            Integer stock = Integer.valueOf(stringRedisTemplate.opsForValue().get("stock"));
            if (stock > 0) {
                stock = stock - 1;
                System.out.println("剩余库存:" + stock);
                stringRedisTemplate.opsForValue().set("stock", stock.toString());
            } else {
                System.out.println("库存不足");
            }
            stringRedisTemplate.delete(LOCK);
        }
        return "success";
    }
}

问题:

上面的代码还有一个问题,如果在删除lock之前程序挂掉,那就出现问题了,锁一直都在。所以可以设置一个超时时间。

Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(LOCK, "lock", 10, TimeUnit.SECOND);

这样还有一个问题要考虑,因为程序执行时间可能较长,设置超时时间可能出现锁永久失效的问题。那怎么解决呢?也就是自己解自己的锁。可以这样实现:

String clientId = UUID.randomUUID().toString();
Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(LOCK, clientId, 10, TimeUnit.SECOND);
···
if (clientId.equals(stringRedisTemplate.opsForValue().get(LOCK))) {
    stringRedisTemplate.delete(LOCK);
}

这样自己的锁就不会给别人释放掉,但还是有一个bug,如果锁过期时间小于程序执行时间,有可能同一时刻可能拿到多把锁。怎么解决呢?可以在程序执行过程中给锁续期。成熟的解决方案有Redisson,每隔10s检查是否还持有锁,如果持有则延长锁的时间。

使用Redisson实现分布式锁

添加依赖:

<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>3.6.5</version>
</dependency>

添加配置:

@Bean
public Redisson redisson() {
    Config config = new Config();
    config.useSingleServer().setAddress("redis://192.168.241.132:6379").setDatabase(0);
    return (Redisson) Redisson.create(config);
}

使用Redisson:

@GetMapping("/delstock2")
public String delStock2() {
    RLock lock = redisson.getLock(LOCK);
    lock.lock();
    Integer stock = Integer.valueOf(stringRedisTemplate.opsForValue().get("stock"));
    if (stock > 0) {
        stock = stock - 1;
        System.out.println("剩余库存:" + stock);
        stringRedisTemplate.opsForValue().set("stock", stock.toString());
    } else {
        System.out.println("库存不足");
    }
    lock.unlock();
    return "success";
}