Mushroom Notes Mushroom Notes
🍄首页
  • JavaSE

    • 基础篇
    • 数据结构
    • IO流
    • Stream流
    • 函数式接口
    • JUC
    • 反射
    • 网络编程
    • 设计模式
  • JavaEE

    • Servlet
    • JDBC
    • 会话技术
    • 过滤器监听器
    • 三层架构
  • JDK

    • 总览
  • JVM

    • 总览
  • 常用mate
  • CSS
  • JavaScript
  • rds 数据库

    • MySQL
    • MySQL 进阶
    • MySQL 库表规范
  • nosql 数据库

    • Redis
    • Redis 进阶
    • Redis 底层
    • MongoDB
  • Spring生态

    • Spring
    • Spring MVC
    • Spring boot
    • Spring Validation
  • Spring Cloud生态

    • Spring Cloud
    • 服务治理
    • 远程调用
    • 网关路由
    • 服务保护
    • 分布式事务
    • 消息中间件
  • 数据库

    • Mybatis
    • Mybatis Plus
    • Elasticsearch
    • Redisson
  • 通信

    • Netty
📚技术
  • 方案专题
  • 算法专题
  • BUG专题
  • 安装专题
  • 网安专题
  • 面试专题
  • 常用网站
  • 后端常用
  • 前端常用
  • 分类
  • 标签
  • 归档

kinoko

一位兴趣使然的热心码农
🍄首页
  • JavaSE

    • 基础篇
    • 数据结构
    • IO流
    • Stream流
    • 函数式接口
    • JUC
    • 反射
    • 网络编程
    • 设计模式
  • JavaEE

    • Servlet
    • JDBC
    • 会话技术
    • 过滤器监听器
    • 三层架构
  • JDK

    • 总览
  • JVM

    • 总览
  • 常用mate
  • CSS
  • JavaScript
  • rds 数据库

    • MySQL
    • MySQL 进阶
    • MySQL 库表规范
  • nosql 数据库

    • Redis
    • Redis 进阶
    • Redis 底层
    • MongoDB
  • Spring生态

    • Spring
    • Spring MVC
    • Spring boot
    • Spring Validation
  • Spring Cloud生态

    • Spring Cloud
    • 服务治理
    • 远程调用
    • 网关路由
    • 服务保护
    • 分布式事务
    • 消息中间件
  • 数据库

    • Mybatis
    • Mybatis Plus
    • Elasticsearch
    • Redisson
  • 通信

    • Netty
📚技术
  • 方案专题
  • 算法专题
  • BUG专题
  • 安装专题
  • 网安专题
  • 面试专题
  • 常用网站
  • 后端常用
  • 前端常用
  • 分类
  • 标签
  • 归档
  • Spring

  • SpringCloud

  • 数据库

    • Mybatis
    • Mybatis Plus
    • Elasticsearch
    • Redisson
      • Redisson常见作用
      • 基本使用
        • 引入依赖
        • 配置连接
        • Demo
      • 分布式锁
        • 可重入锁(Reentrant Lock)
        • 公平锁(Fair Lock)
        • 联锁(MultiLock)
        • 红锁(RedLock)
        • 读写锁(ReadWriteLock)
        • 信号量(Semaphore)
        • 可过期性信号量(PermitExpirableSemaphore)
        • 闭锁(CountDownLatch)
        • 看门狗机制解析※
      • 限流
      • 蘑菇工具箱
  • 通信

  • 框架
  • 数据库
kinoko
2023-12-19
目录

Redisson

Redisson是一个基于Java的开源的、高级的Redis客户端,它实现了Redis的分布式和响应式特性,Redisson能够让Java开发者更方便地与Redis进行交互。
简单来说Redisson就是一个Redis的客户端,比RedisTemplate更高级,更简单。

# Redisson常见作用


  • 分布式对象:分布式对象简单来说就是存储在Redis中的Java对象。

Redisson允许你在分布式环境中创建Java对象,并将它们存储在Redis中。这样,不同的Java应用程序或服务就能够共享和访问这些对象,实现数据共享和数据同步。

  • 分布式集合:简单来说就是将集合放在Redis中,并且可以多个客户端对集合进行操作。

Redisson支持常见的分布式数据结构,如List、Set、Map、Queue等,使得多个Java应用程序可以并发地访问和修改这些集合。

  • 分布式锁:通过Redisson,你可以轻松地实现分布式锁,确保在分布式环境中的并发操作的正确性和一致性。
  • 缓存:通过Redisson能够轻松的基于redis实现项目中的缓存
  • 常见算法的分布式实现:Redisson提供了一些常用算法的分布式实现,如分布式信号量、分布式位图、分布式计数器等。

# 基本使用


# 引入依赖

<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson-spring-boot-starter</artifactId>
    <version>3.17.4</version>
</dependency>
1
2
3
4
5

# 配置连接

/**
 * redisson配置类
 * @author kinoko
 */
@Configuration
public class RedissonConfig {

    @Value(value = "${spring.data.redis.host}")
    private String host;
    @Value(value = "${spring.data.redis.port}")
    private String port;
    @Value(value = "${spring.data.redis.database}")
    private Integer database;
    @Value(value = "${spring.data.redis.password}")
    private String password;

    @Bean
    public RedissonClient redissonClient() {
        Config config = new Config();
        config.useSingleServer().
                setAddress("redis://" + host + ":" + port)
                .setDatabase(database)
                .setPassword(password)
        ;
        config.setCodec(new JsonJacksonCodec());
        return Redisson.create(config);
    }

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29

# Demo

@SpringBootTest
public class RedissonTest {

    //注入RedissonClient
    @Autowired
    RedissonClient redissonClient;

    //保存对象
    @Test
    public void saveObj(){
        //创建一个User对象
        User user = new User(1, "test", "password");
        //获得一个RBucket实现类,参数是redis数据库中的key值
        RBucket<User> bucket = redissonClient.getBucket("user:" + user.getId());
        //执行set语句,将user对象存入redis中
        bucket.set(user);
    }

    //获取对象
	@Test
    public void getObj(){
        //获得一个RBucket实现类,参数是redis数据库中的key值
        RBucket<User> bucket = redissonClient.getBucket("user:1" );
        User user = bucket.get();
    }

    //修改
    @Test
    public void update(){
        User user = new User(1, "newName", "newPassword");
        RBucket<User> bucket = redissonClient.getBucket("user:" + user.getId());
        //bucket.set(user); 不管key存在不存在都添加/修改值
        bucket.setIfExists(user);
    }

    //删除
    @Test
    public void del(){
        RBucket<User> bucket = redissonClient.getBucket("user:1");
        bucket.delete();
    }
    
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43

# 分布式锁


# 可重入锁(Reentrant Lock)

基于Redis的Redisson分布式可重入锁RLock Java对象实现了java.util.concurrent.locks.Lock接口。同时还提供了异步(Async)、反射式(Reactive)和RxJava2标准的接口。

RLock lock = redisson.getLock("anyLock");
// 最常见的使用方法
lock.lock();
1
2
3

常规Redis的分布式锁实现存在一个问题,如果存储这个分布式锁的节点宕机,即便业务执行完毕这个锁也无法释放,造成死锁,为避免这个问题,Redisson内部提供了一个监控锁看门狗,当业务未执行完毕,自动续期锁时间,若业务执行完毕,出现异常状况业务未主动释放锁,则由看门狗进行释放锁的操作。

@ResponseBody
@GetMapping(value = "/hello")
public String hello() {

    //1、获取一把锁,只要锁的名字一样,就是同一把锁
    RLock myLock = redisson.getLock("my-lock");

    //2、加锁
    myLock.lock();      //阻塞式等待。默认加的锁都是30s

    //1)、锁的自动续期,如果业务超长,运行期间自动锁上新的30s。不用担心业务时间长,锁自动过期被删掉
    //2)、加锁的业务只要运行完成,就不会给当前锁续期,即使不手动解锁,锁默认会在30s内自动过期,不会产生死锁问题

    // myLock.lock(10,TimeUnit.SECONDS);   //10秒钟自动解锁,自动解锁时间一定要大于业务执行时间
    //问题:在锁时间到了以后,不会自动续期(不会启动看门狗机制)
    //1、如果我们传递了锁的超时时间,就发送给redis执行脚本,进行占锁,默认超时就是 我们指定的时间
    //2、如果我们未指定锁的超时时间,就使用 lockWatchdogTimeout = 30 * 1000 【看门狗默认时间】
    //只要占锁成功,就会启动一个定时任务【重新给锁设置过期时间,新的过期时间就是看门狗的默认时间】,每隔10秒都会自动的再次续期,续成30秒
    // internalLockLeaseTime 【看门狗时间】 / 3, 10s
    try {
        System.out.println("加锁成功,执行业务..." + Thread.currentThread().getId());
        try { TimeUnit.SECONDS.sleep(20); } catch (InterruptedException e) { e.printStackTrace(); }
    } catch (Exception ex) {
        ex.printStackTrace();
    } finally {
        //3、解锁  假设解锁代码没有运行,Redisson会不会出现死锁
        System.out.println("释放锁..." + Thread.currentThread().getId());
        myLock.unlock();
    }

    return "hello";
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32

# 公平锁(Fair Lock)


基于Redis的Redisson分布式可重入公平锁也是实现了java.util.concurrent.locks.Lock接口的一种RLock对象。同时还提供了异步(Async)、反射式(Reactive)和RxJava2标准的接口。
它保证了当多个Redisson客户端线程同时请求加锁时,优先分配给先发出请求的线程。所有请求线程会在一个队列中排队,当某个线程出现宕机时,Redisson会等待5秒后继续下一个线程,也就是说如果前面有5个线程都处于等待状态,那么后面的线程会等待至少25秒。

RLock fairLock = redisson.getFairLock("anyLock");
// 最常见的使用方法
fairLock.lock();
1
2
3

# 联锁(MultiLock)


基于Redis的Redisson分布式联锁RedissonMultiLock对象可以将多个RLock对象关联为一个联锁,每个RLock对象实例可以来自于不同的Redisson实例。

RLock lock1 = redissonInstance1.getLock("lock1");
RLock lock2 = redissonInstance2.getLock("lock2");
RLock lock3 = redissonInstance3.getLock("lock3");

RedissonMultiLock lock = new RedissonMultiLock(lock1, lock2, lock3);
// 同时加锁:lock1 lock2 lock3
// 所有的锁都上锁成功才算成功。
lock.lock();
...
lock.unlock();
1
2
3
4
5
6
7
8
9
10

# 红锁(RedLock)


基于Redis的Redisson红锁RedissonRedLock对象实现了Redlock介绍的加锁算法。该对象也可以用来将多个RLock对象关联为一个红锁,每个RLock对象实例可以来自于不同的Redisson实例。

RLock lock1 = redissonInstance1.getLock("lock1");
RLock lock2 = redissonInstance2.getLock("lock2");
RLock lock3 = redissonInstance3.getLock("lock3");

RedissonRedLock lock = new RedissonRedLock(lock1, lock2, lock3);
// 同时加锁:lock1 lock2 lock3
// 红锁在大部分节点上加锁成功就算成功。
lock.lock();
...
lock.unlock();
1
2
3
4
5
6
7
8
9
10

# 读写锁(ReadWriteLock)


基于Redis的Redisson分布式可重入读写锁RReadWriteLock Java对象实现了java.util.concurrent.locks.ReadWriteLock接口。其中读锁和写锁都继承了RLock接口。
分布式可重入读写锁允许同时有多个读锁和一个写锁处于加锁状态。

RReadWriteLock rwlock = redisson.getReadWriteLock("anyRWLock");
// 最常见的使用方法
rwlock.readLock().lock();
// 或
rwlock.writeLock().lock();
1
2
3
4
5
/**
* 保证一定能读到最新数据,修改期间,写锁是一个排它锁(互斥锁、独享锁),读锁是一个共享锁
* 写锁没释放读锁必须等待
* 读 + 读 :相当于无锁,并发读,只会在Redis中记录好,所有当前的读锁。他们都会同时加锁成功
* 写 + 读 :必须等待写锁释放
* 写 + 写 :阻塞方式
* 读 + 写 :有读锁。写也需要等待
* 只要有读或者写的存都必须等待
* @return
*/
@GetMapping(value = "/write")
@ResponseBody
public String writeValue() {
    String s = "";
    RReadWriteLock readWriteLock = redisson.getReadWriteLock("rw-lock");
    RLock rLock = readWriteLock.writeLock();
    try {
        //1、改数据加写锁,读数据加读锁
        rLock.lock();
        s = UUID.randomUUID().toString();
        ValueOperations<String, String> ops = stringRedisTemplate.opsForValue();
        ops.set("writeValue",s);
        TimeUnit.SECONDS.sleep(10);
    } catch (InterruptedException e) {
        e.printStackTrace();
    } finally {
        rLock.unlock();
    }


    return s;
}

@GetMapping(value = "/read")
@ResponseBody
public String readValue() {
    String s = "";
    RReadWriteLock readWriteLock = redisson.getReadWriteLock("rw-lock");
    //加读锁
    RLock rLock = readWriteLock.readLock();
    try {
        rLock.lock();
        ValueOperations<String, String> ops = stringRedisTemplate.opsForValue();
        s = ops.get("writeValue");
        try { TimeUnit.SECONDS.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); }
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        rLock.unlock();
    }

    return s;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53

# 信号量(Semaphore)


基于Redis的Redisson的分布式信号量(Semaphore)Java对象RSemaphore采用了与java.util.concurrent.Semaphore相似的接口和用法。同时还提供了异步(Async)、反射式(Reactive)和RxJava2标准的接口。

RSemaphore semaphore = redisson.getSemaphore("semaphore");
semaphore.acquire();
//或
semaphore.acquireAsync();
semaphore.acquire(23);
semaphore.tryAcquire();
//或
semaphore.tryAcquireAsync();
semaphore.tryAcquire(23, TimeUnit.SECONDS);
//或
semaphore.tryAcquireAsync(23, TimeUnit.SECONDS);
semaphore.release(10);
semaphore.release();
//或
semaphore.releaseAsync();
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
* 车库停车
* 3车位
* 信号量也可以做分布式限流
*/
@GetMapping(value = "/park")
@ResponseBody
public String park() throws InterruptedException {


    RSemaphore park = redisson.getSemaphore("park");
    park.acquire();     //获取一个信号、获取一个值,占一个车位
    boolean flag = park.tryAcquire();

    if (flag) {
        //执行业务
    } else {
        return "error";
    }

    return "ok=>" + flag;
}

@GetMapping(value = "/go")
@ResponseBody
public String go() {
    RSemaphore park = redisson.getSemaphore("park");
    park.release();     //释放一个车位
    return "ok";
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31

# 可过期性信号量(PermitExpirableSemaphore)


基于Redis的Redisson可过期性信号量(PermitExpirableSemaphore)是在RSemaphore对象的基础上,为每个信号增加了一个过期时间。每个信号可以通过独立的ID来辨识,释放时只能通过提交这个ID才能释放。它提供了异步(Async)、反射式(Reactive)和RxJava2标准的接口。

RPermitExpirableSemaphore semaphore = redisson.getPermitExpirableSemaphore("mySemaphore");
String permitId = semaphore.acquire();
// 获取一个信号,有效期只有2秒钟。
String permitId = semaphore.acquire(2, TimeUnit.SECONDS);
// ...
semaphore.release(permitId);
1
2
3
4
5
6

# 闭锁(CountDownLatch)


基于Redisson的Redisson分布式闭锁(CountDownLatch)Java对象RCountDownLatch采用了与java.util.concurrent.CountDownLatch相似的接口和用法。

RCountDownLatch latch = redisson.getCountDownLatch("anyCountDownLatch");
latch.trySetCount(1);
latch.await();

// 在其他线程或其他JVM里
RCountDownLatch latch = redisson.getCountDownLatch("anyCountDownLatch");
latch.countDown();
1
2
3
4
5
6
7
/**
* 放假、锁门
* 1班没人了
* 5个班,全部走完,我们才可以锁大门
* 分布式闭锁
*/

@GetMapping(value = "/lockDoor")
@ResponseBody
public String lockDoor() throws InterruptedException {


    RCountDownLatch door = redisson.getCountDownLatch("door");
    door.trySetCount(5);
    door.await();       //等待闭锁完成
    return "放假了...";
}

@GetMapping(value = "/gogogo/{id}")
@ResponseBody
public String gogogo(@PathVariable("id") Long id) {
    RCountDownLatch door = redisson.getCountDownLatch("door");
    door.countDown();       //计数-1
    return id + "班的人都走了...";
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25

# 看门狗机制解析※


看门狗机制特点:

  1. 仅leaseTime <= 0时(释放时间小于等于0)会触发看门狗机制
  2. Redisson 默认加锁时间是 30(n)秒,每隔 n/3秒 会进行一次续期,每次续期设置的加锁时间都是30s
  3. 默认加锁时间可以通过Config.lockWatchdogTimeout指定
  4. 当加锁线程调用了unLock(),该锁会被释放,看门狗停止续期
  5. 当尝试续期的过程中,当前线程被中断或执行结束,看门狗停止续期

源码解析
由RedissonLock.tryLock()方法跟进

private RFuture<Boolean> tryAcquireOnceAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
        RFuture acquiredFuture;
        // ※尝试加锁※
        if (leaseTime > 0L) {
            acquiredFuture = this.tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
        } else {
            acquiredFuture = this.tryLockInnerAsync(waitTime, this.internalLockLeaseTime, TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
        }
    	// 处理加锁结果
        CompletionStage<Boolean> f = acquiredFuture.thenApply((acquired) -> {
            // 获取加锁结果,可以理解成if(ttlRemaining == null),即是否有获取到过期时间
            if (acquired) {
                // 加锁成功
                if (leaseTime > 0L) {
                    // 约定key过期时间大于0,设置过期时间
                    this.internalLockLeaseTime = unit.toMillis(leaseTime);
                } else {
                    // ※未约定key的过期时间或过期时间小于等于0,走看门狗守护线程※
                    this.scheduleExpirationRenewal(threadId);
                }
            }

            return acquired;
        });
        return new CompletableFutureWrapper(f);
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

重点代码※※※※

// 这个方法主要执行了一个lua脚本,返回执行结果
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
        return this.evalWriteAsync(this.getRawName(), LongCodec.INSTANCE, command,
                    // KEYS[1]是获取getLock传入的Redis KEY,这里判断了一下这个Key是否存在
                     "if (redis.call('exists', KEYS[1]) == 0) then" +
                        // Key不存在,将这个Key放入hset(Redis的hash结构)中,并使字段ARGV[2](线程id)的值+1
                        " redis.call('hincrby', KEYS[1], ARGV[2], 1);" +
                        // 设置过期时间为ARGV[1]
                        " redis.call('pexpire', KEYS[1], ARGV[1]);" +
                        // 返回null
                        " return nil; end;" +
                    // Key存在且hash表中存在字段ARGV[2](线程id)
                    " if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then" +
                        // 使字段ARGV[2]的值+1
                        " redis.call('hincrby', KEYS[1], ARGV[2], 1);" +
                        // 设置过期时间ARGV[1]
                        " redis.call('pexpire', KEYS[1], ARGV[1]);" +
                        // 返回null
                        " return nil; end;" +
                    // 返回剩余时间
                    " return redis.call('pttl', KEYS[1]);",
                Collections.singletonList(this.getRawName()), new Object[]{unit.toMillis(leaseTime), this.getLockName(threadId)});
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

函数功能:使用Redis进行加锁操作。函数通过调用Lua脚本来实现加锁逻辑

  1. 如果key不存在,设置当前线程id作为field且值+1,返回null,加锁成功
  2. 如果key存在且存在当前线程id的field,则值+1(重入次数+1),返回null,加锁成功
  3. 除这两种情况外属于加锁失败,返回过期剩余时间

通过lua脚本实现获取锁功能,好处在于lua脚本本身就有原子性的特点,redis会将这几段指令解析为一条指令去执行。

看门狗(守护线程)

protected void scheduleExpirationRenewal(long threadId) {
    	// 创建新的看门狗(续期),内部维护着一个Map(线程id, 重入次数),即看门狗看管的线程库
        ExpirationEntry entry = new ExpirationEntry();
    	// EXPIRATION_RENEWAL_MAP是一个续期位图,可以理解成(Key, 看门狗)
    	// 尝试获取当前Key的看门狗,如果存在则返回,不存在则将看门狗放入
        ExpirationEntry oldEntry = (ExpirationEntry)EXPIRATION_RENEWAL_MAP.putIfAbsent(this.getEntryName(), entry);
        if (oldEntry != null) {
            // 将线程id交给看门狗看管
            oldEntry.addThreadId(threadId);
        } else {
            // 将线程id交给看门狗看管
            entry.addThreadId(threadId);

            try {
                // ※刷新过期时间※
                this.renewExpiration();
            } finally {
                // 如果当前线程被销毁(即续续期期间守护线程被销毁)
                if (Thread.currentThread().isInterrupted()) {
                    // ※终止续期※
                    this.cancelExpirationRenewal(threadId);
                }

            }
        }

    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private void renewExpiration() {
        // 获取当前锁对象的看门狗
        ExpirationEntry ee = (ExpirationEntry)EXPIRATION_RENEWAL_MAP.get(this.getEntryName());
        if (ee != null) {
            // 开启守护线程(TimerTask)
            Timeout task = this.commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
                public void run(Timeout timeout) throws Exception {
                    // 获取看门狗
                    ExpirationEntry ent = (ExpirationEntry)RedissonBaseLock.EXPIRATION_RENEWAL_MAP.get(RedissonBaseLock.this.getEntryName());
                    if (ent != null) {
                        // 获取第一个线程id(一般也只会有一个)
                        Long threadId = ent.getFirstThreadId();
                        if (threadId != null) {
                            // ※尝试续期※
                            CompletionStage<Boolean> future = RedissonBaseLock.this.renewExpirationAsync(threadId);
                            future.whenComplete((res, e) -> {
                                // 出现异常,解除Key与看门狗的绑定
                                if (e != null) {
                                    RedissonBaseLock.log.error("Can't update lock " + RedissonBaseLock.this.getRawName() + " expiration", e);
                                    RedissonBaseLock.EXPIRATION_RENEWAL_MAP.remove(RedissonBaseLock.this.getEntryName());
                                } else {
                                    // 判断续期结果,可以理解成 if(result == 1)
                                    if (res) {
                                        // 成功:递归续期(其实我没搞懂这里为什么要递归,岂不是会频繁开启定时任务)
                                        RedissonBaseLock.this.renewExpiration();
                                    } else {
                                        // ※失败:停止续期※
                                        RedissonBaseLock.this.cancelExpirationRenewal((Long)null);
                                    }

                                }
                            });
                        }
                    }
                }
            // 每 n/3s 执行一次,默认是30/3 = 10s执行一次
            }, this.internalLockLeaseTime / 3L, TimeUnit.MILLISECONDS);
            // 将守护线程任务与看门狗绑定
            ee.setTimeout(task);
        }
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
protected CompletionStage<Boolean> renewExpirationAsync(long threadId) {
        return this.evalWriteAsync(this.getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                    // 判断Key的hash表中是否存在ARGV[2](当前线程id)字段
                     "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then" +
                        // 存在则重置过期时间
                        " redis.call('pexpire', KEYS[1], ARGV[1]);" +
                        // 返回1
                        " return 1; end;" +
                        // 否则返回0
                        " return 0;",
                Collections.singletonList(this.getRawName()), this.internalLockLeaseTime, this.getLockName(threadId));
    }
1
2
3
4
5
6
7
8
9
10
11
12
protected void cancelExpirationRenewal(Long threadId) {
    	// 获取当前key的看门狗
        ExpirationEntry task = (ExpirationEntry)EXPIRATION_RENEWAL_MAP.get(this.getEntryName());
        if (task != null) {
            if (threadId != null) {
                // 从看门狗中移除线程
                task.removeThreadId(threadId);
            }
        	// 如果ThreadId为null或者当前看门狗下已无看管线程
            if (threadId == null || task.hasNoThreads()) {
                // 获取守护线程
                Timeout timeout = task.getTimeout();
                if (timeout != null) {
                    // 停止任务
                    timeout.cancel();
                }
            	// 解除Key与看门狗的绑定
                EXPIRATION_RENEWAL_MAP.remove(this.getEntryName());
            }

        }
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

# 限流


基于令牌桶+滑动时间窗口实现
参考:

  • 精尽 Redisson 源码分析 —— 限流器 RateLimiter_redisson限流器_路从脚起的博客-CSDN博客 (opens new window)
  • 【大厂面试题】如何用Redis实现令牌桶限流?_哔哩哔哩_bilibili (opens new window)

# 蘑菇工具箱

仓库:toolbox-redisson (opens new window)

#redis#缓存
上次更新: 2024/01/14 20:23:43
Elasticsearch
NIO 基础

← Elasticsearch NIO 基础→

最近更新
01
JVM 底层
09-13
02
JVM 理论
09-13
03
JVM 应用
09-13
更多文章>
Theme by Vdoing | Copyright © 2022-2024 kinoko | MIT License | 粤ICP备2024165634号
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式