一、环境准备
linux系统,本人使用的是centos8,最小化安装
Redis下载
二、安装
首先通过 tar -zxvg xxxxx 将 Redis 解压,目录随意,本人是在 /usr/local/ 下创建了一个文件夹用来安装使用
解压过后执行 make 命令编译,执行的时候会检测你的系统是否缺少依赖之类的,如果没问题会进行编译
编译成功后,执行 make test 命令检测一下是否有什么错误
执行 make PREFIX /usr/local/redis install 安装, **PREFIX /usr/local/redis ** 为指定安装目录
安装好之后则会在 /usr/local/redis 下生成 bin 文件夹,内含文件如下
他们分别是:
1 2 3 4 5 6 redis-benchmark #redis 性能测试工具 redis-check-aof #检查aof日志的工具 redis-check-rdb #检查rdb日志的工具 redis-cli #redis 连接的客户端 redis-sentinel #哨兵模式 redis-server #redis 服务进程
三、配置
从源代码目录复制 redis.conf 文件到 bin 同级
注释掉 bind 配置,如果不注释的话,默认只有 127.0.0.1 也就是本机可以连接
保护模式设置,关闭保护模式,如果开启的话,redis只能本机访问
backlog 连接队列
配置 redis 后台启动
日志文件配置,默认为空,即在那个目录启动,日志文件就在那个目录下生成
数据库个数
四、启动与连接
显示启动 redis-server 服务
1 2 # 启动 bin 目录下的 redis-server,并指定使用我们刚才拷贝过来的 redis.conf 配置文件 ./bin/redis-server ./redis.conf
隐式启动 redis-server 服务
修改 redis.conf 配置文件
连接 redis-cli 客户端,并测试
1 2 3 4 5 6 7 8 # 启动客户端 ./bin.redis-cli # 新增key-velue, 存储字符串 双引号 可有可无 set word "hello world" # 根据key查询 get word
关闭
五、操作命令
在操作之前我们先要知道:
redis 中都有哪些数据类型:string、list、set、zset(order set)、hash
redis 中共有16个数据库(空间),使用select 0 切换,默认使用 0号库
1、基础操作
1)新增
字符串(string)
1 2 # set key valueset site www.baidu.com
追加
1 2 # 向指定的 key 追加值,如果key不存在则创建并返回value长度,如果存在则向value后追加,并返回追加后的value长度 append key value
2)删除
字符串(string)
1 2 3 4 5 # 单个 del key del site # 批量删除 del sit age
3)修改
字符串(string)
1 2 3 4 5 6 7 8 9 # 修改值 set key value set site www.baidu.com # 修改 key 名称,如果修改后的key name 存在,则会覆盖修改后的key name 的值 rename age agee # 修改 key 名称,检查修改后的key name是否存在,如果不存在返回1,表示成功,否则返回0,修改不成功 # nx: not exists 是否存在 renamenx age site
4)查询
字符串(string)
1 2 3 4 5 # get key get site # 查询并修改 getset site test
5)各种查询
* 的使用
1 2 3 4 5 # 查询所有key keys * # 模糊匹配 keys si*
[] 的使用
1 2 # 比较精确的匹配,匹配中括号中的字符 keys sit[ey]
? 的使用
随机返回一个key
查看 key 的类型:
查看某个 key 是否存在:
1 2 # 存在返回 1,不存在返回 0 exists site
获取 value 的长度
6)key 移动
1 2 # 将 key 移动至1号库 move site 1
7 )key 递增/减
1 2 3 4 5 6 7 8 9 10 11 12 13 14 # 设置整型 key,每次递增 1 incr key # 设置整型 key,每次递减 1 decr key # 设置整型 key,递增10,即步长 incrby key 10 # 浮点数类型 递增,没有相应的递减 incrbyfloat key 1.1 # 设置整型 key,递减10,即步长 decrby key 10
8)key 的生命周期
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 # 查询key的有效期 # 已过期/不过期的key 返回-1,Redis2.8后不存在的key 返回-2 # 返回 秒 ttl site # 返回 毫秒 pttl site # 设置key的生命周期 # 秒 expire site 10 # 毫秒 pexpire site 10000 # 设置key永久有效 persist key
9)切换数据库,默认有16个
10)清空数据库
2、各种数据类型的操作
2-1、string(字符串)类型
1)新增
1 2 3 4 5 6 7 8 9 10 11 12 # ex:设置生命周期秒数 # px:设置生命周期毫秒数 # 生命周期设置ex/px同时写的时候以后面的有效期为准 # nx:表示 key 不存在时,执行操作 # xx:表示 key 存在时,执行操作 set key value [ex 秒数][px 毫秒数] [nx]/[xx] # 20 秒过期,value为10 set age 20 10 # 一次性新增多个键值 mset key value key value
2)查询
1 2 3 4 get key # 获取多个key mget key1 key2 key3
3)删除
1 2 3 4 5 # 单个 del key del site # 批量删除 del sit age
4)偏移量
1 2 3 4 5 # 偏移 从下标位置向后替换 setrange key index value # start,stop可以为负数,则是从value末尾开始, getrange key start stop
2-2、list(列表)类型
1)新增
1 2 3 4 5 6 7 8 9 10 11 12 # 从左侧推入 lpush list a b c d # 从右侧推入 rpush list a b c d # 插入值,在列表中找到某个值,并在其前或后插入值,如果没有找到则会返回 -1 # 如果找到了多个值,则会在第一个找到的值前后插入值。 linsert key after|before search value eg: # 在key为num的列表中找到3,并在3的前面插入2 linsert num before 3 2
2)查询
1 2 3 4 5 6 7 8 9 # 范围取值 lrange list 0 1 lrange list 0 -1 # 根据下标获取元素 lindex list index # 获取列表长度 llen key
3) 删除(弹出)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 # 删除整个key del key # 弹出元素 lpop list rpop list # 删除元素 # count 删除几个,正数代表从左向右删除,负数代表从右向左删除 # value 根据值删除 lrem key count value eg: lrem list 1 value lrem list -1 value # 等待弹出 # 如果key中没有元素,则等待有值了之后弹出 # timeout 为0 则一直等待# 适用场景:长轮询,在线聊天 brpop/blpop key timeout
4)剪切
1 2 # 剪切后key的值就变为剪切的值 ltrim key start stop
5)弹出并插入
1 2 3 4 5 6 7 8 9 10 # 在第一个列表的右侧弹出一个元素,并插入到第二个列表的左侧,成功会返回弹出插入的值 # 原子性操作 # 适用场景: # 两个列表: task[a,b,c,d] bak[] # 执行任务时可以使用此命令:task[a,b,c] bak[d],执行命令后会返回弹出并插入的值,也就是d # 这个时候我们就知道该执行任务:d # 如果执行成功则在bak中,pop掉d # 如果执行失败,或者没有执行,那么bak中的d还在。下次再执行任务的时候去bak中拿就好了。 rpoplpush key1 key2
2-3、set(集合)类型,无序
1)新增
1 sadd key value1 value2 ...
2)查询
1 2 3 4 5 6 7 8 9 10 11 # 随机查询出一个元素,或指定count个元素 srandmember key count # 查询集合所有元素 smembers key # 判断value是否包含在集合中,是返回1,否则返回0 sismember key value # 查询集合共有多少个元素 scard key
3)删除(弹出)
1 2 3 4 5 6 # 删除元素(根据值删除) srem key value1 value2 # 弹出 # 随机删除一个元素并返回pop出的值,count可以指定随机删除多少个 spop key count
4)更多
1 2 3 4 5 6 7 8 9 10 11 12 13 14 # 删除指定集合重元素,并将删除的元素添加到另一个集合 smove sourcr target value # 交集 sinter key1 key2 key3 # 查询出交集并存储在一个key(targer)中 sinterstore target key1 key2 key3 # 并集 sunion key1 key2 key3 # 差集,key1 与 key2 key3 的差集,返回key1中存在,其他key中不存在的 sdiff key1 key2 key3
2-4、Hash(哈希)类型
1)新增
1 2 3 4 5 6 7 8 9 10 11 # 单个,如果field已经存在则会覆盖原有的value hset key field value eg: hset user id 1 hset user name zhangsan # 如果key中field不存在,添加成功,否则添加失败 hsetnx key field value # 批量为key设置hash 值 hmset key field1 value1 field2 value2
2)查询
1 2 3 4 5 6 7 8 9 10 11 # 根据key的属性查询value hget key field # 查询key中是否存在某个field hexists key field # 查询key中所有的field hkeys key # 查询key中所有value hvals key
3)删除
4)更多
1 2 3 4 5 6 # 为key中field对应的value增加量操作,针对数字的value hincrby key field increment eg: hset user id 1 hincrby user id 12 result:13
2-5、zset(order set) 有序集合
1)新增
1 2 # 根据order排序 zadd key order1 value1 order2 value2
2)查询
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 # 把集合排序后,返回范围内数据[start,stop] # 注意这里是先进行“排序”,在拿 start - stop(下标/排名)的值 # 默认升序 zrange key start stop # 降序 zrevrange key start stop # 这里也是和集合升序排序后取值,这里的min,max指的是实际排序的值 # limit 分页# offset 偏移量,从那个开始算 # N 查多少个 # withscores 实际分数 zrangebyscorce key min max withscores limit offset N eg: zadd test 1 a 3 c 2 b 4 d 6 f 9 i 8 h zrangebyscore test 4 8 # result:d f h zrangebyscore test 4 8 withscores # result:d 4 f 6 h 8 zrange test 4 8 # result:f h i zrangebyscore test 1 8 limit 1 2 # result:b c # 先对集合生序排序,再查询value 在key中拍第几个,从0开始。 zrank key value # 降序 zrevrank key value
3)删除
1 2 3 4 5 6 7 8 9 10 11 12 13 14 # 根据元素删除 zrem key value # 按排名删除(下标) zremrangebyrank test 0 1 eg: zadd test 1 a 3 c 2 b 4 d 6 f 9 i 8 h # 删除的就是: a b # 按分数(排序实际的值)删除 zremrangebyscore test 2 8 eg: zadd test 1 a 3 c 2 b 4 d 6 f 9 i 8 h # 删除的就是: b c d f h
4) 更多
1 2 3 4 5 # 统计集合内一共有多少个 zcard key # 统计集合内指定范围内的 zcount ket start stop
六、发布和订阅
1、什么是发布和订阅
Redis 发布订阅(pub/sub )是一种消息通信模式:发送者(pub )发送消息,订阅者(sub )订阅消息
Redis 客户端可以订阅任意数量的频道
2、发布和订阅命令行示例
1 2 3 4 5 6 7 8 9 10 11 12 # 分别打开三个四个窗体 # 前三个窗口用于订阅消息,最后一个用于发布消息 # 前三个窗口分别执行以下命令 subscribe a1 subscribe a2 subscribe a1 a2 # 最后一个窗口执行以下命令 publish a1 hello publish a2 world # 最后查看前三个窗体分别接收到了那个消息
订阅消息:
发布消息:
七、新数据类型
1、Bitmaps(位操作)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 # 设置 key 偏移量的位置 是1/0 setbit key offset value # 获取 key 偏移量的位置的 值 getbit key offset ------------------------------------------------------------------ # 获取 key 开始-结束 有多少个1 # 这里的 start,top指的是字节组 # 即 01000000 8个bit 为一个字节组 # bitcount key start top eg: setbit a 1000 0 setbit a 0 1 setbit a 2 1 # a中的二进制,也就是 位 是: 10100000 # 查询所有,即返回a中bit为1的个数 bitcount a setbit a 8 1 # a中的二进制,也就是 位 是:10100000 10000000 # 这里 start:0 指的就是:10100000 # 这里 end:1 指的就是: 10000000 # 这个返回 2 bitcount a 0 0 # 这个返回 3 bitcount a 0 1 ------------------------------------------------------------------ # 多个key 按位操作 # operation 可以是: AND OR NOT XOR bitop operation destkey key1 [key2...] eg: setbit lower 2 1 set char Q # char 和 lower 做or才操作,比较后 char的第二位变成了1 ,结果放在cahr上 # 第一个 char 表示放在那个key上 bitop and lower char char lower
2、HyperLogLog(基数统计)
什么是基数:假如现在有这么一组数据{1,3,5,7,5,7,8}, 那么他的基数就是{1,3,5,7,8}
1)新增
1 2 3 4 pfadd key elements eg: pfadd program java php java result: java php
2)查询
1 2 3 # 统计基数个数 pfcount program result: 2
3)更多
1 2 3 4 5 6 # 合并,将多个 key 的元素合并到一个目标 key 中 pfmerge target keys eg: pfadd k1 a b pfmerge k2 program k1 result: 4
3、Geospatial(地理信息的缩写)
注:该类型就是元素的 2 维坐标。在地图上就是经纬度。redis 基于该类型,提供了经纬度设置,查询,范围查询,距离查询,经纬度 Hash 等操作.
1)新增
1 2 3 4 5 6 7 # 经度 纬度 名称 geoadd key longitude latitude member eg: geoadd china:city 121.47 31.23 shanghai 106.50 29.53 chongqing # 两极无法直接添加,一般会下载城市数据,通过 java 程序一次性导入。 # 有效的经度从 -180 度到 180 度。有效的纬度从 -85.05112878 度到 85.05112878 度。
2)查询
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 # 获得指定地区的坐标值 geopos key member eg: geopos china:city shanghai result: 1) 121.47000163793563843 # 经度 2) 31.22999903975783553 # 纬度 # 获取两位位置的直线距离 # m 米 默认值 # km 千米 # mi 英里 # ft 英尺 geodist key member1 member2 [m|km|ft|mi] eg: geodist china:city shanghai chongqing # 查询给定经纬度为中心,半径内的元素 georadius key longitude latitude radius [m|km|ft|mi] eg: # 查询 key 中 经纬度为中心 1000 公里内的元素 georadius china:city 110 30 1000 km
八、Jedis 操作
1、Jedis 连接测试
创建一个 Maven 工程
pom 文件中增加依赖
1 2 3 4 5 <dependency > <groupId > redis.clients</groupId > <artifactId > jedis</artifactId > <version > 3.6.3</version > </dependency >
创建一个类并测试
启动 main 方法
注意: 这里可能会发生报错,可能是由于虚拟机防火墙为关闭导致的
关闭防火墙:
执行 systemctl status firewalld 查看防火墙状态
执行systemctl stop firewalld 关闭防火墙,在查看防火墙状态
禁用防火墙:systemctl disable firewalld
2、测试相关数据类型
注: 因为和命令行操作相同,所以就不所有命令都进行测试了
1)String 类型
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 @Test public void test1 () { Jedis jedis = new Jedis ("192.168.111.144" , 6379 ); jedis.set("name" , "zhangsan" ); String name = jedis.get("name" ); System.out.println(name); jedis.mset("age" , "12" , "sex" , "man" ); List<String> keys = jedis.mget("name" , "age" , "sex" ); System.out.println(keys); Set<String> set = jedis.keys("*" ); for (String str : set) { System.out.println("key:" + str); String value = jedis.get(str); System.out.println("value:" + value); } System.out.println(jedis.exists("name" )); jedis.expire("test_key" , 30 ); System.out.println(jedis.ttl("test_key" )); jedis.close(); }
2)List 类型
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 @Test public void test_list () { Jedis jedis = new Jedis ("192.168.111.144" , 6379 ); jedis.flushDB(); jedis.lpush("list" , "a" , "b" ); jedis.rpush("list" , "c" ); String val1 = jedis.lindex("list" , 1 ); System.out.println(val1); List<String> list = jedis.lrange("list" , 0 , -1 ); for (String str : list) { System.out.println(str); } jedis.close(); }
3)Set 类型
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 @Test public void test_set () { Jedis jedis = new Jedis ("192.168.111.144" , 6379 ); jedis.flushDB(); jedis.sadd("set" , "set1" , "set2" , "set3" ); List<String> values = jedis.srandmember("set" , 2 ); for (String str : values) { System.out.println(str); } Set<String> all = jedis.smembers("set" ); for (String str : all) { System.out.println(str); } Long count = jedis.scard("set" ); System.out.println(count); jedis.close(); }
4)Hash 类型
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 @Test public void test_hash () { Jedis jedis = new Jedis ("192.168.111.144" , 6379 ); jedis.flushDB(); Map<String, String> map = new HashMap <>(); map.put("name" , "zhangsan" ); map.put("age" , "18" ); map.put("sex" , "man" ); jedis.hset("user" , map); jedis.hsetnx("user" , "city" , "beijing" ); String name = jedis.hget("user" , "name" ); System.out.println(name); Boolean hexists = jedis.hexists("user" , "address" ); System.out.println(hexists); Set<String> user = jedis.hkeys("user" ); for (String str : user) { System.out.println(str); } List<String> value = jedis.hvals("user" ); for (String str : value) { System.out.println(str); } jedis.close(); }
5)ZSet 类型
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 @Test public void test_zset () { Jedis jedis = new Jedis ("192.168.111.144" , 6379 ); jedis.flushDB(); Map<String, Double> map = new HashMap <>(); map.put("age" , (double )2 ); jedis.zadd("user" , (double ) 1 , "name" ); jedis.zadd("user" , map); Set<String> user = jedis.zrange("user" , 0 , -1 ); for (String str : user) { System.out.println(str); } Set<String> user1 = jedis.zrevrange("user" , 0 , -1 ); for (String str : user1) { System.out.println(str); } jedis.close(); }
3、模拟发送验证码
要求:
输入手机号,点击发送后随机生成 6 位数字码,两分钟有效。
输入验证码,点击验证,返回成功或失败。
每个手机号每天只能输入 3 次。
1)创建一个生成验证码的方法
1 2 3 4 5 6 7 8 public String getCode () { Random random = new Random (); StringBuilder code = new StringBuilder (); for (int i = 0 ; i < 6 ; i++) { code.append(random.nextInt(10 )); } return code.toString(); }
2)创建生成验证码的方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 @Override public Boolean createCodeByPhoneNum (String phoneNum) { boolean result = true ; Jedis jedis = new Jedis ("192.168.111.144" , 6379 ); String sendCountKey = "sendCode" + phoneNum + ":count" ; String codeKey = "code" + phoneNum + ":code" ; String code = getCode(); boolean saveCode = true ; String count = jedis.get(sendCountKey); if (count == null ) { jedis.setex(sendCountKey, 24 * 60 * 60 , "1" ); } else if (Integer.parseInt(count) < 3 ) { jedis.incr(sendCountKey); } else if (Integer.parseInt(count) > 2 ) { saveCode = false ; result = false ; } if (saveCode) { jedis.setex(codeKey, 60 * 2 , code); } jedis.close(); return result; }
3)创建校验验证码的方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 @Override public Boolean verifyCodeByPhoneNum (String phoneNum, String code) { Jedis jedis = new Jedis ("192.168.111.144" , 6379 ); String codeKey = "code" + phoneNum + ":code" ; String redisCode = jedis.get(codeKey); jedis.close(); if (redisCode == null ) { return false ; } else if (redisCode.equals(code)) { return true ; } else { return false ; } }
九、SpringBoot 整合 Redis
1、在 pom 文件中引入相关依赖
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 <dependency > <groupId > redis.clients</groupId > <artifactId > jedis</artifactId > <version > 3.6.3</version > </dependency > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-data-redis</artifactId > <version > 2.5.3</version > </dependency > <dependency > <groupId > org.apache.commons</groupId > <artifactId > commons-pool2</artifactId > <version > 2.8.0</version > </dependency >
2、application.yml 配置 redis 相关配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 spring: redis: host: 192.168 .111 .144 port: 6379 database: 0 jedis: pool: max-active: max-wait: -1 max-idle: 5 min-idle: 0
3、配置 redis 配置类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 @EnableCaching @Configuration public class RedisConfig { @Bean public RedisTemplate<String, Object> redisTemplate (RedisConnectionFactory redisConnectionFactory) throws UnknownHostException { RedisTemplate<String, Object> template = new RedisTemplate <>(); template.setConnectionFactory(redisConnectionFactory); setRedisTemplate(template); template.afterPropertiesSet(); return template; } private void setRedisTemplate (RedisTemplate<String, Object> template) { Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer <>(Object.class); ObjectMapper objectMapper = new ObjectMapper (); objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS); objectMapper.registerModule(new JavaTimeModule ()); objectMapper.activateDefaultTyping(LaissezFaireSubTypeValidator.instance, ObjectMapper.DefaultTyping.NON_FINAL, JsonTypeInfo.As.PROPERTY); jackson2JsonRedisSerializer.setObjectMapper(objectMapper); StringRedisSerializer stringRedisSerializer = new StringRedisSerializer (); template.setKeySerializer(stringRedisSerializer); template.setHashKeySerializer(stringRedisSerializer); template.setValueSerializer(jackson2JsonRedisSerializer); template.setHashValueSerializer(jackson2JsonRedisSerializer); template.afterPropertiesSet(); } }
4、redis 工具类
使用的时候注入即可
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 package com.sy.jedis_redisdemo.utils;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.data.redis.core.RedisTemplate;import org.springframework.stereotype.Component;import org.springframework.util.CollectionUtils;import java.util.Collection;import java.util.List;import java.util.Map;import java.util.Set;import java.util.concurrent.TimeUnit;@Component public class RedisUtil { @Autowired private RedisTemplate<String, Object> redisTemplate; public boolean watch (String key) { try { if (key != null && key != "" ) { redisTemplate.watch(key); } return true ; } catch (Exception e) { e.printStackTrace(); return false ; } } public boolean watch (Collection<String> keys) { try { if (keys != null && keys.size()>0 ) { redisTemplate.watch(keys); } return true ; } catch (Exception e) { e.printStackTrace(); return false ; } } public boolean unwatch () { try { redisTemplate.unwatch(); return true ; } catch (Exception e) { e.printStackTrace(); return false ; } } public boolean expire (String key, long time) { try { if (time > 0 ) { redisTemplate.expire(key, time, TimeUnit.SECONDS); } return true ; } catch (Exception e) { e.printStackTrace(); return false ; } } public long getExpire (String key) { return redisTemplate.getExpire(key, TimeUnit.SECONDS); } public boolean hasKey (String key) { try { return redisTemplate.hasKey(key); } catch (Exception e) { e.printStackTrace(); return false ; } } @SuppressWarnings("unchecked") public void del (String... key) { if (key != null && key.length > 0 ) { if (key.length == 1 ) { redisTemplate.delete(key[0 ]); } else { redisTemplate.delete((Collection<String>) CollectionUtils.arrayToList(key)); } } } public Object get (String key) { return key == null ? null : redisTemplate.opsForValue().get(key); } public boolean set (String key, Object value) { try { redisTemplate.opsForValue().set(key, value); return true ; } catch (Exception e) { e.printStackTrace(); return false ; } } public boolean set (String key, Object value, long time) { try { if (time > 0 ) { redisTemplate.opsForValue().set(key, value, time, TimeUnit.SECONDS); } else { set(key, value); } return true ; } catch (Exception e) { e.printStackTrace(); return false ; } } public long incr (String key, long delta) { if (delta < 0 ) { throw new RuntimeException ("递增因子必须大于0" ); } return redisTemplate.opsForValue().increment(key, delta); } public long decr (String key, long delta) { if (delta < 0 ) { throw new RuntimeException ("递减因子必须大于0" ); } return redisTemplate.opsForValue().increment(key, -delta); } public Object hget (String key, String item) { return redisTemplate.opsForHash().get(key, item); } public Map<Object, Object> hmget (String key) { return redisTemplate.opsForHash().entries(key); } public boolean hmset (String key, Map<String, Object> map) { try { redisTemplate.opsForHash().putAll(key, map); return true ; } catch (Exception e) { e.printStackTrace(); return false ; } } public boolean hmset (String key, Map<String, Object> map, long time) { try { redisTemplate.opsForHash().putAll(key, map); if (time > 0 ) { expire(key, time); } return true ; } catch (Exception e) { e.printStackTrace(); return false ; } } public boolean hset (String key, String item, Object value) { try { redisTemplate.opsForHash().put(key, item, value); return true ; } catch (Exception e) { e.printStackTrace(); return false ; } } public boolean hset (String key, String item, Object value, long time) { try { redisTemplate.opsForHash().put(key, item, value); if (time > 0 ) { expire(key, time); } return true ; } catch (Exception e) { e.printStackTrace(); return false ; } } public void hdel (String key, Object... item) { redisTemplate.opsForHash().delete(key, item); } public boolean hHasKey (String key, String item) { return redisTemplate.opsForHash().hasKey(key, item); } public double hincr (String key, String item, double by) { return redisTemplate.opsForHash().increment(key, item, by); } public double hdecr (String key, String item, double by) { return redisTemplate.opsForHash().increment(key, item, -by); } public Set<Object> sGet (String key) { try { return redisTemplate.opsForSet().members(key); } catch (Exception e) { e.printStackTrace(); return null ; } } public boolean sHasKey (String key, Object value) { try { return redisTemplate.opsForSet().isMember(key, value); } catch (Exception e) { e.printStackTrace(); return false ; } } public long sSet (String key, Object... values) { try { return redisTemplate.opsForSet().add(key, values); } catch (Exception e) { e.printStackTrace(); return 0 ; } } public long sSetAndTime (String key, long time, Object... values) { try { Long count = redisTemplate.opsForSet().add(key, values); if (time > 0 ) { expire(key, time); } return count; } catch (Exception e) { e.printStackTrace(); return 0 ; } } public long sGetSetSize (String key) { try { return redisTemplate.opsForSet().size(key); } catch (Exception e) { e.printStackTrace(); return 0 ; } } public long setRemove (String key, Object... values) { try { Long count = redisTemplate.opsForSet().remove(key, values); return count; } catch (Exception e) { e.printStackTrace(); return 0 ; } } public List<Object> lGet (String key, long start, long end) { try { return redisTemplate.opsForList().range(key, start, end); } catch (Exception e) { e.printStackTrace(); return null ; } } public long lGetListSize (String key) { try { return redisTemplate.opsForList().size(key); } catch (Exception e) { e.printStackTrace(); return 0 ; } } public Object lGetIndex (String key, long index) { try { return redisTemplate.opsForList().index(key, index); } catch (Exception e) { e.printStackTrace(); return null ; } } public boolean lSet (String key, Object value) { try { redisTemplate.opsForList().rightPush(key, value); return true ; } catch (Exception e) { e.printStackTrace(); return false ; } } public boolean lSet (String key, Object value, long time) { try { redisTemplate.opsForList().rightPush(key, value); if (time > 0 ) { expire(key, time); } return true ; } catch (Exception e) { e.printStackTrace(); return false ; } } public boolean lSet (String key, List<Object> value) { try { redisTemplate.opsForList().rightPushAll(key, value); return true ; } catch (Exception e) { e.printStackTrace(); return false ; } } public boolean lSet (String key, List<Object> value, long time) { try { redisTemplate.opsForList().rightPushAll(key, value); if (time > 0 ) { expire(key, time); } return true ; } catch (Exception e) { e.printStackTrace(); return false ; } } public boolean lUpdateIndex (String key, long index, Object value) { try { redisTemplate.opsForList().set(key, index, value); return true ; } catch (Exception e) { e.printStackTrace(); return false ; } } public long lRemove (String key, long count, Object value) { try { Long remove = redisTemplate.opsForList().remove(key, count, value); return remove; } catch (Exception e) { e.printStackTrace(); return 0 ; } } }
5、编写 controller 测试
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 private final RedisUtil redisUtil; @ApiOperation(value = "验证redis整合") @GetMapping(value = "/verifyRedis") public ResultBody verifyRedis () { redisUtil.set("test" , "ceshi" ); String value = (String) redisUtil.get("test" ); System.out.println(value); return ResultBody.success(); }
十、Redis 事务和锁机制
Redis 事务是一个单独的隔离操作:事务中的所有命令都会序列化、按顺序的执行。事务在执行的过程中,不会被其他客户端发送来的命令请求所打断。
Redis 事务的主要作用就是串联多个命令防止别的命令插队。
1、基本操作
1)、Multi、Exec、Discard
Multi: 开始事务,进行组队阶段
Exec: 执行阶段,按照顺序执行
Discard: 放弃执行
从输入 Multi 命令开始,输入的命令都会依次进入命令队列中,但不会执行,直到输入 Exec 后,Redis 都会将之前的命令队列中的命令依次执行。
组队的过程中可以通过 Discard 来放弃组队
示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 # 开启事务 multi set key1 value1 set key2 value2 # 执行 exec keys * # 开启事务 multi set a1 v1 set a2 v2 # 放弃事务 discard keys *
2)事务的错误处理
事务可能会出现的错误有两种:
组队中某个命令出现了报告错误,执行时整个的所有队列都会被取消
示例:
执行阶段某个命令出现了错误,则只有报错的命令不会执行,而其他命令正常执行。
2、事务冲突的问题
场景: 有很多人有你的账户,同事去参加双十一抢购
3、例子
一个请求想给金额减8000
一个请求想给金额减5000
一个请求想给金额减1000
1)解决方案
1. 悲观锁
什么是悲观锁: 顾名思义,就是很悲观,每次去拿数据的时候都认为别人会修改,所以每次在拿到数据的时候都会上锁,这样别人想去拿这个数据就会 block(阻塞) ,知道他用完数据,将锁释放后,被人才可以拿到数据。传统的关系型数据库中就用到了很多这种锁机制 ,比如行锁 ,表锁 等,读锁 ,写锁 等,都是在做操作之前先上锁锁 。
2.乐观锁
什么是乐观锁: 顾名思义,就是很乐观,每次去拿数据的时候都认为别人不会修改,所以不会上锁,但是在更新的时候会判断一下在此期间别人有没有去更新这个数据,可以使用版本号等机制。乐观锁适用于多读的应用类型,这样可以提高吞吐量。Redis 就是利用这种 check-and-set 机制实现事务的 。
3.watch key [key…] 命令
在执行 Multi 之前,先执行 watch key [keys…] 命令,可以监视一个(或多个)key ,如果在事务执行之前这个(这些)key 被其他命令所改动,那么事务将会被打断
测试例子:
4、Redis 事务三特性
单独的隔离操作:
事务中的所有命令都会序列化、按顺序的执行。事务在执行的过程中,不会被其他客户端发送的命令请求打断。
没有隔离级别的概念:
队列中的命令没有提交之前都不会实际被执行,因为事务提交前任何指令都不会被实际执行。
不保证原子性:
事务中如果有一条命令执行失败,其后的命令仍然会被执行,没有回滚。
5、秒杀案例
1)基本实现
页面:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 <!DOCTYPE html > <html lang ="en" xmlns:th ="http://www.thymeleaf.org" > <script src ="webjars/jquery/3.5.1/jquery.min.js" > </script > <head > <meta charset ="UTF-8" > <title > Title</title > </head > <body > <h1 > iPhone 13 pro!!! 1元秒杀!!!</h1 > <form id ="msform" action ="http://localhost:8079/redis/ms/doseckill" > <input type ="hidden" id ="prodid" name ="prodid" value ="0101" > 用户id:<input type ="text" id ="userId" name ="userid" > <input type ="button" id ="miaosha_btn" name ="seckill_btn" value ="秒杀点我" /> </form > </body > <script > $(function ( ) { var form = $("#msform" ); var parems = {}; $("#miaosha_btn" ).click (function ( ) { parems.userId = $("#userId" ).val (); parems.prodid = $("#prodid" ).val (); $.ajax ({ type : "post" , url : "http://localhost:8079/redis/ms/doseckill" , data : JSON .stringify (parems), dataType : "json" , contentType : "application/json;charset=utf-8" , success : function (data ) { if (data.code === "00000" ) { alert (data.message ); $("#miaosha_btn" ).attr ("disabled" , true ); } else if (data.code === "50000" ) { alert (data.message ); } } }) }) }) </script > </html >
实体类:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 package com.sy.jedis_redisdemo.restful.beans;import lombok.Data;@Data public class User { public String userId; public String prodid; }
Controller:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 package com.sy.jedis_redisdemo.restful.controller;import com.sy.api.context.ResultBody;import com.sy.jedis_redisdemo.restful.beans.User;import com.sy.jedis_redisdemo.restful.service.MsService;import io.swagger.annotations.Api;import io.swagger.annotations.ApiOperation;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.*;@Api(value = "秒杀", tags = "秒杀") @RestController @RequestMapping("/ms") public class MsController { @Autowired private MsService msService; @ApiOperation(value = "秒杀") @PostMapping(value = "/doseckill") public ResultBody doseckill (@RequestBody User user) { return msService.doseckill(user); } }
Service:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 package com.sy.jedis_redisdemo.restful.service;import com.sy.api.context.ResultBody;import com.sy.jedis_redisdemo.restful.beans.User;public interface MsService { ResultBody doseckill (User user) ; }
ServiceImpl:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 package com.sy.jedis_redisdemo.restful.service.impl;import com.baomidou.mybatisplus.extension.api.R;import com.sy.api.context.ResultBody;import com.sy.jedis_redisdemo.restful.beans.User;import com.sy.jedis_redisdemo.restful.service.MsService;import com.sy.jedis_redisdemo.utils.RedisUtil;import org.apache.commons.lang3.StringUtils;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Service;import redis.clients.jedis.Jedis;import sun.net.RegisteredDomain;@Service public class MsServiceImpl implements MsService { @Autowired private RedisUtil redisUtil; @Override public ResultBody doseckill (User user) { if (StringUtils.isEmpty(user.userId) || StringUtils.isEmpty(user.prodid)) { return ResultBody.error("用户或商品id不能为空" ); } String kcKey = "sk:" + user.prodid + ":kc" ; String userKey = "sk:" + user.prodid + ":user" ; Object kc = redisUtil.get(kcKey); if (kc == null ) { return ResultBody.error("秒杀未开始,请等待!" ); } if (redisUtil.sHasKey(userKey, user.userId)) { return ResultBody.error("已经秒杀成功,不能重复秒杀!" ); } if ((int )kc <= 0 ) { return ResultBody.error("秒杀已经结束!" ); } redisUtil.decr(kcKey, 1 ); redisUtil.sSet(userKey, user.userId); return ResultBody.success("秒杀成功!" ); } }
2)ab工具模拟并发
执行 yum install httpd-tools 安装
2-2)命令及常用参数
常用参数: ab –list 可查看所有参数
1 2 3 4 5 6 7 8 9 10 11 # 请求次数 -n requests Number of requests to perform # 并发次数 -c concurrency Number of multiple requests to make at a time # 提交参数,参数要写到文件中,针对post,使用时要设置 -T -p postfile File containing data to POST. Remember also to set -T -u putfile File containing data to PUT. Remember also to set -T # 设置 content-type,和你请求的 content-type 一样 -T content-type Content-type header to use for POST/PUT data, eg. 'application/x-www-form-urlencoded' Default is 'text/plain'
命令:
1 2 # 1000 次请求,其中并发请求有100个,参数文件是 postfiel ab -n 1000 -c 100 -p postfile -T 'application/json;charset=utf-8' http://192.168.111.2:8079/redis/ms/doseckill
2-3)测试
创建 postfile 参数文件内容(用于 ab 工具测试并发发送的参数):
修改 controller: 因为同一个 userId 不能重复秒杀,所以我们在这里随机一个。
执行命令进行测试:
1 ab -n 1000 -c 100 -p postfile -T 'application/json;charset=utf-8' http://192.168.111.2:8079/redis/ms/doseckill
查看执行结果:
执行命令的结果:
redis 中数据情况:
出现的问题:
在上面测试结果中可以看到库存数量变为了 -35 ,这里出现的就是 超卖 的问题
还可能会出现 连接超时的问题
3)超卖和连接超时问题解决
3-1)连接超时问题
这个问题在 SpringBoot 整合 Redis 中,通过 yml,redisConfig,RedisUtils 已经解决了
3-2)超卖问题
1.setnx 解决
2.Redisson 解决
添加依赖
1 2 3 4 5 6 <dependency > <groupId > org.redisson</groupId > <artifactId > redisson-spring-boot-starter</artifactId > <version > 3.13.6</version > </dependency >
修改 serviceImpl
这里不用考虑 锁误删的问题,因为Redisson内部已经拼接了,见下图:
这里的 id 是 ConnectionManager 的一个属性,继续追源码你会发现他也是一个 UUID 类型的。
十一、Redis 持久化
1、RDB
1)什么是 RDB
RDB 就是在指定的时间间隔 内将内存中的数据集快照 写入磁盘。
直白的说就是:在某一时刻记录下数据快照,在时间间隔内将数据存储到磁盘,恢复的时候也是将快照文件直接读到内存中
2)备份是如何执行的
Redis 会单独创建一个 (Fork) 一个子进程来进行持久化,会先将数据写入到一个临时文件中,等持久化过程都结束了,再用这个临时文件替换上次持久化好的文件 。整个过程中,主进程不进行任何 IO 操作,这就确保了极高的性能。如果需要大规模的数据恢复,而且对于数据恢复的完整性不是非常敏感,那 RDB 方式要比 AOF 方式更加高效。
RDB 的缺点就是:最后一次持久化后的数据可能丢失。
3)备份文件位置与备份规则
位置:默认在那个目录下启动 redis 服务,文件就生成在哪里。
规则:
4)命令 save VS bgsave
Redis 默认情况下是自动进行备份的。也可以使用下面两个命令进行手动备份。
save: save 时只管保存,其他不管,全部阻塞。不建议
bgsave: Redis 会在后台异步进行快照操作,快照同时还可以响应客户端请求。
2、AOF
1)什么是 AOF
以 日志 的形式类记录每个写操作(增量保存) ,将 Redis 执行过的所有写指令记录下来(读操作不记录 ),只许追加文件但不可以改写文件 ,Redis 启动之初会读取该文件重新构建数据。简单的来说就是根据日志文件的内容将所有指令从前到后执行一次来完成数据的恢复工作。
2)AOF 持久化流程
客户端的请求写命令会被 append 追加到 AOF 缓冲区内;
AOF 缓冲区根据 AOF 持久化策略 [always,everysec,no]**,将操作 **sync 同步到磁盘的 AOF 文件中;
AOF 文件大小超过重写策略或手动重写时,会对 AOF 文件 rewrite 重写,压缩 AOF 文件的容量;
Redis 服务重启是,会重新 load 加载 AOF 文件中的写操作达到数据恢复的目的;
3)AOF 默认不开启
可以再 redis.conf 中配置文件名称,默认为:appendonly.aof
AOF 文件的保存路径与 RDB 的路径一致
4)AOF 和 RDB 同时开启,Redis 听谁的?
AOF 和 RDB 同时开启,系统默认取 AOF 的数据(数据不会存在丢失情况)
5)AOF 文件损坏xiuf
如果遇到 AOF 文件损坏,可以通过:/bin/redis-check-for-aof–fix appendonly.aof 进行恢复
备份被写坏的 AOF 文件
恢复:重启 Redis ,然后会自动加载 AOF 文件
6)AOF 同步频率设置
appendfsync always
始终同步,每次 Redis 的写操作都会like记入日志;性能比较差,但是数据的完整性比较好。
appendfsync everysec
每秒同步,每秒记入日志一次,如果宕机,本秒的数据可能会丢失
appendfsync no
Redis 不主动进行同步,把同步时机交给操作系统。
7)Rewrite 压缩
当 AOF 文件 >= 设置定文件大小后,会对 AOF 文件进行压缩重写
例:
1 2 3 4 5 6 # 我们实际执行的指令 set k1 v1 set k2 v2 压缩后的 set k1 v1 k2 v2
其实 AOF 的压缩重写,就是把一些命令重写为一条命令,不去关注过程,只关注最后的结果。
十二、Redis 主从复制
1、什么是主从复制
主机数据更新后根据配置和策略,自动同步到备机的 master/slaver 机制 , Master 以写为主,Slaver 以读为主 。
2、能干嘛
3、主从配置
这里是在一台虚拟机中,通过不同端口来模拟多台服务器
新建一个目录,名字自己定义就好了。
将我们原本的 redis.conf 复制到新创建的目录中。
创建三个配置文件,分别以端口进行区别,内容配置也要进行相应的修改
1 2 3 4 5 6 # 引入基本的配置文件 include /usr/local/myredis/redis.conf # 重新配置的内容,可以覆盖基本配置中配置的 pidfile /var/run/redis_6379.pid port 6379 dbfilename dump6379.rdb
可以看到这里我们已经起来了这三个 Redis
连接三台 Redis 并查看信息,可以看到三台 Redis 都是 master(主)
配置主从关系:配从(库)不配主(库)
1 2 3 4 5 # 在从机中使用命令配置,ip:主机ip,port:主机端口 slaveof ip port # 取消从服务器配置,会重新变成主服务器 slaveof no one
查看信息已经可以看到,角色(role)已经为从机(slave)
测试
在主机中新增一个 key
在从机中查看
注意:写操作只能在主机中进行,从机中只能进行读操作
4、主从复制的原理
当从服务器(slave )连接上主服务器(master )之后,从服务器(slave )会向主服务器(master )发送进行数据同步的消息
master 接到 slave 发送过来的同步消息后,把 master 的数据会持久化到 RDB 文件,把 RDB 文件发送给 slave ,slave 拿到 RDB 文件进行读取
master 每次进行写操作后,会和 slave 进行同步数据。
注意:
上面第 2 步是 slave 主动同步数据(全量复制),第 3 步是 master 主动同步数据(增量复制)
全量复制: 从服务器(slave )主动发送消息同步数据
增量复制: 主服务器(master )将写操作命令传给从服务器(slave )进行数据同步
5、一主二仆
特点:
当某一台从服务器挂掉之后再重启,他重新变成一个主机,并不会恢复成原来的从服务器
当某一台冲服务器挂掉之后,主服务数据发生了变动,从服务器重启之后,重新设置成主服务的从服务器,那么他会将主服务器的数据复制过来
当主服务挂掉之后,从服务器还是从服务器,并且可以看到主服务器已经挂掉了(down),并不会上位变成主服务器。当主服务器重新启动之后,依旧是主服务器,两台从服务器还是他小弟,并且可以再信息中看到大哥的状态是启动的(up)
测试:
shutdown 掉 6381,查看主服务器(6379) 的信息,我们可以看到主服务器信息中只有一台从服务器了
我们在 主服务器(6379)中随意添加几条数据。
重新启动 6381,并查看信息,我们可以看到他重新变成了主服务器
我们将 6381 重新设置成从服务器,并分别查看他和主服务器的信息,可以看到他已经变成了从服务器,而且主服务器的信息中显示有两台从服务器。
查看从服务器中的数据,可以看到和主服务器是一样的
6、薪火相传
薪火相传 我们可以理解为 组织架构树 或者我们实际工作中项目经理、组长、组员 这种结构。
就是说:
6379 为主服务器
6380 是 6379 的从服务器
6381 是 6380 的服务器
这样,6379 数据会同步给 6380 ,6380 再同步给 6381。
缺点:
当 6380 挂掉了,那么 6379 的数据也无法同步给 6381了。
这里可以自己进行测试一下,我这里就不实际进行操作了。
7、反客为主
其实这个就是我们上面提到的 slaveof no one 命令,可已将从机变为主机。
缺点: 需要手动完成。
8、哨兵模式
1)什么是哨兵模式
反客为主的自动版 ,能够后台监控主机是否故障,如果故障了,根据投票数自动将从库转换为主库。
其实就像是 zookeeper 集群的选举。农奴翻身把歌唱,噢耶!
2)如何使用
设置为一主二仆/多仆模式
创建 sentinel.conf 文件,目录自定义,我们这里就放在和 redis6379.conf 同级的目录下了
配置哨兵,编写 sentinel.conf 配置文件内容
1 2 3 4 5 6 # sentinel 哨兵 # monitor 监控 # mymaster 为监控对象起的服务器名称,其实就是为 master 主机起个别名 # 192.168.111.144 6379 主机 ip 和端口 # 1 至少有多少个哨兵同意迁移的数量,通俗的说就是:需要有多少个从机同意才能切换为主机,这里个人建议设置为从机数量的一半以上。类似于 zookeeper 的选举 sentinel monitor mymaster 192.168.111.144 6379 1
启动哨兵,使用 redis-sentinel 启动
注意,哨兵服务的默认端口为:26379
测试,关掉 6379,查看哨兵窗口变化
这里通过控制台信息我们可以看到 6381 被选为了主服务器,6380 变成了 6381 的从机。
还有一个信息就是,6379 还在。那么当 6379 重新启动后会变成什么样呢?这里我们将 6379 重启,并查看信息。
通过上面的信息我们可以看到,6379 变成了 6381 的从机。
复制延时
哨兵模式的缺点,由于所有的写操作都是在 Master 上进行的,然后同步更新到 Slave 上,所以从 Master 同步到 Slave 会有一定的延迟,当系统很繁忙的时候,延迟问题会更加严重,Slave 机器数量的增加也会使这个问题更加严重。
3)故障恢复
4)springboot 主从复制
只需要在 yml 配置文件中进行相应配置即可:
服务启动出现了报错:
1 2 SENTINEL SENTINELS command returns less than 2 nodes! At least two sentinels should be defined in Redis configuration. Set checkSentinelsList = false to avoid this check.
经过一番搜索并没有找到什么有用的,所以我又增加了两个哨兵@~@
修改原有的 sentinel.conf 配置文件,并重命名为 sentinel6379.conf
1 2 3 sentinel monitor mymaster 192.168.111.144 6379 1 protected-mode no port 26379
新增两个配置文件,分别是:sentinel6380.conf ,sentinel6381.conf ,只需要修改一下 port ,分别是:26380,26381
启动服务/测试类测试:
执行后可以到 Redis 中看一下数据是不是已经存进去了。
参考:springboot+redis的sentinel实现哨兵模式(超详细)
十三、Redis 集群
1、问题
容量不够,redis 如何进行扩容?
并发写操作,redis 如何分摊?
另外,主从模式,薪火相传模式,主机宕机,导致 ip 地址发生变化,应用程序中配置需要修改对应的主机地址、端口等信息。
之前通过代理主机来解决,但是 redis3.0 中提供了解决方案。就是 无中心化集群 配置。
2、什么是集群?
Redis 集群实现了对 Redis 的水平扩容,即启动 N 个 Redis 节点,将这个数据库分布存储在这 N 个节点中,每个节点存储总数据的 1/N 。
Redis 集群通过分区(partition) 来提供一定程度的可用性(availability) :即使集群中有一部分节点失效或者无法进行通讯,集群也可以继续处理命令请求。
3、Redis 集群的搭建
这里我们还是在同一台虚拟机中以不同的端口的方式来模拟搭建,依旧是 主从复制 中用到的目录就可以。
创建六个实例分别是:6379、6380、6381、6389、6390、6391 (三主三从)。
删除掉之前测试产生的 dump、aof 文件。
我们在 redis_6379.conf 的基础上配置基本信息
在配置文件中增加集群相关配置
1 2 3 4 5 6 7 8 9 10 11 12 13 include /usr/local/myredis/redis.conf pidfile "/var/run/redis_6379.pid" port 6379 dbfilename "dump6379.rdb" logfile "/usr/local/myredis/redis_err_6379.log" daemonize yes dir "/usr/local/myredis" # 开启集群 cluster-enabled yes # 指定集群节点配置文件 cluster-config-file nodes_6379.conf # 设定节点失联时间(毫秒),超时后集群自动进入主从切换 cluster-node-timeout 15000
删除6380 6381 两个配置文件,并将我们刚才修改好的配置文件在复制出5份。
修改复制出来的那几份文件,修改相应的端口
1 2 3 # 可以使用替换进行操作 vim redis6380.conf :%s/6379/6380
启动 6 个 Redis 服务,并查看节点配置文件是否正确生成了
将 6 个节点合成一个集群
连接
1 2 3 # -c 采用集群策略连接,设置数据会自动切换到相应的写主机 # 连接任何一个都可以,例如6379/6380/6381,会自动切换到写主机 redis-cli -c -p 6379
查看集群节点信息:cluster nodes
我们可以在信息中看到哪一台是主机,哪一台是从机,和对应的主从。
4、Redis cluster 如何分配这六个节点?
一个集群至少要有三个主节点
选项 –cluster-replicas 1 表示我们希望为集群中的每个主节点创建一个从节点。
分配原则尽量保证每个主数据库运行在不同的 IP 地址,每个从库和主库不在一个 IP 地址上。
5、什么是 slots(插槽)?
通俗的来说就像是 洞 ,每个洞都可以放入东西。
一个 Redis 集群包含 16384 个插槽(hash slot),数据库中的每个键都属于这 16384 个插槽中的其中一个。
每个主节点都有他负责的一定范围的插槽,可以再节点信息中看到。
集群会使用 CRC16(key)%16384 来计算 key 属于哪个插槽,其中的 CRC16(key) 语句用于计算键 key 的 CRC16 校验和。
6、集群数据的新增与查询
新增(单个key/value、多个key/value)
查询集群中的值
1 2 3 4 5 6 7 8 9 10 # 获取 key 在集群中的插槽位置:cluster keyslot key cluster keyslot k1 result: 12706 # 获取插槽位置中有几个值,只能插槽所在的主机中用,否则返回 0 cluster countkeysinslot 12706 # 获取指定插槽中count个 key,cluster get keysinslot slot count # 获取 12706 这个插槽中的 key,获取 10 个 cluster getkeysinslot 12706 10
7、故障恢复
如果主节点下线,从节点能否自动提升为主节点?注意:15 秒超时
从上图可以看到,6379 下线之后,6380 在 15 秒之后提升为主节点了。
下线的主节点恢复之后,主从关系如何?
从上图可以看到,下线的主节点恢复后变成了从机。
如果某一段插槽的主从都挂掉了(集群中的某一组主从挂掉),那么这个集群是否还能继续使用?
根据 redis.conf 中 cluster-require-full-coverage 配置而变化的:
如果 cluster-require-full-coverage 为 yes ,那么,整个集群都挂掉。
如果 cluster-require-full-coverage 为 no ,那么就只有该主从不能用,也无法存储。
8、springboot 集群的配置
修改 yml 配置文件
测试类测试
这样简单的配置就好了,我这里没有报错一切顺利,因为我也没用过,所以这些应该是很基础的配置。
9、Redis 集群的好处与不足
好处:
不足
多键操作是不被支持的
多键的 Redis 事务是不被支持的。lua 脚本不被支持。
由于集群方案出现的较晚,很多公司已经采用了其他的集群方案,而代理或者客户端分片的方案想要 迁移 至 redis cluster ,需要整体迁移,而不是逐步过渡,复杂难度大
十四、Redis 应用问题解决
1、缓存穿透
什么是 缓存穿透
缓存穿透: 就是大量的请求 访问缓存,获取不存在的数据 ,这个时候就会去请求数据库(数据库中也不存在 ),导致数据库瘫痪或错误、宕掉等。
出现场景
一般出现在一下两种场景中:
redis 中查询不到数据
出现很多非正常 url 访问
解决方案
对空值缓存
如果查询返回的数据为空(不管数据是不是存在),我们仍然把这个空结果(null)进行缓存,设置空结果的过期时间会很短,最长不会超过五分钟。
设置可以访问的名单(白名单)
使用 bitmaps 类型定义一个可以访问的名单,名单 id 作为 bitmaps 的偏移量,每次访问和 bitmaps 里面的 id 进行比较,如果访问的 id 不在 bitmaps 中,进行拦截,不允许访问。
采用布隆过滤器
这个底层其实就是一个 bitmaps ,只不过做了相应的封装等。
其实就是检索一个元素是否在一个集合中。
优点:空间效率和查询时间都远超一般的算法
缺点:有一定的误识别率和删除困难
进行实时监控
当发现 Redis 的命中率开始急速降低,需要排查访问对象和访问的数据,和运维人员配合,可以设置黑名单限制服务。
2、缓存击穿
什么是 缓存击穿
缓存击穿: key 对应的数据在数据库中存在,但是在 redis 中过期,此时若有大量并发请求过来,这些请求发现缓存过期一般都会从后端 DB 加载数据并回设到缓存,这个时候并发的请求可能会瞬间把后端 DB 压垮
注意:这里指的是某个 key 过期了,大量访问使用这个 key 。并不是出现大量的 key 过期
解决方案
3、缓存雪崩
什么是 缓存雪崩
缓存雪崩: 在极少时间段内,查询大量 key 的集中过期情况。
解决方案
构建多级缓存架构
nginx 缓存 + Redis 缓存 + 其他缓存(ehcache 等)
使用锁或队列
用加锁或者队列的方式来保证不会有大量的线程对数据库一次性进行读写,从而避免失效时大量的并发请求落到底层存储系统上。不适合高并发情况。
设置过期标志更新缓存
记录缓存数据是否过期(设置提前量),如果过期会触发通知另外的线程在后台去更新实际 key 的缓存
将缓存失效时间分散开
比如我们可以再原有的失效时间基础上增加一个随机值,比如 1-5 分钟随机,这样每一个缓存的过期时间的重复率就会降低,就很难引发集体失效的事件。
4、分布式锁
1)什么是 分布式锁
个人理解 分布式锁 就是应用在集群、分布式服务中。因为原单体单机部署的系统演化成分布式集群系统,由于分布式系统多线程、多进程且分布在不同的机器上,这样原来的单机部署情况下的并发锁策略就失效了,我们就需要一种跨 JVM 的互斥机制(锁)来控制共享资源的访问
2)实现方式
基于数据库实现
基于缓存(Redis 等)
性能好
基于 zookeeper
可靠性高
3)使用 redis实现分布式锁
其实就是 setnx key value 来设置一个 key 作为锁,释放锁就是删除掉这个 key 。
问题一:假如现在出现了别的情况,例如:加锁之后程序报错或由于别的原因没有释放该怎么办呢?
解决方式就是设置一个过期时间
1 2 set key_lock 1 exprie key_lock 10
问题二:如果在设置过期时间的时候服务器突然死掉了或者由于一些其他原因设置失败了怎么办?
解决方式就是加锁的同时设置过期时间
1 2 # 这个命令在上面说过,他的意思就是:新增一个 key 并设置他的时间,只有在这个 key 不存在的时候才能设置成功 set kye_lock 1 EX 10 NX
问题三:误删问题
这个可以通过添加 UUID 等方式解决,在上面的 秒发案例的超卖问题中 中已经解决过了。
案例
案例一、
场景:
一亿个用户,用户有频繁登录的,也有不经常登录的
如何记录用户的登录信息
如何查询活跃用户,[如: 连续登录的]
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 # 以位图的方式来解决 # 用户的登录状态以 0/1来表示 # 初始值 一亿个用户都没有登陆过,设置第3,5,7位用户在周一登录过 setbit mon 100000000 0 setbit mon 3 1 setbit mon 5 1 setbit mon 7 1 # 设置第3,4,8位用户在周二登录过 setbit tue 100000000 0 setbit tue 3 1 setbit tue 4 1 setbit tue 8 1 # 设置第3,6,9位用户在周三登录过 setbit wed 100000000 0 setbit wed 3 1 setbit wed 6 1 setbit wed 9 1 # 按位与运算 # res 为比较后返回的结果集 # 例子:0100 # 0101 # 0110 # and 返回结果集是0100,位图的同一个位置都一样,则返回的是1,否则返回0 # or 返回结果集是0111,位图的同一个位置有一个是1,则返回1 # bitop and res mon tue wed