一、Redis 入门
Redis为什么单线程还这么快?
误区1:高性能的服务器一定是多线程的?
误区2:多线程(CPU上下文会切换!)一定比单线程效率高!
核心:Redis是将所有的数据放在内存中的,所以说使用单线程去操作效率就是最高的,多线程(CPU上下文会切换:耗时的操作!),对于内存系统来说,如果没有上下文切换效率就是最高的,多次读写都是在一个CPU上的,在内存存储数据情况下,单线程就是最佳的方案。
(1)完全基于内存,数据存在内存中,绝大部分请求是纯粹的内存操作,非常快速,跟传统的磁盘文件数据存储相比,避免了通过磁盘IO读取到内存这部分的开销。
(2)数据结构简单,对数据操作也简单。Redis中的数据结构是专门进行设计的,每种数据结构都有一种或多种数据结构来支持。Redis正是依赖这些灵活的数据结构,来提升读取和写入的性能。
(3)采用单线程,省去了很多上下文切换的时间以及CPU消耗,不存在竞争条件,不用去考虑各种锁的问题,不存在加锁释放锁操作,也不会出现死锁而导致的性能消耗。
(4)使用基于IO多路复用机制的线程模型,可以处理并发的链接。
多个 Socket 可能会产生不同的操作,每个操作对应不同的文件事件,但是IO多路复用程序会监听多个Socket,将Socket产生的事件放入队列中排队,事件分派器每次从队列中取出一个事件,把该事件交给对应的事件处理器进行处理。
Redis客户端对服务端的每次调用都经历了发送命令,执行命令,返回结果三个过程。其中执行命令阶段,由于Redis是单线程来处理命令的,所有每一条到达服务端的命令不会立刻执行,所有的命令都会进入一个队列中,然后逐个被执行。并且多个客户端发送的命令的执行顺序是不确定的。但是可以确定的是不会有两条命令被同时执行,不会产生并发问题,这就是Redis的单线程基本模型。
-
多路I/O复用模型是利用 select、poll、epoll 可以同时监察多个流的 I/O 事件的能力,在空闲的时候,会把当前线程阻塞掉,当有一个或多个流有 I/O 事件时,就从阻塞态中唤醒,然后程序就会轮询一遍所有的流(epoll 是只轮询那些真正发出了事件的流),并且依次顺序的处理就绪的流,这种做法就避免了大量的无用操作。
-
这里“多路”指的是多个网络连接,“复用”指的是复用同一个线程。采用多路 I/O 复用技术可以让单个线程高效的处理多个客户端的网络IO连接请求(尽量减少网络 IO 的时间消耗)
为什么Redis是单线程?
这里我们强调的单线程,指的是网络请求模块使用一个线程来处理,即一个线程处理所有网络请求,其他模块仍用了多个线程。
那为什么使用单线程呢?官方答案是:因为CPU不是Redis的瓶颈,Redis的瓶颈最有可能是机器内存或者网络带宽。既然单线程容易实现,而且CPU不会成为瓶颈,那就顺理成章地采用单线程的方案了。
但是,我们使用单线程的方式是无法发挥多核CPU 性能,不过我们可以通过在单机开多个Redis 实例来解决这个问题
二、五种数据类型 + 三种特殊数据类型
Redis是一个开源(BSD许可),内存存储的数据结构服务器,可用作数据库,高速缓存和消息队列代理。它支持字符串、哈希表、列表、集合、有序集合,位图,hyperloglogs等数据类型。内置复制、Lua脚本、LRU收回、事务以及不同级别磁盘持久化功能,同时通过Redis Sentinel提供高可用,通过Redis Cluster提供自动分区。
Redis-key
在redis中无论什么数据类型,在数据库中都是以key-value形式保存,通过进行对Redis-key的操作,来完成对数据库中数据的操作。
下面学习的命令:
keys *
: 查看当前数据库所有key
set key value
: 设置插入的键值对
exists key
:判断键是否存在
del key
:删除键值对
move key db
:将键值对移动到指定数据库
expire key second
:设置键值对的过期时间
type key
:查看value的数据类型
ttl key
: key的过期时间
-
当前key没有设置过期时间,所以会返回-1.
-
当前key有设置过期时间,而且key已经过期,所以会返回-2.
-
当前key有设置过期时间,且key还没有过期,故会返回key的正常剩余时间.
RENAME key newkey
修改 key 的名称
RENAMENX key newkey
仅当 newkey 不存在时,将 key 改名为 newkey
更多命令:redis命令手册
String(字符串)
127.0.0.1:6379> set key1 vl #设置值
OK
127.0.0.1:6379> get key1 #获取值
"vl"
127.0.0.1:6379> keys * #获得所有的key
1) "key1"
2) "name"
3) "age"
127.0.0.1:6379> exists key1 #判断某一个key是否存在
(integer) 1
127.0.0.1:6379> APPEND key1 "hello" #追加字符串,如果当前key不存在就相当于setkey
(integer) 7
127.0.0.1:6379> get key1
"vlhello"
127.0.0.1:6379> strlen key1 #获取字符串的长度
(integer) 7
127.0.0.1:6379> append key1 ",zhangsan"
(integer) 16
127.0.0.1:6379> get key1
"vlhello,zhangsan"
127.0.0.1:6379> strlen key1
(integer) 16
127.0.0.1:6379>
##########################################################
#步长 i+=
127.0.0.1:6379> set views 0 #初始浏览量为0
OK
127.0.0.1:6379> get views
"0"
127.0.0.1:6379> incr views #使值加1(自增1)
(integer) 1
127.0.0.1:6379> incr views
(integer) 2
127.0.0.1:6379> decr views #使值减一(自减1)
(integer) 1
127.0.0.1:6379> decr views
(integer) 0
127.0.0.1:6379> decr views
(integer) -1
127.0.0.1:6379> get views
"-1"
127.0.0.1:6379> incrby views 10 #设置步长,指定增量
(integer) 9
127.0.0.1:6379> incrby views 10
(integer) 19
127.0.0.1:6379> decrby views 5 #设置步长,减少增量
(integer) 14
#############################################################
#字符串范围 : range
127.0.0.1:6379> get key1
"vlhello,zhangsan"
127.0.0.1:6379> getrange key1 2 5 #截取字符串 [2,5]
"hell"
127.0.0.1:6379> getrange key1 2 -1 #从第二个字符开始,获取全部字符串
"hello,zhangsan"
127.0.0.1:6379> getrange key1 0 -1 #获取全部字符串 和 get key是一样的
"vlhello,zhangsan"#替换
127.0.0.1:6379> set key2 asfzxc
OK
127.0.0.1:6379> get key2
"asfzxc"
127.0.0.1:6379> setrange key2 2 www #替换指定位置开始的字符串
(integer) 6
127.0.0.1:6379> get key2
"aswwwc"
#############################################################
# setex (set with expire) #设置过期时间
# setnx (set if not exist) #不存在在设置 (在分布式锁中会常常使用)
127.0.0.1:6379> get key1
"vlhello,zhangsan"
127.0.0.1:6379> setex key1 10 "zhangsan" #设置key1的值为 zhangsan,10秒后过期
OK
127.0.0.1:6379> get key1
"zhangsan"
127.0.0.1:6379> ttl key1
(integer) -2
127.0.0.1:6379> get key1
(nil)
127.0.0.1:6379> setnx key3 "redis" #如果key3不存在,创建key3
(integer) 1
127.0.0.1:6379> setnx key3 "monkey" #如果key3存在,创建失败
(integer) 0
127.0.0.1:6379> get key3
"redis"
#############################################################
mset
mget
127.0.0.1:6379> mset k1 v1 k2 v2 k3 v3 #同时设置多个值
OK
127.0.0.1:6379> keys *
1) "k3"
2) "k2"
3) "k1"
127.0.0.1:6379> mget k1 k2 k3 #同时获取多个值
1) "v1"
2) "v2"
3) "v3"
127.0.0.1:6379> msetnx k1 v1 k5 v5 #msetnx 是一个原子性操作,要么都成功要么都失败
(integer) 0
127.0.0.1:6379> get k5
(nil)#对象
set user:1 {name:zhangsan,age:30} #设置一个user:1对象值为json字符来保存一个对象。
127.0.0.1:6379> set user:1 {name:zhangsan,age:30}
OK
127.0.0.1:6379> get user:1
"{name:zhangsan,age:30}"#这里的key是一个巧妙的设计: user:{id}:{filed},如此设计在redis中是可以的。
127.0.0.1:6379> mset user:1:name lisi user:1:age 2
OK
127.0.0.1:6379> mget user:1:name user:1:age
1) "lisi"
2) "2"
127.0.0.1:6379> get user:1
(nil)
#############################################################
getset #先get然后再set127.0.0.1:6379> get db
(nil)
127.0.0.1:6379> getset db redis
(nil)
127.0.0.1:6379> get db
"redis"
127.0.0.1:6379> getset db monkey #如果存在值,获取原来的值,并设置新的值
"redis"
127.0.0.1:6379> get db
"monkey"
String类似的使用场景:value除了是字符串还可以是数字,用途举例:
- 计数器
- 统计多单位的数量:uid:123666:follow 0
- 粉丝数
- 对象存储缓存
List(列表)
Redis列表是简单的字符串列表,按照插入顺序排序。你可以添加一个元素到列表的头部(左边)或者尾部(右边)
一个列表最多可以包含 2 32 − 1 2^{32} - 1 232−1 个元素 (4294967295, 每个列表超过40亿个元素)。
首先我们列表,可以经过规则定义将其变为队列、栈、双端队列等
正如图Redis中List是可以进行双端操作的,所以命令也就分为了LXXX和RXXX两类,有时候L也表示List例如LLEN
####################################################
127.0.0.1:6379> lpush list one #将一个值或者多个值,插入到列表头部 (左边插入)
(integer) 1
127.0.0.1:6379> lpush list two
(integer) 2
127.0.0.1:6379> lpush list three
(integer) 3
127.0.0.1:6379> lrange list 0 -1 #获取list中的值
1) "three"
2) "two"
3) "one"
127.0.0.1:6379> lrange list 0 1 #通过区间获取具体的值
1) "three"
2) "two"
127.0.0.1:6379> rpush list right #将一个值或者多个值,插入到列表尾部 (右边插入)
(integer) 4
127.0.0.1:6379> lrange list 0 -1
1) "three"
2) "two"
3) "one"
4) "right"
####################################################
LPOP
RPOP
127.0.0.1:6379> lpop list #移除list的第一个元素
"three"
127.0.0.1:6379> lpop list 2 #移除list的前两个元素
1) "two"
2) "one"
127.0.0.1:6379> lrange list 0 -1 #获取list中的值
1) "right"
127.0.0.1:6379> lrange list 0 -1
1) "right"
2) "li1"
3) "li"
127.0.0.1:6379> rpop list 1 #移除list的最后一个元素
1) "li"
127.0.0.1:6379> lrange list 0 -1
1) "right"
2) "li1"
####################################################
Lindex
127.0.0.1:6379> lrange list 0 -1
1) "right"
2) "li1"
127.0.0.1:6379> lindex list 2 #通过下标获得list中的某一个值
(nil)
127.0.0.1:6379> lindex list 0
"right"
####################################################
Llen
127.0.0.1:6379> llen list #返回列表的长度
(integer) 2
####################################################
移除指定的值 Lrem
取关用到: uid
127.0.0.1:6379> lrange list 0 -1
1) "one"
2) "one"
3) "right"
4) "li1"
127.0.0.1:6379> lrem list 1 one #移除list集合中指定个数的value,精确匹配
(integer) 1
127.0.0.1:6379> lrange list 0 -1
1) "one"
2) "right"
3) "li1"
127.0.0.1:6379> lpush list one
(integer) 4
127.0.0.1:6379> lpush list one
(integer) 5
127.0.0.1:6379> lrange list 0 -1
1) "one"
2) "one"
3) "one"
4) "right"
5) "li1"
127.0.0.1:6379> lrem list 3 one #移除三个指定的值
(integer) 3
127.0.0.1:6379> lrange list 0 -1
1) "right"
2) "li1"
####################################################
trim 修剪 。 list截断
127.0.0.1:6379> rpush mulist "hello1"
(integer) 1
127.0.0.1:6379> rpush mulist "hello2"
(integer) 2
127.0.0.1:6379> rpush mulist "hello3"
(integer) 3
127.0.0.1:6379> rpush mulist "hello4"
(integer) 4
127.0.0.1:6379> lrange mulist 0 -1
1) "hello1"
2) "hello2"
3) "hello3"
4) "hello4"
127.0.0.1:6379> ltrim mulist 1 2 #通过下标截取指定的长度,这个list已经被改变了,截断了只剩下截取的元素。
OK
127.0.0.1:6379> lrange mulist 0 -1
1) "hello2"
2) "hello3"
####################################################
rpoplpush #移除列表的最后一个元素,将它移动到新的列表当中
127.0.0.1:6379> lrange mulist 0 -1
1) "hello2"
2) "hello3"
3) "hello4"
4) "hello5"
127.0.0.1:6379> rpoplpush mulist myotherlist #移除列表的最后一个元素,将它移动到新的列表当中
"hello5"
127.0.0.1:6379> lrange mulist 0 -1 #查看原来的列表
1) "hello2"
2) "hello3"
3) "hello4"
127.0.0.1:6379> lrange myotherlist 0 -1 #查看目标列表中,确实存在该值
1) "hello5"
####################################################
lset 将列表中指定下标的值替换为另一个值,相当于更新操作
127.0.0.1:6379> exists lit #判断这个列表是否存在
(integer) 0
127.0.0.1:6379> lset list 0 item #如果不存在列表 我们去更新就会报错
(error) ERR no such key
127.0.0.1:6379> lpush list value1
(integer) 1
127.0.0.1:6379> lrange list 0 0
1) "value1"
127.0.0.1:6379> lset list 0 item #如果存在,更新当前下标的值
OK
127.0.0.1:6379> lrange list 0 0
1) "item"
127.0.0.1:6379> lset list 1 other #如果不存在 则会报错
(error) ERR index out of range
####################################################
linsert #将某个具体的value值插入到列表中某个元素的前面或者后面127.0.0.1:6379> lrange mulist 0 -1
1) "hello2"
2) "hello3"
3) "hello4"
127.0.0.1:6379> linsert mulist before hello3 zhangsan #将zhangsan值插入到mulist列表中hello3的前面
(integer) 4
127.0.0.1:6379> lrange mulist 0 -1
1) "hello2"
2) "zhangsan"
3) "hello3"
4) "hello4"
127.0.0.1:6379> linsert mulist after hello3 zhangsha #将zhangsha值插入到mulist列表中hello3的后面
(integer) 5
127.0.0.1:6379> lrange mulist 0 -1
1) "hello2"
2) "zhangsan"
3) "hello3"
4) "zhangsha"
5) "hello4"###########################################
blpop/brpop: blocked pop
127.0.0.1:6379> lrange list 0 -1
1) "1"
127.0.0.1:6379> blpop list 15
1) "list"
2) "1"
127.0.0.1:6379> blpop list 15
1) "list"
2) "11"
(3.11s)
127.0.0.1:6379> lrange list 0 -1
(empty list or set)
127.0.0.1:6379> blpop list 5
(nil)
(5.05s)
############################################
# 删除指定方向指定数量的元素--lrem key count value: count > 0; count < 0; count = 0.
127.0.0.1:6379> lrange list 0 -1
1) "one"
2) "one"
3) "two"
4) "one"
5) "one"
6) "one"
7) "three"
8) "one"
9) "one"
127.0.0.1:6379> lrem list 1 one
(integer) 1
127.0.0.1:6379> lrange list 0 -1
1) "one"
2) "two"
3) "one"
4) "one"
5) "one"
6) "three"
7) "one"
8) "one"
127.0.0.1:6379> lrem list -1 one
(integer) 1
127.0.0.1:6379> lrange list 0 -1
1) "one"
2) "two"
3) "one"
4) "one"
5) "one"
6) "three"
7) "one"
127.0.0.1:6379> lrem list 0 one
(integer) 5
127.0.0.1:6379> lrange list 0 -1
1) "two"
2) "three"
127.0.0.1:6379> lrem list 10 two
(integer) 1
127.0.0.1:6379> lrange list 0 -1
1) "three"
######################################################
# lindex: 根据 index 获取元素
127.0.0.1:6379> lindex list 2
"zhangsan"
######################################################
# lpushx/rpushx: push if list eXists
127.0.0.1:6379> lrange list 0 -1
1) "one"
2) "zhangsan"
3) "11"
4) "lisi"
5) "three"
6) "four"
127.0.0.1:6379> lpushx list1 val
(integer) 0
127.0.0.1:6379> lpushx list val
(integer) 7
127.0.0.1:6379> rpushx list val
(integer) 8
127.0.0.1:6379> lrange list 0 -1
1) "val"
2) "one"
3) "zhangsan"
4) "11"
5) "lisi"
6) "three"
7) "four"
8) "val"
小结
- 它实际上是一个链表,before node after ,left , right 都可以插入值
- 如果key不存在,创建新的链表
- 如果key存在,新增内容
- 如果移除了所有值,空链表,也代表不存在。
- 在两边插入或者改动值,效率最高,中间元素相对来说效率会低一点
可以做消息排队 消息队列( Lpush Rpop) ,栈(Lpush Lpop)
Set(集合)
Redis的Set是string类型的无序集合。集合成员是唯一的,这就意味着集合中不能出现重复的数据。
Redis 中 集合是通过哈希表实现的,所以添加,删除,查找的复杂度都是O(1)。
集合中最大的成员数为 2 32 − 1 2^{32} - 1 232−1 (4294967295, 每个集合可存储40多亿个成员)。
####################################################
127.0.0.1:6379> sadd myset hello #set集合中添加元素
(integer) 1
127.0.0.1:6379> sadd myset zhangsan
(integer) 1
127.0.0.1:6379> sadd myset lisi
(integer) 1
127.0.0.1:6379> smembers myset #查看指定set的所有值
1) "hello"
2) "lisi"
3) "zhangsan"
127.0.0.1:6379> sismember myset lisi #判断某一个值是不是在set集合中
(integer) 1
127.0.0.1:6379> sismember myset world
(integer) 0####################################################
scard
127.0.0.1:6379> scard myset #获取set集合中内容的元素个数!
(integer) 3
127.0.0.1:6379> sadd myset hello
(integer) 0
####################################################
rem
127.0.0.1:6379> srem myset hello #移除set集合中的指定元素
(integer) 1
127.0.0.1:6379> SMEMBERS myset
1) "lisi"
2) "zhangsan"
127.0.0.1:6379> scard myset
(integer) 2####################################################
set 无序不重复集合。比如:随机抽奖
127.0.0.1:6379> srandmember myset #随机抽选出一个元素
"lisi"
127.0.0.1:6379> srandmember myset
"zhangsan"
127.0.0.1:6379> srandmember myset
"zhangsan"
127.0.0.1:6379> srandmember myset
"lisi"
127.0.0.1:6379> SMEMBERS myset
1) "lisi"
2) "zhangsan"
127.0.0.1:6379> srandmember myset 2 #随机抽选出两个元素
1) "lisi"
2) "zhangsan"####################################################
删除定的key,删除随机的key127.0.0.1:6379> SMEMBERS myset
1) "111"
2) "blue"
3) "lisi"
4) "zhangsan"
127.0.0.1:6379> spop myset #随机删除一些set集合中的元素
"lisi"
127.0.0.1:6379> spop myset
"111"
127.0.0.1:6379> SMEMBERS myset
1) "blue"
2) "zhangsan"####################################################
smove 将一个指定的值,移动到另外一个set集合
127.0.0.1:6379> sadd myset world zhangsan
(integer) 2
127.0.0.1:6379> SMEMBERS myset
1) "hello"
2) "world"
3) "hello2"
4) "hello3"
5) "zhangsan"
127.0.0.1:6379> smove myset myset2 zhangsan #将myset中的zhangsan移动到myset2集合中去
(integer) 1
127.0.0.1:6379> SMEMBERS myset
1) "world"
2) "hello2"
3) "hello3"
4) "hello"
127.0.0.1:6379> SMEMBERS myset2
1) "zhangsan"####################################################
微博,B站中的共同关注!(并集)
- 差集 sdiff
- 交集 sinter
- 并集 sunion
127.0.0.1:6379> sadd key1 a b c d
(integer) 4
127.0.0.1:6379> sadd key2 c d e f
(integer) 4
127.0.0.1:6379> SDIFF key1 key2 #差集
1) "b"
2) "a"
127.0.0.1:6379> SINTER key1 key2 #交集 共同好友就可以这样实现
1) "d"
2) "c"
127.0.0.1:6379> SUNION key1 key2 #并集
1) "b"
2) "c"
3) "e"
4) "a"
5) "d"
6) "f"
127.0.0.1:6379> sadd set1 a b c d
(integer) 4
127.0.0.1:6379> sadd set2 c d e f
(integer) 4
127.0.0.1:6379> sdiffstore set3 set1 set2
(integer) 2
127.0.0.1:6379> smembers set3
1) "a"
2) "b"
Hash(哈希)
Redis hash 是一个string类型的field和value的映射表,hash特别适合用于存储对象。
Set就是一种简化的Hash,只变动key,而value使用默认值填充。可以将一个Hash表作为一个对象进行存储,表中存放对象的信息。
Map集合 , key-map,时候这个值是一个map集合。本质和string类型没有太大区别,还是一个简单的key-value!
####################################################
hset myhash field zhangsan
127.0.0.1:6379> hset myhash field1 zhangsan #set一个具体的 key-value
(integer) 1
127.0.0.1:6379> hget myhash field1 #获取一个字段值
"zhangsan"
127.0.0.1:6379> hmset myhash field1 hello field2 world #设置多个key-value
OK
127.0.0.1:6379> hmget myhash field1 field2 #获取多个字段值
1) "hello"
2) "world"
127.0.0.1:6379> hgetall myhash #获取全部的数据
1) "field1"
2) "hello"
3) "field2"
4) "world"
127.0.0.1:6379> hdel myhash field1 #删除hash指定key字段! 对应的value值也就消失了
(integer) 1
127.0.0.1:6379> HGETALL myhash
1) "field2"
2) "world"
####################################################
hlen #获取hash表的字段数量
127.0.0.1:6379> hmset myhash field hello field3 wufeng field4 lis
OK
127.0.0.1:6379> HGETALL myhash
1) "field2"
2) "world"
3) "field"
4) "hello"
5) "field3"
6) "wufeng"
7) "field4"
8) "lis"
127.0.0.1:6379> hget myhash field
"hello"
127.0.0.1:6379> hlen myhash #获取hash表的字段数量
(integer) 4####################################################
127.0.0.1:6379> HEXISTS myhash field #判断hash中指定字段是否存在
(integer) 1
127.0.0.1:6379> HEXISTS myhash field6
(integer) 0####################################################
#只获得所有filed hkeys
#只获得所有value hvals
127.0.0.1:6379> HKEYS myhash #获得所有filed
1) "field2"
2) "field"
3) "field3"
4) "field4"
127.0.0.1:6379> HVALS myhash #获得所有value
1) "world"
2) "hello"
3) "wufeng"
4) "lis"####################################################
hincrby #加1 hdecrby #减1
127.0.0.1:6379> hset myhash field6 5 #指定增量为5
(integer) 1
127.0.0.1:6379> HINCRBY myhash field6 1 #数值加1
(integer) 6
127.0.0.1:6379> HINCRBY myhash field6 -2 #数值减2
(integer) 3
127.0.0.1:6379> HSETNX myhash field7 hello #如果不存在则可以设置
(integer) 1
127.0.0.1:6379> HSETNX myhash field6 hello #如果存在则可以设置
(integer) 0
hash变更的数据 user name age ,尤其是用户信息之类的,经常变动的信息。 hash更适合于对象的存储,String更加适合字符串的存储。
127.0.0.1:6379> hmset myhash user:1:name zhangsan user:1:age 30 #设置user:{id}:{field}
OK
127.0.0.1:6379> hmget myhash user:1:name user:1:age
1) "zhangsan"
2) "30"
127.0.0.1:6379> HGETALL myhash
1) "field7"
2) "hello"
3) "user:1:name"
4) "zhangsan"
5) "user:1:age"
6) "30"
Zset(有序集合)
不同的是每个元素都会关联一个double类型的分数(score)。redis正是通过分数来为集合中的成员进行从小到大的排序。
score相同:按字典顺序排序
有序集合的成员是唯一的,但分数(score)却可以重复。
在set的基础上,增加一个值,
-
set:
k1 v1
-
zset:
k1 score1 v1
####################################################
127.0.0.1:6379> zadd myset 1 one #添加一个值
(integer) 1
127.0.0.1:6379> zadd myset 2 two 3 three #添加多个值
(integer) 2
127.0.0.1:6379> zrange myset 0 -1
1) "one"
2) "two"
3) "three"####################################################
排序如何实现127.0.0.1:6379> zadd salary 2500 xiaohong #添加三个用户 薪水,名字
(integer) 1
127.0.0.1:6379> zadd salary 5000 zhangsan
(integer) 1
127.0.0.1:6379> zadd salary 1000 lisi
(integer) 1
127.0.0.1:6379> ZRANGEBYSCORE salary -inf +inf #显示全部用户,按薪水从小到大排名 -inf:表示负无穷
1) "lisi"
2) "xiaohong"
3) "zhangsan"
127.0.0.1:6379> zrevrange salary 0 -1 withscores # 从大到小进行排序
1) "zhangsan"
2) "5000"
3) "xiaohong"
4) "2500"
127.0.0.1:6379> ZRANGEBYSCORE salary -inf +inf withscores #查找所有的数据 ,withscores:带有的薪水信息
1) "lisi"
2) "1000"
3) "xiaohong"
4) "2500"
5) "zhangsan"
6) "5000"
127.0.0.1:6379> ZRANGEBYSCORE salary -inf 2500 withscores #显示工资小于2500员工的升序排列
1) "lisi"
2) "1000"
3) "xiaohong"
4) "2500"####################################################
移除:zrem
127.0.0.1:6379> zrange salary 0 -1
1) "lisi"
2) "xiaohong"
3) "zhangsan"
127.0.0.1:6379> zrem salary lisi #移除有序集合中的指定元素
(integer) 1
127.0.0.1:6379> zrange salary 0 -1
1) "xiaohong"
2) "zhangsan"
127.0.0.1:6379> zcard salary #获取集合中的个数
(integer) 2####################################################
127.0.0.1:6379> zrange myset 0 -1
1) "one"
2) "two"
3) "three"
127.0.0.1:6379> zcount myset 0 2 #获取指定分数区间的成员数量
(integer) 2
127.0.0.1:6379> zcount myset 0 5
(integer) 3##################################################
127.0.0.1:6379> zrange myzset 0 -1 withscores
1) "one"
2) "1"
3) "two"
4) "2"
5) "three"
6) "3"
127.0.0.1:6379> zcount myzset 0 2
(integer) 2
127.0.0.1:6379> zincrby myzset 2 one
"3"
127.0.0.1:6379> zrange myzset 0 -1 withscores
1) "two"
2) "2"
3) "one"
4) "3"
5) "three"
6) "3"
127.0.0.1:6379> zscore myzset one
"3"
127.0.0.1:6379> zrank myset one
(error) WRONGTYPE Operation against a key holding the wrong kind of value
127.0.0.1:6379> zrank myzset one
(integer) 1
应用案例:
- set排序 存储班级成绩表 工资表排序!
- 普通消息,1.重要消息 2.带权重进行判断
- 排行榜应用实现,取Top N测试
Geospatial(地理位置)
使用经纬度定位地理坐标并用一个有序集合zset保存,所以zset命令也可以使用
有效经纬度
有效的经度从-180度到180度。
有效的纬度从-85.05112878度到85.05112878度。
指定单位的参数 unit 必须是以下单位的其中一个:
m 表示单位为米。
km 表示单位为千米。
mi 表示单位为英里。
ft 表示单位为英尺。
关于GEORADIUS的参数
通过georadius就可以完成 附近的人功能
withcoord:带上坐标
withdist:带上距离,单位与半径单位相同
COUNT n : 只显示前n个(按距离递增排序)
geoadd
#geoadd 添加地理位置
# 规则: 两极无法直接添加,我们一般会下载城市数据,直接通过java程序一次性导入
#参数 key 值(纬度、经度、名称)#有效的经度从-180度到180度。#有效的纬度从-85.05112878度到85.05112878度。#当坐标位置超出上述指定范围时,该命令将会返回一个错误。
127.0.0.1:6379> geoadd china:city 116.40 39.90 beijing
(integer) 1
127.0.0.1:6379> geoadd china:city 121.47 31.23 shanghai
(integer) 1
127.0.0.1:6379> geoadd china:city 106.50 29.53 chongqing
(integer) 1
127.0.0.1:6379> geoadd china:city 114.08 22.54 shenzhen 116.85 38.31 cangzhou
(integer) 2
127.0.0.1:6379> geoadd china:city 120.15 30.28 hangzhou 125.14 42.92 xian
(integer) 2
geopos
获得当前定位:一定是一个坐标值。
127.0.0.1:6379> geopos china:city beijing #获取指定的城市的经度和纬度
1) 1) "116.39999896287918091"2) "39.90000009167092543"
127.0.0.1:6379> geopos china:city beijing cangzhou
1) 1) "116.39999896287918091"2) "39.90000009167092543"
2) 1) "116.84999853372573853"2) "38.30999992507150864"
geodist
两人之间的距离
单位:
- m 表示单位为米
- km 表示单位为千米
- mi 表示单位为英里
- ft 表示单位为英尺
127.0.0.1:6379> geodist china:city beijing shanghai km #查看北京到上海的直线距离
"1067.3788"
127.0.0.1:6379> GEODIST china:city beijing chongqing km #查看北京到重庆的直线距离
"1464.0708"
georadius 以给定的经纬度为中心,找出某一半径内的元素
我附近的人? (获得所有附近的人的地址,定位!) 通过半径来查询
获得指定数量的人为200个 count 200
所有数据应该都录入 china:city ,才会让结果更加清晰
127.0.0.1:6379> GEORADIUS china:city 110 30 500 km #以110 30这个经度纬度为中心,寻找方圆1000km内的城市
1) "chongqing"
127.0.0.1:6379> GEORADIUS china:city 110 30 1000 km
1) "chongqing"
2) "shenzhen"
3) "hangzhou"
127.0.0.1:6379> GEORADIUS china:city 110 30 1000 km withdist #显示到中间距离的位置
1) 1) "chongqing"2) "341.9374"
2) 1) "shenzhen"2) "923.9364"
3) 1) "hangzhou"2) "976.4868"
127.0.0.1:6379> GEORADIUS china:city 110 30 1000 km withdist withcoord #显示他人的定位信息
1) 1) "chongqing"2) "341.9374"3) 1) "106.49999767541885376"2) "29.52999957900659211"
2) 1) "shenzhen"2) "923.9364"3) 1) "114.08000081777572632"2) "22.53999903789756587"
3) 1) "hangzhou"2) "976.4868"3) 1) "120.15000075101852417"2) "30.2800007575645509"
127.0.0.1:6379> GEORADIUS china:city 110 30 1000 km withdist withcoord count 1
1) 1) "chongqing"2) "341.9374"3) 1) "106.49999767541885376"2) "29.52999957900659211"
127.0.0.1:6379> GEORADIUS china:city 110 30 1000 km withdist withcoord count 2 #筛选出指定的结果!
1) 1) "chongqing"2) "341.9374"3) 1) "106.49999767541885376"2) "29.52999957900659211"
2) 1) "shenzhen"2) "923.9364"3) 1) "114.08000081777572632"2) "22.53999903789756587"
127.0.0.1:6379> 127.0.0.1:6379> georadius china:city 110 30 1000 km withcoord withdist withhash
1) 1) "chongqin"2) "341.9374"3) (integer) 40260420916289844) 1) "106.49999767541885376"2) "29.52999957900659211"
2) 1) "shenzhen"2) "923.9364"3) (integer) 40464322968005464) 1) "114.08000081777572632"2) "22.53999903789756587"
3) 1) "hangzhou"2) "976.4868"3) (integer) 40541342573907834) 1) "120.15000075101852417"2) "30.2800007575645509"
127.0.0.1:6379> georadius china:city 110 30 1000 km withcoord withdist count 2
1) 1) "chongqin"2) "341.9374"3) 1) "106.49999767541885376"2) "29.52999957900659211"
2) 1) "shenzhen"2) "923.9364"3) 1) "114.08000081777572632"2) "22.53999903789756587"
127.0.0.1:6379> georadius china:city 110 30 1000 km withcoord withdist count 2 asc
1) 1) "chongqin"2) "341.9374"3) 1) "106.49999767541885376"2) "29.52999957900659211"
2) 1) "shenzhen"2) "923.9364"3) 1) "114.08000081777572632"2) "22.53999903789756587"
127.0.0.1:6379> georadius china:city 110 30 1000 km withcoord withdist count 2 desc
1) 1) "hangzhou"2) "976.4868"3) 1) "120.15000075101852417"2) "30.2800007575645509"
2) 1) "shenzhen"2) "923.9364"3) 1) "114.08000081777572632"2) "22.53999903789756587"
georadiusbymember
#找出位于指定元素(集合中已经存在)周围的其他元素
127.0.0.1:6379> GEORADIUSBYMEMBER china:city beijing 1000 km
1) "cangzhou"
2) "beijing"
3) "xian"
127.0.0.1:6379> GEORADIUSBYMEMBER china:city shanghai 500km
(error) ERR wrong number of arguments for 'georadiusbymember' command
127.0.0.1:6379> GEORADIUSBYMEMBER china:city shanghai 500 km
1) "hangzhou"
2) "shanghai"
geohash 返回一个或多个位置元素的geohash表示(很少使用到)
该命令将返回11个字符的geohash字符串
#将二维的经纬度转换为一维的字符串,如果两个字符串越近,那么则距离越近。
127.0.0.1:6379> GEOHASH china:city beijing chongqing
1) "wx4fbxxfke0"
2) "wm5xzrybty0"
geo 底层的实现原理其实就是Zset 我们可以使用Zset命令来操作geo
127.0.0.1:6379> ZRANGE china:city 0 -1 #查看地图中全部元素
1) "chongqing"
2) "shenzhen"
3) "hangzhou"
4) "shanghai"
5) "cangzhou"
6) "beijing"
7) "xian"
127.0.0.1:6379> zrem china:city beijing xian #移除指定元素
(integer) 2
127.0.0.1:6379> ZRANGE china:city 0 -1
1) "chongqing"
2) "shenzhen"
3) "hangzhou"
4) "shanghai"
5) "cangzhou"
Hyperloglog(基数统计)
Redis HyperLogLog 是用来做基数统计的算法,HyperLogLog 的优点是,在输入元素的数量或者体积非常非常大时,计算基数所需的空间总是固定的、并且是很小的。
花费 12 KB 内存,就可以计算接近 2^64 个不同元素的基数。
因为 HyperLogLog 只会根据输入元素来计算基数,而不会储存输入元素本身,所以 HyperLogLog 不能像集合那样,返回输入的各个元素。
其底层使用string数据类型
-
什么是基数?
数据集中不重复的元素的个数。
即独立用户的数量,如:
假设我们有一个网站,需要统计每天访问网站的独立用户数量。通常情况下,我们可以使用传统的方法,比如将每个用户的 ID 记录在一个集合中,然后使用集合的大小来统计独立用户数量。但是,当用户量非常大时,这种方法会占用大量内存空间。
这时,我们可以使用 HyperLogLog 来进行基数统计。假设有一天,我们接收到了以下用户访问网站的 ID 数据:
用户ID:[1001, 2002, 3003, 1001, 4004, 1001, 5005, 6006, 2002]
现在,我们使用 HyperLogLog 结构来统计这些用户的独立数量。首先,我们将每个用户 ID 进行哈希映射,得到一个哈希值,然后根据哈希值来估计基数。
经过哈希映射后,得到的哈希值可能类似于:
[0x32, 0x18, 0x7F, 0x32, 0x45, 0x32, 0x92, 0x11, 0x18]接着,我们使用位操作来统计零位前导串(zero leading string)的长度。假设经过统计后,得到的零位前导串长度为:
[5, 4, 6, 5, 3, 5, 7, 2, 4]最后,通过对零位前导串长度的统计结果进行分析,就能够估计出独立用户的数量。
-
应用场景:
-
网页的访问量(UV):一个用户多次访问,也只能算作一个人。
-
传统实现,存储用户的id,然后每次进行比较。当用户变多之后这种方式及其浪费空间,而我们的目的只是计数,Hyperloglog就能帮助我们利用最小的空间完成。
-
127.0.0.1:6379> PFADD mykey a b c d e f g h k #创建第一组元素 mykey
(integer) 1
127.0.0.1:6379> PFCOUNT mykey #统计 mykey 元素的基数数量
(integer) 9
127.0.0.1:6379> PFADD mykey2 q w e r t y u i o #创建第二组元素 mykey2
(integer) 1
127.0.0.1:6379> PFCOUNT mykey2
(integer) 9
127.0.0.1:6379> PFMERGE mykey3 mykey mykey2 # 合并两组 mykey mykey2 => mykey3
OK
127.0.0.1:6379> PFCOUNT mykey3 #看并集的数量 相同的不计
(integer) 16
如果允许容错,那么一定可以使用Hyperloglog !
如果不允许容错,就使用set或者自己的数据类型即可 !
原因:
HyperLogLog 通过概率统计来估计基数是因为其设计的目的是在牺牲一定的精确度的情况下,换取更小的内存占用和更高的性能。这种牺牲精确度来换取效率的策略被称为"概率算法"。
HyperLogLog 的近似值原理基于统计学中的概率方法,利用哈希函数将元素映射到一个固定长度的比特串中,然后根据比特串的特征来估计基数。在实际应用中,HyperLogLog 会对零位前导串(zero leading string)的长度进行统计,并根据统计结果来推测独立元素的数量。
由于 HyperLogLog 使用的是概率方法,因此得到的结果是一个近似值,而不是精确值。这意味着在某些情况下,可能会出现误差,但通常情况下,这种误差是可以接受的。通过牺牲一定的精确度,HyperLogLog 能够以较小的内存消耗来快速估计大型数据集合的基数,适用于需要高效处理大规模数据的场景。
BitMaps(位图)
使用位存储,信息状态只有 0 和 1
Bitmap是一串连续的2进制数字(0或1),每一位所在的位置为偏移(offset),在bitmap上可执行AND,OR,XOR,NOT以及其它位操作。
应用场景: 签到统计、状态统计
统计用户信息,活跃,不活跃!登录,未登录!打卡,未打卡! 两个的状态,都可以使用bitmaps
Bitmaps位图,数据结构!都是操作二进制位来进行记录,就只有0和1两个状态.
365天 = 365bit 1字节 = 8bit 46个字节左右!
使用bitmap来记录 周一到周日的打卡
127.0.0.1:6379> setbit sign 0 1 #周一到周日打卡情况,1代表打卡 0代表未打卡
(integer) 0
127.0.0.1:6379> setbit sign 1 1
(integer) 0
127.0.0.1:6379> setbit sign 3 1
(integer) 0
127.0.0.1:6379> setbit sign 4 0
(integer) 0
127.0.0.1:6379> setbit sign 5 0
(integer) 0
127.0.0.1:6379> setbit sign 6 1
(integer) 0127.0.0.1:6379> GETBIT sign 5 #查看某一天是否打卡
(integer) 0
127.0.0.1:6379> GETBIT sign 6
(integer) 1127.0.0.1:6379> bitcount sign #统计这周的打卡记录
(integer) 4
bitmaps是一串从左到右的二进制串
三、事务
Redis事务本质: 一组命令的集合。 一个事务中的所有命令都会被序列化,在事务执行过程中,会按照顺序执行!
一次性、顺序性、排他性 执行一系列的命令
------ 队列 set set set 执行 ---------
Redis事务没有隔离级别的概念!
所有的命令在事务中,并没有直接被执行,只有发起执行命令的时候才会执行
redis单条命令是保证原子性的,但是事务不保证原子性
Redis的事务:
- 开启事务(
multi
) - 命令入队(…)
- 执行事务(
exec
)
正常执行事务
127.0.0.1:6379> multi #开启事务
OK
127.0.0.1:6379(TX)> set k1 v1 #命令入队
QUEUED
127.0.0.1:6379(TX)> set k2 v2
QUEUED
127.0.0.1:6379(TX)> get k2
QUEUED
127.0.0.1:6379(TX)> set k3 v3
QUEUED
127.0.0.1:6379(TX)> exec #执行事务
1) OK
2) OK
3) "v2"
4) OK
放弃事务 ( discard )
127.0.0.1:6379> multi #开启事务
OK
127.0.0.1:6379(TX)> set k1 v1
QUEUED
127.0.0.1:6379(TX)> set k2 v2
QUEUED
127.0.0.1:6379(TX)> DISCARD #取消事务
OK
127.0.0.1:6379> get k2 #事务队列中命令都不会被执行
(nil)
127.0.0.1:6379> get k1
(nil)
编译型异常(代码有问题 ,及命令有错) ,事务中所有的命令都不会被执行
127.0.0.1:6379> multi
OK
127.0.0.1:6379(TX)> set k1 v1
QUEUED
127.0.0.1:6379(TX)> set k2 v2
QUEUED
127.0.0.1:6379(TX)> getset k2 #错误的命令
(error) ERR wrong number of arguments for 'getset' command
127.0.0.1:6379(TX)> set k3 v3
QUEUED
127.0.0.1:6379(TX)> exec #执行事务报错
(error) EXECABORT Transaction discarded because of previous errors.
127.0.0.1:6379> get k3 #所有的命令都不执行
(nil)
127.0.0.1:6379> get k1
(nil)
运行时异常(1/0),如果事务队列中存在语法性,那么执行命令的时候,其他命令是可以正常执行的,错误命令抛出异常
127.0.0.1:6379> set k1 v1
OK
127.0.0.1:6379> multi
OK
127.0.0.1:6379(TX)> set k2 v2
QUEUED
127.0.0.1:6379(TX)> set k3 v3
QUEUED
127.0.0.1:6379(TX)> incr k1 #执行的时候会失败
QUEUED
127.0.0.1:6379(TX)> get k2
QUEUED
127.0.0.1:6379(TX)> exec
1) OK
2) OK
3) (error) ERR value is not an integer or out of range #虽然第一条命令报错了,但是依旧正常执行成功了。
4) "v2"
127.0.0.1:6379> get k3
"v3"
悲观锁、乐观锁
监控 watch (面试常问)
悲观锁:
- 很悲观,认为什么时候都会出现问题,无论做什么都会加锁
乐观锁:
- 很乐观,认为什么时候都不会出现问题,所以不会上锁!更新数据的时候去判断一下,在此期间是否有人修改过这个数据
- 获取version
- 更新的时候比较version
redis监视测试
正常执行成功
127.0.0.1:6379> set money 100 # 设置余额:100
OK
127.0.0.1:6379> set out 0 # 支出使用:0
OK
127.0.0.1:6379> watch money #监视money 对象
OK
127.0.0.1:6379> multi
OK
127.0.0.1:6379(TX)> DECRBY money 20
QUEUED
127.0.0.1:6379(TX)> INCRBY out 20
QUEUED
127.0.0.1:6379(TX)> exec #事务正常结束,数据期间没有发生变动,这个时候就正常执行成功
1) (integer) 80
2) (integer) 20
测试多线程修改值,使用watch 可以当做redis的乐观锁操作
#第一个线程
127.0.0.1:6379> watch money #监控 money
OK
127.0.0.1:6379> multi
OK
127.0.0.1:6379(TX)> DECRBY money 20
QUEUED
127.0.0.1:6379(TX)> INCRBY out 20
QUEUED
127.0.0.1:6379(TX)> exec #执行之前,另外一个线程,修改了我们的值,这个时候就会导致食物执行失败。
(nil)#第二个线程
127.0.0.1:6379> get money
"80"
127.0.0.1:6379> INCRby money 100
(integer) 180
127.0.0.1:6379> get money
"180"
unwatch
进行解锁。
如果修改失败,获取最新的值就好
注意:每次提交执行exec后都会自动释放锁,不管是否成功
四、Jedis
使用Java来操作Redis,Jedis是Redis官方推荐使用的Java连接redis的客户端的开发工具,使用java操作redis中间件。
测试
1.导入对应的依赖
<dependencies><!--导入jedis的包--><dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId><version>3.2.0</version></dependency><!--fastjson 存一些数据用的--><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.48</version></dependency></dependencies>
2.编码测试:
- 连接数据库
- 操作命令
- 断开连接
若连接本地:
public class TestPing {public static void main(String[] args) {Jedis jedis = new Jedis("127.0.0.1", 6379);System.out.println(jedis.ping());} }
若连接阿里云服务器:
public class TestPing {public static void main(String[] args) {Jedis jedis = new Jedis("47.120.xxx.xxx", 6379);jedis.auth("19991001");System.out.println(jedis.ping());} }
package com.jihu;
import redis.clients.jedis.Jedis;
public class TestPing {public static void main(String[] args) {//1. new jedis 对象即可Jedis jedis = new Jedis("192.168.56.130",6379); //自己虚拟机的ip地址jedis.auth("123456");// jedis 所有的命令就是我们之前学习的所有指令System.out.println(jedis.ping());}
}
输出结果:
常用的API
String 、list、set 、hash、Zset等 所有的api命令,就是我们对应的上面学习的指令,一个都没有变化!
判断keys * 以及一些基本方法
package com.jihu.base;import redis.clients.jedis.Jedis;import java.util.Set;public class TestKey {public static void main(String[] args) {Jedis jedis = new Jedis("192.168.56.130",6379);jedis.auth("123456");System.out.println("清空数据:"+jedis.flushDB());System.out.println("判断某个键是否存在:"+jedis.exists("username"));System.out.println("新增<'username','kuangshen'>的键值对:"+jedis.set("username", "kuangshen"));System.out.println("新增<'password','password'>的键值对:"+jedis.set("password", "password"));System.out.print("系统中所有的键如下:");Set<String> keys = jedis.keys("*");System.out.println(keys);System.out.println("删除键password:"+jedis.del("password"));System.out.println("判断键password是否存在:"+jedis.exists("password"));System.out.println("查看键username所存储的值的类型:"+jedis.type("username"));System.out.println("随机返回key空间的一个:"+jedis.randomKey());System.out.println("重命名key:"+jedis.rename("username","name"));System.out.println("取出改后的name:"+jedis.get("name"));System.out.println("按索引查询:"+jedis.select(0));System.out.println("删除当前选择数据库中的所有key:"+jedis.flushDB());System.out.println("返回当前数据库中key的数目:"+jedis.dbSize());System.out.println("删除所有数据库中的所有key:"+jedis.flushAll());}
}
对String操作的命令
package com.jihu.base;
import redis.clients.jedis.Jedis;
import java.util.concurrent.TimeUnit;public class TestString {public static void main(String[] args) {Jedis jedis = new Jedis("192.168.56.130",6379);jedis.auth("123456");jedis.flushDB();System.out.println("===========增加数据===========");System.out.println(jedis.set("key1","value1"));System.out.println(jedis.set("key2","value2"));System.out.println(jedis.set("key3", "value3"));System.out.println("删除键key2:"+jedis.del("key2"));System.out.println("获取键key2:"+jedis.get("key2"));System.out.println("修改key1:"+jedis.set("key1", "value1Changed"));System.out.println("获取key1的值:"+jedis.get("key1"));System.out.println("在key3后面加入值:"+jedis.append("key3", "End"));System.out.println("key3的值:"+jedis.get("key3"));System.out.println("增加多个键值对:"+jedis.mset("key01","value01","key02","value02","key03","value03"));System.out.println("获取多个键值对:"+jedis.mget("key01","key02","key03"));System.out.println("获取多个键值对:"+jedis.mget("key01","key02","key03","key04"));System.out.println("删除多个键值对:"+jedis.del("key01","key02"));System.out.println("获取多个键值对:"+jedis.mget("key01","key02","key03"));jedis.flushDB();System.out.println("===========新增键值对防止覆盖原先值==============");System.out.println(jedis.setnx("key1", "value1"));System.out.println(jedis.setnx("key2", "value2"));System.out.println(jedis.setnx("key2", "value2-new"));System.out.println(jedis.get("key1"));System.out.println(jedis.get("key2"));System.out.println("===========新增键值对并设置有效时间=============");System.out.println(jedis.setex("key3", 2, "value3"));System.out.println(jedis.get("key3"));try {TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(jedis.get("key3"));System.out.println("===========获取原值,更新为新值==========");System.out.println(jedis.getSet("key2", "key2GetSet"));System.out.println(jedis.get("key2"));System.out.println("获得key2的值的字串:"+jedis.getrange("key2", 2, 4));}
}
对List操作的命令
package com.jihu.base;
import redis.clients.jedis.Jedis;
public class TestList {public static void main(String[] args) {Jedis jedis = new Jedis("192.168.56.130",6379);jedis.auth("123456");jedis.flushDB();System.out.println("===========添加一个list===========");jedis.lpush("collections", "ArrayList", "Vector", "Stack", "HashMap", "WeakHashMap", "LinkedHashMap");jedis.lpush("collections", "HashSet");jedis.lpush("collections", "TreeSet");jedis.lpush("collections", "TreeMap");System.out.println("collections的内容:"+jedis.lrange("collections", 0, -1));//-1代表倒数第一个元素,-2代表倒数第二个元素,end为-1表示查询全部System.out.println("collections区间0-3的元素:"+jedis.lrange("collections",0,3));System.out.println("===============================");// 删除列表指定的值 ,第二个参数为删除的个数(有重复时),后add进去的值先被删,类似于出栈System.out.println("删除指定元素个数:"+jedis.lrem("collections", 2, "HashMap"));System.out.println("collections的内容:"+jedis.lrange("collections", 0, -1));System.out.println("删除下表0-3区间之外的元素:"+jedis.ltrim("collections", 0, 3));System.out.println("collections的内容:"+jedis.lrange("collections", 0, -1));System.out.println("collections列表出栈(左端):"+jedis.lpop("collections"));System.out.println("collections的内容:"+jedis.lrange("collections", 0, -1));System.out.println("collections添加元素,从列表右端,与lpush相对应:"+jedis.rpush("collections", "EnumMap"));System.out.println("collections的内容:"+jedis.lrange("collections", 0, -1));System.out.println("collections列表出栈(右端):"+jedis.rpop("collections"));System.out.println("collections的内容:"+jedis.lrange("collections", 0, -1));System.out.println("修改collections指定下标1的内容:"+jedis.lset("collections", 1, "LinkedArrayList"));System.out.println("collections的内容:"+jedis.lrange("collections", 0, -1));System.out.println("===============================");System.out.println("collections的长度:"+jedis.llen("collections"));System.out.println("获取collections下标为2的元素:"+jedis.lindex("collections", 2));System.out.println("===============================");jedis.lpush("sortedList", "3","6","2","0","7","4");System.out.println("sortedList排序前:"+jedis.lrange("sortedList", 0, -1));System.out.println(jedis.sort("sortedList"));System.out.println("sortedList排序后:"+jedis.lrange("sortedList", 0, -1));}
}
对set操作的命令
package com.jihu.base;import redis.clients.jedis.Jedis;public class TestSet {public static void main(String[] args) {Jedis jedis = new Jedis("192.168.56.130",6379);jedis.auth("123456");jedis.flushDB();System.out.println("============向集合中添加元素(不重复)============");System.out.println(jedis.sadd("eleSet", "e1","e2","e4","e3","e0","e8","e7","e5"));System.out.println(jedis.sadd("eleSet", "e6"));System.out.println(jedis.sadd("eleSet", "e6"));System.out.println("eleSet的所有元素为:"+jedis.smembers("eleSet"));System.out.println("删除一个元素e0:"+jedis.srem("eleSet", "e0"));System.out.println("eleSet的所有元素为:"+jedis.smembers("eleSet"));System.out.println("删除两个元素e7和e6:"+jedis.srem("eleSet", "e7","e6"));System.out.println("eleSet的所有元素为:"+jedis.smembers("eleSet"));System.out.println("随机的移除集合中的一个元素:"+jedis.spop("eleSet"));System.out.println("随机的移除集合中的一个元素:"+jedis.spop("eleSet"));System.out.println("eleSet的所有元素为:"+jedis.smembers("eleSet"));System.out.println("eleSet中包含元素的个数:"+jedis.scard("eleSet"));System.out.println("e3是否在eleSet中:"+jedis.sismember("eleSet", "e3"));System.out.println("e1是否在eleSet中:"+jedis.sismember("eleSet", "e1"));System.out.println("e1是否在eleSet中:"+jedis.sismember("eleSet", "e5"));System.out.println("=================================");System.out.println(jedis.sadd("eleSet1", "e1","e2","e4","e3","e0","e8","e7","e5"));System.out.println(jedis.sadd("eleSet2", "e1","e2","e4","e3","e0","e8"));System.out.println("将eleSet1中删除e1并存入eleSet3中:"+jedis.smove("eleSet1", "eleSet3", "e1"));//移到集合元素System.out.println("将eleSet1中删除e2并存入eleSet3中:"+jedis.smove("eleSet1", "eleSet3", "e2"));System.out.println("eleSet1中的元素:"+jedis.smembers("eleSet1"));System.out.println("eleSet3中的元素:"+jedis.smembers("eleSet3"));System.out.println("============集合运算=================");System.out.println("eleSet1中的元素:"+jedis.smembers("eleSet1"));System.out.println("eleSet2中的元素:"+jedis.smembers("eleSet2"));System.out.println("eleSet1和eleSet2的交集:"+jedis.sinter("eleSet1","eleSet2"));System.out.println("eleSet1和eleSet2的并集:"+jedis.sunion("eleSet1","eleSet2"));System.out.println("eleSet1和eleSet2的差集:"+jedis.sdiff("eleSet1","eleSet2"));//eleSet1中有,eleSet2中没有jedis.sinterstore("eleSet4","eleSet1","eleSet2");//求交集并将交集保存到dstkey的集合System.out.println("eleSet4中的元素:"+jedis.smembers("eleSet4"));}
}
对hash操作的命令
package com.jihu.base;import redis.clients.jedis.Jedis;import java.util.HashMap;
import java.util.Map;public class TestHash {public static void main(String[] args) {Jedis jedis = new Jedis("192.168.56.130",6379);jedis.auth("123456");jedis.flushDB();Map<String,String> map = new HashMap<String,String>();map.put("key1","value1");map.put("key2","value2");map.put("key3","value3");map.put("key4","value4");//添加名称为hash(key)的hash元素jedis.hmset("hash",map);//向名称为hash的hash中添加key为key5,value为value5元素jedis.hset("hash", "key5", "value5");System.out.println("散列hash的所有键值对为:"+jedis.hgetAll("hash"));//return Map<String,String>System.out.println("散列hash的所有键为:"+jedis.hkeys("hash"));//return Set<String>System.out.println("散列hash的所有值为:"+jedis.hvals("hash"));//return List<String>System.out.println("将key6保存的值加上一个整数,如果key6不存在则添加key6:"+jedis.hincrBy("hash", "key6", 6));System.out.println("散列hash的所有键值对为:"+jedis.hgetAll("hash"));System.out.println("将key6保存的值加上一个整数,如果key6不存在则添加key6:"+jedis.hincrBy("hash", "key6", 3));System.out.println("散列hash的所有键值对为:"+jedis.hgetAll("hash"));System.out.println("删除一个或者多个键值对:"+jedis.hdel("hash", "key2"));System.out.println("散列hash的所有键值对为:"+jedis.hgetAll("hash"));System.out.println("散列hash中键值对的个数:"+jedis.hlen("hash"));System.out.println("判断hash中是否存在key2:"+jedis.hexists("hash","key2"));System.out.println("判断hash中是否存在key3:"+jedis.hexists("hash","key3"));System.out.println("获取hash中的值:"+jedis.hmget("hash","key3"));System.out.println("获取hash中的值:"+jedis.hmget("hash","key3","key4"));}
}
对Zset操作的命令
public class TestZSet {public static void main(String[] args) {Jedis jedis = JedisEnumSingleton.INSTANCE.getJedis();System.out.println("jedis.flushDB(): " + jedis.flushDB());System.out.println("==================向集合中添加元素==================");System.out.println(jedis.zadd("myset", 1, "one"));HashMap<String, Double> scoreMembers = new HashMap<String, Double>();scoreMembers.put("two", 2.0);scoreMembers.put("three", 3.0);System.out.println(jedis.zadd("myset", scoreMembers));System.out.println("集合中所有元素: " + jedis.zrange("myset", 0, -1));System.out.println("===================排序============================");jedis.zadd("salary", 2500, "zhangsan");jedis.zadd("salary", 1500, "lisi");jedis.zadd("salary", 4500, "wangwu");System.out.println("jedis.zrangeWithScores:" + jedis.zrangeWithScores("salary",0, -1));System.out.println("jedis.zrangeByScore:" + jedis.zrangeByScore("salary",Integer.MIN_VALUE, Integer.MAX_VALUE));System.out.println("jedis.zrangeByScore:" + jedis.zrangeByScoreWithScores("salary",Integer.MIN_VALUE, Integer.MAX_VALUE));System.out.println("jedis.zrevrange:" + jedis.zrevrange("salary",0, -1));System.out.println("jedis.zrevrangeByScore:" + jedis.zrevrangeByScore("salary",Integer.MAX_VALUE, Integer.MIN_VALUE));System.out.println("jedis.zrevrangeByScoreWithScores:" + jedis.zrevrangeByScoreWithScores("salary",Integer.MAX_VALUE, Integer.MIN_VALUE));}
}
事务
package com.jihu;
import com.alibaba.fastjson.JSONObject;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.Transaction;public class TestTX {public static void main(String[] args) {Jedis jedis = new Jedis("192.168.56.130",6379);jedis.auth("123456");jedis.flushDB();JSONObject jsonObject = new JSONObject();jsonObject.put("hello","world");jsonObject.put("name","jihu");//开启事务Transaction multi = jedis.multi();String result = jsonObject.toJSONString();//jedis.watch(result) //给result加乐观锁try {multi.set("user1",result);multi.set("user2",result);int i = 1/0; //代码抛出异常,事务执行失败multi.exec(); //执行事务} catch (Exception exception) {multi.discard(); //放弃事务exception.printStackTrace();}finally {System.out.println(jedis.get("user1"));System.out.println(jedis.get("user2"));jedis.close(); //关闭连接}}
}
在 Multi 中时无法使用 Jedis:
Exception in thread "main" redis.clients.jedis.exceptions.JedisDataException: Cannot use Jedis when in Multi. Please use Transaction or reset jedis state.
五、SpringBoot整合
SpringBoot 操作数据:spring-data jpa jdbc mongodb redis!
SpringData 也是和 SpringBoot 齐名的项目!
说明: 在 SpringBoot2.x 之后,原来使用的jedis 被替换为了 lettuce?
jedis : 采用的直连,多个线程操作的话,是不安全的,如果想要避免不安全的,使用 jedis pool 连接池! 更像 BIO 模式
lettuce : 采用netty,实例可以再多个线程中进行共享,不存在线程不安全的情况!可以减少线程数据了,更像 NIO 模式
源码分析
springboot所有的配置类都有一个自动配置类 :RedisAutoConfiguration:
@Bean // 我们可以自己定义一个redisTemplate来替换这个默认的!
@ConditionalOnMissingBean(name = "redisTemplate") // 不存在才生效
public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) throws UnknownHostException {
// 默认的 RedisTemplate 没有过多的设置,redis 对象都是需要序列化!
// 两个泛型都是 Object, Object 的类型,我们后使用需要强制转换 <String, Object>RedisTemplate<Object, Object> template = new RedisTemplate<>();template.setConnectionFactory(redisConnectionFactory);return template;
}@Bean
@ConditionalOnMissingBean // 由于 String 是redis中最常使用的类型,所以说单独提出来了一个bean!
public StringRedisTemplate stringRedisTemplate(RedisConnectionFactory redisConnectionFactory) throws UnknownHostException {StringRedisTemplate template = new StringRedisTemplate();template.setConnectionFactory(redisConnectionFactory);return template;
}
整合测试
1.导入依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
2.配置连接
#springboot所有的配置类都有一个自动配置类 :RedisAutoConfiguration
# (RedisAutoConfiguration它在RedisAutoConfiguration/META-INF/spring.factories文件中)
#自动配置类都会绑定一个 properties配置文件 RedisProperties#配置Redis
spring.redis.host=192.168.56.130
spring.redis.port=6379
spring.redis.password=123456
3.测试
package com.jihu;import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.core.RedisTemplate;@SpringBootTest
class Redis02SpringbootApplicationTests {@Autowiredprivate RedisTemplate redisTemplate;@Testvoid contextLoads() {//redisTemplate 操作不同的数据类型,api和我们的指令是一样的//redisTemplate.opsForValue() 表示操作字符串 类似String//redisTemplate.opsForList() 表示操作List ,类似list//等等
// redisTemplate.opsForValue();
// redisTemplate.opsForList();
// redisTemplate.opsForSet();
// redisTemplate.opsForZSet();
// redisTemplate.opsForHash();
// redisTemplate.opsForGeo();
// redisTemplate.opsForHyperLogLog();redisTemplate.opsForList();//除了基本的操作,我们常用的方法都可以直接通过redisTemplate操作,比如事务和基本的CRUD//获取redis的连接对象//RedisConnection connection = redisTemplate.getConnectionFactory().getConnection();//connection.flushDb();//connection.flushAll();redisTemplate.opsForValue().set("mykey","jihu");System.out.println(redisTemplate.opsForValue().get("mykey"));}
}
4.测试结果
此时我们回到Redis查看数据时候,惊奇发现全是乱码,可是程序中可以正常输出。这时候就关系到存储对象的序列化问题,在网络中传输的对象也是一样需要序列化,否者就全是乱码。
RedisTemplate内部的序列化配置是这样的
默认的序列化器是采用JDK序列化器
对呀对象的保存
如果不序列化这样会报错
这里序列化之后就好了
自定义RedisTemplate 序列化配置
1.自定义RedisTemplate序列化配置
我们来编写一个自己的 RedisTemplete
package com.fatfish.config;import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;@Configuration
public class RedisConfig {// 这是我给大家写好的一个固定模板,大家在企业中,拿去就可以直接使用!// 自己定义了一个 RedisTemplate@Bean@SuppressWarnings("all")public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {// 我们为了自己开发方便,一般直接使用 <String, Object>RedisTemplate<String, Object> template = new RedisTemplate<String,Object>();template.setConnectionFactory(factory);// Json序列化配置Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = newJackson2JsonRedisSerializer(Object.class);ObjectMapper om = new ObjectMapper();om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);jackson2JsonRedisSerializer.setObjectMapper(om);// String 的序列化StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();// key采用String的序列化方式template.setKeySerializer(stringRedisSerializer);// hash的key也采用String的序列化方式template.setHashKeySerializer(stringRedisSerializer);// value序列化方式采用jacksontemplate.setValueSerializer(jackson2JsonRedisSerializer);// hash的value序列化方式采用jacksontemplate.setHashValueSerializer(jackson2JsonRedisSerializer);template.afterPropertiesSet();return template;}
}
再查看就不是乱码了
@Resource@Qualifier("redisTemplate")private RedisTemplate redisTemplate;
@Qualifier("redisTemplate"
: 区分多个同名字的 Bean
2.RedisUtil配置(CRUD操作string,map,list,set)
package com.jihu.config.utils;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;//在我们真实的开发中,或者你们的公司,一般都可以看到一个公司自己封装的RedisUtil
@Component
public final class RedisUtil {@Autowiredprivate RedisTemplate<String, Object> redisTemplate;// =============================common============================/*** 指定缓存失效时间* @param key 键* @param time 时间(秒)*/public boolean expire(String key, long time) {try {if (time > 0) {redisTemplate.expire(key, time, TimeUnit.SECONDS);}return true;} catch (Exception e) {e.printStackTrace();return false;}}/*** 根据key 获取过期时间* @param key 键 不能为null* @return 时间(秒) 返回0代表为永久有效*/public long getExpire(String key) {return redisTemplate.getExpire(key, TimeUnit.SECONDS);}/*** 判断key是否存在* @param key 键* @return true 存在 false不存在*/public boolean hasKey(String key) {try {return redisTemplate.hasKey(key);} catch (Exception e) {e.printStackTrace();return false;}}/*** 删除缓存* @param key 可以传一个值 或多个*/@SuppressWarnings("unchecked")public void del(String... key) {if (key != null && key.length > 0) {if (key.length == 1) {redisTemplate.delete(key[0]);} else {redisTemplate.delete((Collection<String>) CollectionUtils.arrayToList(key));}}}// ============================String=============================/*** 普通缓存获取* @param key 键* @return 值*/public Object get(String key) {return key == null ? null : redisTemplate.opsForValue().get(key);}/*** 普通缓存放入* @param key 键* @param value 值* @return true成功 false失败*/public boolean set(String key, Object value) {try {redisTemplate.opsForValue().set(key, value);return true;} catch (Exception e) {e.printStackTrace();return false;}}/*** 普通缓存放入并设置时间* @param key 键* @param value 值* @param time 时间(秒) time要大于0 如果time小于等于0 将设置无限期* @return true成功 false 失败*/public boolean set(String key, Object value, long time) {try {if (time > 0) {redisTemplate.opsForValue().set(key, value, time, TimeUnit.SECONDS);} else {set(key, value);}return true;} catch (Exception e) {e.printStackTrace();return false;}}/*** 递增* @param key 键* @param delta 要增加几(大于0)*/public long incr(String key, long delta) {if (delta < 0) {throw new RuntimeException("递增因子必须大于0");}return redisTemplate.opsForValue().increment(key, delta);}/*** 递减* @param key 键* @param delta 要减少几(小于0)*/public long decr(String key, long delta) {if (delta < 0) {throw new RuntimeException("递减因子必须大于0");}return redisTemplate.opsForValue().increment(key, -delta);}// ================================Map=================================/*** HashGet* @param key 键 不能为null* @param item 项 不能为null*/public Object hget(String key, String item) {return redisTemplate.opsForHash().get(key, item);}/*** 获取hashKey对应的所有键值* @param key 键* @return 对应的多个键值*/public Map<Object, Object> hmget(String key) {return redisTemplate.opsForHash().entries(key);}/*** HashSet* @param key 键* @param map 对应多个键值*/public boolean hmset(String key, Map<String, Object> map) {try {redisTemplate.opsForHash().putAll(key, map);return true;} catch (Exception e) {e.printStackTrace();return false;}}/*** HashSet 并设置时间* @param key 键* @param map 对应多个键值* @param time 时间(秒)* @return true成功 false失败*/public boolean hmset(String key, Map<String, Object> map, long time) {try {redisTemplate.opsForHash().putAll(key, map);if (time > 0) {expire(key, time);}return true;} catch (Exception e) {e.printStackTrace();return false;}}/*** 向一张hash表中放入数据,如果不存在将创建** @param key 键* @param item 项* @param value 值* @return true 成功 false失败*/public boolean hset(String key, String item, Object value) {try {redisTemplate.opsForHash().put(key, item, value);return true;} catch (Exception e) {e.printStackTrace();return false;}}/*** 向一张hash表中放入数据,如果不存在将创建** @param key 键* @param item 项* @param value 值* @param time 时间(秒) 注意:如果已存在的hash表有时间,这里将会替换原有的时间* @return true 成功 false失败*/public boolean hset(String key, String item, Object value, long time) {try {redisTemplate.opsForHash().put(key, item, value);if (time > 0) {expire(key, time);}return true;} catch (Exception e) {e.printStackTrace();return false;}}/*** 删除hash表中的值** @param key 键 不能为null* @param item 项 可以使多个 不能为null*/public void hdel(String key, Object... item) {redisTemplate.opsForHash().delete(key, item);}/*** 判断hash表中是否有该项的值** @param key 键 不能为null* @param item 项 不能为null* @return true 存在 false不存在*/public boolean hHasKey(String key, String item) {return redisTemplate.opsForHash().hasKey(key, item);}/*** hash递增 如果不存在,就会创建一个 并把新增后的值返回** @param key 键* @param item 项* @param by 要增加几(大于0)*/public double hincr(String key, String item, double by) {return redisTemplate.opsForHash().increment(key, item, by);}/*** hash递减** @param key 键* @param item 项* @param by 要减少记(小于0)*/public double hdecr(String key, String item, double by) {return redisTemplate.opsForHash().increment(key, item, -by);}// ============================set=============================/*** 根据key获取Set中的所有值* @param key 键*/public Set<Object> sGet(String key) {try {return redisTemplate.opsForSet().members(key);} catch (Exception e) {e.printStackTrace();return null;}}/*** 根据value从一个set中查询,是否存在** @param key 键* @param value 值* @return true 存在 false不存在*/public boolean sHasKey(String key, Object value) {try {return redisTemplate.opsForSet().isMember(key, value);} catch (Exception e) {e.printStackTrace();return false;}}/*** 将数据放入set缓存** @param key 键* @param values 值 可以是多个* @return 成功个数*/public long sSet(String key, Object... values) {try {return redisTemplate.opsForSet().add(key, values);} catch (Exception e) {e.printStackTrace();return 0;}}/*** 将set数据放入缓存** @param key 键* @param time 时间(秒)* @param values 值 可以是多个* @return 成功个数*/public long sSetAndTime(String key, long time, Object... values) {try {Long count = redisTemplate.opsForSet().add(key, values);if (time > 0)expire(key, time);return count;} catch (Exception e) {e.printStackTrace();return 0;}}/*** 获取set缓存的长度** @param key 键*/public long sGetSetSize(String key) {try {return redisTemplate.opsForSet().size(key);} catch (Exception e) {e.printStackTrace();return 0;}}/*** 移除值为value的** @param key 键* @param values 值 可以是多个* @return 移除的个数*/public long setRemove(String key, Object... values) {try {Long count = redisTemplate.opsForSet().remove(key, values);return count;} catch (Exception e) {e.printStackTrace();return 0;}}// ===============================list=================================/*** 获取list缓存的内容** @param key 键* @param start 开始* @param end 结束 0 到 -1代表所有值*/public List<Object> lGet(String key, long start, long end) {try {return redisTemplate.opsForList().range(key, start, end);} catch (Exception e) {e.printStackTrace();return null;}}/*** 获取list缓存的长度** @param key 键*/public long lGetListSize(String key) {try {return redisTemplate.opsForList().size(key);} catch (Exception e) {e.printStackTrace();return 0;}}/*** 通过索引 获取list中的值** @param key 键* @param index 索引 index>=0时, 0 表头,1 第二个元素,依次类推;index<0时,-1,表尾,-2倒数第二个元素,依次类推*/public Object lGetIndex(String key, long index) {try {return redisTemplate.opsForList().index(key, index);} catch (Exception e) {e.printStackTrace();return null;}}/*** 将list放入缓存** @param key 键* @param value 值*/public boolean lSet(String key, Object value) {try {redisTemplate.opsForList().rightPush(key, value);return true;} catch (Exception e) {e.printStackTrace();return false;}}/*** 将list放入缓存* @param key 键* @param value 值* @param time 时间(秒)*/public boolean lSet(String key, Object value, long time) {try {redisTemplate.opsForList().rightPush(key, value);if (time > 0)expire(key, time);return true;} catch (Exception e) {e.printStackTrace();return false;}}/*** 将list放入缓存** @param key 键* @param value 值* @return*/public boolean lSet(String key, List<Object> value) {try {redisTemplate.opsForList().rightPushAll(key, value);return true;} catch (Exception e) {e.printStackTrace();return false;}}/*** 将list放入缓存** @param key 键* @param value 值* @param time 时间(秒)* @return*/public boolean lSet(String key, List<Object> value, long time) {try {redisTemplate.opsForList().rightPushAll(key, value);if (time > 0)expire(key, time);return true;} catch (Exception e) {e.printStackTrace();return false;}}/*** 根据索引修改list中的某条数据** @param key 键* @param index 索引* @param value 值* @return*/public boolean lUpdateIndex(String key, long index, Object value) {try {redisTemplate.opsForList().set(key, index, value);return true;} catch (Exception e) {e.printStackTrace();return false;}}/*** 移除N个值为value** @param key 键* @param count 移除多少个* @param value 值* @return 移除的个数*/public long lRemove(String key, long count, Object value) {try {Long remove = redisTemplate.opsForList().remove(key, count, value);return remove;} catch (Exception e) {e.printStackTrace();return 0;}}}
六、Redis.conf 详解
1.容量单位不区分大小写,G和GB有区别
2.可以使用 include 组合多个配置文件
3.网络配置
protected-mode no #保护模式
port 6379 #端口设置
4.通用配置(GENERAL)
daemonize yes #以守护进程的方式运行,默认是no ,我们需要自己开启为yespidfile /var/run/redis_6379.pid #如果以后台的方式运行,我们就需要指定一个pid文件# Specify the server verbosity level.
# This can be one of:
# debug (a lot of information, useful for development/testing)
# verbose (many rarely useful info, but not a mess like the debug level)
# notice (moderately verbose, what you want in production probably)
# warning (only very important / critical messages are logged)
loglevel notice #日志级别
logfile "" #日志的文件位置名
databases 16 #数据库的数量,默认是16个数据库
always-show-logo no #是否总是显示log
5.快照(SNAPSHOTTING) rdb配置
持久化,在规定的时间内,执行了多少次操作,则会持久化到文件 .rdb .aof
redis是内存数据库,如果没有持久化,那么数据断电及失。
#在3600s内,如果至少有 1 key进行了修改,我们将进行持久化操作save 3600 1#在300s内,如果至少有 100 key进行了修改,我们将进行持久化操作save 300 100#在 60s 内,如果至少有 10000 key进行了修改,我们将进行持久化操作save 60 10000#我们之后学习持久化,会自己定义这个测试stop-writes-on-bgsave-error yes #持久化如果出错,是否还需要继续工作。rdbcompression yes #是否压缩rdb文件,需要消耗一些 cpu资源rdbchecksum yes #保存rdb文件的视觉,进行错误的检查效验dir ./ #rdb文件保存的目录
REPLICATION 主从复制,我们后面讲解主从复制的时候再进行详解
在Redis的配置文件(redis.conf)中,可以配置RDB持久化相关的参数:
save m n
:设置触发RDB持久化的条件和频率。例如,save 900 1
表示在900秒(15分钟)内如果至少有1个key被修改,则执行BGSAVE命令生成快照。stop-writes-on-bgsave-error yes/no
:当BGSAVE命令执行失败时,是否停止写入操作。rdbcompression yes/no
:是否对RDB文件进行压缩存储,节省磁盘空间但会增加CPU消耗。rdbchecksum yes/no
:在写入RDB文件时是否进行数据校验。dir /path/to/directory
:指定RDB文件的存储目录。dbfilename dump.rdb
:指定RDB文件的名称。
6.SECURITY 安全
1.可以再redis.conf配置文件里面修改密码requirepass 123456 #可以在这里设置redis的密码,默认是没有密码的2.可以用命令设置密码
127.0.0.1:6379> ping
(error) NOAUTH Authentication required. #设置密码后发现所有的命令都没有权限了
127.0.0.1:6379> auth 123456 #使用密码进行登录
OK
127.0.0.1:6379> config get requirepass #查看redis的密码
1) "requirepass"
2) "123456"
127.0.0.1:6379> config set requirepass "1234567" #用命令设置redis的密码
7.CLIENTS 限制(客户端连接相关)
maxclients 10000 #设置能连上redis的最大客户端的数量maxmemory <bytes> # redis 配置最大的内存容量maxmemory-policy noeviction # 内存到达上限之后的处理策略
6种策略1、volatile-lru:只对设置了过期时间的key进行LRU(默认值)2、allkeys-lru : 删除lru算法的key3、volatile-random:随机删除即将过期key4、allkeys-random:随机删除5、volatile-ttl : 删除即将过期的6、noeviction : 永不过期,返回错误# volatile-lru -> 在具有过期设置的键中使用近似 LRU 进行驱逐。
# allkeys-lru -> 使用近似 LRU 驱逐任何键。
# volatile-lfu -> 在具有过期设置的键中使用近似 LFU 进行驱逐。
# allkeys-lfu -> 使用近似 LFU 驱逐任何键。
# volatile-random -> 从设置了过期时间的密钥中删除一个随机密钥。
# allkeys-random -> 删除随机密钥,任何密钥。
# volatile-ttl -> 删除过期时间最近的密钥(次要 TTL)
# noeviction -> 不驱逐任何东西,只是在写操作上返回错误。
8.APPEND ONLY 模式 aof配置
appendonly no # 默认是不开启aof模式的,默认是使用rdb方式持久化的,在大部分所有的情况下,rdb完全够用!
appendfilename "appendonly.aof" # 持久化的文件的名字# appendfsync always # 每次修改都会 sync。消耗性能
appendfsync everysec # 每秒执行一次 sync,可能会丢失这1s的数据!
# appendfsync no # 不执行 sync,这个时候操作系统自己同步数据,速度最快!
七、Redis持久化——RDB
面试和工作,持久化都是重点!
Redis 是内存数据库,如果不将内存中的数据库状态保存到磁盘,那么一旦服务器进程退出,服务器中的数据库状态也会消失。所以 Redis 提供了持久化功能!
持久化:在指定时间间隔内将内存数据存入磁盘中,断电也能恢复数据,使用快照文件读到内存中。
RDB:读写文件
RDB(Redis DataBase)
什么是RDB
用在主从复制中,rdb就是备用的,在从机上面。
在指定的时间间隔内将内存中的数据集快照写入磁盘,也就是行话讲的Snapshot快照,它恢复时是将快 照文件直接读到内存里。
Redis会单独创建(fork)一个子进程来进行持久化,会先将数据写入到一个临时文件中,待持久化过程 都结束了,再用这个临时文件替换上次持久化好的文件。整个过程中,主进程是不进行任何IO操作的。 这就确保了极高的性能。如果需要进行大规模数据的恢复,且对于数据恢复的完整性不是非常敏感,那 RDB方式要比AOF方式更加的高效。RDB的缺点是最后一次持久化后的数据可能丢失。我们默认的就是 RDB,一般情况下不需要修改这个配置!
有时候在生产环境我们会将这个文件进行备份!
rdb保存的文件是dump.rdb 都是在我们的配置文件中快照中进行配置的!
触发机制
- save的规则满足的情况下,会自动触发rdb原则
- 执行flushall命令,也会触发我们的rdb原则
- 退出redis,也会自动产生rdb文件
备份就自动生成一个 dump.rdb
1.save
使用 save 命令,会立刻对当前内存中的数据进行持久化 ,但是会阻塞,也就是不接受其他操作了;
由于 save 命令是同步命令,会占用Redis的主进程。若Redis数据非常多时,save命令执行速度会非常慢,阻塞所有客户端的请求。
示意图:
2.flushall命令
flushall 命令也会触发持久化 ;
触发持久化规则
满足配置条件中的触发条件 ;
可以通过配置文件对 Redis 进行设置, 让它在“ N 秒内数据集至少有 M 个改动”这一条件被满足时, 自动进行数据集保存操作。
3.bgsave
bgsave 是异步进行,进行持久化的时候,redis 还可以将继续响应客户端请求 ;
示意图:
bgsave和save对比
命令 | save | bgsave |
---|---|---|
IO类型 | 同步 | 异步 |
阻塞? | 是 | 是(阻塞发生在fock(),通常非常快) |
复杂度 | O(n) | O(n) |
优点 | 不会消耗额外的内存 | 不阻塞客户端命令 |
缺点 | 阻塞客户端命令 | 需要fock子进程,消耗内存 |
- SAVE:
- 描述:
SAVE
命令会阻塞 Redis 服务器进程,直到持久化过程完成为止。它会将当前的数据集以同步阻塞方式保存到磁盘上的一个文件中,这个过程会在主进程中进行。- 特点:
- 阻塞:
SAVE
命令会阻塞 Redis 服务器进程,直到持久化过程完成。- 同步:持久化操作是同步执行的,因此会导致服务器停止服务一段时间。
- 优点:
- 简单:
SAVE
是一个简单的命令,容易理解和使用。- 数据一致性:在持久化期间,Redis 不接受任何请求,因此可以保证数据一致性。
- 缺点:
- 阻塞:持久化过程会阻塞 Redis 服务器,导致服务器暂时无法处理请求,影响性能。
- 性能问题:对于大型数据库来说,保存整个数据集可能会消耗大量时间和资源。
- 适用场合:适用于小型数据库,或者在数据量不大且可以容忍服务器停止服务一段时间的情况下。
- BGSAVE:
- 描述:
BGSAVE
命令会派生一个子进程,由子进程负责将数据集保存到磁盘上的一个文件中。父进程继续处理请求,不会被阻塞。- 特点:
- 后台执行:
BGSAVE
是一个后台执行的命令,不会阻塞 Redis 服务器主进程。- 异步:持久化操作是异步执行的,不会影响服务器的响应速度。
- 优点:
- 非阻塞:
BGSAVE
不会阻塞服务器进程,因此不会影响 Redis 服务器的性能。- 异步:持久化过程在后台执行,不会影响服务器响应请求的能力。
- 缺点:
- 内存占用:
BGSAVE
使用子进程进行持久化操作,可能会消耗一定的内存资源。- 可能会失败:在某些情况下,
BGSAVE
操作可能会因为子进程出错或者其他原因失败。- 适用场合:适用于大型数据库,或者在不能容忍服务器停止服务的情况下,因为
BGSAVE
不会阻塞 Redis 服务器的主进程。总的来说,如果你的数据库较小,且可以容忍短暂的服务停止,那么使用
SAVE
是一个简单的选择。但如果你的数据库较大,或者不能容忍服务停止,那么最好使用BGSAVE
。
如果恢复rdb文件!
1、只需要将rdb文件放在我们redis启动目录就可以,redis启动的时候会自动检查dump.rdb 恢复其中的数据!
2、查看需要存在的位置
127.0.0.1:6379> config get dir
1) "dir"
2) "/usr/local/bin" # 如果在这个目录下存在 dump.rdb 文件,启动就会自动恢复其中的数据
优缺点
优点:
- 适合大规模的数据恢复
- 对数据的完整性要求不高
缺点:
- 需要一定的时间间隔进行操作,如果redis意外宕机了,这个最后一次修改的数据就没有了。
- fork进程的时候,会占用一定的内容空间。
优点:
- RDB持久化生成的快照文件紧凑,适合用于备份和恢复数据。
- 在数据恢复时,由于快照是全量的,恢复速度相对较快。
缺点:
- RDB持久化是全量备份,如果Redis进程意外终止,可能会丢失最后一次快照生成后的数据。
- 对于大型数据集,生成快照会消耗较多的CPU和IO资源,可能会影响Redis的性能。
- RDB持久化无法实现实时备份,只能通过定期生成快照的方式来保证数据的持久化。
总的来说,RDB持久化适合对数据一致性要求不是特别高、需要定期备份的场景。对于要求实时持久化并且能够容忍一定数据丢失的场景,AOF持久化更为适合。
八、Redis 持久化——AOF
AOF(Append-Only File)(记录文件)
将我们的所有命令都记录下来,history,恢复的时候就把这个文件全部在执行一遍!
AOF 是什么
以日志的形式来记录每个写操作,将Redis执行过的所有指令记录下来(读操作不记录),只许追加文件 但不可以改写文件,redis启动之初会读取该文件重新构建数据,换言之,redis重启的话就根据日志文件 的内容将写指令从前到后执行一次以完成数据的恢复工作
Aof保存的是 appendonly.aof 文件
append
默认是不开启的,我们需要手动进行配置!我们只需要将 appendonly 改为yes就开启了 aof!
重启,redis 就可以生效了! 如果这个 aof 文件有错位,这时候 redis 是启动不起来的吗,我们需要修复这个aof文件
开启后会生成appendonly.aof文件
在里面写入数据后
看 appendonly.aof 里面会有 写数据的记录
如果这个aof文件有错误,这时候redis是启动不起来的,需要我们修复这个aof文件
redis给我们提供了一个工具 redis-check-aof --fix
aof配置文件出错,连接时会出现这个问题
用redis-check-aof --fix 进行恢复
如果文件正常,重启就可以直接恢复了
重写规则说明
aof 默认就是文件的无限追加,文件会越来越大!
如果 aof 文件大于 64m,太大了! fork一个新的进程来将我们的文件进行重写!
优点和缺点!
appendonly no # 默认是不开启aof模式的,默认是使用rdb方式持久化的,在大部分所有的情况下,
rdb完全够用!
appendfilename "appendonly.aof" # 持久化的文件的名字
# appendfsync always # 每次修改都会 sync。消耗性能
appendfsync everysec # 每秒执行一次 sync,可能会丢失这1s的数据!
# appendfsync no # 不执行 sync,这个时候操作系统自己同步数据,速度最快!
# rewrite 重写,
优点
1、每一次修改都同步,文件的完整会更加好!
2、每秒同步一次,可能会丢失一秒的数据
3、从不同步,效率最高的!
缺点
1、相对于数据文件来说,aof远远大于 rdb,修复的速度也比 rdb慢!
2、Aof 运行效率也要比 rdb 慢,所以我们redis默认的配置就是rdb持久化!
扩展:
1、RDB 持久化方式能够在指定的时间间隔内对你的数据进行快照存储
2、AOF 持久化方式记录每次对服务器写的操作,当服务器重启的时候会重新执行这些命令来恢复原始 的数据,AOF命令以Redis 协议追加保存每次写的操作到文件末尾,Redis还能对AOF文件进行后台重 写,使得AOF文件的体积不至于过大。
3、只做缓存,如果你只希望你的数据在服务器运行的时候存在,你也可以不使用任何持久化
4、同时开启两种持久化方式
- 在这种情况下,当redis重启的时候会优先载入AOF文件来恢复原始的数据,因为在通常情况下AOF 文件保存的数据集要比RDB文件保存的数据集要完整。
- RDB 的数据不实时,同时使用两者时服务器重启也只会找AOF文件,那要不要只使用AOF呢?作者 建议不要,因为RDB更适合用于备份数据库(AOF在不断变化不好备份),快速重启,而且不会有 AOF可能潜在的Bug,留着作为一个万一的手段。
5、性能建议
- 因为RDB文件只用作后备用途,建议只在Slave上持久化RDB文件,而且只要15分钟备份一次就够 了,只保留 save 900 1 这条规则。
- 如果Enable AOF ,好处是在最恶劣情况下也只会丢失不超过两秒数据,启动脚本较简单只load自 己的AOF文件就可以了,代价一是带来了持续的IO,二是AOF rewrite 的最后将 rewrite 过程中产 生的新数据写到新文件造成的阻塞几乎是不可避免的。只要硬盘许可,应该尽量减少AOF rewrite 的频率,AOF重写的基础大小默认值64M太小了,可以设到5G以上,默认超过原大小100%大小重 写可以改到适当的数值。
- 如果不Enable AOF ,仅靠 Master-Slave Repllcation 实现高可用性也可以,能省掉一大笔IO,也 减少了rewrite时带来的系统波动。代价是如果Master/Slave 同时倒掉,会丢失十几分钟的数据, 启动脚本也要比较两个 Master/Slave 中的 RDB文件,载入较新的那个,微博就是这种架构。
优点:
- 可读性高:AOF文件是以文本形式保存的,易于理解和调试。
- 容灾性强:AOF文件记录了所有写操作命令,因此可以更可靠地恢复数据。
- 适用于追加操作:AOF采用追加方式记录写操作,因此对于磁盘的IO消耗相对较低。
缺点:
- 文件较大:由于AOF文件保存了每个写操作命令,因此可能会比RDB生成的快照文件更大。
- 恢复速度较慢:由于Redis在启动时需要重新执行AOF文件中的所有写操作,因此恢复速度可能比RDB快照方式慢。
- 重写可能会耗时:AOF重写过程可能会耗费一定的时间和资源,尤其是在数据集较大时。
配置AOF持久化
在Redis的配置文件中,可以通过以下参数配置AOF持久化:
appendonly yes/no
:是否开启AOF持久化,默认为no。appendfilename
:AOF文件的文件名,默认为appendonly.aof。appendfsync always/everysec/no
:控制何时将写入操作同步到磁盘,可选值有always(每个写入操作都会同步到磁盘)、everysec(每秒同步一次)和no(不同步)。auto-aof-rewrite-percentage
:触发AOF重写的百分比,默认为100。auto-aof-rewrite-min-size
:AOF文件大小超过该值时,触发AOF重写,默认为64MB。适用场景
- 数据可靠性要求高:AOF持久化记录了每个写操作命令,因此在数据可靠性要求较高的场景下更为适用。
- 追加操作频繁:AOF持久化采用追加方式记录写操作,适用于写操作频繁的场景。
- 可读性要求高:AOF文件是以文本形式保存的,易于理解和调试,适用于需要对持久化数据进行查看和修改的场景。
AOF重写是Redis中的一项机制,旨在解决AOF持久化过程中可能出现的文件膨胀和性能问题。在AOF持久化过程中,随着Redis接收到写操作命令的不断累积,AOF文件会不断增长,可能会导致文件体积过大,影响性能和占用过多的磁盘空间。AOF重写机制的作用是生成一个新的AOF文件,该文件与当前数据集的状态相同,但是文件体积更小,仍然能够完整地恢复数据集。
AOF重写的工作原理
- 生成新的AOF文件:
- AOF重写过程由后台线程完成,不会阻塞主进程。该线程会遍历当前数据集的所有键值对,并生成一系列写命令,这些命令可以完整地重现当前数据集的状态。
- 记录生成过程:
- 在生成新的AOF文件时,Redis会记录当前服务器接收到的所有写命令。这些命令不是原始的客户端写命令,而是生成新AOF文件所需的最少的写命令序列。
- 保留最近一次完整写命令序列:
- 在AOF重写过程中,Redis会记录生成新AOF文件所需的写命令序列。一旦重写完成,Redis会将这个写命令序列写入新的AOF文件中,代替旧的AOF文件。
- 切换AOF文件:
- 当新AOF文件生成完毕并写入完成后,Redis会原子性地将旧的AOF文件替换为新的AOF文件。这样就完成了AOF重写。
AOF重写的优点
- 降低AOF文件体积:由于AOF重写只记录当前数据集的状态,因此生成的新AOF文件体积通常会小于原始AOF文件。
- 减少写操作记录:AOF重写只记录生成新AOF文件所需的最少的写命令序列,而不是全部历史写命令,因此能够降低AOF文件的体积。
- 提升性能:AOF重写过程由后台线程完成,不会阻塞主进程,因此不会影响Redis的正常响应速度。
auto-aof-rewrite-percentage:触发AOF重写的AOF文件体积增长百分比,默认为100%。
这个配置项指定了触发AOF重写的AOF文件体积增长的百分比阈值。默认情况下,这个值是100%,意味着当AOF文件的体积增长到原始AOF文件的大小的两倍时,Redis就会触发AOF重写过程。
举个例子来说,假设初始时AOF文件的大小是100MB。如果设置了默认的100%阈值,那么当AOF文件的大小增长到200MB时,就会触发AOF重写。这样做的目的是为了在AOF文件不断增长时,及时触发AOF重写,以防止AOF文件变得过大影响性能或占用过多的磁盘空间。
九、RDB和AOF选择
RDB 和 AOF 对比
RDB | AOF | |
---|---|---|
启动优先级 | 低 | 高 |
体积 | 小 | 大 |
恢复速度 | 快 | 慢 |
数据安全性 | 丢数据 | 根据策略决定 |
如何选择使用哪种持久化方式?
一般来说, 如果想达到足以媲美 PostgreSQL 的数据安全性, 你应该同时使用两种持久化功能。
如果你非常关心你的数据, 但仍然可以承受数分钟以内的数据丢失, 那么你可以只使用 RDB 持久化。
有很多用户都只使用 AOF 持久化, 但并不推荐这种方式: 因为定时生成 RDB 快照(snapshot)非常便于进行数据库备份, 并且 RDB 恢复数据集的速度也要比 AOF 恢复的速度要快。
首先,需要创建 RDB 文件的备份,通常是最新的
dump.rdb
文件。这是为了确保数据安全。将备份文件保存在一个安全的地方,以防意外情况发生时需要还原数据。
通过执行以下两个命令来完成切换持久化方式:首先,使用
CONFIG SET appendonly yes
命令开启 AOF 功能;然后,使用CONFIG SET save ""
命令关闭 RDB 功能。这样做之后,Redis 就会开始将写入命令追加到 AOF 文件中,并不再执行 RDB 快照持久化。确保执行完上述命令后,数据库中的键数量没有发生变化,以确保数据持久化的完整性。
确保写入命令会被正确地追加到 AOF 文件的末尾,以确保持久化的有效性。
在步骤3中,第一个命令执行后,Redis 会阻塞直到初始 AOF 文件创建完成为止。这意味着 Redis 会等待 AOF 文件完全初始化之后才会继续处理其他命令请求。一旦 AOF 文件初始化完成,Redis 就会开始将写入命令追加到 AOF 文件末尾。
第二个命令则是可选的,用于关闭 RDB 功能。如果你愿意的话,你可以同时使用 RDB 和 AOF 这两种持久化功能。然而,需要确保在
redis.conf
配置文件中打开 AOF 功能,以防止服务器重启后之前通过CONFIG SET
命令设置的配置被遗忘,导致服务器按照原来的配置启动。
十、Redis订阅发布
Redis 发布订阅(pub/sub)是一种消息通信模式:发送者(pub)发送消息,订阅者(sub)接收消息。如:微信、 微博、关注系统!
Redis 客户端可以订阅任意数量的频道。
订阅/发布消息图:
第一个:消息发送者, 第二个:频道 第三个:消息订阅者!
下图展示了频道 channel1 , 以及订阅这个频道的三个客户端 —— client2 、 client5 和 client1 之间的 关系:
当有新消息通过 PUBLISH 命令发送给频道 channel1 时, 这个消息就会被发送给订阅它的三个客户端:
命令
这些命令被广泛用于构建即时通信应用,比如网络聊天室(chatroom)和实时广播、实时提醒等。
测试
订阅端:
127.0.0.1:6379> SUBSCRIBE jihu2 #订阅一个频道jihu2
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "jihu2"
3) (integer) 1
#等待读取推送的信息
1) "message"
2) "jihu2"
3) "hello beauty women" #自动接收jihu2发布的信息
1) "message"
2) "jihu2"
3) "hello redis"
发送端:
[root@localhost bin]# redis-cli -p 6379 #发布者发布消息到频道
127.0.0.1:6379> PUBLISH jihu2 "hello beauty women" #发布者发布消息到频道
(integer) 1
127.0.0.1:6379> PUBLISH jihu2 "hello redis"
(integer) 1
原理
Redis是使用C实现的,通过分析 Redis 源码里的 pubsub.c 文件,了解发布和订阅机制的底层实现,籍此加深对 Redis 的理解。
Redis 通过 PUBLISH 、SUBSCRIBE 和 PSUBSCRIBE 等命令实现发布和订阅功能。
微信:
通过 SUBSCRIBE 命令订阅某频道后,redis-server 里维护了一个字典,字典的键就是一个个 频道!, 而字典的值则是一个链表,链表中保存了所有订阅这个 channel 的客户端。SUBSCRIBE 命令的关键, 就是将客户端添加到给定 channel 的订阅链表中。
通过 PUBLISH 命令向订阅者发送消息,redis-server 会使用给定的频道作为键,在它所维护的 channel 字典中查找记录了订阅这个频道的所有客户端的链表,遍历这个链表,将消息发布给所有订阅者。
缺点
- 如果一个客户端订阅了频道,但自己读取消息的速度却不够快的话,那么不断积压的消息会使redis输出缓冲区的体积变得越来越大,这可能使得redis本身的速度变慢,甚至直接崩溃。
- 这和数据传输可靠性有关,如果在订阅方断线,那么他将会丢失所有在短线期间发布者发布的消息。
使用场景
1、实时消息系统!
2、事实聊天!(频道当做聊天室,将信息回显给所有人即可!)
3、订阅,关注系统都是可以的!
稍微复杂的场景,我们就会使用消息中间件MQ处理。
当使用 Java 实现 Redis 的发布订阅功能时,我们需要使用 Redis 官方提供的 Java 客户端库 Jedis。确保你已经将 Jedis 添加到项目依赖中。
下面我将编写两个 Java 类,一个作为发布者,另一个作为订阅者。
发布者(Publisher)import redis.clients.jedis.Jedis;public class Publisher {public static void main(String[] args) {// 连接到本地 Redis 服务器Jedis jedis = new Jedis("localhost");// 模拟发布消息到频道for (int i = 0; i < 5; i++) {String message = "News Update " + i;jedis.publish("news", message);System.out.println("Published '" + message + "' to channel 'news'");try {Thread.sleep(1000); // 模拟消息发布间隔} catch (InterruptedException e) {e.printStackTrace();}}// 关闭连接jedis.close();} }
这段代码连接到本地 Redis 服务器,然后模拟发布了 5 条消息到名为 ‘news’ 的频道,并且在控制台上打印了发布的消息内容。
订阅者(Subscriber)import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPubSub;public class Subscriber {public static void main(String[] args) {// 连接到本地 Redis 服务器Jedis jedis = new Jedis("localhost");// 订阅消息jedis.subscribe(new JedisPubSub() {@Overridepublic void onMessage(String channel, String message) {System.out.println("Received message: " + message);}}, "news");} }
这段代码连接到本地 Redis 服务器,然后订阅了名为 ‘news’ 的频道,并且实现了 onMessage 方法来处理接收到的消息,在控制台上打印出来。
运行代码
确保 Redis 服务器正在运行,然后在不同的终端中分别运行发布者和订阅者的 Java 类,你会看到订阅者接收到发布者发布的消息。
通过这种方式,你可以在 Java 中使用 Redis 的发布订阅功能,实现消息的发布和订阅。
十一、Redis主从复制
概念
主从复制,是指将一台Redis服务器的数据,复制到其他的Redis服务器。前者称为主节点 (master/leader),后者称为从节点(slave/follower);数据的复制是单向的,只能由主节点到从节点。 Master以写为主,Slave 以读为主。
默认情况下,每台Redis服务器都是主节点;
且一个主节点可以有多个从节点(或没有从节点),但一个从节点只能有一个主节点。
主从复制的作用主要包括:
1、数据冗余:主从复制实现了数据的热备份,是持久化之外的一种数据冗余方式。
2、故障恢复:当主节点出现问题时,可以由从节点提供服务,实现快速的故障恢复;实际上是一种服务 的冗余。
3、负载均衡:在主从复制的基础上,配合读写分离,可以由主节点提供写服务,由从节点提供读服务 (即写Redis数据时应用连接主节点,读Redis数据时应用连接从节点),分担服务器负载;尤其是在写 少读多的场景下,通过多个从节点分担读负载,可以大大提高Redis服务器的并发量。
4、高可用(集群)基石:除了上述作用以外,主从复制还是哨兵和集群能够实施的基础,因此说主从复 制是Redis高可用的基础
一般来说,要将Redis运用于工程项目中,只使用一台Redis是万万不能的(宕机),原因如下:
1、从结构上,单个Redis服务器会发生单点故障,并且一台服务器需要处理所有的请求负载,压力较 大;
2、从容量上,单个Redis服务器内存容量有限,就算一台Redis服务器内存容量为256G,也不能将所有 内存用作Redis存储内存,一般来说,单台Redis最大使用内存不应该超过20G。
电商网站上的商品,一般都是一次上传,无数次浏览的,说专业点也就是"多读少写"。
对于这种场景,我们可以使如下这种架构:
主从复制,读写分离! 80% 的情况下都是在进行读操作!减缓服务器的压力!架构中经常使用! 一主 二从!
只要在公司中,主从复制就是必须要使用的,因为在真实的项目中不可能单机使用Redis!
环境配置
只配置从库,不用配置主库!
127.0.0.1:6379> info replication #查看当前库的信息
# Replication
role:master #角色 master
connected_slaves:0 # 0 表示没有从机
master_failover_state:no-failover
master_replid:f80a04ead679af98a1c84518b78e529a0a33c986
master_replid2:0000000000000000000000000000000000000000
master_repl_offset:0
second_repl_offset:-1
repl_backlog_active:0
repl_backlog_size:1048576
repl_backlog_first_byte_offset:0
repl_backlog_histlen:0
复制3个配置文件,然后修改对应的信息
要想改一个启动项
要改以下四步: ( redis80.confshu )
- 端口 2.pid名字 3.log文件名字 4.dump.rdb 名字
修改完毕后,启动我们的3个redis服务器,可以通过进程信息查看
一主二从
默认情况下,每台Redis服务器都是主节点; 我们一般情况下只用配置从机就好了。
主(79) 二从( 80 , 81 )
从机中查看:
127.0.0.1:6380> SLAVEOF 127.0.0.1 6379 #SLAVEOF host 6379 找谁当自己的老的
OK
127.0.0.1:6380> info repilcation
127.0.0.1:6380> info replication
# Replication
role:slave #当前角色是从机
master_host:127.0.0.1 #可以看到主机的信息
master_port:6379
master_link_status:up
master_last_io_seconds_ago:3
master_sync_in_progress:0
slave_read_repl_offset:28
slave_repl_offset:28
slave_priority:100
slave_read_only:1
replica_announced:1
connected_slaves:0
master_failover_state:no-failover
master_replid:a0e8a30728e95d67d6b191795452ea27b934ba67
master_replid2:0000000000000000000000000000000000000000
master_repl_offset:28
second_repl_offset:-1
repl_backlog_active:1
repl_backlog_size:1048576
repl_backlog_first_byte_offset:1
repl_backlog_histlen:28
主机中查看(此时两个从机都配置完毕了):
127.0.0.1:6379> info replication
# Replication
role:master
connected_slaves:2 #多了从机的配置
slave0:ip=127.0.0.1,port=6380,state=online,offset=196,lag=0 #多了从机的配置
slave1:ip=127.0.0.1,port=6381,state=online,offset=196,lag=1
master_failover_state:no-failover
master_replid:a0e8a30728e95d67d6b191795452ea27b934ba67
master_replid2:0000000000000000000000000000000000000000
master_repl_offset:196
second_repl_offset:-1
repl_backlog_active:1
repl_backlog_size:1048576
repl_backlog_first_byte_offset:1
repl_backlog_histlen:196
真实的主从配置应该在配置文件中配置,这样的话是永久的,我们这里使用的是命令是暂时的。
这里是在配置文件中配置 主从信息
细节
主机可以写,从机不能写只能读。 主机中的所有信息和数据,都会自动被从机保存。
主机写:
从机只能读取内容
测试:主机断开连接,从机依旧连接到主机的,但是没有写操作,这个时候,主机如果回来了,从机依 旧可以直接获取到主机写的信息!
如果是使用命令行,来配置的主从,这个时候如果重启了,就会变回主机!只要变为从机,立马就会从 主机中获取值!
赋值原理
Slave 启动成功连接到 master 后会发送一个sync同步命令
Master 接到命令,启动后台的存盘进程,同时收集所有接收到的用于修改数据集命令,在后台进程执行 完毕之后,master将传送整个数据文件到slave,并完成一次完全同步。
全量复制:而slave服务在接收到数据库文件数据后,将其存盘并加载到内存中。
增量复制:Master 继续将新的所有收集到的修改命令依次传给slave,完成同步
但是只要是重新连接master,一次完全同步(全量复制)将被自动执行! 我们的数据一定可以在从机中看到!
层层链路
上一个M链接下一个 S!
这时候也可以完成我们的主从复制!
如果没有老大了,这个时候能不能选择一个老大出来呢? 需手动设置!
谋朝篡位
如果主机断开了连接,我们可以使用SLAVEOF no one
让自己变成主机!其他的节点就可以手动连 接到最新的这个主节点(手动)! 如果这个时候老大修复了,那就重新连接!
十二、哨兵模式
(自动选举老大的模式)
概述
主从切换技术的方法是:当主服务器宕机后,需要手动把一台从服务器切换为主服务器,这就需要人工 干预,费事费力,还会造成一段时间内服务不可用。这不是一种推荐的方式,更多时候,我们优先考虑 哨兵模式。Redis从2.8开始正式提供了Sentinel(哨兵) 架构来解决这个问题。
谋朝篡位的自动版,能够后台监控主机是否故障,如果故障了根据投票数自动将从库转换为主库。
哨兵模式是一种特殊的模式,首先Redis提供了哨兵的命令,哨兵是一个独立的进程,作为进程,它会独 立运行。其原理是哨兵通过发送命令,等待Redis服务器响应,从而监控运行的多个Redis实例。
Redis中的哨兵模式是一种用于实现高可用性(high availability)的机制。在Redis中,哨兵(Sentinel)是一个独立的进程,负责监控主数据库(master)和从数据库(slave),并在主数据库失效时自动将一个从数据库提升为新的主数据库,以确保系统的持续可用性。
以下是Redis中哨兵模式的主要组成部分和工作原理:
哨兵(Sentinel):哨兵是一个独立的进程,负责监控Redis中的主数据库和从数据库。哨兵会定期向主数据库和从数据库发送PING命令来检测它们的状态,并在发现主数据库不可用时,选举出一个新的主数据库。
主数据库(Master):主数据库是Redis中的主要数据存储节点,负责处理所有写操作,并将数据同步到从数据库。
从数据库(Slave):从数据库是主数据库的备份节点,负责接收来自主数据库的数据复制,并在主数据库失效时,通过选举成为新的主数据库。
工作流程如下:
- 初始阶段,哨兵监控着系统中的主数据库和从数据库。
- 如果主数据库失效(比如宕机),哨兵会开始进行故障检测。
- 哨兵会根据预设的条件(例如,主数据库不可用的连续次数)来决定是否宣布主数据库失效。
- 一旦主数据库失效,哨兵会从当前的从数据库中选举出一个新的主数据库。
- 选举出的新主数据库将会向哨兵报告自己的状态,并成为系统中的主数据库。
- 哨兵会更新系统中所有的从数据库,使它们成为新主数据库的从数据库,并开始将数据复制到它们。
在Redis Sentinel(哨兵)中,用于选举主节点的算法主要是基于哨兵之间的通信和协作。哨兵通过相互通信来达成一致,选择一个合适的从节点作为新的主节点。主要的选举算法包括:
- 投票选举算法:
- 在Redis Sentinel中,当哨兵检测到主节点不可用时,它会发起一次选举过程。
- 每个哨兵都会投票给自己认为适合成为新主节点的从节点。
- 哨兵将投票结果进行汇总,并选择得到最多票数的从节点作为新的主节点。
- 如果存在多个从节点获得相同数量的选票,则根据一定的优先级规则进行进一步的决定,例如根据从节点的复制偏移量(复制进度)或者ID等因素进行排序。
- 加权投票选举算法:
- 类似于投票选举算法,但是不同哨兵的投票可能具有不同的权重,这可以根据哨兵的配置或性能来调整。
- 通过给具有更高权重的哨兵赋予更多的投票权,可以在选举过程中更加重视它们的投票结果。
- 基于节点健康度的选举算法:
- 在进行选举时,哨兵可能会考虑到每个从节点的健康度。
- 如果一个从节点被认为更健康或者复制进度更接近于主节点,则它可能会获得更多的选票,从而有更大的机会成为新的主节点。
这里的哨兵有两个作用
- 通过发送命令,让Redis服务器返回监控其运行状态,包括主服务器和从服务器。
- 当哨兵监测到master宕机,会自动将slave切换成master,然后通过发布订阅模式通知其他的从服 务器,修改配置文件,让它们切换主机。
然而一个哨兵进程对Redis服务器进行监控,可能会出现问题,为此,我们可以使用多个哨兵进行监控。 各个哨兵之间还会进行监控,这样就形成了多哨兵模式。
假设主服务器宕机,哨兵1先检测到这个结果,系统并不会马上进行failover过程,仅仅是哨兵1主观的认 为主服务器不可用,这个现象成为主观下线。当后面的哨兵也检测到主服务器不可用,并且数量达到一 定值时,那么哨兵之间就会进行一次投票,投票的结果由一个哨兵发起,进行failover[故障转移]操作。 切换成功后,就会通过发布订阅模式,让各个哨兵把自己监控的从服务器实现切换主机,这个过程称为 客观下线。
测试
我们目前的状态是 一主二从!
1、配置哨兵配置文件 sentinel.conf
# sentinel monitor 被监控的名称 host port 1
sentinel monitor myredis 127.0.0.1 6379 1
#后面的这个数字1 ,代表主机挂了,从机投票看让谁接替成为主机,票数最多的,就会成为主机。
sentinel:这是Redis Sentinel的命令行工具。
monitor:这是sentinel命令的一个子命令,用于指示哨兵开始监视一个Redis实例。
myredis:这是要监视的Redis实例的名称。它是一个用户定义的标识符,用于在监视多个Redis实例时进行区分。
127.0.0.1:这是要监视的Redis实例的主机地址。在这里,使用的是本地主机(localhost)的IP地址。
6379:这是要监视的Redis实例的端口号。在这里,Redis实例使用的是默认的6379端口。
1:这是指定了要求多少个哨兵同意(包括自身)才将一个节点标记为主节点。在这里,使用的是"1",表示只需要一个哨兵同意即可。
综合起来,该命令告诉Redis Sentinel开始监视一个名为 “myredis” 的Redis实例,该实例位于本地主机的6379端口,如果至少有一个哨兵同意,它将标记一个节点为主节点。
2.启动哨兵
[root@localhost bin]# redis-sentinel kconfig/sentinel.conf
如果Master 节点断开了,这个时候就会从从机中随机选择一个服务器! (这里面有一个投票算法!)
哨兵日志! (指定6381为主节点)
哨兵模式
如果主机此时回来了,只能归并到新的主机下,当做从机,这就是哨兵模式的规则!
哨兵模式优缺点
优点:
- 哨兵集群,基于主从复制模式,所有主从复制的优点,它都有
- 主从可以切换,故障可以转移,系统的可用性更好
- 哨兵模式是主从模式的升级,手动到自动,更加健壮
缺点:
- Redis不好在线扩容,集群容量一旦达到上限,在线扩容就十分麻烦
- 实现哨兵模式的配置其实是很麻烦的,里面有很多配置项
哨兵模式的全部配置
完整的哨兵模式配置文件 sentinel.conf
# Example sentinel.conf# 哨兵sentinel实例运行的端口 默认26379
port 26379# 哨兵sentinel的工作目录
dir /tmp# 哨兵sentinel监控的redis主节点的 ip port
# master-name 可以自己命名的主节点名字 只能由字母A-z、数字0-9 、这三个字符".-_"组成。
# quorum 当这些quorum个数sentinel哨兵认为master主节点失联 那么这时 客观上认为主节点失联了
# sentinel monitor <master-name> <ip> <redis-port> <quorum>
sentinel monitor mymaster 127.0.0.1 6379 1# 当在Redis实例中开启了requirepass foobared 授权密码 这样所有连接Redis实例的客户端都要提供密码
# 设置哨兵sentinel 连接主从的密码 注意必须为主从设置一样的验证密码
# sentinel auth-pass <master-name> <password>
sentinel auth-pass mymaster MySUPER--secret-0123passw0rd# 指定多少毫秒之后 主节点没有应答哨兵sentinel 此时 哨兵主观上认为主节点下线 默认30秒
# sentinel down-after-milliseconds <master-name> <milliseconds>
sentinel down-after-milliseconds mymaster 30000# 这个配置项指定了在发生failover主备切换时最多可以有多少个slave同时对新的master进行 同步,
这个数字越小,完成failover所需的时间就越长,
但是如果这个数字越大,就意味着越 多的slave因为replication而不可用。
可以通过将这个值设为 1 来保证每次只有一个slave 处于不能处理命令请求的状态。
# sentinel parallel-syncs <master-name> <numslaves>
sentinel parallel-syncs mymaster 1# 故障转移的超时时间 failover-timeout 可以用在以下这些方面:
#1. 同一个sentinel对同一个master两次failover之间的间隔时间。
#2. 当一个slave从一个错误的master那里同步数据开始计算时间。直到slave被纠正为向正确的master那里同步数据时。
#3.当想要取消一个正在进行的failover所需要的时间。
#4.当进行failover时,配置所有slaves指向新的master所需的最大时间。不过,即使过了这个超时,slaves依然会被正确配置为指向master,但是就不按parallel-syncs所配置的规则来了
# 默认三分钟
# sentinel failover-timeout <master-name> <milliseconds>
sentinel failover-timeout mymaster 180000# SCRIPTS EXECUTION#配置当某一事件发生时所需要执行的脚本,可以通过脚本来通知管理员,例如当系统运行不正常时发邮件通知相关人员。
#对于脚本的运行结果有以下规则:
#若脚本执行后返回1,那么该脚本稍后将会被再次执行,重复次数目前默认为10
#若脚本执行后返回2,或者比2更高的一个返回值,脚本将不会重复执行。
#如果脚本在执行过程中由于收到系统中断信号被终止了,则同返回值为1时的行为相同。
#一个脚本的最大执行时间为60s,如果超过这个时间,脚本将会被一个SIGKILL信号终止,之后重新执行。#通知型脚本:当sentinel有任何警告级别的事件发生时(比如说redis实例的主观失效和客观失效等等),将会去调用这个脚本,
#这时这个脚本应该通过邮件,SMS等方式去通知系统管理员关于系统不正常运行的信息。调用该脚本时,将传给脚本两个参数,
#一个是事件的类型,
#一个是事件的描述。
#如果sentinel.conf配置文件中配置了这个脚本路径,那么必须保证这个脚本存在于这个路径,并且是可执行的,否则sentinel无法正常启动成功。
#通知脚本 用shell编写
# sentinel notification-script <master-name> <script-path>sentinel notification-script mymaster /var/redis/notify.sh# 客户端重新配置主节点参数脚本
# 当一个master由于failover而发生改变时,这个脚本将会被调用,通知相关的客户端关于master地址已经发生改变的信息。
# 以下参数将会在调用脚本时传给脚本:
# <master-name> <role> <state> <from-ip> <from-port> <to-ip> <to-port>
# 目前<state>总是“failover”,
# <role>是“leader”或者“observer”中的一个。
# 参数 from-ip, from-port, to-ip, to-port是用来和旧的master和新的master(即旧的slave)通信的
# 这个脚本应该是通用的,能被多次调用,不是针对性的。
# sentinel client-reconfig-script <master-name> <script-path>
sentinel client-reconfig-script mymaster /var/redis/reconfig.sh #一般都是由运维来配置。
sentinel parallel-syncs mymaster 1
:这条Redis Sentinel配置命令指定了在执行故障转移期间,要将从节点重新配置为新的主节点所需的并行复制过程的最大数量。这里的参数解释如下:
- sentinel:Redis Sentinel命令前缀。
- parallel-syncs:这是配置项的名称,用于指定并行同步的数量。
- mymaster:这是要设置并行同步的Redis主节点的名称。
- 1:这是指定了在执行故障转移期间所允许的最大并行同步数量。
换句话说,当发生故障转移并将一个从节点提升为新的主节点时,这个参数指定了可以同时对多少个从节点执行同步操作。在这种情况下,配置为
1
表示在进行故障转移时只允许一个从节点与新的主节点进行并行同步。这可以避免对主节点的负载过大,因为在同步期间会消耗大量的网络带宽和主节点的处理能力。如果需要更高的可用性,可以增加并行同步的数量,但要注意确保主节点的资源足够支持这些并行同步操作,以免影响主节点的性能。
十三、Redis缓存穿透和雪崩
缓存就是一堵墙
穿透就是越过墙(查不到),
击穿就是墙破了个洞(热点过期,点),
雪崩就是墙倒了(缓存集中过期 / Redis宕机,面)
服务的高可用问题!
Redis缓存的使用,极大的提升了应用程序的性能和效率,特别是数据查询方面。但同时,它也带来了一 些问题。其中,最要害的问题,就是数据的一致性问题,从严格意义上讲,这个问题无解。如果对数据 的一致性要求很高,那么就不能使用缓存。
另外的一些典型问题就是,缓存穿透、缓存雪崩和缓存击穿。目前,业界也都有比较流行的解决方案。
缓存穿透(查不到)
概念
缓存穿透的概念很简单,用户想要查询一个数据,发现redis内存数据库没有,也就是缓存没有命中,于 是向持久层数据库查询。发现也没有,于是本次查询失败。当用户很多的时候,缓存都没有命中(秒 杀!),于是都去请求了持久层数据库。这会给持久层数据库造成很大的压力,这时候就相当于出现了 缓存穿透。
解决方案
布隆过滤器
布隆过滤器是一种数据结构,对所有可能查询的参数以hash形式存储,在控制层先进行校验,不符合则 丢弃,从而避免了对底层存储系统的查询压力;
缓存空对象
当存储层不命中后,即使返回的空对象也将其缓存起来,同时会设置一个过期时间,之后再访问这个数 据将会从缓存中获取,保护了后端数据源;
但是这种方法会存在两个问题:
1、如果空值能够被缓存起来,这就意味着缓存需要更多的空间存储更多的键,因为这当中可能会有很多 的空值的键;
2、即使对空值设置了过期时间,还是会存在缓存层和存储层的数据会有一段时间窗口的不一致,这对于 需要保持一致性的业务会有影响。
缓存击穿(量太大,缓存过期!)
概述
这里需要注意和缓存击穿的区别,缓存击穿,是指一个key非常热点,在不停的扛着大并发,大并发集中 对这一个点进行访问,当这个key在失效的瞬间,持续的大并发就穿破缓存,直接请求数据库,就像在一 个屏障上凿开了一个洞。
当某个key在过期的瞬间,有大量的请求并发访问,这类数据一般是热点数据,由于缓存过期,会同时访 问数据库来查询最新数据,并且回写缓存,会导使数据库瞬间压力过大。
解决方案
设置热点数据永不过期
从缓存层面来看,没有设置过期时间,所以不会出现热点 key 过期后产生的问题。
加互斥锁 (分布式锁)
分布式锁:使用分布式锁,保证对于每个key同时只有一个线程去查询后端服务,其他线程没有获得分布 式锁的权限,因此只需要等待即可。这种方式将高并发的压力转移到了分布式锁,因此对分布式锁的考验很大。
缓存雪崩
概念
缓存雪崩,是指在某一个时间段,缓存集中过期失效。Redis 宕机!
产生雪崩的原因之一,比如在写本文的时候,马上就要到双十二零点,很快就会迎来一波抢购,这波商 品时间比较集中的放入了缓存,假设缓存一个小时。那么到了凌晨一点钟的时候,这批商品的缓存就都 过期了。而对这批商品的访问查询,都落到了数据库上,对于数据库而言,就会产生周期性的压力波 峰。于是所有的请求都会达到存储层,存储层的调用量会暴增,造成存储层也会挂掉的情况。
其实集中过期,倒不是非常致命,比较致命的缓存雪崩,是缓存服务器某个节点宕机或断网。因为自然 形成的缓存雪崩,一定是在某个时间段集中创建缓存,这个时候,数据库也是可以顶住压力的。无非就 是对数据库产生周期性的压力而已。而缓存服务节点的宕机,对数据库服务器造成的压力是不可预知 的,很有可能瞬间就把数据库压垮。
解决方案
redis高可用
这个思想的含义是,既然redis有可能挂掉,那我多增设几台redis,这样一台挂掉之后其他的还可以继续 工作,其实就是搭建的集群。(异地多活!)
限流降级(在SpringCloud讲解过!)
这个解决方案的思想是,在缓存失效后,通过加锁或者队列来控制读数据库写缓存的线程数量。比如对 某个key只允许一个线程查询数据和写缓存,其他线程等待。
数据预热
数据加热的含义就是在正式部署之前,我先把可能的数据先预先访问一遍,这样部分可能大量访问的数 据就会加载到缓存中。在即将发生大并发访问前手动触发加载缓存不同的key,设置不同的过期时间,让 缓存失效的时间点尽量均匀。
十四、布隆过滤器
1. 什么是布隆过滤器
布隆过滤器(Bloom Filter),是1970年,由一个叫布隆的小伙子提出的,距今已经五十年了,和老哥一样老。
它实际上是一个很长的二进制向量和一系列随机映射函数,二进制大家应该都清楚,存储的数据不是0就是1,默认是0。
主要用于判断一个元素是否在一个集合中,0代表不存在
某个数据,1代表存在
某个数据。
懂了吗?作为暖男
的老哥在给你们画张图来帮助理解:
2. 布隆过滤器用途
- 解决Redis缓存穿透(今天重点讲解)
- 在爬虫时,对爬虫网址进行过滤,已经存在布隆中的网址,不在爬取。
- 垃圾邮件过滤,对每一个发送邮件的地址进行判断是否在布隆的黑名单中,如果在就判断为垃圾邮件。
3. 布隆过滤器原理
存入过程
布隆过滤器上面说了,就是一个二进制数据的集合。当一个数据加入这个集合时,经历如下洗礼(这里有缺点,下面会讲):
- 通过K个哈希函数计算该数据,返回K个计算出的hash值
- 这些K个hash值映射到对应的K个二进制的数组下标
- 将K个下标对应的二进制数据改成1。
例如,第一个哈希函数返回x,第二个第三个哈希函数返回y与z,那么:X、Y、Z对应的二进制改成1。
查询过程
布隆过滤器主要作用就是查询一个数据,在不在这个二进制的集合中,查询过程如下
- 通过K个哈希函数计算该数据,对应计算出的K个hash值
- 通过hash值找到对应的二进制的数组下标
- 判断:如果存在一处位置的二进制数据是0,那么该数据不存在。如果都是1,该数据存在集合中。(这里有缺点,下面会讲)
删除过程
一般不能删除布隆过滤器里的数据,这是一个缺点之一,我们下面会分析
布隆过滤器的优缺点
优点
- 由于存储的是二进制数据,所以占用的空间很小
- 它的插入和查询速度是非常快的,时间复杂度是O(K),可以联想一下HashMap的过程
- 保密性很好,因为本身不存储任何原始数据,只有二进制数据
缺点
这就要回到我们上面所说的那些缺点了。
添加数据是通过计算数据的hash值,那么很有可能存在这种情况:两个不同的数据计算得到相同的hash值。
例如图中的“你好
”和“hello
”,假如最终算出hash值相同,那么他们会将同一个下标的二进制数据改为1。
这个时候,你就不知道下标为2的二进制,到底是代表“你好
”还是“hello
”。
由此得出如下缺点
一、存在误判
假如上面的图没有存"hello
",只存了"你好
",那么用"hello
"来查询的时候,会判断"hello
"存在集合中。
因为“你好
”和“hello
”的hash值是相同的,通过相同的hash值,找到的二进制数据也是一样的,都是1。
二、删除困难
到这里我不说大家应该也明白为什么吧,作为你们的暖男老哥
,还是讲一下吧。
还是用上面的举例,因为“你好
”和“hello
”的hash值相同,对应的数组下标也是一样的。
这时候老哥想去删除“你好
”,将下标为2里的二进制数据,由1改成了0。
那么我们是不是连“hello
”都一起删了呀。(0代表有这个数据,1代表没有这个数据)
4. 实现布隆过滤器
有很多种实现方式,其中一种就是Guava
提供的实现方式。
一、引入Guava pom配置
<dependency><groupId>com.google.guava</groupId><artifactId>guava</artifactId><version>29.0-jre</version>
</dependency>
二、代码实现
这里我们顺便测一下它的误判率。
package com.fatfish.bloomfilter;import com.google.common.hash.BloomFilter;
import com.google.common.hash.Funnels;public class MyBloomFilter {private static int size = 1000000;private static double fpp = 0.01;private static BloomFilter<Integer> bloomFilter = BloomFilter.create(Funnels.integerFunnel(), size, fpp);public static void main(String[] args) {for (int i = 0; i < size; i++) {bloomFilter.put(i);}int count = 0;for (int i = size; i < size + 1000000; i++) {if (bloomFilter.mightContain(i)) {count++;System.out.println(i + "误判了");}}System.out.println("总共的误判数目:" + count);System.out.printf("误判率:%.3f", 1.0*count/size);}
}
运行结果:
10万数据里有947个误判,约等于0.01%,也就是我们代码里设置的误判率:fpp = 0.01。
深入分析代码
核心BloomFilter.create
方法
@VisibleForTestingstatic <T> BloomFilter<T> create( Funnel<? super T> funnel, long expectedInsertions, double fpp, Strategy strategy) {。。。。
}
这里有四个参数:
funnel
:数据类型(一般是调用Funnels工具类中的)expectedInsertions
:期望插入的值的个数fpp
:误判率(默认值为0.03)strategy
:哈希算法
我们重点讲一下fpp
参数
fpp误判率(False Positive Probability,假阳性概率)
情景一:fpp = 0.01
- 误判个数:947
- 占内存大小:9585058位数
情景二:fpp = 0.03
(默认参数)
- 误判个数:3033
- 占内存大小:7298440位数
情景总结
- 误判率可以通过
fpp
参数进行调节 - fpp越小,需要的内存空间就越大:0.01需要900多万位数,0.03需要700多万位数。
- fpp越小,集合添加数据时,就需要更多的hash函数运算更多的hash值,去存储到对应的数组下标里。(忘了去看上面的布隆过滤存入数据的过程)
上面的numBits
,表示存一百万个int类型数字,需要的位数为7298440,700多万位。理论上存一百万个数,一个int是4字节32位,需要481000000=3200万位。如果使用HashMap去存,按HashMap50%的存储效率,需要6400万位。可以看出BloomFilter的存储空间很小,只有HashMap的1/10左右
上面的numHashFunctions
表示需要几个hash函数运算,去映射不同的下标存这些数字是否存在(0 or 1)。
解决Redis缓存穿透
上面使用Guava实现的布隆过滤器是把数据放在了本地内存中。分布式的场景中就不合适了,无法共享内存。
我们还可以用Redis来实现布隆过滤器,这里使用Redis封装好的客户端工具Redisson。
其底层是使用数据结构bitMap,大家就把它理解成上面说的二进制结构,由于篇幅原因,bitmap不在这篇文章里讲,我们之后写一篇文章介绍。
代码实现
pom配置:
<dependency><groupId>org.redisson</groupId><artifactId>redisson-spring-boot-starter</artifactId><version>3.13.4</version>
</dependency>
java代码:
public class RedissonBloomFilter {public static void main(String[] args) {Config config = new Config();config.useSingleServer().setAddress("redis://127.0.0.1:6379");config.useSingleServer().setPassword("1234");//构造RedissonRedissonClient redisson = Redisson.create(config);RBloomFilter<String> bloomFilter = redisson.getBloomFilter("phoneList");//初始化布隆过滤器:预计元素为100000000L,误差率为3%bloomFilter.tryInit(100000000L,0.03);//将号码10086插入到布隆过滤器中bloomFilter.add("10086");//判断下面号码是否在布隆过滤器中//输出falseSystem.out.println(bloomFilter.contains("123456"));//输出trueSystem.out.println(bloomFilter.contains("10086"));}
}
由于Guava那个版本,我们已经很详细的讲了布隆过滤器的那些参数,这里就不重复赘述了。
- Jedis:
- Jedis是Redis官方推荐的Java客户端之一,它是一个直接连接Redis服务器的Java库,提供了简单易用的API,使用起来较为直观。
- Jedis通过创建连接池来管理与Redis服务器的连接,通过连接池可以减少连接创建和销毁的开销,提高性能。
- Jedis的优点是简单易用,适合于对Redis操作较为简单的场景。
- RedisTemplate:
- RedisTemplate是Spring Data Redis提供的一个Redis操作模板,它封装了Redis连接和操作的细节,提供了更高层次的抽象和功能。
- RedisTemplate提供了一种更加灵活和功能丰富的方式来操作Redis,支持多种数据结构和命令,同时与Spring框架集成更为紧密。
- RedisTemplate是基于Jedis或Lettuce等底层连接池实现的,可以灵活切换底层连接方式。
- Redis连接池:
- Redis连接池用于管理与Redis服务器的连接,它可以提高连接的复用率和效率,减少连接创建和销毁的开销。
- 连接池可以通过预先创建一定数量的连接并维护在一个连接池中,客户端需要时从连接池中获取连接并在使用完毕后归还,以实现连接的复用和管理。
- Lettuce:
- Lettuce是一个基于Netty实现的高性能Redis客户端,相比于Jedis,它支持异步操作和响应式编程模型,具有更好的并发性能和可扩展性。
- Lettuce提供了更加灵活和强大的API,支持连接池、集群、哨兵、SSL等特性,同时提供了更丰富的配置选项和监控功能。
- Redisson:
- Redisson是一个基于Redis的Java驻内存数据网格(In-Memory Data Grid)和分布式锁(Distributed Lock)的框架,它提供了丰富的分布式数据结构和分布式服务。
- Redisson不仅提供了对Redis的普通操作,还提供了诸如分布式集合、分布式映射、分布式锁等高级功能,可以简化分布式系统的开发和管理。
- Redisson是一个完整的Redis客户端,它使用了Lettuce作为底层的Redis连接实现,同时提供了更高级别的功能和抽象。
总的来说,Jedis和Lettuce是直接连接Redis服务器的客户端,RedisTemplate是Spring Data Redis提供的操作模板,Redisson是一个完整的Redis客户端框架,而Redis连接池则用于管理客户端与Redis服务器之间的连接。它们之间的选择取决于项目的需求和开发者的偏好,可以根据具体场景选择最适合的客户端。
package com.fatfish.bloomfilter;import com.fatfish.config.RedisConfig; import com.fatfish.utils.JedisEnumSingleton; import com.fatfish.utils.RedisUtil; import com.google.common.base.Charsets; import com.google.common.hash.BloomFilter; import com.google.common.hash.Funnels; import org.redisson.Redisson; import org.redisson.api.RedissonClient; import org.redisson.config.Config; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.AnnotationConfigApplicationContext; import org.springframework.data.redis.connection.RedisConnection; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; import redis.clients.jedis.JedisPoolConfig;import javax.annotation.Resource; import java.util.HashMap; import java.util.Map;@Component public class RedisBloomFilter {// 布隆过滤器零一串大小private static final long EXPECTED_INSERTIONS = 1000000;// 预设的误判率,假阳性概率private static final double FALSE_POSITIVE_PROBABILITY = 0.03;// 主机号private static final String HOST = "127.0.0.1";// 端口号private static final int PORT = 6379;// 布隆过滤器private static BloomFilter<CharSequence> bloomFilter;// Jedisprivate static Jedis jedis;// Jedis Poolprivate static JedisPool jedisPool;// RedisTemplate —— 封装在 RedisUtil 中private static RedisUtil redisUtil;// redissonprivate static RedissonClient redissonClient;// 内存private static Map<String, String> memory;static {// 初始化内存memory = new HashMap<>();memory.put("张三", "北京市");memory.put("李四", "上海市");memory.put("王五", "广州市");// 初始化布隆过滤器bloomFilter = BloomFilter.create(Funnels.stringFunnel(Charsets.UTF_8), 1, 0.01);// 初始化 Jedis PooljedisPool = new JedisPool(new JedisPoolConfig(), HOST, PORT);// redisUtil 初始化redisUtil = new RedisUtil();// redissonClient 初始化Config config = new Config();config.useSingleServer().setAddress("redis://" + HOST + ":" + PORT);redissonClient = Redisson.create(config);// bloomFilter = redissonClient.getBloomFilter("myBloomFilter");// 初始化 Jedis// 方式 1 :Jedis直连jedis = JedisEnumSingleton.INSTANCE.getJedis();// 方式 2 :Jedis pooljedis = jedisPool.getResource();// 方式 3 :RedisTemplate —— 基于Jedis或Lettuce等底层连接池实现// 方式 4 :Redisson}public static void main(String[] args) {// 加载数据库中的相关数据到布隆过滤器loadDataFromDB2BloomFilter();// 查布隆过滤器,判断数据是否存在(Redis 或 内存)// 若数据不存在,则返回// 若数据存在,则继续;String[] querys = new String[]{"张三", "李四", "张三", "王五", "赵六"};for (String query : querys) {System.out.println("=================================");System.out.println("开始查询" + query);boolean mightContain = bloomFilter.mightContain(query);// 数据不存在,则返回if (!mightContain) {System.out.println(query + "不存在,请检查后再操作!");return;}// 数据存在,则继续else {// 查Redis 缓存 // queryRedisByJedis(query); // queryRedisByRedisTemplate(query);queryRedisByRedisson(query);}}if (redissonClient != null && !redissonClient.isShutdown()) {redissonClient.shutdown();}}/*** 查Redis 缓存* Redis 缓存存在,则直接返回* Redis 缓存不存在,则查内存,并将结果也写到Redis 缓存中** @param query 待查询的 key*/private static void queryRedisByRedisson(String query) {boolean exists = redissonClient.getBucket(query).isExists();// Redis 缓存存在,则直接返回if (exists) {String result = redissonClient.getBucket(query).get().toString();System.out.println("结果存在于Redis缓存, 查询结果:" + result);}// Redis 缓存不存在,则查内存,并将结果也写到Redis 缓存中else {// 查内存if (memory.containsKey(query)) {String result = memory.get(query);// 将结果也写到Redis 缓存中redissonClient.getBucket(query).set(result);System.out.println("结果存在于DB, 查询结果:" + result);}}}/*** 查Redis 缓存* Redis 缓存存在,则直接返回* Redis 缓存不存在,则查内存,并将结果也写到Redis 缓存中** @param query 待查询的 key*/private static void queryRedisByRedisTemplate(String query) {boolean exists = redisUtil.hasKey(query);// Redis 缓存存在,则直接返回if (exists) {String result = (String) redisUtil.get(query);System.out.println("结果存在于Redis缓存, 查询结果:" + result);}// Redis 缓存不存在,则查内存,并将结果也写到Redis 缓存中else {// 查内存if (memory.containsKey(query)) {String result = memory.get(query);// 将结果也写到Redis 缓存中redisUtil.set(query, result);System.out.println("结果存在于DB, 查询结果:" + result);}}}/*** 查Redis 缓存* Redis 缓存存在,则直接返回* Redis 缓存不存在,则查内存,并将结果也写到Redis 缓存中** @param query 待查询的 key*/private static void queryRedisByJedis(String query) {Boolean exists = jedis.exists(query);// Redis 缓存存在,则直接返回if (exists) {String result = jedis.get(query);System.out.println("结果存在于Redis缓存, 查询结果:" + result);}// Redis 缓存不存在,则查内存,并将结果也写到Redis 缓存中else {// 查内存if (memory.containsKey(query)) {String result = memory.get(query);// 将结果也写到Redis 缓存中jedis.set(query, result);System.out.println("结果存在于DB, 查询结果:" + result);}}}/*** 加载数据库中的相关数据到布隆过滤器*/private static void loadDataFromDB2BloomFilter() {System.out.println("开始加载数据库中的相关数据到布隆过滤器...");for (Map.Entry<String, String> cache : memory.entrySet()) {String name = cache.getKey();bloomFilter.put(name);}System.out.println("数据加载完成!");} }
十五、Redisson
【进阶篇】Redis实战之Redisson使用技巧详解,干活!-腾讯云开发者社区-腾讯云
Redis分布式锁-这一篇全了解(Redission实现分布式锁完美方案)-CSDN博客
参考
【狂神说Java】Redis最新超详细版教程通俗易懂
Redis笔记总结(狂神说) - 爲誰心殇 - 博客园
狂神说 Redis笔记_狂神说redis笔记-CSDN博客