Redisson
Redisson是一个基于Java的开源的、高级的Redis客户端,它实现了Redis的分布式和响应式特性,Redisson能够让Java开发者更方便地与Redis进行交互。
简单来说Redisson就是一个Redis的客户端,比RedisTemplate更高级,更简单。
# Redisson常见作用
- 分布式对象:分布式对象简单来说就是存储在Redis中的Java对象。
Redisson允许你在分布式环境中创建Java对象,并将它们存储在Redis中。这样,不同的Java应用程序或服务就能够共享和访问这些对象,实现数据共享和数据同步。
- 分布式集合:简单来说就是将集合放在Redis中,并且可以多个客户端对集合进行操作。
Redisson支持常见的分布式数据结构,如List、Set、Map、Queue等,使得多个Java应用程序可以并发地访问和修改这些集合。
- 分布式锁:通过Redisson,你可以轻松地实现分布式锁,确保在分布式环境中的并发操作的正确性和一致性。
- 缓存:通过Redisson能够轻松的基于redis实现项目中的缓存
- 常见算法的分布式实现:Redisson提供了一些常用算法的分布式实现,如分布式信号量、分布式位图、分布式计数器等。
# 基本使用
# 引入依赖
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.17.4</version>
</dependency>
2
3
4
5
# 配置连接
/**
* redisson配置类
* @author kinoko
*/
@Configuration
public class RedissonConfig {
@Value(value = "${spring.data.redis.host}")
private String host;
@Value(value = "${spring.data.redis.port}")
private String port;
@Value(value = "${spring.data.redis.database}")
private Integer database;
@Value(value = "${spring.data.redis.password}")
private String password;
@Bean
public RedissonClient redissonClient() {
Config config = new Config();
config.useSingleServer().
setAddress("redis://" + host + ":" + port)
.setDatabase(database)
.setPassword(password)
;
config.setCodec(new JsonJacksonCodec());
return Redisson.create(config);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# Demo
@SpringBootTest
public class RedissonTest {
//注入RedissonClient
@Autowired
RedissonClient redissonClient;
//保存对象
@Test
public void saveObj(){
//创建一个User对象
User user = new User(1, "test", "password");
//获得一个RBucket实现类,参数是redis数据库中的key值
RBucket<User> bucket = redissonClient.getBucket("user:" + user.getId());
//执行set语句,将user对象存入redis中
bucket.set(user);
}
//获取对象
@Test
public void getObj(){
//获得一个RBucket实现类,参数是redis数据库中的key值
RBucket<User> bucket = redissonClient.getBucket("user:1" );
User user = bucket.get();
}
//修改
@Test
public void update(){
User user = new User(1, "newName", "newPassword");
RBucket<User> bucket = redissonClient.getBucket("user:" + user.getId());
//bucket.set(user); 不管key存在不存在都添加/修改值
bucket.setIfExists(user);
}
//删除
@Test
public void del(){
RBucket<User> bucket = redissonClient.getBucket("user:1");
bucket.delete();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# 分布式锁
# 可重入锁(Reentrant Lock)
基于Redis的Redisson分布式可重入锁RLock Java对象实现了java.util.concurrent.locks.Lock接口。同时还提供了异步(Async)、反射式(Reactive)和RxJava2标准的接口。
RLock lock = redisson.getLock("anyLock");
// 最常见的使用方法
lock.lock();
2
3
常规Redis的分布式锁实现存在一个问题,如果存储这个分布式锁的节点宕机,即便业务执行完毕这个锁也无法释放,造成死锁,为避免这个问题,Redisson内部提供了一个监控锁看门狗,当业务未执行完毕,自动续期锁时间,若业务执行完毕,出现异常状况业务未主动释放锁,则由看门狗进行释放锁的操作。
@ResponseBody
@GetMapping(value = "/hello")
public String hello() {
//1、获取一把锁,只要锁的名字一样,就是同一把锁
RLock myLock = redisson.getLock("my-lock");
//2、加锁
myLock.lock(); //阻塞式等待。默认加的锁都是30s
//1)、锁的自动续期,如果业务超长,运行期间自动锁上新的30s。不用担心业务时间长,锁自动过期被删掉
//2)、加锁的业务只要运行完成,就不会给当前锁续期,即使不手动解锁,锁默认会在30s内自动过期,不会产生死锁问题
// myLock.lock(10,TimeUnit.SECONDS); //10秒钟自动解锁,自动解锁时间一定要大于业务执行时间
//问题:在锁时间到了以后,不会自动续期(不会启动看门狗机制)
//1、如果我们传递了锁的超时时间,就发送给redis执行脚本,进行占锁,默认超时就是 我们指定的时间
//2、如果我们未指定锁的超时时间,就使用 lockWatchdogTimeout = 30 * 1000 【看门狗默认时间】
//只要占锁成功,就会启动一个定时任务【重新给锁设置过期时间,新的过期时间就是看门狗的默认时间】,每隔10秒都会自动的再次续期,续成30秒
// internalLockLeaseTime 【看门狗时间】 / 3, 10s
try {
System.out.println("加锁成功,执行业务..." + Thread.currentThread().getId());
try { TimeUnit.SECONDS.sleep(20); } catch (InterruptedException e) { e.printStackTrace(); }
} catch (Exception ex) {
ex.printStackTrace();
} finally {
//3、解锁 假设解锁代码没有运行,Redisson会不会出现死锁
System.out.println("释放锁..." + Thread.currentThread().getId());
myLock.unlock();
}
return "hello";
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# 公平锁(Fair Lock)
基于Redis的Redisson分布式可重入公平锁也是实现了java.util.concurrent.locks.Lock接口的一种RLock对象。同时还提供了异步(Async)、反射式(Reactive)和RxJava2标准的接口。
它保证了当多个Redisson客户端线程同时请求加锁时,优先分配给先发出请求的线程。所有请求线程会在一个队列中排队,当某个线程出现宕机时,Redisson会等待5秒后继续下一个线程,也就是说如果前面有5个线程都处于等待状态,那么后面的线程会等待至少25秒。
RLock fairLock = redisson.getFairLock("anyLock");
// 最常见的使用方法
fairLock.lock();
2
3
# 联锁(MultiLock)
基于Redis的Redisson分布式联锁RedissonMultiLock对象可以将多个RLock对象关联为一个联锁,每个RLock对象实例可以来自于不同的Redisson实例。
RLock lock1 = redissonInstance1.getLock("lock1");
RLock lock2 = redissonInstance2.getLock("lock2");
RLock lock3 = redissonInstance3.getLock("lock3");
RedissonMultiLock lock = new RedissonMultiLock(lock1, lock2, lock3);
// 同时加锁:lock1 lock2 lock3
// 所有的锁都上锁成功才算成功。
lock.lock();
...
lock.unlock();
2
3
4
5
6
7
8
9
10
# 红锁(RedLock)
基于Redis的Redisson红锁RedissonRedLock对象实现了Redlock介绍的加锁算法。该对象也可以用来将多个RLock对象关联为一个红锁,每个RLock对象实例可以来自于不同的Redisson实例。
RLock lock1 = redissonInstance1.getLock("lock1");
RLock lock2 = redissonInstance2.getLock("lock2");
RLock lock3 = redissonInstance3.getLock("lock3");
RedissonRedLock lock = new RedissonRedLock(lock1, lock2, lock3);
// 同时加锁:lock1 lock2 lock3
// 红锁在大部分节点上加锁成功就算成功。
lock.lock();
...
lock.unlock();
2
3
4
5
6
7
8
9
10
# 读写锁(ReadWriteLock)
基于Redis的Redisson分布式可重入读写锁RReadWriteLock Java对象实现了java.util.concurrent.locks.ReadWriteLock接口。其中读锁和写锁都继承了RLock接口。
分布式可重入读写锁允许同时有多个读锁和一个写锁处于加锁状态。
RReadWriteLock rwlock = redisson.getReadWriteLock("anyRWLock");
// 最常见的使用方法
rwlock.readLock().lock();
// 或
rwlock.writeLock().lock();
2
3
4
5
/**
* 保证一定能读到最新数据,修改期间,写锁是一个排它锁(互斥锁、独享锁),读锁是一个共享锁
* 写锁没释放读锁必须等待
* 读 + 读 :相当于无锁,并发读,只会在Redis中记录好,所有当前的读锁。他们都会同时加锁成功
* 写 + 读 :必须等待写锁释放
* 写 + 写 :阻塞方式
* 读 + 写 :有读锁。写也需要等待
* 只要有读或者写的存都必须等待
* @return
*/
@GetMapping(value = "/write")
@ResponseBody
public String writeValue() {
String s = "";
RReadWriteLock readWriteLock = redisson.getReadWriteLock("rw-lock");
RLock rLock = readWriteLock.writeLock();
try {
//1、改数据加写锁,读数据加读锁
rLock.lock();
s = UUID.randomUUID().toString();
ValueOperations<String, String> ops = stringRedisTemplate.opsForValue();
ops.set("writeValue",s);
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
rLock.unlock();
}
return s;
}
@GetMapping(value = "/read")
@ResponseBody
public String readValue() {
String s = "";
RReadWriteLock readWriteLock = redisson.getReadWriteLock("rw-lock");
//加读锁
RLock rLock = readWriteLock.readLock();
try {
rLock.lock();
ValueOperations<String, String> ops = stringRedisTemplate.opsForValue();
s = ops.get("writeValue");
try { TimeUnit.SECONDS.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); }
} catch (Exception e) {
e.printStackTrace();
} finally {
rLock.unlock();
}
return s;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# 信号量(Semaphore)
基于Redis的Redisson的分布式信号量(Semaphore)Java对象RSemaphore采用了与java.util.concurrent.Semaphore相似的接口和用法。同时还提供了异步(Async)、反射式(Reactive)和RxJava2标准的接口。
RSemaphore semaphore = redisson.getSemaphore("semaphore");
semaphore.acquire();
//或
semaphore.acquireAsync();
semaphore.acquire(23);
semaphore.tryAcquire();
//或
semaphore.tryAcquireAsync();
semaphore.tryAcquire(23, TimeUnit.SECONDS);
//或
semaphore.tryAcquireAsync(23, TimeUnit.SECONDS);
semaphore.release(10);
semaphore.release();
//或
semaphore.releaseAsync();
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
* 车库停车
* 3车位
* 信号量也可以做分布式限流
*/
@GetMapping(value = "/park")
@ResponseBody
public String park() throws InterruptedException {
RSemaphore park = redisson.getSemaphore("park");
park.acquire(); //获取一个信号、获取一个值,占一个车位
boolean flag = park.tryAcquire();
if (flag) {
//执行业务
} else {
return "error";
}
return "ok=>" + flag;
}
@GetMapping(value = "/go")
@ResponseBody
public String go() {
RSemaphore park = redisson.getSemaphore("park");
park.release(); //释放一个车位
return "ok";
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# 可过期性信号量(PermitExpirableSemaphore)
基于Redis的Redisson可过期性信号量(PermitExpirableSemaphore)是在RSemaphore对象的基础上,为每个信号增加了一个过期时间。每个信号可以通过独立的ID来辨识,释放时只能通过提交这个ID才能释放。它提供了异步(Async)、反射式(Reactive)和RxJava2标准的接口。
RPermitExpirableSemaphore semaphore = redisson.getPermitExpirableSemaphore("mySemaphore");
String permitId = semaphore.acquire();
// 获取一个信号,有效期只有2秒钟。
String permitId = semaphore.acquire(2, TimeUnit.SECONDS);
// ...
semaphore.release(permitId);
2
3
4
5
6
# 闭锁(CountDownLatch)
基于Redisson的Redisson分布式闭锁(CountDownLatch)Java对象RCountDownLatch采用了与java.util.concurrent.CountDownLatch相似的接口和用法。
RCountDownLatch latch = redisson.getCountDownLatch("anyCountDownLatch");
latch.trySetCount(1);
latch.await();
// 在其他线程或其他JVM里
RCountDownLatch latch = redisson.getCountDownLatch("anyCountDownLatch");
latch.countDown();
2
3
4
5
6
7
/**
* 放假、锁门
* 1班没人了
* 5个班,全部走完,我们才可以锁大门
* 分布式闭锁
*/
@GetMapping(value = "/lockDoor")
@ResponseBody
public String lockDoor() throws InterruptedException {
RCountDownLatch door = redisson.getCountDownLatch("door");
door.trySetCount(5);
door.await(); //等待闭锁完成
return "放假了...";
}
@GetMapping(value = "/gogogo/{id}")
@ResponseBody
public String gogogo(@PathVariable("id") Long id) {
RCountDownLatch door = redisson.getCountDownLatch("door");
door.countDown(); //计数-1
return id + "班的人都走了...";
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# 看门狗机制解析※
看门狗机制特点:
- 仅leaseTime <= 0时(释放时间小于等于0)会触发看门狗机制
- Redisson 默认加锁时间是 30(n)秒,每隔 n/3秒 会进行一次续期,每次续期设置的加锁时间都是30s
- 默认加锁时间可以通过
Config.lockWatchdogTimeout指定 - 当加锁线程调用了unLock(),该锁会被释放,看门狗停止续期
- 当尝试续期的过程中,当前线程被中断或执行结束,看门狗停止续期
源码解析
由RedissonLock.tryLock()方法跟进
private RFuture<Boolean> tryAcquireOnceAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
RFuture acquiredFuture;
// ※尝试加锁※
if (leaseTime > 0L) {
acquiredFuture = this.tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
} else {
acquiredFuture = this.tryLockInnerAsync(waitTime, this.internalLockLeaseTime, TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
}
// 处理加锁结果
CompletionStage<Boolean> f = acquiredFuture.thenApply((acquired) -> {
// 获取加锁结果,可以理解成if(ttlRemaining == null),即是否有获取到过期时间
if (acquired) {
// 加锁成功
if (leaseTime > 0L) {
// 约定key过期时间大于0,设置过期时间
this.internalLockLeaseTime = unit.toMillis(leaseTime);
} else {
// ※未约定key的过期时间或过期时间小于等于0,走看门狗守护线程※
this.scheduleExpirationRenewal(threadId);
}
}
return acquired;
});
return new CompletableFutureWrapper(f);
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
重点代码※※※※
// 这个方法主要执行了一个lua脚本,返回执行结果
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
return this.evalWriteAsync(this.getRawName(), LongCodec.INSTANCE, command,
// KEYS[1]是获取getLock传入的Redis KEY,这里判断了一下这个Key是否存在
"if (redis.call('exists', KEYS[1]) == 0) then" +
// Key不存在,将这个Key放入hset(Redis的hash结构)中,并使字段ARGV[2](线程id)的值+1
" redis.call('hincrby', KEYS[1], ARGV[2], 1);" +
// 设置过期时间为ARGV[1]
" redis.call('pexpire', KEYS[1], ARGV[1]);" +
// 返回null
" return nil; end;" +
// Key存在且hash表中存在字段ARGV[2](线程id)
" if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then" +
// 使字段ARGV[2]的值+1
" redis.call('hincrby', KEYS[1], ARGV[2], 1);" +
// 设置过期时间ARGV[1]
" redis.call('pexpire', KEYS[1], ARGV[1]);" +
// 返回null
" return nil; end;" +
// 返回剩余时间
" return redis.call('pttl', KEYS[1]);",
Collections.singletonList(this.getRawName()), new Object[]{unit.toMillis(leaseTime), this.getLockName(threadId)});
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
函数功能:使用Redis进行加锁操作。函数通过调用Lua脚本来实现加锁逻辑
- 如果key不存在,设置当前线程id作为field且值+1,返回null,加锁成功
- 如果key存在且存在当前线程id的field,则值+1(重入次数+1),返回null,加锁成功
- 除这两种情况外属于加锁失败,返回过期剩余时间
通过lua脚本实现获取锁功能,好处在于lua脚本本身就有原子性的特点,redis会将这几段指令解析为一条指令去执行。
看门狗(守护线程)
protected void scheduleExpirationRenewal(long threadId) {
// 创建新的看门狗(续期),内部维护着一个Map(线程id, 重入次数),即看门狗看管的线程库
ExpirationEntry entry = new ExpirationEntry();
// EXPIRATION_RENEWAL_MAP是一个续期位图,可以理解成(Key, 看门狗)
// 尝试获取当前Key的看门狗,如果存在则返回,不存在则将看门狗放入
ExpirationEntry oldEntry = (ExpirationEntry)EXPIRATION_RENEWAL_MAP.putIfAbsent(this.getEntryName(), entry);
if (oldEntry != null) {
// 将线程id交给看门狗看管
oldEntry.addThreadId(threadId);
} else {
// 将线程id交给看门狗看管
entry.addThreadId(threadId);
try {
// ※刷新过期时间※
this.renewExpiration();
} finally {
// 如果当前线程被销毁(即续续期期间守护线程被销毁)
if (Thread.currentThread().isInterrupted()) {
// ※终止续期※
this.cancelExpirationRenewal(threadId);
}
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private void renewExpiration() {
// 获取当前锁对象的看门狗
ExpirationEntry ee = (ExpirationEntry)EXPIRATION_RENEWAL_MAP.get(this.getEntryName());
if (ee != null) {
// 开启守护线程(TimerTask)
Timeout task = this.commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
public void run(Timeout timeout) throws Exception {
// 获取看门狗
ExpirationEntry ent = (ExpirationEntry)RedissonBaseLock.EXPIRATION_RENEWAL_MAP.get(RedissonBaseLock.this.getEntryName());
if (ent != null) {
// 获取第一个线程id(一般也只会有一个)
Long threadId = ent.getFirstThreadId();
if (threadId != null) {
// ※尝试续期※
CompletionStage<Boolean> future = RedissonBaseLock.this.renewExpirationAsync(threadId);
future.whenComplete((res, e) -> {
// 出现异常,解除Key与看门狗的绑定
if (e != null) {
RedissonBaseLock.log.error("Can't update lock " + RedissonBaseLock.this.getRawName() + " expiration", e);
RedissonBaseLock.EXPIRATION_RENEWAL_MAP.remove(RedissonBaseLock.this.getEntryName());
} else {
// 判断续期结果,可以理解成 if(result == 1)
if (res) {
// 成功:递归续期(其实我没搞懂这里为什么要递归,岂不是会频繁开启定时任务)
RedissonBaseLock.this.renewExpiration();
} else {
// ※失败:停止续期※
RedissonBaseLock.this.cancelExpirationRenewal((Long)null);
}
}
});
}
}
}
// 每 n/3s 执行一次,默认是30/3 = 10s执行一次
}, this.internalLockLeaseTime / 3L, TimeUnit.MILLISECONDS);
// 将守护线程任务与看门狗绑定
ee.setTimeout(task);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
protected CompletionStage<Boolean> renewExpirationAsync(long threadId) {
return this.evalWriteAsync(this.getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
// 判断Key的hash表中是否存在ARGV[2](当前线程id)字段
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then" +
// 存在则重置过期时间
" redis.call('pexpire', KEYS[1], ARGV[1]);" +
// 返回1
" return 1; end;" +
// 否则返回0
" return 0;",
Collections.singletonList(this.getRawName()), this.internalLockLeaseTime, this.getLockName(threadId));
}
2
3
4
5
6
7
8
9
10
11
12
protected void cancelExpirationRenewal(Long threadId) {
// 获取当前key的看门狗
ExpirationEntry task = (ExpirationEntry)EXPIRATION_RENEWAL_MAP.get(this.getEntryName());
if (task != null) {
if (threadId != null) {
// 从看门狗中移除线程
task.removeThreadId(threadId);
}
// 如果ThreadId为null或者当前看门狗下已无看管线程
if (threadId == null || task.hasNoThreads()) {
// 获取守护线程
Timeout timeout = task.getTimeout();
if (timeout != null) {
// 停止任务
timeout.cancel();
}
// 解除Key与看门狗的绑定
EXPIRATION_RENEWAL_MAP.remove(this.getEntryName());
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# 限流
基于令牌桶+滑动时间窗口实现
参考:
- 精尽 Redisson 源码分析 —— 限流器 RateLimiter_redisson限流器_路从脚起的博客-CSDN博客 (opens new window)
- 【大厂面试题】如何用Redis实现令牌桶限流?_哔哩哔哩_bilibili (opens new window)