Redis学习

一、环境准备

  • linux系统,本人使用的是centos8,最小化安装
  • Redis下载

二、安装

  1. 首先通过 tar -zxvg xxxxxRedis 解压,目录随意,本人是在 /usr/local/ 下创建了一个文件夹用来安装使用

  2. 解压过后执行 make 命令编译,执行的时候会检测你的系统是否缺少依赖之类的,如果没问题会进行编译

    图片加载失败

  3. 编译成功后,执行 make test 命令检测一下是否有什么错误

    • 小编这里就出现了错误

      图片加载失败

      这个错误的意思是:如果想要执行 make test 命令需要一个 tcl 8.5 的库,如果想要 test 就要安装这个库

      图片加载失败

  4. 执行 make PREFIX /usr/local/redis install 安装, **PREFIX /usr/local/redis ** 为指定安装目录

    图片加载失败

    安装好之后则会在 /usr/local/redis 下生成 bin 文件夹,内含文件如下

    图片加载失败

    他们分别是:

    1
    2
    3
    4
    5
    6
    redis-benchmark   #redis 性能测试工具
    redis-check-aof #检查aof日志的工具
    redis-check-rdb #检查rdb日志的工具
    redis-cli #redis 连接的客户端
    redis-sentinel #哨兵模式
    redis-server #redis 服务进程

三、配置

  1. 从源代码目录复制 redis.conf 文件到 bin 同级

    图片加载失败

  2. 注释掉 bind 配置,如果不注释的话,默认只有 127.0.0.1 也就是本机可以连接

    图片加载失败

  3. 保护模式设置,关闭保护模式,如果开启的话,redis只能本机访问

    图片加载失败

  4. backlog 连接队列

    图片加载失败

  5. 配置 redis 后台启动

    图片加载失败

  6. 日志文件配置,默认为空,即在那个目录启动,日志文件就在那个目录下生成

    图片加载失败

  7. 数据库个数

    图片加载失败

四、启动与连接

  1. 显示启动 redis-server 服务

    1
    2
    # 启动 bin 目录下的 redis-server,并指定使用我们刚才拷贝过来的 redis.conf 配置文件
    ./bin/redis-server ./redis.conf

    图片加载失败

  2. 隐式启动 redis-server 服务

    修改 redis.conf 配置文件

    图片加载失败

  3. 连接 redis-cli 客户端,并测试

    1
    2
    3
    4
    5
    6
    7
    8
    # 启动客户端
    ./bin.redis-cli

    # 新增key-velue, 存储字符串 双引号 可有可无
    set word "hello world"

    # 根据key查询
    get word

    图片加载失败

  4. 关闭

    • 显示启动的 redis-server 直接按 ctrl + c 即可

    • 隐式启动的 redis-server 需要查询到进程后,根据进程ID杀死进程即可

      图片加载失败

    • 关闭 redis-cli 输入 exit 即可

五、操作命令

在操作之前我们先要知道:

  1. redis 中都有哪些数据类型:string、list、set、zset(order set)、hash
  2. redis 中共有16个数据库(空间),使用select 0 切换,默认使用 0号库

1、基础操作

1)新增

  1. 字符串(string)

    1
    2
    # set key value
    set site www.baidu.com
  2. 追加

    1
    2
    # 向指定的 key 追加值,如果key不存在则创建并返回value长度,如果存在则向value后追加,并返回追加后的value长度
    append key value

2)删除

  1. 字符串(string)

    1
    2
    3
    4
    5
    # 单个 del key
    del site

    #批量删除
    del sit age

3)修改

  1. 字符串(string)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    # 修改值 set key value
    set site www.baidu.com

    #修改 key 名称,如果修改后的key name 存在,则会覆盖修改后的key name 的值
    rename age agee

    #修改 key 名称,检查修改后的key name是否存在,如果不存在返回1,表示成功,否则返回0,修改不成功
    # nx: not exists 是否存在
    renamenx age site

4)查询

  1. 字符串(string)

    1
    2
    3
    4
    5
    # get key
    get site

    # 查询并修改
    getset site test

5)各种查询

  1. * 的使用

    1
    2
    3
    4
    5
    # 查询所有key
    keys *

    # 模糊匹配
    keys si*
  2. [] 的使用

    1
    2
    # 比较精确的匹配,匹配中括号中的字符
    keys sit[ey]
  3. 的使用

    1
    2
    # ? 匹配单个字符
    keys si?e
  4. 随机返回一个key

    1
    randomkey
  5. 查看 key 的类型:

    1
    2
    # type key
    type site
  6. 查看某个 key 是否存在:

    1
    2
    # 存在返回 1,不存在返回 0
    exists site
  7. 获取 value 的长度

    1
    strlen key

6)key 移动

1
2
# 将 key 移动至1号库
move site 1

7 )key 递增/减

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 设置整型 key,每次递增 1
incr key

# 设置整型 key,每次递减 1
decr key

# 设置整型 key,递增10,即步长
incrby key 10

# 浮点数类型 递增,没有相应的递减
incrbyfloat key 1.1

#设置整型 key,递减10,即步长
decrby key 10

8)key 的生命周期

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 查询key的有效期
# 已过期/不过期的key 返回-1,Redis2.8后不存在的key 返回-2
# 返回 秒
ttl site
# 返回 毫秒
pttl site

# 设置key的生命周期
#
expire site 10
# 毫秒
pexpire site 10000

# 设置key永久有效
persist key

9)切换数据库,默认有16个

1
select 0

10)清空数据库

1
flushdb

2、各种数据类型的操作

2-1、string(字符串)类型

1)新增

1
2
3
4
5
6
7
8
9
10
11
12
# ex:设置生命周期秒数
# px:设置生命周期毫秒数
# 生命周期设置ex/px同时写的时候以后面的有效期为准
# nx:表示 key 不存在时,执行操作
# xx:表示 key 存在时,执行操作
set key value [ex 秒数][px 毫秒数] [nx]/[xx]

# 20 秒过期,value为10
set age 20 10

# 一次性新增多个键值
mset key value key value

2)查询

1
2
3
4
get key

# 获取多个key
mget key1 key2 key3

3)删除

1
2
3
4
5
# 单个 del key
del site

#批量删除
del sit age

4)偏移量

1
2
3
4
5
# 偏移 从下标位置向后替换
setrange key index value

# start,stop可以为负数,则是从value末尾开始,
getrange key start stop

图片加载失败

2-2、list(列表)类型

1)新增

1
2
3
4
5
6
7
8
9
10
11
12
# 从左侧推入
lpush list a b c d

# 从右侧推入
rpush list a b c d

# 插入值,在列表中找到某个值,并在其前或后插入值,如果没有找到则会返回 -1
# 如果找到了多个值,则会在第一个找到的值前后插入值。
linsert key after|before search value
eg:
# 在key为num的列表中找到3,并在3的前面插入2
linsert num before 3 2

2)查询

1
2
3
4
5
6
7
8
9
# 范围取值
lrange list 0 1
lrange list 0 -1

# 根据下标获取元素
lindex list index

# 获取列表长度
llen key

3) 删除(弹出)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# 删除整个key
del key

# 弹出元素
lpop list
rpop list

# 删除元素
# count 删除几个,正数代表从左向右删除,负数代表从右向左删除
# value 根据值删除
lrem key count value
eg:
lrem list 1 value
lrem list -1 value


# 等待弹出
# 如果key中没有元素,则等待有值了之后弹出
# timeout 为0 则一直等待
# 适用场景:长轮询,在线聊天
brpop/blpop key timeout

4)剪切

1
2
# 剪切后key的值就变为剪切的值
ltrim key start stop

5)弹出并插入

1
2
3
4
5
6
7
8
9
10
# 在第一个列表的右侧弹出一个元素,并插入到第二个列表的左侧,成功会返回弹出插入的值
# 原子性操作

# 适用场景:
# 两个列表: task[a,b,c,d] bak[]
# 执行任务时可以使用此命令:task[a,b,c] bak[d],执行命令后会返回弹出并插入的值,也就是d
# 这个时候我们就知道该执行任务:d
# 如果执行成功则在bak中,pop掉d
# 如果执行失败,或者没有执行,那么bak中的d还在。下次再执行任务的时候去bak中拿就好了。
rpoplpush key1 key2

2-3、set(集合)类型,无序

1)新增

1
sadd key value1 value2 ...

2)查询

1
2
3
4
5
6
7
8
9
10
11
# 随机查询出一个元素,或指定count个元素
srandmember key count

# 查询集合所有元素
smembers key

# 判断value是否包含在集合中,是返回1,否则返回0
sismember key value

# 查询集合共有多少个元素
scard key

3)删除(弹出)

1
2
3
4
5
6
# 删除元素(根据值删除)
srem key value1 value2

# 弹出
# 随机删除一个元素并返回pop出的值,count可以指定随机删除多少个
spop key count

4)更多

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 删除指定集合重元素,并将删除的元素添加到另一个集合
smove sourcr target value

# 交集
sinter key1 key2 key3
# 查询出交集并存储在一个key(targer)中
sinterstore target key1 key2 key3

# 并集
sunion key1 key2 key3


# 差集,key1 与 key2 key3 的差集,返回key1中存在,其他key中不存在的
sdiff key1 key2 key3

2-4、Hash(哈希)类型

1)新增

1
2
3
4
5
6
7
8
9
10
11
# 单个,如果field已经存在则会覆盖原有的value
hset key field value
eg:
hset user id 1
hset user name zhangsan

# 如果key中field不存在,添加成功,否则添加失败
hsetnx key field value

# 批量为key设置hash
hmset key field1 value1 field2 value2

2)查询

1
2
3
4
5
6
7
8
9
10
11
# 根据key的属性查询value
hget key field

# 查询key中是否存在某个field
hexists key field

# 查询key中所有的field
hkeys key

# 查询key中所有value
hvals key

3)删除

1
del key

4)更多

1
2
3
4
5
6
# 为key中field对应的value增加量操作,针对数字的value
hincrby key field increment
eg:
hset user id 1
hincrby user id 12
result:13

2-5、zset(order set) 有序集合

1)新增

1
2
# 根据order排序
zadd key order1 value1 order2 value2

2)查询

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# 把集合排序后,返回范围内数据[start,stop]
# 注意这里是先进行“排序”,在拿 start - stop(下标/排名)的值
# 默认升序
zrange key start stop
# 降序
zrevrange key start stop

# 这里也是和集合升序排序后取值,这里的min,max指的是实际排序的值
# limit 分页
# offset 偏移量,从那个开始算
# N 查多少个
# withscores 实际分数
zrangebyscorce key min max withscores limit offset N
eg:
zadd test 1 a 3 c 2 b 4 d 6 f 9 i 8 h
zrangebyscore test 4 8
# result:d f h
zrangebyscore test 4 8 withscores
# result:d 4 f 6 h 8
zrange test 4 8
# result:f h i
zrangebyscore test 1 8 limit 1 2
# result:b c


# 先对集合生序排序,再查询value 在key中拍第几个,从0开始。
zrank key value
# 降序
zrevrank key value

3)删除

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 根据元素删除
zrem key value

# 按排名删除(下标)
zremrangebyrank test 0 1
eg:
zadd test 1 a 3 c 2 b 4 d 6 f 9 i 8 h
# 删除的就是: a b

# 按分数(排序实际的值)删除
zremrangebyscore test 2 8
eg:
zadd test 1 a 3 c 2 b 4 d 6 f 9 i 8 h
# 删除的就是: b c d f h

4) 更多

1
2
3
4
5
# 统计集合内一共有多少个
zcard key

# 统计集合内指定范围内的
zcount ket start stop

六、发布和订阅

1、什么是发布和订阅

  1. Redis 发布订阅(pub/sub)是一种消息通信模式:发送者(pub)发送消息,订阅者(sub)订阅消息
  2. Redis 客户端可以订阅任意数量的频道

图片加载失败

2、发布和订阅命令行示例

1
2
3
4
5
6
7
8
9
10
11
12
# 分别打开三个四个窗体
# 前三个窗口用于订阅消息,最后一个用于发布消息
# 前三个窗口分别执行以下命令
subscribe a1
subscribe a2
subscribe a1 a2

# 最后一个窗口执行以下命令
publish a1 hello
publish a2 world

# 最后查看前三个窗体分别接收到了那个消息

订阅消息:

图片加载失败

图片加载失败

图片加载失败

发布消息:

图片加载失败

七、新数据类型

1、Bitmaps(位操作)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# 设置 key 偏移量的位置 是1/0
setbit key offset value

# 获取 key 偏移量的位置的 值
getbit key offset

------------------------------------------------------------------

# 获取 key 开始-结束 有多少个1
# 这里的 start,top指的是字节组
# 即 01000000 8个bit 为一个字节组
#
bitcount key start top
eg:
setbit a 1000 0
setbit a 0 1
setbit a 2 1
# a中的二进制,也就是 位 是: 10100000
# 查询所有,即返回a中bit为1的个数
bitcount a

setbit a 8 1
# a中的二进制,也就是 位 是:10100000 10000000

# 这里 start:0 指的就是:10100000
# 这里 end:1 指的就是: 10000000
# 这个返回 2
bitcount a 0 0
# 这个返回 3
bitcount a 0 1


------------------------------------------------------------------

# 多个key 按位操作
# operation 可以是: AND OR NOT XOR
bitop operation destkey key1 [key2...]
eg:
setbit lower 2 1
set char Q
# char 和 lower 做or才操作,比较后 char的第二位变成了1 ,结果放在cahr上
# 第一个 char 表示放在那个key上
bitop and lower char char lower

2、HyperLogLog(基数统计)

什么是基数:假如现在有这么一组数据{1,3,5,7,5,7,8}, 那么他的基数就是{1,3,5,7,8}

1)新增

1
2
3
4
pfadd key elements
eg:
pfadd program java php java
result: java php

2)查询

1
2
3
# 统计基数个数
pfcount program
result: 2

3)更多

1
2
3
4
5
6
# 合并,将多个 key 的元素合并到一个目标 key 中
pfmerge target keys
eg:
pfadd k1 a b
pfmerge k2 program k1
result: 4

3、Geospatial(地理信息的缩写)

注:该类型就是元素的 2 维坐标。在地图上就是经纬度。redis 基于该类型,提供了经纬度设置,查询,范围查询,距离查询,经纬度 Hash 等操作.

1)新增

1
2
3
4
5
6
7
#  经度 纬度 名称
geoadd key longitude latitude member
eg:
geoadd china:city 121.47 31.23 shanghai 106.50 29.53 chongqing

# 两极无法直接添加,一般会下载城市数据,通过 java 程序一次性导入。
# 有效的经度从 -180 度到 180 度。有效的纬度从 -85.05112878 度到 85.05112878 度。

2)查询

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# 获得指定地区的坐标值
geopos key member
eg:
geopos china:city shanghai
result:
1) 121.47000163793563843 # 经度
2) 31.22999903975783553 # 纬度


# 获取两位位置的直线距离
# m 米 默认值
# km 千米
# mi 英里
# ft 英尺
geodist key member1 member2 [m|km|ft|mi]
eg:
geodist china:city shanghai chongqing

# 查询给定经纬度为中心,半径内的元素
georadius key longitude latitude radius [m|km|ft|mi]
eg:
# 查询 key 中 经纬度为中心 1000 公里内的元素
georadius china:city 110 30 1000 km

八、Jedis 操作

1、Jedis 连接测试

  1. 创建一个 Maven 工程

  2. pom 文件中增加依赖

    1
    2
    3
    4
    5
    <dependency>
    <groupId>redis.clients</groupId>
    <artifactId>jedis</artifactId>
    <version>3.6.3</version>
    </dependency>
  3. 创建一个类并测试

    图片加载失败

  4. 启动 main 方法

    图片加载失败

注意:这里可能会发生报错,可能是由于虚拟机防火墙为关闭导致的

图片加载失败

关闭防火墙:

  1. 执行 systemctl status firewalld 查看防火墙状态

    图片加载失败

  2. 执行systemctl stop firewalld 关闭防火墙,在查看防火墙状态

    图片加载失败

  3. 禁用防火墙:systemctl disable firewalld

2、测试相关数据类型

注:因为和命令行操作相同,所以就不所有命令都进行测试了

1)String 类型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
/**
* 字符串类型测试
*/
@Test
public void test1() {
Jedis jedis = new Jedis("192.168.111.144", 6379);
// 单个 key-value 设置
jedis.set("name", "zhangsan");
String name = jedis.get("name");
System.out.println(name);

// 多个 key-value 设置
jedis.mset("age", "12", "sex", "man");
List<String> keys = jedis.mget("name", "age", "sex");
System.out.println(keys);

// 获取所有 key
Set<String> set = jedis.keys("*");
for (String str : set) {
System.out.println("key:" + str);
String value = jedis.get(str);
System.out.println("value:" + value);
}

// 判断 key 是否存在
System.out.println(jedis.exists("name"));
// 设置 key 过期时间,这里 expire 方法过期了,可以使用 JedisCommands 实现
jedis.expire("test_key", 30);
// 查看 key 的过期时间
System.out.println(jedis.ttl("test_key"));

jedis.close();
}

2)List 类型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/**
* List 类型测试
*/
@Test
public void test_list() {
Jedis jedis = new Jedis("192.168.111.144", 6379);
jedis.flushDB();
jedis.lpush("list", "a", "b");
jedis.rpush("list", "c");

String val1 = jedis.lindex("list", 1);
System.out.println(val1);

List<String> list = jedis.lrange("list", 0, -1);
for (String str : list) {
System.out.println(str);
}

jedis.close();
}

3)Set 类型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
/**
* set 类型测试
*/
@Test
public void test_set() {
Jedis jedis = new Jedis("192.168.111.144", 6379);
jedis.flushDB();
jedis.sadd("set", "set1", "set2", "set3");

// 随机查询出一个元素,或指定count个元素
List<String> values = jedis.srandmember("set", 2);
for (String str : values) {
System.out.println(str);
}

// 查询集合所有元素
Set<String> all = jedis.smembers("set");
for (String str : all) {
System.out.println(str);
}

// 查询集合共有多少个元素
Long count = jedis.scard("set");
System.out.println(count);

jedis.close();
}

4)Hash 类型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
/**
* Hash 类型测试
*/
@Test
public void test_hash() {
Jedis jedis = new Jedis("192.168.111.144", 6379);
jedis.flushDB();
Map<String, String> map = new HashMap<>();
map.put("name", "zhangsan");
map.put("age", "18");
map.put("sex", "man");
jedis.hset("user", map);

// 如果 key 中 field 不存在,添加成功,否则添加失败
jedis.hsetnx("user", "city", "beijing");

// 根据 key 的属性查询 value
String name = jedis.hget("user", "name");
System.out.println(name);

// 查询 key 中是否存在某个 field
Boolean hexists = jedis.hexists("user", "address");
System.out.println(hexists);

// 查询 key 中所有的 field
Set<String> user = jedis.hkeys("user");
for (String str : user) {
System.out.println(str);
}

// 查询 key 中所有 value
List<String> value = jedis.hvals("user");
for (String str : value) {
System.out.println(str);
}

jedis.close();
}

5)ZSet 类型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
/**
* zset 类型
*/
@Test
public void test_zset() {
Jedis jedis = new Jedis("192.168.111.144", 6379);
jedis.flushDB();

Map<String, Double> map = new HashMap<>();
map.put("age", (double)2);
jedis.zadd("user", (double) 1, "name");
jedis.zadd("user", map);

// 升序排序后查询
Set<String> user = jedis.zrange("user", 0, -1);
for (String str : user) {
System.out.println(str);
}

// 降序排序后查询
Set<String> user1 = jedis.zrevrange("user", 0, -1);
for (String str : user1) {
System.out.println(str);
}

jedis.close();
}

3、模拟发送验证码

要求:

  1. 输入手机号,点击发送后随机生成 6 位数字码,两分钟有效。
  2. 输入验证码,点击验证,返回成功或失败。
  3. 每个手机号每天只能输入 3 次。

1)创建一个生成验证码的方法

1
2
3
4
5
6
7
8
public String getCode() {
Random random = new Random();
StringBuilder code = new StringBuilder();
for (int i = 0; i < 6; i++) {
code.append(random.nextInt(10));
}
return code.toString();
}

2)创建生成验证码的方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
@Override
public Boolean createCodeByPhoneNum(String phoneNum) {

boolean result = true;

Jedis jedis = new Jedis("192.168.111.144", 6379);

// 验证码发送次数
String sendCountKey = "sendCode" + phoneNum + ":count";
// 验证码 key
String codeKey = "code" + phoneNum + ":code";
// 验证码
String code = getCode();
// 是否可以存入验证码
boolean saveCode = true;

// 1、获取手机号对应发送的次数
String count = jedis.get(sendCountKey);
if (count == null) {
// 没有发送过,设置发送次数为1
jedis.setex(sendCountKey, 24 * 60 * 60, "1");
} else if (Integer.parseInt(count) < 3) {
// 发送次数 +1
jedis.incr(sendCountKey);
} else if (Integer.parseInt(count) > 2) {
// 已经发送三次了
saveCode = false;
result = false;
//jedis.close();
}

// 2、保存验证码
if (saveCode) {
jedis.setex(codeKey, 60 * 2, code);
}

jedis.close();
return result;
}

3)创建校验验证码的方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Override
public Boolean verifyCodeByPhoneNum(String phoneNum, String code) {

Jedis jedis = new Jedis("192.168.111.144", 6379);

// 验证码 key
String codeKey = "code" + phoneNum + ":code";

// 获取验证码
String redisCode = jedis.get(codeKey);

jedis.close();
if (redisCode == null) {
return false;
} else if (redisCode.equals(code)) {
return true;
} else {
return false;
}
}

九、SpringBoot 整合 Redis

1、在 pom 文件中引入相关依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
<!--redis-->
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>3.6.3</version>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
<version>2.5.3</version>
</dependency>

<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
<version>2.8.0</version>
</dependency>

2、application.yml 配置 redis 相关配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
spring:
redis:
# Redis 服务器地址
host: 192.168.111.144
# 端口
port: 6379
# 数据库索引
database: 0
# 连接超时时间(毫秒)
# timeout: 1800000
jedis:
pool:
# 连接池最大连接数(负数表示没有限制)
max-active:
# 最大阻塞等待时间(负数表示没有限制)
max-wait: -1
# 连接池中最大空闲连接
max-idle: 5
# 连接池值最小空闲连接
min-idle: 0

3、配置 redis 配置类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
@EnableCaching
@Configuration
public class RedisConfig {
@Bean
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) throws UnknownHostException {
// 我们为了自己开发方便,一般直接使用 <String, Object>
// 两个泛型都是 Object, Object 的类型,我们后使用需要强制转换 <String, Object>
RedisTemplate<String, Object> template = new RedisTemplate<>();
template.setConnectionFactory(redisConnectionFactory);
setRedisTemplate(template);
template.afterPropertiesSet();
return template;
}


private void setRedisTemplate(RedisTemplate<String, Object> template) {
// Json序列化配置
Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<>(Object.class);

ObjectMapper objectMapper = new ObjectMapper();
objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
// 解决jackson2无法反序列化LocalDateTime的问题
objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
objectMapper.registerModule(new JavaTimeModule());

// 该方法过时
// om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
// 上面 enableDefaultTyping 方法过时,使用 activateDefaultTyping
objectMapper.activateDefaultTyping(LaissezFaireSubTypeValidator.instance, ObjectMapper.DefaultTyping.NON_FINAL, JsonTypeInfo.As.PROPERTY);
jackson2JsonRedisSerializer.setObjectMapper(objectMapper);

// String 的序列化
StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
// key采用String的序列化方式
template.setKeySerializer(stringRedisSerializer);
// hash的key也采用String的序列化方式
template.setHashKeySerializer(stringRedisSerializer);
// value序列化方式采用jackson
template.setValueSerializer(jackson2JsonRedisSerializer);
// hash的value序列化方式采用jackson
template.setHashValueSerializer(jackson2JsonRedisSerializer);
// 设置值(value)的序列化采用FastJsonRedisSerializer。
// 设置键(key)的序列化采用StringRedisSerializer。
template.afterPropertiesSet();
}

}

4、redis 工具类

使用的时候注入即可

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
package com.sy.jedis_redisdemo.utils;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;

/**
* @ClassName RedisUtil
* @Description TODO
* @Author sy
* @Date 2021/8/15 22:20
* @Version 1.0
**/
@Component
public class RedisUtil {


@Autowired
private RedisTemplate<String, Object> redisTemplate;


// =============================common============================


/**
* sy
* 监视 key
* @param key 键
*/
public boolean watch(String key) {
try {
if (key != null && key != "") {
redisTemplate.watch(key);
}
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}

/**
* sy
* 监视 key
* @param keys 键
*/
public boolean watch(Collection<String> keys) {
try {
if (keys != null && keys.size()>0) {
redisTemplate.watch(keys);
}
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}

/**
* sy
* 解除监视 key
*/
public boolean unwatch() {
try {
redisTemplate.unwatch();
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}


/**
* 指定缓存失效时间
*
* @param key 键
* @param time 时间(秒)
*/
public boolean expire(String key, long time) {
try {
if (time > 0) {
redisTemplate.expire(key, time, TimeUnit.SECONDS);
}
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}


/**
* 根据key 获取过期时间
*
* @param key 键 不能为null
* @return 时间(秒) 返回0代表为永久有效
*/
public long getExpire(String key) {
return redisTemplate.getExpire(key, TimeUnit.SECONDS);
}


/**
* 判断key是否存在
*
* @param key 键
* @return true 存在 false不存在
*/
public boolean hasKey(String key) {
try {
return redisTemplate.hasKey(key);
} catch (Exception e) {
e.printStackTrace();
return false;
}
}


/**
* 删除缓存
*
* @param key 可以传一个值 或多个
*/
@SuppressWarnings("unchecked")
public void del(String... key) {
if (key != null && key.length > 0) {
if (key.length == 1) {
redisTemplate.delete(key[0]);
} else {
redisTemplate.delete((Collection<String>) CollectionUtils.arrayToList(key));
}
}
}


// ============================String=============================


/**
* 普通缓存获取
*
* @param key 键
* @return
*/
public Object get(String key) {
return key == null ? null : redisTemplate.opsForValue().get(key);
}


/**
* 普通缓存放入
*
* @param key 键
* @param value 值
* @return true成功 false失败
*/


public boolean set(String key, Object value) {
try {
redisTemplate.opsForValue().set(key, value);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}


/**
* 普通缓存放入并设置时间
*
* @param key 键
* @param value 值
* @param time 时间(秒) time要大于0 如果time小于等于0 将设置无限期
* @return true成功 false 失败
*/


public boolean set(String key, Object value, long time) {
try {
if (time > 0) {
redisTemplate.opsForValue().set(key, value, time, TimeUnit.SECONDS);
} else {
set(key, value);
}
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}


/**
* 递增
*
* @param key 键
* @param delta 要增加几(大于0)
*/
public long incr(String key, long delta) {
if (delta < 0) {
throw new RuntimeException("递增因子必须大于0");
}
return redisTemplate.opsForValue().increment(key, delta);
}


/**
* 递减
*
* @param key 键
* @param delta 要减少几(小于0)
*/
public long decr(String key, long delta) {
if (delta < 0) {
throw new RuntimeException("递减因子必须大于0");
}
return redisTemplate.opsForValue().increment(key, -delta);
}


// ================================Map=================================


/**
* HashGet
*
* @param key 键 不能为null
* @param item 项 不能为null
*/
public Object hget(String key, String item) {
return redisTemplate.opsForHash().get(key, item);
}


/**
* 获取hashKey对应的所有键值
*
* @param key 键
* @return 对应的多个键值
*/
public Map<Object, Object> hmget(String key) {
return redisTemplate.opsForHash().entries(key);
}


/**
* HashSet
*
* @param key 键
* @param map 对应多个键值
*/
public boolean hmset(String key, Map<String, Object> map) {
try {
redisTemplate.opsForHash().putAll(key, map);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}


/**
* HashSet 并设置时间
*
* @param key 键
* @param map 对应多个键值
* @param time 时间(秒)
* @return true成功 false失败
*/
public boolean hmset(String key, Map<String, Object> map, long time) {
try {
redisTemplate.opsForHash().putAll(key, map);
if (time > 0) {
expire(key, time);
}
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}


/**
* 向一张hash表中放入数据,如果不存在将创建
*
* @param key 键
* @param item 项
* @param value 值
* @return true 成功 false失败
*/
public boolean hset(String key, String item, Object value) {
try {
redisTemplate.opsForHash().put(key, item, value);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}


/**
* 向一张hash表中放入数据,如果不存在将创建
*
* @param key 键
* @param item 项
* @param value 值
* @param time 时间(秒) 注意:如果已存在的hash表有时间,这里将会替换原有的时间
* @return true 成功 false失败
*/
public boolean hset(String key, String item, Object value, long time) {
try {
redisTemplate.opsForHash().put(key, item, value);
if (time > 0) {
expire(key, time);
}
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}


/**
* 删除hash表中的值
*
* @param key 键 不能为null
* @param item 项 可以使多个 不能为null
*/
public void hdel(String key, Object... item) {
redisTemplate.opsForHash().delete(key, item);
}


/**
* 判断hash表中是否有该项的值
*
* @param key 键 不能为null
* @param item 项 不能为null
* @return true 存在 false不存在
*/
public boolean hHasKey(String key, String item) {
return redisTemplate.opsForHash().hasKey(key, item);
}


/**
* hash递增 如果不存在,就会创建一个 并把新增后的值返回
*
* @param key 键
* @param item 项
* @param by 要增加几(大于0)
*/
public double hincr(String key, String item, double by) {
return redisTemplate.opsForHash().increment(key, item, by);
}


/**
* hash递减
*
* @param key 键
* @param item 项
* @param by 要减少记(小于0)
*/
public double hdecr(String key, String item, double by) {
return redisTemplate.opsForHash().increment(key, item, -by);
}


// ============================set=============================


/**
* 根据key获取Set中的所有值
*
* @param key 键
*/
public Set<Object> sGet(String key) {
try {
return redisTemplate.opsForSet().members(key);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}


/**
* 根据value从一个set中查询,是否存在
*
* @param key 键
* @param value 值
* @return true 存在 false不存在
*/
public boolean sHasKey(String key, Object value) {
try {
return redisTemplate.opsForSet().isMember(key, value);
} catch (Exception e) {
e.printStackTrace();
return false;
}
}


/**
* 将数据放入set缓存
*
* @param key 键
* @param values 值 可以是多个
* @return 成功个数
*/
public long sSet(String key, Object... values) {
try {
return redisTemplate.opsForSet().add(key, values);
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}


/**
* 将set数据放入缓存
*
* @param key 键
* @param time 时间(秒)
* @param values 值 可以是多个
* @return 成功个数
*/
public long sSetAndTime(String key, long time, Object... values) {
try {
Long count = redisTemplate.opsForSet().add(key, values);
if (time > 0) {
expire(key, time);
}
return count;
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}


/**
* 获取set缓存的长度
*
* @param key 键
*/
public long sGetSetSize(String key) {
try {
return redisTemplate.opsForSet().size(key);
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}


/**
* 移除值为value的
*
* @param key 键
* @param values 值 可以是多个
* @return 移除的个数
*/


public long setRemove(String key, Object... values) {
try {
Long count = redisTemplate.opsForSet().remove(key, values);
return count;
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}


// ===============================list=================================


/**
* 获取list缓存的内容
*
* @param key 键
* @param start 开始
* @param end 结束 0 到 -1代表所有值
*/
public List<Object> lGet(String key, long start, long end) {
try {
return redisTemplate.opsForList().range(key, start, end);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}


/**
* 获取list缓存的长度
*
* @param key 键
*/
public long lGetListSize(String key) {
try {
return redisTemplate.opsForList().size(key);
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}


/**
* 通过索引 获取list中的值
*
* @param key 键
* @param index 索引 index>=0时, 0 表头,1 第二个元素,依次类推;index<0时,-1,表尾,-2倒数第二个元素,依次类推
*/
public Object lGetIndex(String key, long index) {
try {
return redisTemplate.opsForList().index(key, index);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}


/**
* 将list放入缓存
*
* @param key 键
* @param value 值
*/
public boolean lSet(String key, Object value) {
try {
redisTemplate.opsForList().rightPush(key, value);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}


/**
* 将list放入缓存
*
* @param key 键
* @param value 值
* @param time 时间(秒)
*/
public boolean lSet(String key, Object value, long time) {
try {
redisTemplate.opsForList().rightPush(key, value);
if (time > 0) {
expire(key, time);
}
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}


/**
* 将list放入缓存
*
* @param key 键
* @param value 值
* @return
*/
public boolean lSet(String key, List<Object> value) {
try {
redisTemplate.opsForList().rightPushAll(key, value);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}


/**
* 将list放入缓存
*
* @param key 键
* @param value 值
* @param time 时间(秒)
* @return
*/
public boolean lSet(String key, List<Object> value, long time) {
try {
redisTemplate.opsForList().rightPushAll(key, value);
if (time > 0) {
expire(key, time);
}
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}


/**
* 根据索引修改list中的某条数据
*
* @param key 键
* @param index 索引
* @param value 值
* @return
*/
public boolean lUpdateIndex(String key, long index, Object value) {
try {
redisTemplate.opsForList().set(key, index, value);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}


/**
* 移除N个值为value
*
* @param key 键
* @param count 移除多少个
* @param value 值
* @return 移除的个数
*/
public long lRemove(String key, long count, Object value) {
try {
Long remove = redisTemplate.opsForList().remove(key, count, value);
return remove;
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}


}

5、编写 controller 测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private final RedisUtil redisUtil;
/**
* sy
* 验证redis整合
* @return ResultBody
*/
@ApiOperation(value = "验证redis整合")
@GetMapping(value = "/verifyRedis")
public ResultBody verifyRedis() {

redisUtil.set("test", "ceshi");
String value = (String) redisUtil.get("test");

System.out.println(value);

return ResultBody.success();
}

十、Redis 事务和锁机制

Redis 事务是一个单独的隔离操作:事务中的所有命令都会序列化、按顺序的执行。事务在执行的过程中,不会被其他客户端发送来的命令请求所打断。

Redis 事务的主要作用就是串联多个命令防止别的命令插队。

1、基本操作

1)、Multi、Exec、Discard

Multi:开始事务,进行组队阶段

Exec:执行阶段,按照顺序执行

Discard:放弃执行

从输入 Multi 命令开始,输入的命令都会依次进入命令队列中,但不会执行,直到输入 Exec 后,Redis 都会将之前的命令队列中的命令依次执行。

组队的过程中可以通过 Discard 来放弃组队

图片加载失败

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 开启事务
multi
set key1 value1
set key2 value2
# 执行
exec
keys *

# 开启事务
multi
set a1 v1
set a2 v2
# 放弃事务
discard
keys *

2)事务的错误处理

事务可能会出现的错误有两种:

  1. 组队中某个命令出现了报告错误,执行时整个的所有队列都会被取消

    图片加载失败

    示例:

    图片加载失败

  2. 执行阶段某个命令出现了错误,则只有报错的命令不会执行,而其他命令正常执行。

    图片加载失败

2、事务冲突的问题

场景:有很多人有你的账户,同事去参加双十一抢购

3、例子

  • 一个请求想给金额减8000
  • 一个请求想给金额减5000
  • 一个请求想给金额减1000

图片加载失败

1)解决方案

1. 悲观锁

什么是悲观锁:顾名思义,就是很悲观,每次去拿数据的时候都认为别人会修改,所以每次在拿到数据的时候都会上锁,这样别人想去拿这个数据就会 block(阻塞),知道他用完数据,将锁释放后,被人才可以拿到数据。传统的关系型数据库中就用到了很多这种锁机制,比如行锁表锁等,读锁写锁等,都是在做操作之前先上锁

图片加载失败

2.乐观锁

什么是乐观锁:顾名思义,就是很乐观,每次去拿数据的时候都认为别人不会修改,所以不会上锁,但是在更新的时候会判断一下在此期间别人有没有去更新这个数据,可以使用版本号等机制。乐观锁适用于多读的应用类型,这样可以提高吞吐量。Redis 就是利用这种 check-and-set 机制实现事务的

图片加载失败

3.watch key [key…] 命令

在执行 Multi 之前,先执行 watch key [keys…] 命令,可以监视一个(或多个)key,如果在事务执行之前这个(这些)key 被其他命令所改动,那么事务将会被打断

测试例子:

图片加载失败

图片加载失败

4、Redis 事务三特性

  1. 单独的隔离操作:

    事务中的所有命令都会序列化、按顺序的执行。事务在执行的过程中,不会被其他客户端发送的命令请求打断。

  2. 没有隔离级别的概念:

    队列中的命令没有提交之前都不会实际被执行,因为事务提交前任何指令都不会被实际执行。

  3. 不保证原子性:

    事务中如果有一条命令执行失败,其后的命令仍然会被执行,没有回滚。

5、秒杀案例

1)基本实现

页面:

图片加载失败

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
<!DOCTYPE html>
<html lang="en" xmlns:th="http://www.thymeleaf.org">
<!--<script th:src="@{/webjars/jquery/3.5.1/jquery.min.js}"></script>-->
<script src="webjars/jquery/3.5.1/jquery.min.js"></script>

<head>
<meta charset="UTF-8">
<title>Title</title>
</head>
<body>
<h1>iPhone 13 pro!!! 1元秒杀!!!</h1>
<form id="msform" action="http://localhost:8079/redis/ms/doseckill">
<input type="hidden" id="prodid" name="prodid" value="0101">
用户id:<input type="text" id="userId" name="userid">
<input type="button" id="miaosha_btn" name="seckill_btn" value="秒杀点我"/>
</form>

</body>
<script>
$(function () {
var form = $("#msform");

var parems = {};

$("#miaosha_btn").click(function () {
parems.userId = $("#userId").val();
parems.prodid = $("#prodid").val();
$.ajax({
type: "post",
url: "http://localhost:8079/redis/ms/doseckill",
data: JSON.stringify(parems),
dataType: "json",
contentType: "application/json;charset=utf-8",
success: function (data) {
// if (data === "false") {
// alert("抢光了");
// $("#miaosha_btn").attr("disabled", true);
// }
if (data.code === "00000") {
alert(data.message);
$("#miaosha_btn").attr("disabled", true);
} else if (data.code === "50000") {
alert(data.message);
}
}
})
})

})
</script>
</html>

实体类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
package com.sy.jedis_redisdemo.restful.beans;

import lombok.Data;

/**
* @ClassName User
* @Description TODO
* @Author sy
* @Date 2021/8/16 19:39
* @Version 1.0
**/
@Data
public class User {

public String userId;

public String prodid;
}

Controller:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package com.sy.jedis_redisdemo.restful.controller;

import com.sy.api.context.ResultBody;
import com.sy.jedis_redisdemo.restful.beans.User;
import com.sy.jedis_redisdemo.restful.service.MsService;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;

/**
* @ClassName MsController
* @Description TODO
* @Author sy
* @Date 2021/8/16 18:14
* @Version 1.0
**/
@Api(value = "秒杀", tags = "秒杀")
@RestController
@RequestMapping("/ms")
public class MsController {

@Autowired
private MsService msService;

@ApiOperation(value = "秒杀")
@PostMapping(value = "/doseckill")
public ResultBody doseckill(@RequestBody User user) {
return msService.doseckill(user);
}
}

Service:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
package com.sy.jedis_redisdemo.restful.service;

import com.sy.api.context.ResultBody;
import com.sy.jedis_redisdemo.restful.beans.User;

/**
* @ClassName MsService
* @Description TODO
* @Author sy
* @Date 2021/8/16 19:43
* @Version 1.0
**/
public interface MsService {

ResultBody doseckill(User user);
}

ServiceImpl:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
package com.sy.jedis_redisdemo.restful.service.impl;

import com.baomidou.mybatisplus.extension.api.R;
import com.sy.api.context.ResultBody;
import com.sy.jedis_redisdemo.restful.beans.User;
import com.sy.jedis_redisdemo.restful.service.MsService;
import com.sy.jedis_redisdemo.utils.RedisUtil;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import redis.clients.jedis.Jedis;
import sun.net.RegisteredDomain;

/**
* @ClassName MsServiceImpl
* @Description TODO
* @Author sy
* @Date 2021/8/16 19:43
* @Version 1.0
**/
@Service
public class MsServiceImpl implements MsService {

@Autowired
private RedisUtil redisUtil;

@Override
public ResultBody doseckill(User user) {
//1、userId 和 prodid 非空判断
if (StringUtils.isEmpty(user.userId) || StringUtils.isEmpty(user.prodid)) {
return ResultBody.error("用户或商品id不能为空");
}

//2、连接 redis
//Jedis jedis = new Jedis("192.168.111.144", 6379);
//RedisUtil redisUtil = new RedisUtil();

//3、拼接key
//3.1 库存key
String kcKey = "sk:" + user.prodid + ":kc";
//3.2 秒杀成功用户key
String userKey = "sk:" + user.prodid + ":user";

//4、获取库存,如果库存为null,秒杀还未开始
Object kc = redisUtil.get(kcKey);
if (kc == null) {
return ResultBody.error("秒杀未开始,请等待!");
}
//5、判断用户是否重复秒杀操作
if (redisUtil.sHasKey(userKey, user.userId)) {
return ResultBody.error("已经秒杀成功,不能重复秒杀!");
}

//6、判断库存数量,如果数量小于1,秒杀结束
if ((int)kc <= 0) {
return ResultBody.error("秒杀已经结束!");
}

//7、秒杀过程
//7.1 库存 -1
redisUtil.decr(kcKey, 1);
//7.1 把秒杀成功用户添加到清单中
redisUtil.sSet(userKey, user.userId);
return ResultBody.success("秒杀成功!");
}
}

2)ab工具模拟并发

2-1)安装 httpd-tools

执行 yum install httpd-tools 安装

2-2)命令及常用参数

常用参数:ab –list 可查看所有参数

1
2
3
4
5
6
7
8
9
10
11
# 请求次数
-n requests Number of requests to perform
# 并发次数
-c concurrency Number of multiple requests to make at a time
# 提交参数,参数要写到文件中,针对post,使用时要设置 -T
-p postfile File containing data to POST. Remember also to set -T
-u putfile File containing data to PUT. Remember also to set -T
# 设置 content-type,和你请求的 content-type 一样
-T content-type Content-type header to use for POST/PUT data, eg.
'application/x-www-form-urlencoded'
Default is 'text/plain'

命令:

1
2
# 1000 次请求,其中并发请求有100个,参数文件是 postfiel
ab -n 1000 -c 100 -p postfile -T 'application/json;charset=utf-8' http://192.168.111.2:8079/redis/ms/doseckill

2-3)测试

  1. 创建 postfile 参数文件内容(用于 ab 工具测试并发发送的参数):

    1
    {"prodid" : "0101"}
  2. 修改 controller:因为同一个 userId 不能重复秒杀,所以我们在这里随机一个。

    图片加载失败

  3. 执行命令进行测试:

    1
    ab -n 1000 -c 100 -p postfile -T 'application/json;charset=utf-8' http://192.168.111.2:8079/redis/ms/doseckill
  4. 查看执行结果:

    执行命令的结果:

    图片加载失败

    图片加载失败

    redis 中数据情况:

    图片加载失败

  5. 出现的问题:

    • 在上面测试结果中可以看到库存数量变为了 -35 ,这里出现的就是 超卖 的问题
    • 还可能会出现 连接超时的问题

3)超卖和连接超时问题解决

3-1)连接超时问题

这个问题在 SpringBoot 整合 Redis 中,通过 yml,redisConfig,RedisUtils 已经解决了

3-2)超卖问题

1.setnx 解决

图片加载失败

2.Redisson 解决
  1. 添加依赖

    1
    2
    3
    4
    5
    6
    <!--redisson-->
    <dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson-spring-boot-starter</artifactId>
    <version>3.13.6</version>
    </dependency>
  2. 修改 serviceImpl

    图片加载失败

    这里不用考虑 锁误删的问题,因为Redisson内部已经拼接了,见下图:

    这里的 idConnectionManager 的一个属性,继续追源码你会发现他也是一个 UUID 类型的。

    图片加载失败

十一、Redis 持久化

1、RDB

1)什么是 RDB

RDB 就是在指定的时间间隔内将内存中的数据集快照写入磁盘。

直白的说就是:在某一时刻记录下数据快照,在时间间隔内将数据存储到磁盘,恢复的时候也是将快照文件直接读到内存中

2)备份是如何执行的

Redis 会单独创建一个 (Fork) 一个子进程来进行持久化,会先将数据写入到一个临时文件中,等持久化过程都结束了,再用这个临时文件替换上次持久化好的文件。整个过程中,主进程不进行任何 IO 操作,这就确保了极高的性能。如果需要大规模的数据恢复,而且对于数据恢复的完整性不是非常敏感,那 RDB 方式要比 AOF 方式更加高效。

RDB 的缺点就是:最后一次持久化后的数据可能丢失。

3)备份文件位置与备份规则

位置:默认在那个目录下启动 redis 服务,文件就生成在哪里。

图片加载失败

规则:

图片加载失败

4)命令 save VS bgsave

Redis 默认情况下是自动进行备份的。也可以使用下面两个命令进行手动备份。

save:save 时只管保存,其他不管,全部阻塞。不建议

bgsave: Redis 会在后台异步进行快照操作,快照同时还可以响应客户端请求。

2、AOF

1)什么是 AOF

以 日志 的形式类记录每个写操作(增量保存),将 Redis 执行过的所有写指令记录下来(读操作不记录),只许追加文件但不可以改写文件Redis 启动之初会读取该文件重新构建数据。简单的来说就是根据日志文件的内容将所有指令从前到后执行一次来完成数据的恢复工作。

2)AOF 持久化流程

  1. 客户端的请求写命令会被 append 追加到 AOF 缓冲区内;
  2. AOF 缓冲区根据 AOF 持久化策略 [always,everysec,no]**,将操作 **sync 同步到磁盘的 AOF 文件中;
  3. AOF 文件大小超过重写策略或手动重写时,会对 AOF 文件 rewrite 重写,压缩 AOF 文件的容量;
  4. Redis 服务重启是,会重新 load 加载 AOF 文件中的写操作达到数据恢复的目的;

3)AOF 默认不开启

可以再 redis.conf 中配置文件名称,默认为:appendonly.aof

AOF 文件的保存路径与 RDB 的路径一致

4)AOF 和 RDB 同时开启,Redis 听谁的?

AOFRDB 同时开启,系统默认取 AOF 的数据(数据不会存在丢失情况)

5)AOF 文件损坏xiuf

  • 如果遇到 AOF 文件损坏,可以通过:/bin/redis-check-for-aof–fix appendonly.aof 进行恢复
  • 备份被写坏的 AOF 文件
  • 恢复:重启 Redis,然后会自动加载 AOF 文件

6)AOF 同步频率设置

  1. appendfsync always

    始终同步,每次 Redis 的写操作都会like记入日志;性能比较差,但是数据的完整性比较好。

  2. appendfsync everysec

    每秒同步,每秒记入日志一次,如果宕机,本秒的数据可能会丢失

  3. appendfsync no

    Redis 不主动进行同步,把同步时机交给操作系统。

7)Rewrite 压缩

AOF 文件 >= 设置定文件大小后,会对 AOF 文件进行压缩重写

例:

1
2
3
4
5
6
# 我们实际执行的指令
set k1 v1
set k2 v2

压缩后的
set k1 v1 k2 v2

其实 AOF 的压缩重写,就是把一些命令重写为一条命令,不去关注过程,只关注最后的结果。

十二、Redis 主从复制

1、什么是主从复制

主机数据更新后根据配置和策略,自动同步到备机的 master/slaver 机制Master 以写为主,Slaver 以读为主

2、能干嘛

  • 读写分离,性能扩展。

  • 容灾快速恢复

    如果某一台从机出现问题,可以切换另一台从机。如果是主机出现问题,则可以通过集群来解决。

    图片加载失败

3、主从配置

这里是在一台虚拟机中,通过不同端口来模拟多台服务器

  1. 新建一个目录,名字自己定义就好了。

  2. 将我们原本的 redis.conf 复制到新创建的目录中。

    图片加载失败

  3. 创建三个配置文件,分别以端口进行区别,内容配置也要进行相应的修改

    图片加载失败

    1
    2
    3
    4
    5
    6
    # 引入基本的配置文件
    include /usr/local/myredis/redis.conf
    # 重新配置的内容,可以覆盖基本配置中配置的
    pidfile /var/run/redis_6379.pid
    port 6379
    dbfilename dump6379.rdb
  4. 可以看到这里我们已经起来了这三个 Redis

    图片加载失败

  5. 连接三台 Redis 并查看信息,可以看到三台 Redis 都是 master(主)

    图片加载失败

    图片加载失败

    图片加载失败

  6. 配置主从关系:配从(库)不配主(库)

    1
    2
    3
    4
    5
    # 在从机中使用命令配置,ip:主机ip,port:主机端口
    slaveof ip port

    # 取消从服务器配置,会重新变成主服务器
    slaveof no one
  7. 查看信息已经可以看到,角色(role)已经为从机(slave)

    图片加载失败

  8. 测试

    • 在主机中新增一个 key

      图片加载失败

    • 在从机中查看

      图片加载失败

    注意:写操作只能在主机中进行,从机中只能进行读操作

    图片加载失败

4、主从复制的原理

  1. 当从服务器(slave)连接上主服务器(master)之后,从服务器(slave)会向主服务器(master)发送进行数据同步的消息
  2. master 接到 slave 发送过来的同步消息后,把 master 的数据会持久化到 RDB 文件,把 RDB 文件发送给 slaveslave 拿到 RDB 文件进行读取
  3. master 每次进行写操作后,会和 slave 进行同步数据。

注意:

上面第 2 步是 slave 主动同步数据(全量复制),第 3 步是 master 主动同步数据(增量复制)

  • 全量复制:从服务器(slave)主动发送消息同步数据
  • 增量复制:主服务器(master)将写操作命令传给从服务器(slave)进行数据同步

5、一主二仆

特点:

  1. 当某一台从服务器挂掉之后再重启,他重新变成一个主机,并不会恢复成原来的从服务器
  2. 当某一台冲服务器挂掉之后,主服务数据发生了变动,从服务器重启之后,重新设置成主服务的从服务器,那么他会将主服务器的数据复制过来
  3. 当主服务挂掉之后,从服务器还是从服务器,并且可以看到主服务器已经挂掉了(down),并不会上位变成主服务器。当主服务器重新启动之后,依旧是主服务器,两台从服务器还是他小弟,并且可以再信息中看到大哥的状态是启动的(up)

测试:

  1. shutdown 掉 6381,查看主服务器(6379) 的信息,我们可以看到主服务器信息中只有一台从服务器了

    图片加载失败

  2. 我们在 主服务器(6379)中随意添加几条数据。

    图片加载失败

  3. 重新启动 6381,并查看信息,我们可以看到他重新变成了主服务器

    图片加载失败

  4. 我们将 6381 重新设置成从服务器,并分别查看他和主服务器的信息,可以看到他已经变成了从服务器,而且主服务器的信息中显示有两台从服务器。

    图片加载失败

  5. 查看从服务器中的数据,可以看到和主服务器是一样的

    图片加载失败

6、薪火相传

薪火相传 我们可以理解为 组织架构树 或者我们实际工作中项目经理、组长、组员这种结构。

就是说:

  • 6379 为主服务器
  • 6380 是 6379 的从服务器
  • 6381 是 6380 的服务器

这样,6379 数据会同步给 6380 ,6380 再同步给 6381。

缺点:

当 6380 挂掉了,那么 6379 的数据也无法同步给 6381了。

这里可以自己进行测试一下,我这里就不实际进行操作了。

7、反客为主

其实这个就是我们上面提到的 slaveof no one 命令,可已将从机变为主机。

缺点: 需要手动完成。

8、哨兵模式

1)什么是哨兵模式

反客为主的自动版,能够后台监控主机是否故障,如果故障了,根据投票数自动将从库转换为主库。

其实就像是 zookeeper 集群的选举。农奴翻身把歌唱,噢耶!

2)如何使用

  1. 设置为一主二仆/多仆模式

  2. 创建 sentinel.conf 文件,目录自定义,我们这里就放在和 redis6379.conf 同级的目录下了

  3. 配置哨兵,编写 sentinel.conf 配置文件内容

    1
    2
    3
    4
    5
    6
    # sentinel 哨兵
    # monitor 监控
    # mymaster 为监控对象起的服务器名称,其实就是为 master 主机起个别名
    # 192.168.111.144 6379 主机 ip 和端口
    # 1 至少有多少个哨兵同意迁移的数量,通俗的说就是:需要有多少个从机同意才能切换为主机,这里个人建议设置为从机数量的一半以上。类似于 zookeeper 的选举
    sentinel monitor mymaster 192.168.111.144 6379 1
  4. 启动哨兵,使用 redis-sentinel 启动

    图片加载失败

    图片加载失败

    注意,哨兵服务的默认端口为:26379

  5. 测试,关掉 6379,查看哨兵窗口变化

    图片加载失败

    这里通过控制台信息我们可以看到 6381 被选为了主服务器,6380 变成了 6381 的从机。

    还有一个信息就是,6379 还在。那么当 6379 重新启动后会变成什么样呢?这里我们将 6379 重启,并查看信息。

    图片加载失败

    图片加载失败

    通过上面的信息我们可以看到,6379 变成了 6381 的从机。

  6. 复制延时

    哨兵模式的缺点,由于所有的写操作都是在 Master 上进行的,然后同步更新到 Slave 上,所以从 Master 同步到 Slave 会有一定的延迟,当系统很繁忙的时候,延迟问题会更加严重,Slave 机器数量的增加也会使这个问题更加严重。

3)故障恢复

图片加载失败

  • 优先级在 redis.conf 中默认:replica-priority 100 ,值越小优先级越高

    图片加载失败

  • 偏移量是指获得原主机数据最全的

  • 每个 Redis 实例启动后都会随机生成一个 40 位的 runid

4)springboot 主从复制

只需要在 yml 配置文件中进行相应配置即可:

图片加载失败

服务启动出现了报错:

1
2
// 哨兵哨兵命令返回少于2个节点!在Redis配置中至少应该定义两个哨兵。设置checkSentinelsList = false以避免此检查。
SENTINEL SENTINELS command returns less than 2 nodes! At least two sentinels should be defined in Redis configuration. Set checkSentinelsList = false to avoid this check.

经过一番搜索并没有找到什么有用的,所以我又增加了两个哨兵@~@

  1. 修改原有的 sentinel.conf 配置文件,并重命名为 sentinel6379.conf

    1
    2
    3
    sentinel monitor mymaster 192.168.111.144 6379 1
    protected-mode no
    port 26379
  2. 新增两个配置文件,分别是:sentinel6380.confsentinel6381.conf,只需要修改一下 port ,分别是:26380,26381

启动服务/测试类测试:

图片加载失败

执行后可以到 Redis 中看一下数据是不是已经存进去了。

参考:springboot+redis的sentinel实现哨兵模式(超详细)

十三、Redis 集群

1、问题

  1. 容量不够,redis 如何进行扩容?
  2. 并发写操作,redis 如何分摊?

另外,主从模式,薪火相传模式,主机宕机,导致 ip 地址发生变化,应用程序中配置需要修改对应的主机地址、端口等信息。

之前通过代理主机来解决,但是 redis3.0 中提供了解决方案。就是 无中心化集群 配置。

2、什么是集群?

  • Redis 集群实现了对 Redis 的水平扩容,即启动 NRedis 节点,将这个数据库分布存储在这 N 个节点中,每个节点存储总数据的 1/N
  • Redis 集群通过分区(partition)来提供一定程度的可用性(availability):即使集群中有一部分节点失效或者无法进行通讯,集群也可以继续处理命令请求。

3、Redis 集群的搭建

这里我们还是在同一台虚拟机中以不同的端口的方式来模拟搭建,依旧是 主从复制 中用到的目录就可以。

创建六个实例分别是:6379、6380、6381、6389、6390、6391(三主三从)。

  1. 删除掉之前测试产生的 dump、aof 文件。

  2. 我们在 redis_6379.conf 的基础上配置基本信息

    图片加载失败

    图片加载失败

  3. 在配置文件中增加集群相关配置

    图片加载失败

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    include /usr/local/myredis/redis.conf
    pidfile "/var/run/redis_6379.pid"
    port 6379
    dbfilename "dump6379.rdb"
    logfile "/usr/local/myredis/redis_err_6379.log"
    daemonize yes
    dir "/usr/local/myredis"
    # 开启集群
    cluster-enabled yes
    # 指定集群节点配置文件
    cluster-config-file nodes_6379.conf
    # 设定节点失联时间(毫秒),超时后集群自动进入主从切换
    cluster-node-timeout 15000
  4. 删除6380 6381 两个配置文件,并将我们刚才修改好的配置文件在复制出5份。

    图片加载失败

  5. 修改复制出来的那几份文件,修改相应的端口

    1
    2
    3
    # 可以使用替换进行操作
    vim redis6380.conf
    :%s/6379/6380

    图片加载失败

  6. 启动 6 个 Redis 服务,并查看节点配置文件是否正确生成了

    图片加载失败

  7. 将 6 个节点合成一个集群

    • 进入到 Redis 安装目录的 src 中,或者是解压的目录中的 src 中。

    • 使用命令进行合体

      1
      2
      3
      4
      # 注意下面的命令只能在安装目录的 src 下才可以使用,使用 ip,不要使用 127.0.0.1
      # --cluster create 集群创建
      # --cluster-replicas 1 采用最简单的方式配置集群,一主一从,正好三组。前 3 为主机,后 3 为从机
      redis-cli --cluster create --cluster-replicas 1 192.168.111.144:6379 192.168.111.144:6380 192.168.111.144:6381 192.168.111.144:6389 192.168.111.144:6390 192.168.111.144:6391
    • 执行命令

      图片加载失败

      执行命令后会展示分配的一些信息,并会询问我们是否接受这种分配方式,我们输入 yes

      图片加载失败

  8. 连接

    1
    2
    3
    # -c 采用集群策略连接,设置数据会自动切换到相应的写主机
    # 连接任何一个都可以,例如6379/6380/6381,会自动切换到写主机
    redis-cli -c -p 6379
  9. 查看集群节点信息:cluster nodes

    我们可以在信息中看到哪一台是主机,哪一台是从机,和对应的主从。

    图片加载失败

4、Redis cluster 如何分配这六个节点?

  • 一个集群至少要有三个主节点
  • 选项 –cluster-replicas 1 表示我们希望为集群中的每个主节点创建一个从节点。
  • 分配原则尽量保证每个主数据库运行在不同的 IP 地址,每个从库和主库不在一个 IP 地址上。

5、什么是 slots(插槽)?

通俗的来说就像是 洞 ,每个洞都可以放入东西。

  • 一个 Redis 集群包含 16384 个插槽(hash slot),数据库中的每个键都属于这 16384 个插槽中的其中一个。

  • 每个主节点都有他负责的一定范围的插槽,可以再节点信息中看到。

    图片加载失败

  • 集群会使用 CRC16(key)%16384 来计算 key 属于哪个插槽,其中的 CRC16(key) 语句用于计算键 keyCRC16 校验和。

6、集群数据的新增与查询

  1. 新增(单个key/value、多个key/value)

    • 下图可见,根据key计算除了插槽,并存储到了相应的主机中,并切换到了相应的主机

      图片加载失败

    • 多个 key/value 新增

      1
      2
      3
      4
      5
      6
      7
      # 错误方式,多个 key 不能同时插入一个插槽
      mset name lucy age 20 addr china
      #err:CROSSSLOT Keys in request don't hash to the same slot

      #正确方式,使用 {组名} 的方式新增数据
      #这样就会根据{}中的组名去计算插槽
      mset name{user} lucy age{user} 20 addr{user} china
  2. 查询集群中的值

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    # 获取 key 在集群中的插槽位置:cluster keyslot key
    cluster keyslot k1
    result: 12706

    # 获取插槽位置中有几个值,只能插槽所在的主机中用,否则返回 0
    cluster countkeysinslot 12706

    # 获取指定插槽中count个 key,cluster get keysinslot slot count
    # 获取 12706 这个插槽中的 key,获取 10 个
    cluster getkeysinslot 12706 10

7、故障恢复

  1. 如果主节点下线,从节点能否自动提升为主节点?注意:15 秒超时

    图片加载失败

    从上图可以看到,6379 下线之后,6380 在 15 秒之后提升为主节点了。

  2. 下线的主节点恢复之后,主从关系如何?

    图片加载失败

    从上图可以看到,下线的主节点恢复后变成了从机。

  3. 如果某一段插槽的主从都挂掉了(集群中的某一组主从挂掉),那么这个集群是否还能继续使用?

    根据 redis.confcluster-require-full-coverage 配置而变化的:

    • 如果 cluster-require-full-coverageyes ,那么,整个集群都挂掉。
    • 如果 cluster-require-full-coverageno ,那么就只有该主从不能用,也无法存储。

8、springboot 集群的配置

  1. 修改 yml 配置文件

    图片加载失败

  2. 测试类测试

    图片加载失败

这样简单的配置就好了,我这里没有报错一切顺利,因为我也没用过,所以这些应该是很基础的配置。

9、Redis 集群的好处与不足

  1. 好处:
    • 实现扩容
    • 分摊压力
    • 无中心配置,相对简单
  2. 不足
    • 多键操作是不被支持的
    • 多键的 Redis 事务是不被支持的。lua 脚本不被支持。
    • 由于集群方案出现的较晚,很多公司已经采用了其他的集群方案,而代理或者客户端分片的方案想要 迁移redis cluster,需要整体迁移,而不是逐步过渡,复杂难度大

十四、Redis 应用问题解决

1、缓存穿透

  1. 什么是 缓存穿透

    缓存穿透: 就是大量的请求 访问缓存,获取不存在的数据,这个时候就会去请求数据库(数据库中也不存在),导致数据库瘫痪或错误、宕掉等。

    图片加载失败

  2. 出现场景

    一般出现在一下两种场景中:

    • redis 中查询不到数据
    • 出现很多非正常 url 访问
  3. 解决方案

    • 对空值缓存

      如果查询返回的数据为空(不管数据是不是存在),我们仍然把这个空结果(null)进行缓存,设置空结果的过期时间会很短,最长不会超过五分钟。

    • 设置可以访问的名单(白名单)

      使用 bitmaps 类型定义一个可以访问的名单,名单 id 作为 bitmaps 的偏移量,每次访问和 bitmaps 里面的 id 进行比较,如果访问的 id 不在 bitmaps 中,进行拦截,不允许访问。

    • 采用布隆过滤器

      这个底层其实就是一个 bitmaps ,只不过做了相应的封装等。

      其实就是检索一个元素是否在一个集合中。

      优点:空间效率和查询时间都远超一般的算法

      缺点:有一定的误识别率和删除困难

    • 进行实时监控

      当发现 Redis 的命中率开始急速降低,需要排查访问对象和访问的数据,和运维人员配合,可以设置黑名单限制服务。

2、缓存击穿

  1. 什么是 缓存击穿

    缓存击穿: key 对应的数据在数据库中存在,但是在 redis 中过期,此时若有大量并发请求过来,这些请求发现缓存过期一般都会从后端 DB 加载数据并回设到缓存,这个时候并发的请求可能会瞬间把后端 DB 压垮

    注意:这里指的是某个 key 过期了,大量访问使用这个 key。并不是出现大量的 key 过期

    图片加载失败

  2. 解决方案

    • 预先设置热门数据

      redis 高峰访问之前,把一些热门数据提前存入到 Redis 中,并加大这些热门数据 key 的时长

    • 实时调整

      现场监控那些数据热门,实时调整 key 的过期时长

    • 使用锁

      图片加载失败

3、缓存雪崩

  1. 什么是 缓存雪崩

    缓存雪崩: 在极少时间段内,查询大量 key 的集中过期情况。

  2. 解决方案

    • 构建多级缓存架构

      nginx 缓存 + Redis 缓存 + 其他缓存(ehcache 等)

    • 使用锁或队列

      用加锁或者队列的方式来保证不会有大量的线程对数据库一次性进行读写,从而避免失效时大量的并发请求落到底层存储系统上。不适合高并发情况。

    • 设置过期标志更新缓存

      记录缓存数据是否过期(设置提前量),如果过期会触发通知另外的线程在后台去更新实际 key 的缓存

    • 将缓存失效时间分散开

      比如我们可以再原有的失效时间基础上增加一个随机值,比如 1-5 分钟随机,这样每一个缓存的过期时间的重复率就会降低,就很难引发集体失效的事件。

4、分布式锁

1)什么是 分布式锁

个人理解 分布式锁 就是应用在集群、分布式服务中。因为原单体单机部署的系统演化成分布式集群系统,由于分布式系统多线程、多进程且分布在不同的机器上,这样原来的单机部署情况下的并发锁策略就失效了,我们就需要一种跨 JVM 的互斥机制(锁)来控制共享资源的访问

2)实现方式

  • 基于数据库实现

  • 基于缓存(Redis 等)

    性能好

  • 基于 zookeeper

    可靠性高

3)使用 redis实现分布式锁

其实就是 setnx key value 来设置一个 key 作为锁,释放锁就是删除掉这个 key

  • 问题一:假如现在出现了别的情况,例如:加锁之后程序报错或由于别的原因没有释放该怎么办呢?

    解决方式就是设置一个过期时间

    1
    2
    set key_lock 1
    exprie key_lock 10
  • 问题二:如果在设置过期时间的时候服务器突然死掉了或者由于一些其他原因设置失败了怎么办?

    解决方式就是加锁的同时设置过期时间

    1
    2
    # 这个命令在上面说过,他的意思就是:新增一个 key 并设置他的时间,只有在这个 key 不存在的时候才能设置成功
    set kye_lock 1 EX 10 NX
  • 问题三:误删问题

    这个可以通过添加 UUID 等方式解决,在上面的 秒发案例的超卖问题中 中已经解决过了。

案例

案例一、

场景:

  1. 一亿个用户,用户有频繁登录的,也有不经常登录的
  2. 如何记录用户的登录信息
  3. 如何查询活跃用户,[如: 连续登录的]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# 以位图的方式来解决
# 用户的登录状态以 0/1来表示
# 初始值 一亿个用户都没有登陆过,设置第3,5,7位用户在周一登录过
setbit mon 100000000 0
setbit mon 3 1
setbit mon 5 1
setbit mon 7 1

# 设置第3,4,8位用户在周二登录过
setbit tue 100000000 0
setbit tue 3 1
setbit tue 4 1
setbit tue 8 1

# 设置第3,6,9位用户在周三登录过
setbit wed 100000000 0
setbit wed 3 1
setbit wed 6 1
setbit wed 9 1

# 按位与运算
# res 为比较后返回的结果集
# 例子:0100
# 0101
# 0110
# and 返回结果集是0100,位图的同一个位置都一样,则返回的是1,否则返回0
# or 返回结果集是0111,位图的同一个位置有一个是1,则返回1
#
#使用一下命令则可以得到res,三天都登录了的用户
bitop and res mon tue wed