Redis 进阶
# 分布式缓存
单点Redis的问题以及解决方案
# 数据持久化-RDB
RDB全称Redis Database Backup file(Redis数据备份文件),也被叫做Redis数据快照。简单来说就是把内存中的所有数据都记录到磁盘中。当Redis实例故障重启后,从磁盘读取快照文件,恢复数据。快照文件称为RDB文件,默认是保存在当前运行目录。
RDB持久化在四种情况下会执行:
- 执行save命令
- 执行bgsave命令
- Redis停机时
- 触发RDB条件时
相关命令
| 命令 | 描述 | 说明 |
|---|---|---|
| save | 主线程执行RDB | 这个过程中其它所有命令都会被阻塞。只有在数据迁移时可能用到。 |
| bgsave | 异步执行RDB | 开启独立进程完成RDB,主进程可以持续处理用户请求,不受影响。 |
触发RDB条件
Redis内部有触发RDB的机制,可以在redis.conf文件中找到,格式如下:
# 900秒内,如果至少有1个key被修改,则执行bgsave , 如果是save "" 则表示禁用RDB
save 900 1
save 300 10
save 60 10000
# 禁用RDB
save ""
2
3
4
5
6
RDB的其它配置也可以在redis.conf文件中设置:
# 是否压缩 ,建议不开启,压缩也会消耗cpu,磁盘的话不值钱
rdbcompression yes
# RDB文件名称
dbfilename dump.rdb
# 文件保存的路径目录
dir ./
2
3
4
5
6
7
8
RDB原理
Linux系统中,进程是无法直接操作物理内存的,需要借助于页表,这个页表描述了当前进程中的虚拟内存中与物理内存中的映射关系,进程就会基于页表的映射关系到物理内存中操作数据。
bgsave开始时会fork主进程得到子进程,子进程共享主进程的内存数据。完成fork后读取内存数据并写入 RDB 文件。而主进程fork子进程时是阻塞的,若是复制主进程的内存数据的话,则需要花费大量的时间,所以这里的fork指的是页表的复制,仅仅只是复制映射关系,那花费的时间就能大大减少了。
由于是RDB是异步操作, 所以在子进程写RDB的过程当中,主进程就有可能会接收用户的请求去修改内存中的数据,读和写之间就会有冲突,所以fork底层采用了copy-on-write技术:
- 当主进程执行读操作时,访问共享内存(read only);
- 当主进程执行写操作时,则会拷贝一份数据,执行写操作。

也就是说,每次只要有写请求,就会去拷贝一份共享内存中对应的数据,写完后,该数据在主进程页表中的映射关系就会指到该到副本上了,然后当RDB条件满足时,主进程再fork一份最新的页表给子进程进行RDB,那么我们考虑一个极端的情况,当子进程在RDB的时候,用户不断的请求修改数据,以至于把共享内存中的全部数据都修改了一遍,也就意味着所有的数据都copy了一遍,就导致内存翻倍了,假设内存已经占用了16G,翻倍就是32G,所以在使用Redis的时候需要预留一些内存,不能说这个服务器32G就把32G都交给Redis把全部内存都消耗完了,这样RDB时就可能会出现内存溢出。
RDB方式bgsave的基本流程
- fork主进程得到一个子进程,共享内存空间
- 子进程读取内存数据并写入新的RDB文件
- 用心RDB文件替换旧RDB文件
RDB的缺点
- RDB执行间隔时间长,两次RDB之间写入数据有丢失的风险
- fork子进程、压缩(可选)、写出RDB文件都比较耗时
# 数据持久化-AOF
AOF全称为Append Only File(追加文件)。Redis处理的每一个写命令都会记录在AOF文件,可以看做是命令日志文件,当服务停止后,想恢复之前的数据就读取这个AOF文件将指令再重新执行一次就行了。
$3是在记录命令的长度,如set长度就为3
AOF配置
AOF默认是关闭的,需要修改redis.conf配置文件来开启AOF:
# 是否开启AOF功能,默认是no
appendonly yes
# AOF文件的名称
appendfilename "appendonly.aof"
2
3
4
AOF的命令记录的频率也可以通过redis.conf文件来配:
# 表示每执行一次写命令,立即记录到AOF文件
appendfsync always
# 写命令执行完先放入AOF缓冲区,然后表示每隔1秒将缓冲区数据写到AOF文件,是默认方案
appendfsync everysec
# 写命令执行完先放入AOF缓冲区,由操作系统决定何时将缓冲区内容写回磁盘
appendfsync no
2
3
4
5
6
always方式的写入是需要Redis的主进程进行,所以影响性能 everysec方式主进程只负责写入缓冲区,不负责写入磁盘,内存操作所以性能影响较小,但是也伴随着风险,如果在这1秒间隔之间Redis宕机了,那缓冲区的数据也就丢失了,也就是说最多也就丢失这一秒的操作

由于AOF文件记录的是命令,RDB文件记录的只是值,所以AOF文件通常会比RDB文件要大的多,而且AOF会记录对同一个key的多次写操作,但只有最后一次写操作才有意义。通过执行bgrewriteaof命令,可以让AOF文件执行重写功能,用最少的命令达到相同效果,但即便如此AOF文件也会比RDB文件要大。
begrewriteaof是异步执行的
Redis也会在触发阈值时自动去重写AOF文件。阈值也可以在redis.conf中配置:
# AOF文件比上次文件 增长超过多少百分比则触发重写
auto-aof-rewrite-percentage 100
# AOF文件体积最小多大以上才触发重写
auto-aof-rewrite-min-size 64mb
2
3
4
# RDB与AOF对比
RDB和AOF各有自己的优缺点,如果对数据安全性要求较高,在实际开发中往往会结合两者来使用。
当同时启用了RDB和AOF,Redis启动时会以AOF优先来做数据恢复,原因是AOF的数据完整性更高,因此RDB更多是用于备份的作用,将RDB文件拷贝到另一台电脑,或者是另一个机房,达到一个异地的容灾作用。
在Redis的更新计划中是有在计划将两种持久化方案合二为一的
# Redis主从
单节点Redis的并发能力是有上限的,要进一步提高Redis的并发能力,就需要搭建主从集群,实现读写分离。
具体搭建流程:Redis集群.pdf (opens new window)
# 主从同步原理
全量同步
主从第一次建立连接时,会执行全量同步,将master节点的所有数据都拷贝给slave节点,流程:
完整流程描述:
- slave节点请求增量同步
- master节点判断replid,发现不一致,拒绝增量同步
- master将完整内存数据生成RDB,发送RDB到slave
- slave清空本地数据,加载master的RDB
- master将RDB期间的命令记录在repl_baklog,并持续将log中的命令发送给slave
- slave执行接收到的命令,保持与master之间的同步
这里有一个问题,master如何得知salve是第一次来连接呢??
有几个概念,可以作为判断依据:
- Replication Id:简称replid,是数据集的标记,id一致则说明是同一数据集。每一个master都有唯一的replid,slave则会继承master节点的replid
- offset:偏移量,随着记录在repl_baklog中的数据增多而逐渐增大。slave完成同步时也会记录当前同步的offset。如果slave的offset小于master的offset,说明slave数据落后于master,需要更新。
也就是说replid可以看出拷贝的是哪一个master的数据,offset代表拷贝了多少,拷贝的进度
因此slave做数据同步,必须向master声明自己的replication id 和offset,master才可以判断到底需要同步哪些数据。
因为slave原本也是一个master,有自己的replid和offset,当第一次变成slave,与master建立连接时,发送的replid和offset是自己的replid和offset。
master判断发现slave发送来的replid与自己的不一致,说明这是一个全新的slave,就知道要做全量同步了。
master会将自己的replid和offset都发送给这个slave,slave保存这些信息。以后slave的replid就与master一致了。
因此,master判断一个节点是否是第一次同步的依据,就是看replid是否一致。
# 增量同步
全量同步需要先做RDB,然后将RDB文件通过网络传输个slave,成本太高了。因此除了第一次做全量同步,其它大多数时候slave与master都是做增量同步。
什么是增量同步?就是只更新slave与master存在差异的部分数据。如图:
master怎么知道slave与自己的数据差异在哪里呢?这就要说到全量同步时的repl_baklog文件了。
# repl_backlog原理
这个文件是一个固定大小的数组,只不过数组是环形,也就是说角标到达数组末尾后,会再次从0开始读写,这样数组头部的数据就会被覆盖。
repl_baklog中会记录Redis处理过的命令日志及offset,包括master当前的offset,和slave已经拷贝到的offset:
slave与master的offset之间的差异,就是salve需要增量拷贝的数据了。
随着不断有数据写入,master的offset逐渐变大,slave也不断的拷贝,追赶master的offset:
直到数组被填满:
此时,如果有新的数据写入,就会覆盖数组中的旧数据。不过,旧的数据只要是绿色的,说明是已经被同步到slave的数据,即便被覆盖了也没什么影响。因为未同步的仅仅是红色部分。
但是,如果slave出现网络阻塞,或者slave宕机,导致master的offset远远超过了slave的offset: 
如果master继续写入新数据,其offset就会覆盖旧的数据,直到将slave现在的offset也覆盖:
棕色框中的红色部分,就是尚未同步,但是却已经被覆盖的数据。此时如果slave恢复,需要同步,却发现自己的offset都没有了,无法完成增量同步了。只能做全量同步。
**注意:**repl_baklog大小有上限,写满后会覆盖最早的数据。如果slave断开时间过久,导致尚未备份的数据被覆盖,则无法基于log做增量同步,只能再次全量同步。
# 主从同步的优化
主从同步可以保证主从数据的一致性,非常重要。尽可能的减少全量同步,并且优化全量同步的耗时。
可以从以下几个方面来优化Redis主从就集群:
- 在master中配置repl-diskless-sync yes启用无磁盘复制,避免全量同步时的磁盘IO。
全量同步要RDB写入磁盘再通过网络发送给slave,当磁盘比较慢,网络比较快的时候可以使用
- Redis单节点上的内存占用不要太大,减少RDB导致的过多磁盘IO
内容少,自然传的就快
- 适当提高repl_baklog的大小,发现slave宕机时尽快实现故障恢复,尽可能避免全量同步
- 限制一个master上的slave节点数量,如果实在是太多slave,则可以采用主-从-从链式结构,减少master压力
主从从架构图:
简述全量同步和增量同步区别?
- 全量同步:master将完整内存数据生成RDB,发送RDB到slave。后续命令则记录在repl_baklog,逐个发送给slave。
- 增量同步:slave提交自己的offset到master,master获取repl_baklog中从offset之后的命令给slave
什么时候执行全量同步?
- slave节点第一次连接master节点时
- slave节点断开时间太久,repl_baklog中的offset已经被覆盖时
什么时候执行增量同步?
- slave节点断开又恢复,并且在repl_baklog中能找到offset时
# Redis哨兵模式
Redis提供了哨兵(Sentinel)机制来实现主从集群的自动故障恢复。
哨兵的作用如下:
- 监控:Sentinel 会不断检查您的master和slave是否按预期工作
- 自动故障恢复:如果master故障,Sentinel会将一个slave提升为master。当故障实例恢复后也以新的master为主
- 通知:Sentinel充当Redis客户端的服务发现来源,当集群发生故障转移时,会将最新信息推送给Redis的客户端
由Sentinel来告知客户端主从地址
# 哨兵的集群监控原理
Sentinel基于心跳机制监测服务状态,每隔1秒向集群的每个实例发送ping命令:
- 主观下线:如果某sentinel节点发现某实例未在规定时间响应,则认为该实例主观下线。
- 客观下线:若超过指定数量(quorum)的sentinel都认为该实例主观下线,则该实例客观下线。quorum值最好超过Sentinel实例数量的一半。

# 集群故障恢复原理
一旦发现master故障,sentinel需要在salve中选择一个作为新的master,选择依据是这样的:
- 首先会判断slave节点与master节点断开时间长短,如果超过指定值(down-after-milliseconds * 10)则会排除该slave节点
- 然后判断slave节点的slave-priority值,越小优先级越高,如果是0则永不参与选举
不过这个值一般都一样,所以可以不用判断
- 如果slave-prority一样,则判断slave节点的offset值,越大说明数据越新,优先级越高
- 最后是判断slave节点的运行id大小,越小优先级越高。
当选出一个新的master后,该如何实现切换呢?
流程如下:
- sentinel给备选的slave1节点发送slaveof no one命令,让该节点成为master
- sentinel给所有其它slave发送slaveof 192.168.150.101 7002 命令,让这些slave成为新master的从节点,开始从新的master上同步数据。
- 最后,sentinel将故障节点标记为slave,当故障节点恢复后会自动成为新的master的slave节点
sentinel会去修改故障节点的配置文件,使其从属于新的master

Sentinel如何判断一个redis实例是否健康?
- 每隔1秒发送一次ping命令,如果超过一定时间没有相向则认为是主观下线
- 如果大多数sentinel都认为实例主观下线,则判定服务下线
故障转移步骤有哪些?
- 首先选定一个slave作为新的master,执行slaveof no one
- 然后让所有节点都执行slaveof 新master
- 修改故障节点配置,添加slaveof 新master
哨兵集群搭建:Redis集群.pdf (opens new window)
# RedisTemplate配置哨兵集群
在Sentinel集群监管下的Redis主从集群,其节点会因为自动故障转移而发生变化,Redis的客户端必须感知这种变化,及时更新连接信息。Spring的RedisTemplate底层利用lettuce实现了节点的感知和自动切换。
配置Redis地址
然后在配置文件application.yml中指定redis的sentinel相关信息:
spring:
redis:
sentinel:
master: mymaster
nodes:
- 192.168.150.101:27001
- 192.168.150.101:27002
- 192.168.150.101:27003
2
3
4
5
6
7
8
配置主从读写分离
在项目的启动类中,添加一个新的bean:
@Bean
public LettuceClientConfigurationBuilderCustomizer clientConfigurationBuilderCustomizer(){
return clientConfigurationBuilder -> clientConfigurationBuilder.readFrom(ReadFrom.REPLICA_PREFERRED);
}
2
3
4
这个bean中配置的就是读写策略,包括四种:
- MASTER:从主节点读取
- MASTER_PREFERRED:优先从master节点读取,master不可用才读取replica(slave)
- REPLICA:从slave(replica)节点读取
- REPLICA _PREFERRED:优先从slave(replica)节点读取,所有的slave都不可用才读取master
# Redis分片集群
# 分片集群搭建
主从和哨兵可以解决高可用、高并发读的问题。但是依然有两个问题没有解决:
- 海量数据存储问题
- 高并发写的问题
使用分片集群可以解决上述问题,如图:
分片集群特征:
- 集群中有多个master,每个master保存不同数据
- 每个master都可以有多个slave节点
- master之间通过ping监测彼此健康状态
- 客户端请求可以访问集群任意节点,最终都会被转发到正确节点
分片集群搭建:Redis集群.pdf (opens new window)
# 散列插槽
插槽原理
Redis会把每一个master节点映射到0~16383共16384个插槽(hash slot)上,查看集群信息时就能看到:
数据key不是与节点绑定,而是与插槽绑定。redis会根据key的有效部分计算插槽值,分两种情况:
- key中包含"{}",且“{}”中至少包含1个字符,“{}”中的部分是有效部分
- key中不包含“{}”,整个key都是有效部分
例如:key是num,那么就根据num计算,如果是{itcast}num,则根据itcast计算。计算方式是利用CRC16算法得到一个hash值,然后对16384取余,得到的结果就是slot值。
如图,在7001这个节点执行set a 1时,对a做hash运算,对16384取余,得到的结果是15495,因此要存储到103节点。到了7003后,执行get num时,对num做hash运算,对16384取余,得到的结果是2765,因此需要切换到7001节点
通过插槽来查找集群中的数据
总结:
为什么Redis要将数据跟插槽绑定?
- 防止节点宕机时数据跟随节点丢失
- 方便集群的伸缩
Redis如何判断某个key应该在哪个实例?
- 将16384个插槽分配到不同的实例
- 根据key的有效部分计算哈希值,对16384取余
- 余数作为插槽,寻找插槽所在实例即可
如何将同一类数据固定的保存在同一个Redis实例?
- 这一类数据使用相同的有效部分,例如key都以{typeId}为前缀
# 集群伸缩
redis-cli --cluster提供了很多操作集群的命令,可以通过下面方式查看:
比如,添加节点的命令:
实现步骤
创建新的Redis实例
- 创建一个文件夹:
mkdir 7004 - 拷贝配置文件:
cp redis.conf /7004 - 修改配置文件:
sed /s/6379/7004/g 7004/redis.conf - 启动:
redis-server 7004/redis.conf
添加新节点到Redis
- 执行命令:
redis-cli --cluster add-node 192.168.150.101:7004 192.168.150.101:7001 - 通过命令查看集群状态:
redis-cli -p 7001 cluster nodes
如图,7004加入了集群,并且默认是一个master节点:
但是,可以看到7004节点的插槽数量为0,因此没有任何数据可以存储到7004上
# 转移插槽
我们要将num存储到7004节点,因此需要先看看num的插槽是多少:
如上图所示,num的插槽为2765.
我们可以将0~3000的插槽从7001转移到7004,命令格式如下:
具体命令如下:
建立连接:
得到下面的反馈:
询问要移动多少个插槽,我们计划是3000个:
新的问题来了:
那个node来接收这些插槽??
显然是7004,那么7004节点的id是多少呢?
复制这个id,然后拷贝到刚才的控制台后:
这里询问,你的插槽是从哪里移动过来的?
- all:代表全部,也就是三个节点各转移一部分
- 具体的id:目标节点的id
- done:没有了
这里我们要从7001获取,因此填写7001的id:
填完后,点击done,这样插槽转移就准备好了:
确认要转移吗?输入yes:
然后,通过命令查看结果:
可以看到目的达成。
# 故障转移
集群初识状态是这样的:
其中7001、7002、7003都是master,我们计划让7002宕机。
# 自动故障转移
当集群中有一个master宕机会发生什么呢?
直接停止一个redis实例,例如7002:redis-cli -p 7002 shutdown
1)首先是该实例与其它实例失去连接
2)然后是疑似宕机:
3)最后是确定下线,自动提升一个slave为新的master:
4)当7002再次启动,就会变为一个slave节点了:
# 手动故障转移
利用cluster failover命令可以手动让集群中的某个master宕机,切换到执行cluster failover命令的这个slave节点,实现无感知的数据迁移。其流程如下:
这种failover命令可以指定三种模式:
- 缺省:默认的流程,如图1~6歩
- force:省略了对offset的一致性校验
- takeover:直接执行第5歩,忽略数据一致性、忽略master状态和其它master的意见
案例需求:在7002这个slave节点执行手动故障转移,重新夺回master地位
步骤如下:
1)利用redis-cli连接7002这个节点
2)执行cluster failover命令
如图:
效果:
# RedisTemplate访问分片集群
RedisTemplate底层同样基于lettuce实现了分片集群的支持,而使用的步骤与哨兵模式基本一致:
1)引入redis的starter依赖
2)配置分片集群地址
3)配置读写分离
与哨兵模式相比,其中只有分片集群的配置方式略有差异,如下:
spring:
redis:
cluster:
nodes:
- 192.168.150.101:7001
- 192.168.150.101:7002
- 192.168.150.101:7003
- 192.168.150.101:8001
- 192.168.150.101:8002
- 192.168.150.101:8003
2
3
4
5
6
7
8
9
10
# 多级缓存
传统的缓存策略一般是请求到达Tomcat后,先查询Redis,如果未命中则查询数据库,存在以下问题:
- 请求要经过Tomcat处理,Tomcat性能成为整个系统的瓶颈
- Redis缓存失效时,会对数据库产生冲击

多级缓存就是充分利用请求处理的每个环节,分别添加缓存,减轻Tomcat压力,提升服务性能:
**扩展:**在使用缓存时,数据库表应尽可能进行数据分离,比如描述一个商品信息应该将商品数据分开多个表存储,防止其中一个字段更新时导致缓存中一个存放大量数据的key失效。
缓存在日常开发中起到至关重要的作用,由于是存储在内存中,数据的读取速度是非常快的,能大量减少对数据库的访问,减少数据库的压力。我们把缓存分为两类:
分布式缓存,例如Redis:
- 优点:存储容量更大、可靠性更好,可以在集群间共享
- 缺点:访问缓存有网络开销
- 场景:缓存数据量较大、可靠性要求较高、可以在集群间共享
进程本地缓存,例如HashMap、GuavaCache:
- 优点:读取本地内容,没有网络开销,速度更快
- 缺点:存储容量有限、可靠性较低、无法共享
- 场景:性能要求较高,缓存数据量较小
# JVM本地进程缓存
Caffeine是一个基于Java8开发的,提供了近乎最佳命中率的高性能的本地缓存库。目前Spring内部的缓存使用的就是Caffeine。GitHub地址:https://github.com/ben-manes/caffeine (opens new window)
缓存使用的基本API:
@Test
void testBasicOps() {
// 构建cache对象
Cache<String, String> cache = Caffeine.newBuilder().build();
// 存数据
cache.put("gf", "迪丽热巴");
// 取数据
String gf = cache.getIfPresent("gf");
System.out.println("gf = " + gf);
// 取数据,包含两个参数:
// 参数一:缓存的key
// 参数二:Lambda表达式,表达式参数就是缓存的key,方法体是查询数据库的逻辑
// 优先根据key查询JVM缓存,如果未命中,则执行参数二的Lambda表达式
String defaultGF = cache.get("defaultGF", key -> {
// 根据key,也就是第一个参数去数据库查询数据
return "柳岩";
});
System.out.println("defaultGF = " + defaultGF);
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
Caffeine提供了三种缓存驱逐策略:
- 基于容量:设置缓存的数量上限
// 创建缓存对象
Cache<String, String> cache = Caffeine.newBuilder()
.maximumSize(1) // 设置缓存大小上限为 1
.build();
2
3
4
- 基于时间:设置缓存的有效时间
// 创建缓存对象
Cache<String, String> cache = Caffeine.newBuilder()
// 设置缓存有效期为 10 秒,从最后一次写入开始计时
.expireAfterWrite(Duration.ofSeconds(10))
.build();
2
3
4
5
- 基于引用:设置缓存为软引用或弱引用,利用GC来回收缓存数据。性能较差,不建议使用。
注意:在默认情况下,当一个缓存元素过期的时候,Caffeine不会自动立即将其清理和驱逐。而是在一次读或写操作后,或者在空闲时间完成对失效数据的驱逐。
场景示例
@Configuration
public class CaffeineConfig {
@Bean
public Cache<Long, Item> itemCache(){
return Caffeine.newBuilder()
.initialCapacity(100)
.maximumSize(10_000)
.build();
}
@Bean
public Cache<Long, ItemStock> stockCache(){
return Caffeine.newBuilder()
.initialCapacity(100)
.maximumSize(10_000)
.build();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@RestController
@RequestMapping("item")
public class ItemController {
@Autowired
private IItemService itemService;
@Autowired
private IItemStockService stockService;
@Autowired
private Cache<Long, Item> itemCache;
@Autowired
private Cache<Long, ItemStock> stockCache;
// ...其它略
@GetMapping("/{id}")
public Item findById(@PathVariable("id") Long id) {
return itemCache.get(id, key -> itemService.query()
.ne("status", 3).eq("id", key)
.one()
);
}
@GetMapping("/stock/{id}")
public ItemStock findStockById(@PathVariable("id") Long id) {
return stockCache.get(id, key -> stockService.getById(key));
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# Redis缓存预热
Redis缓存会面临冷启动问题:
冷启动:服务刚刚启动时,Redis中并没有缓存,如果所有商品数据都在第一次查询时添加缓存,可能会给数据库带来较大压力。
缓存预热:在实际开发中,我们可以利用大数据统计用户访问的热点数据,在项目启动时将这些热点数据提前查询并保存到Redis中。
我们数据量较少,并且没有数据统计相关功能,目前可以在启动时将所有数据都放入缓存中。
1)利用Docker安装Redis
docker run --name redis -p 6379:6379 -d redis redis-server --appendonly yes
2)在item-service服务中引入Redis依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
2
3
4
3)配置Redis地址
spring:
redis:
host: 192.168.150.101
2
3
4)编写初始化类
缓存预热需要在项目启动时完成,并且必须是拿到RedisTemplate之后。这里我们利用InitializingBean接口来实现,因为InitializingBean可以在对象被Spring创建并且成员变量全部注入后执行。
@Component
public class RedisHandler implements InitializingBean {
@Autowired
private StringRedisTemplate redisTemplate;
@Autowired
private IItemService itemService;
@Autowired
private IItemStockService stockService;
private static final ObjectMapper MAPPER = new ObjectMapper();
@Override
public void afterPropertiesSet() throws Exception {
// 初始化缓存
// 1.查询商品信息
List<Item> itemList = itemService.list();
// 2.放入缓存
for (Item item : itemList) {
// 2.1.item序列化为JSON
String json = MAPPER.writeValueAsString(item);
// 2.2.存入redis
redisTemplate.opsForValue().set("item:id:" + item.getId(), json);
}
// 3.查询商品库存信息
List<ItemStock> stockList = stockService.list();
// 4.放入缓存
for (ItemStock stock : stockList) {
// 2.1.item序列化为JSON
String json = MAPPER.writeValueAsString(stock);
// 2.2.存入redis
redisTemplate.opsForValue().set("item:stock:id:" + stock.getId(), json);
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# 缓存同步
大多数情况下,浏览器查询到的都是缓存数据,如果缓存数据与数据库数据存在较大差异,可能会产生比较严重的后果。所以必须保证数据库数据、缓存数据的一致性,这就是缓存与数据库的同步。
# 数据同步策略
缓存数据同步的常见方式有三种:
设置有效期:给缓存设置有效期,到期后自动删除。再次查询时更新
- 优势:简单、方便
- 缺点:时效性差,缓存过期之前可能不一致
- 场景:更新频率较低,时效性要求低的业务
如商品的描述信息等
同步双写:在修改数据库的同时,直接修改缓存
- 优势:时效性强,缓存与数据库强一致
- 缺点:有代码侵入,耦合度高;
- 场景:对一致性、时效性要求较高的缓存数据
如商品库存信息
**异步通知:**修改数据库时发送事件通知,相关服务监听到通知后修改缓存数据
- 优势:低耦合,可以同时通知多个缓存服务
- 缺点:时效性一般,可能存在中间不一致状态
- 场景:时效性要求一般,有多个服务需要同步
大多数缓存都可以使用异步通知来完成缓存同步
而异步实现又可以基于MQ或者Canal来实现:
1)基于MQ的异步通知:
解读:
- 商品服务完成对数据的修改后,只需要发送一条消息到MQ中。
- 缓存服务监听MQ消息,然后完成对缓存的更新
依然有少量的代码侵入。
2)基于Canal的通知
解读:
- 商品服务完成商品修改后,业务直接结束,没有任何代码侵入
- Canal监听MySQL变化,当发现变化后,立即通知缓存服务
- 缓存服务接收到canal通知,更新缓存
代码零侵入
# Cannl
Canal [kə'næl],译意为水道/管道/沟渠,canal是阿里巴巴旗下的一款开源项目,基于Java开发。基于数据库增量日志解析,提供增量数据订阅&消费。GitHub的地址:https://github.com/alibaba/canal (opens new window)
Canal是基于mysql的主从同步来实现的,MySQL主从同步的原理如下:
- MySQL master 将数据变更写入二进制日志( binary log),其中记录的数据叫做binary log events
- MySQL slave 将 master 的 binary log events拷贝到它的中继日志(relay log)
- MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据
Canal就是把自己伪装成MySQL的一个slave节点,从而监听master的binary log变化。再把得到的变化信息通知给Canal的客户端,进而完成对其它数据库的同步。
Canal安装教程:安装Canal.pdf (opens new window)
Canal提供了各种语言的客户端,当Canal监听到binlog变化时,会通知Canal的客户端。
我们可以利用Canal提供的Java客户端,监听Canal通知消息。当收到变化的消息时,完成对缓存的更新。
这里使用GitHub上的第三方开源的canal-starter客户端。
地址:https://github.com/NormanGyllenhaal/canal-client (opens new window)
与SpringBoot完美整合,自动装配,比官方客户端要简单好用很多。
# 使用步骤
引入依赖:
<dependency>
<groupId>top.javatool</groupId>
<artifactId>canal-spring-boot-starter</artifactId>
<version>1.2.1-RELEASE</version>
</dependency>
2
3
4
5
编写配置:
canal:
destination: heima # canal的集群名字,要与安装canal时设置的名称一致
server: 192.168.150.101:11111 # canal服务地址
2
3
修改Item实体类
通过@Id、@Column、等注解完成Item与数据库表字段的映射:
package com.heima.item.pojo;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import org.springframework.data.annotation.Id;
import org.springframework.data.annotation.Transient;
import javax.persistence.Column;
import java.util.Date;
@Data
@TableName("tb_item")
public class Item {
@TableId(type = IdType.AUTO)
@Id // 标记主键字段
private Long id;//商品id
@Column(name = "name") // 名称不一致时需要添加
private String name;//商品名称
private String title;//商品标题
private Long price;//价格(分)
private String image;//商品图片
private String category;//分类名称
private String brand;//品牌名称
private String spec;//规格
private Integer status;//商品状态 1-正常,2-下架
private Date createTime;//创建时间
private Date updateTime;//更新时间
@TableField(exist = false)
@Transient // 标记不属于表中的字段
private Integer stock;
@TableField(exist = false)
@Transient // 标记不属于表中的字段
private Integer sold;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
编写监听器
通过实现EntryHandler<T>接口编写监听器,监听Canal消息。注意两点:
- 实现类通过
@CanalTable("tb_item")指定监听的表信息 - EntryHandler的泛型是与表对应的实体类
package com.heima.item.canal;
import com.github.benmanes.caffeine.cache.Cache;
import com.heima.item.config.RedisHandler;
import com.heima.item.pojo.Item;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import top.javatool.canal.client.annotation.CanalTable;
import top.javatool.canal.client.handler.EntryHandler;
@CanalTable("tb_item") // 指定要监听的表
@Component
public class ItemHandler implements EntryHandler<Item> { // 泛型指定表关联的实体类
@Autowired
private RedisHandler redisHandler;
@Autowired
private Cache<Long, Item> itemCache;
// 监听增删改的通知
@Override
public void insert(Item item) {
// 写数据到JVM进程缓存
itemCache.put(item.getId(), item);
// 写数据到redis
redisHandler.saveItem(item);
}
@Override
public void update(Item before, Item after) {
// 写数据到JVM进程缓存
itemCache.put(after.getId(), after);
// 写数据到redis
redisHandler.saveItem(after);
}
@Override
public void delete(Item item) {
// 删除数据到JVM进程缓存
itemCache.invalidate(item.getId());
// 删除数据到redis
redisHandler.deleteItemById(item.getId());
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
在这里对Redis的操作都封装到了RedisHandler这个对象中,是我们之前做缓存预热时编写的一个类,内容如下:
package com.heima.item.config;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.heima.item.pojo.Item;
import com.heima.item.pojo.ItemStock;
import com.heima.item.service.IItemService;
import com.heima.item.service.IItemStockService;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import java.util.List;
@Component
public class RedisHandler implements InitializingBean {
@Autowired
private StringRedisTemplate redisTemplate;
@Autowired
private IItemService itemService;
@Autowired
private IItemStockService stockService;
private static final ObjectMapper MAPPER = new ObjectMapper();
@Override
public void afterPropertiesSet() throws Exception {
// 初始化缓存
// 1.查询商品信息
List<Item> itemList = itemService.list();
// 2.放入缓存
for (Item item : itemList) {
// 2.1.item序列化为JSON
String json = MAPPER.writeValueAsString(item);
// 2.2.存入redis
redisTemplate.opsForValue().set("item:id:" + item.getId(), json);
}
// 3.查询商品库存信息
List<ItemStock> stockList = stockService.list();
// 4.放入缓存
for (ItemStock stock : stockList) {
// 2.1.item序列化为JSON
String json = MAPPER.writeValueAsString(stock);
// 2.2.存入redis
redisTemplate.opsForValue().set("item:stock:id:" + stock.getId(), json);
}
}
public void saveItem(Item item) {
try {
String json = MAPPER.writeValueAsString(item);
redisTemplate.opsForValue().set("item:id:" + item.getId(), json);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
public void deleteItemById(Long id) {
redisTemplate.delete("item:id:" + id);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# 多级缓存结构

搭建多级缓存架构,注意修改Nginx的负载均衡策略为基于请求的uri地址
# Redis内存淘汰策略
**内存淘汰:**就是当Redis内存使用达到设置的阈值时,Redis主动挑选部分key删除以释放更多内存的流程。
Redis会在执行任何客户端命令之前检测内存是否够用,调用processCommand()尝试做内存淘汰:
Redis支持8种不同策略来选择要删除的key:
- **no-enviction:**不淘汰任何key,但是内存满时不允许写入新数据,是Redis默认的内存淘汰策略。
- **volatile-ttl:**对设置了TTL的key,比较key的剩余TTL,TTL越小越先被淘汰。
- **allkeys-random:**对全体的key,随机进行淘汰。也就是直接从db->dict中随机挑选。
- **volatile-random:**对设置了TTL的key,随机进行淘汰。也就是从db->expires中随机挑选。
- **allkeys-lru:**对全体的key,基于LRU算法进行淘汰
- **volatile-lru:**对设置了TTL的key,基于LRU算法进行淘汰
- **allkeys-lfu:**对全体key,基于LFU算法进行淘汰
- **volatile-lfu:**对设置了TTL的key,基于LFI算法进行淘汰
LRU算法(Least Recently Used),最少最近使用。用当前时间减去最后一次访问时间,这个值越大则淘汰优先级越高,简单说就是越久没用了就越先淘汰。
LFU算法(Least Frequently Used),最少频率使用。会统计每个key的访问频率,值越小淘汰优先级越高。
# Key设计原则
Redis的Key虽然可以自定义,但最好遵循下面的几个最佳实践约定:
- 遵循基本格式:[业务名称]:[数据名]:[id]
- 长度不超过44字节
- 不包含特殊字符
例如:我们的登录业务,保存用户信息,其key可以设计成如下格式:
这样设计的好处:
- 可读性强
- 避免key冲突
- 方便管理
- 更节省内存:key是string类型,底层编码包含int、embstr和raw三种。embstr在小于44字节使用,采用连续内存空间,内存占用更小。当字节数大于44字节时,会转为raw模式存储,在raw模式下,内存空间不是连续的,而是采用一个指针指向了另外一段内存空间,在这段空间里存储SDS内容,这样空间不连续,访问的时候性能也就会受到影响,还可能会产生内存碎片。
# 数据类型选择
# 例1:Java对象的存储
推荐两种方式:
- string类型,直接存储json字符串
- 优点:实现方便,简单粗暴
- 缺点:数据耦合,需要更新某个字段时只能更新整个key,可能会涉及一段较大的连续内存重新分配的问题,并且string类型本身在内存占用上没有做优化,会携带一些源信息,例如申请的内存大小、字符长度、头类型,这些都会占用空间。
- hash类型,通过kv结构存储字段名和字段值
- 优点:底层使用ziplist,空间占用小,可以灵活访问对象的任意字段
- 缺点:代码实现相对复杂
※ 注意:
这是json对象中存在长字符的情况

这是多字段的情况

所以在存储空间上取最优的话还是需要斟酌一下的。
# 例2:假如有hash类型的key,其中有100万对field和value,field是自增id
这本身是一个BigKey问题,当hash结构entry数量大于500后就会由ziplist编码升级成ht(哈希表)编码,内存压缩的优势就没了,主要思路就是数据分片,那么怎么分片?
推荐方案
拆分为小的hash,将 id / 100 作为key, 将id % 100 作为field,这样每100个元素为一个Hash。
| key | field | value |
|---|---|---|
| key:0 | id:00 | value0 |
| ..... | ..... | |
| id:99 | value99 | |
| key:1 | id:00 | value100 |
| ..... | ..... | |
| id:99 | value199 | |
| .... | ||
| key:9999 | id:00 | value999900 |
| ..... | ..... | |
| id:99 | value999999 |