哈喽,大家好,我是指北君。
本篇文件我们来介绍如何Redis实现分布式锁的演进过程,以及为什么不能直接用Setnx实现分布式锁。
分布式锁是控制分布式系统不同进程共同访问共享资源的一种锁的实现。如果不同的系统或同一个系统的不同主机之间共享了某个临界资源,往往需要互斥来防止彼此干扰,以保证一致性。
业界流行的分布式锁实现,一般有这3种方式:
这里主要介绍如何通过 Redis 来实现分布式锁。在介绍 Redis 分布式锁之前,我们首先介绍一下实现Redis 分布式锁的关键命令。
setnx key value
Setnx(SET if Not eXists) 命令在指定的 key 不存在时,为 key 设置指定的值。
设置成功,返回 1 。设置失败,返回 0 。
PS:Redis 官方是不推荐基于 setnx 命令来实现分布式锁的,因为会存在很多问题,
①、单点问题。比如:
- 1、客户端A 从master拿到锁lock01
- 2、master正要把lock01同步(Redis的主从同步通常是异步的)给slave时,突然宕机了,导致lock01没同步给slave
- 3、主从切换,slave节点被晋级为master节点
- 4、客户端B到master拿lock01照样能拿到。这样必将导致同一把锁被多人使用。
②、锁的高级用法,比如读写锁、可重入锁等等,setnx 都比较难实现。
这里先介绍基于 sentnx 实现的分布式锁,后面会介绍官方推荐的基于 redisson 来实现分布式锁。
接到上文,查询三级分类数据,如果我们部署了多个商品服务,然后多个线程同时去获取三级分类数据,如果不加分布式锁,就会导致,每一个部署的商品服务第一次查询都会走 DB。
public Map< String, List< Catelog2Vo > > getCatelogJsonWithRedisLock() throws InterruptedException {
// 一、获取分布式锁
Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("lock", "111");
if(lock){
// true 表示加锁成功,执行相关业务
Map< String, List< Catelog2Vo > > dataFromDb = getDataFromDb();
stringRedisTemplate.delete("lock");
return dataFromDb;
}else{
System.out.println("获取分布式锁失败...等待重试...");
//加锁失败...重试机制
//休眠一百毫秒
try {
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
//自旋的方式
return getCatelogJsonWithRedisLock();
}
}
设置锁自动过期
public Map< String, List< Catelog2Vo > > getCatelogJsonWithRedisLock() throws InterruptedException {
// 一、获取分布式锁
Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("lock", "111");
if(lock){
// true 表示加锁成功,执行相关业务
// 设置过期时间
stringRedisTemplate.expire("lock",30,TimeUnit.SECONDS);
Map< String, List< Catelog2Vo > > dataFromDb = getDataFromDb();
stringRedisTemplate.delete("lock");
return dataFromDb;
}else{
System.out.println("获取分布式锁失败...等待重试...");
//加锁失败...重试机制
//休眠一百毫秒
try {
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
//自旋的方式
return getCatelogJsonWithRedisLock();
}
}
setnx 命令和过期时间保证原子性。
public Map< String, List< Catelog2Vo > > getCatelogJsonWithRedisLock() throws InterruptedException {
// 一、获取分布式锁
Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("lock", "111",30,TimeUnit.SECONDS);
if(lock){
// true 表示加锁成功,执行相关业务
// 设置过期时间
//stringRedisTemplate.expire("lock",30,TimeUnit.SECONDS);
Map< String, List< Catelog2Vo > > dataFromDb = getDataFromDb();
stringRedisTemplate.delete("lock");
return dataFromDb;
}else{
System.out.println("获取分布式锁失败...等待重试...");
//加锁失败...重试机制
//休眠一百毫秒
try {
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
//自旋的方式
return getCatelogJsonWithRedisLock();
}
}
保证删除的是自己的锁。
public Map< String, List< Catelog2Vo > > getCatelogJsonWithRedisLock() throws InterruptedException {
// 一、获取分布式锁
String uuid = UUID.randomUUID().toString();
Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("lock", uuid,30,TimeUnit.SECONDS);
if(lock){
// true 表示加锁成功,执行相关业务
// 设置过期时间
//stringRedisTemplate.expire("lock",30,TimeUnit.SECONDS);
Map< String, List< Catelog2Vo > > dataFromDb = getDataFromDb();
String lockValue = stringRedisTemplate.opsForValue().get("lock");
if(uuid.equals(lockValue)){
stringRedisTemplate.delete("lock");
}
return dataFromDb;
}else{
System.out.println("获取分布式锁失败...等待重试...");
//加锁失败...重试机制
//休眠一百毫秒
try {
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
//自旋的方式
return getCatelogJsonWithRedisLock();
}
}
通过Lua脚本保证删除锁和判断锁两个操作原子性
public Map< String, List< Catelog2Vo > > getCatelogJsonWithRedisLock(){
// 一、获取分布式锁
String uuid = UUID.randomUUID().toString();
Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("lock", uuid,30,TimeUnit.SECONDS);
if (lock) {
System.out.println("获取分布式锁成功...");
Map< String, List< Catelog2Vo > > dataFromDb = null;
try {
//加锁成功...执行业务
dataFromDb = getDataFromDb();
} finally {
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
//删除锁
stringRedisTemplate.execute(new DefaultRedisScript< Long >(script, Long.class), Arrays.asList("lock"), uuid);
}
//先去redis查询下保证当前的锁是自己的
//获取值对比,对比成功删除=原子性 lua脚本解锁
// String lockValue = stringRedisTemplate.opsForValue().get("lock");
// if (uuid.equals(lockValue)) {
// //删除我自己的锁
// stringRedisTemplate.delete("lock");
// }
return dataFromDb;
}else{
System.out.println("获取分布式锁失败...等待重试...");
//加锁失败...重试机制
//休眠一百毫秒
try {
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
//自旋的方式
return getCatelogJsonWithRedisLock();
}
}
这也是分布式锁的最终模式,需要保证两个点:加锁【设置锁+过期时间】和删除锁【判断+删除】原子性。
全部0条评论
快来发表一下你的评论吧 !