Redisson—分布式集合详述

7.1. 映射(Map)

基于Redis的Redisson的分布式映射结构的RMap Java对象实现了java.util.concurrent.ConcurrentMap接口和java.util.Map接口。与HashMap不同的是,RMap保持了元素的插入顺序。该对象的最大容量受Redis限制,最大元素数量是4 294 967 295个。

除了同步接口外,还提供了异步(Async)、反射式(Reactive)和RxJava2标准的接口。如果你想用Redis Map来保存你的POJO的话,可以考虑使用分布式实时对象(Live Object)服务。

在特定的场景下,映射缓存(Map)上的高度频繁的读取操作,使网络通信都被视为瓶颈时,可以使用Redisson提供的带有本地缓存功能的映射。

RMap<String, SomeObject> map = redisson.getMap("anyMap");
SomeObject prevObject = map.put("123", new SomeObject());
SomeObject currentObject = map.putIfAbsent("323", new SomeObject());
SomeObject obj = map.remove("123");map.fastPut("321", new SomeObject());
map.fastRemove("321");RFuture<SomeObject> putAsyncFuture = map.putAsync("321");
RFuture<Void> fastPutAsyncFuture = map.fastPutAsync("321");map.fastPutAsync("321", new SomeObject());
map.fastRemoveAsync("321");

映射的字段锁的用法:

RMap<MyKey, MyValue> map = redisson.getMap("anyMap");
MyKey k = new MyKey();
RLock keyLock = map.getLock(k);
keyLock.lock();
try {MyValue v = map.get(k);// 其他业务逻辑
} finally {keyLock.unlock();
}RReadWriteLock rwLock = map.getReadWriteLock(k);
rwLock.readLock().lock();
try {MyValue v = map.get(k);// 其他业务逻辑
} finally {keyLock.readLock().unlock();
}

7.1.1. 映射(Map)的元素淘汰(Eviction),本地缓存(LocalCache)和数据分片(Sharding)

Redisson提供了一系列的映射类型的数据结构,这些结构按特性主要分为三大类:

  • 元素淘汰(Eviction) 类 -- 带有元素淘汰(Eviction)机制的映射类允许针对一个映射中每个元素单独设定 有效时间最长闲置时间
  • 本地缓存(LocalCache) 类 -- 本地缓存(Local Cache)也叫就近缓存(Near Cache)。这类映射的使用主要用于在特定的场景下,映射缓存(MapCache)上的高度频繁的读取操作,使网络通信都被视为瓶颈的情况。Redisson与Redis通信的同时,还将部分数据保存在本地内存里。这样的设计的好处是它能将读取速度提高最多 45倍 。 所有同名的本地缓存共用一个订阅发布话题,所有更新和过期消息都将通过该话题共享。
  • 数据分片(Sharding) 类 -- 数据分片(Sharding)类仅适用于Redis集群环境下,因此带有数据分片(Sharding)功能的映射也叫集群分布式映射。它利用分库的原理,将单一一个映射结构切分为若干个小的映射,并均匀的分布在集群中的各个槽里。这样的设计能使一个单一映射结构突破Redis自身的容量限制,让其容量随集群的扩大而增长。在扩容的同时,还能够使读写性能和元素淘汰处理能力随之成线性增长。

以下列表是Redisson提供的所有映射的名称及其特性:

接口名称
中文名称

RedissonClient
对应的构造方法

本地缓存功能
Local Cache

数据分片功能
Sharding

元素淘汰功能
Eviction

RMap
映射

getMap()

No

No

No

RMapCache
映射缓存

getMapCache()

No

No

Yes

RLocalCachedMap
本地缓存映射

getLocalCachedMap()

Yes

No

No

RLocalCachedMap
Cache
本地缓存映射缓存
仅限于Redisson PRO版本

getLocalCachedMapCache()

Yes

No

Yes

RClusteredMap
集群分布式映射存
仅限于Redisson PRO版本

getClusteredMap()

No

Yes

No

RClusteredMapCache
集群分布式映射缓存存
仅限于Redisson PRO版本

getClusteredMapCache()

No

Yes

Yes

RClusteredLocal
CachedMap
集群分布式本地缓存映射存
仅限于Redisson PRO版本

getClusteredLocal
CachedMap()

Yes

Yes

No

RClusteredLocal
CachedMapCache
集群分布式本地缓存映射缓存存
仅限于Redisson PRO版本

getClusteredLocal
CachedMapCache()

Yes

Yes

Yes

除此以外,Redisson还提供了Spring Cache和JCache的实现。

元素淘汰功能(Eviction)

Redisson的分布式的RMapCache Java对象在基于RMap的前提下实现了针对单个元素的淘汰机制。同时仍然保留了元素的插入顺序。由于RMapCache是基于RMap实现的,使它同时继承了java.util.concurrent.ConcurrentMap接口和java.util.Map接口。Redisson提供的Spring Cache整合以及JCache正是基于这样的功能来实现的。

目前的Redis自身并不支持散列(Hash)当中的元素淘汰,因此所有过期元素都是通过org.redisson.EvictionScheduler实例来实现定期清理的。为了保证资源的有效利用,每次运行最多清理300个过期元素。任务的启动时间将根据上次实际清理数量自动调整,间隔时间趋于1秒到1小时之间。比如该次清理时删除了300条元素,那么下次执行清理的时间将在1秒以后(最小间隔时间)。一旦该次清理数量少于上次清理数量,时间间隔将增加1.5倍。

RMapCache<String, SomeObject> map = redisson.getMapCache("anyMap");
// 有效时间 ttl = 10分钟
map.put("key1", new SomeObject(), 10, TimeUnit.MINUTES);
// 有效时间 ttl = 10分钟, 最长闲置时间 maxIdleTime = 10秒钟
map.put("key1", new SomeObject(), 10, TimeUnit.MINUTES, 10, TimeUnit.SECONDS);// 有效时间 = 3 秒钟
map.putIfAbsent("key2", new SomeObject(), 3, TimeUnit.SECONDS);
// 有效时间 ttl = 40秒钟, 最长闲置时间 maxIdleTime = 10秒钟
map.putIfAbsent("key2", new SomeObject(), 40, TimeUnit.SECONDS, 10, TimeUnit.SECONDS);

本地缓存功能(Local Cache)

在特定的场景下,映射(Map)上的高度频繁的读取操作,使网络通信都被视为瓶颈时,使用Redisson提供的带有本地缓存功能的分布式本地缓存映射RLocalCachedMapJava对象会是一个很好的选择。它同时实现了java.util.concurrent.ConcurrentMap和java.util.Map两个接口。本地缓存功能充分的利用了JVM的自身内存空间,对部分常用的元素实行就地缓存,这样的设计让读取操作的性能较分布式映射相比提高最多 45倍 。以下配置参数可以用来创建这个实例:

LocalCachedMapOptions options = LocalCachedMapOptions.defaults()// 用于淘汰清除本地缓存内的元素// 共有以下几种选择:// LFU - 统计元素的使用频率,淘汰用得最少(最不常用)的。// LRU - 按元素使用时间排序比较,淘汰最早(最久远)的。// SOFT - 元素用Java的WeakReference来保存,缓存元素通过GC过程清除。// WEAK - 元素用Java的SoftReference来保存, 缓存元素通过GC过程清除。// NONE - 永不淘汰清除缓存元素。.evictionPolicy(EvictionPolicy.NONE)// 如果缓存容量值为0表示不限制本地缓存容量大小.cacheSize(1000)// 以下选项适用于断线原因造成了未收到本地缓存更新消息的情况。// 断线重连的策略有以下几种:// CLEAR - 如果断线一段时间以后则在重新建立连接以后清空本地缓存// LOAD - 在服务端保存一份10分钟的作废日志//        如果10分钟内重新建立连接,则按照作废日志内的记录清空本地缓存的元素//        如果断线时间超过了这个时间,则将清空本地缓存中所有的内容// NONE - 默认值。断线重连时不做处理。.reconnectionStrategy(ReconnectionStrategy.NONE)// 以下选项适用于不同本地缓存之间相互保持同步的情况// 缓存同步策略有以下几种:// INVALIDATE - 默认值。当本地缓存映射的某条元素发生变动时,同时驱逐所有相同本地缓存映射内的该元素// UPDATE - 当本地缓存映射的某条元素发生变动时,同时更新所有相同本地缓存映射内的该元素// NONE - 不做任何同步处理.syncStrategy(SyncStrategy.INVALIDATE)// 每个Map本地缓存里元素的有效时间,默认毫秒为单位.timeToLive(10000)// 或者.timeToLive(10, TimeUnit.SECONDS)// 每个Map本地缓存里元素的最长闲置时间,默认毫秒为单位.maxIdle(10000)// 或者.maxIdle(10, TimeUnit.SECONDS);
RLocalCachedMap<String, Integer> map = redisson.getLocalCachedMap("test", options);String prevObject = map.put("123", 1);
String currentObject = map.putIfAbsent("323", 2);
String obj = map.remove("123");// 在不需要旧值的情况下可以使用fast为前缀的类似方法
map.fastPut("a", 1);
map.fastPutIfAbsent("d", 32);
map.fastRemove("b");RFuture<String> putAsyncFuture = map.putAsync("321");
RFuture<Void> fastPutAsyncFuture = map.fastPutAsync("321");map.fastPutAsync("321", new SomeObject());
map.fastRemoveAsync("321");

当不再使用Map本地缓存对象的时候应该手动销毁,如果Redisson对象被关闭(shutdown)了,则不用手动销毁。

RLocalCachedMap<String, Integer> map = ...
map.destroy();

如何通过加载数据的方式来降低过期淘汰事件发布信息对网络的影响

代码范例:

public void loadData(String cacheName, Map<String, String> data) {RLocalCachedMap<String, String> clearMap = redisson.getLocalCachedMap(cacheName, LocalCachedMapOptions.defaults().cacheSize(1).syncStrategy(SyncStrategy.INVALIDATE));RLocalCachedMap<String, String> loadMap = redisson.getLocalCachedMap(cacheName, LocalCachedMapOptions.defaults().cacheSize(1).syncStrategy(SyncStrategy.NONE));loadMap.putAll(data);clearMap.clearLocalCache();
}

数据分片功能(Sharding)

Map数据分片是Redis集群模式下的一个功能。Redisson提供的分布式集群映射RClusteredMap Java对象也是基于RMap实现的。它同时实现了java.util.concurrent.ConcurrentMap和java.util.Map两个接口。在这里可以获取更多的内部信息。

RClusteredMap<String, SomeObject> map = redisson.getClusteredMap("anyMap");SomeObject prevObject = map.put("123", new SomeObject());
SomeObject currentObject = map.putIfAbsent("323", new SomeObject());
SomeObject obj = map.remove("123");map.fastPut("321", new SomeObject());
map.fastRemove("321");

7.1.2. 映射持久化方式(缓存策略)

Redisson供了将映射中的数据持久化到外部储存服务的功能。主要场景有一下几种:

  1. 将Redisson的分布式映射类型作为业务和外部储存媒介之间的缓存。
  2. 或是用来增加Redisson映射类型中数据的持久性,或是用来增加已被驱逐的数据的寿命。
  3. 或是用来缓存数据库,Web服务或其他数据源的数据。

Read-through策略

通俗的讲,如果一个被请求的数据不存在于Redisson的映射中的时候,Redisson将通过预先配置好的MapLoader对象加载数据。

Write-through(数据同步写入)策略

在遇到映射中某条数据被更改时,Redisson会首先通过预先配置好的MapWriter对象写入到外部储存系统,然后再更新Redis内的数据。

Write-behind(数据异步写入)策略

对映射的数据的更改会首先写入到Redis,然后再使用异步的方式,通过MapWriter对象写入到外部储存系统。在并发环境下可以通过writeBehindThreads参数来控制写入线程的数量,已达到对外部储存系统写入并发量的控制。

以上策略适用于所有实现了RMap、RMapCache、RLocalCachedMap和RLocalCachedMapCache接口的对象。

配置范例:

MapOptions<K, V> options = MapOptions.<K, V>defaults().writer(myWriter).loader(myLoader);RMap<K, V> map = redisson.getMap("test", options);
// 或
RMapCache<K, V> map = redisson.getMapCache("test", options);
// 或
RLocalCachedMap<K, V> map = redisson.getLocalCachedMap("test", options);
// 或
RLocalCachedMapCache<K, V> map = redisson.getLocalCachedMapCache("test", options);

7.1.3. 映射监听器(Map Listener)

Redisson为所有实现了RMapCache或RLocalCachedMapCache接口的对象提供了监听以下事件的监听器:

事件 | 监听器 元素 添加 事件 | org.redisson.api.map.event.EntryCreatedListener
元素 过期 事件 | org.redisson.api.map.event.EntryExpiredListener
元素 删除 事件 | org.redisson.api.map.event.EntryRemovedListener
元素 更新 事件 | org.redisson.api.map.event.EntryUpdatedListener

使用范例:

RMapCache<String, Integer> map = redisson.getMapCache("myMap");
// 或
RLocalCachedMapCache<String, Integer> map = redisson.getLocalCachedMapCache("myMap", options);int updateListener = map.addListener(new EntryUpdatedListener<Integer, Integer>() {@Overridepublic void onUpdated(EntryEvent<Integer, Integer> event) {event.getKey(); // 字段名event.getValue() // 新值event.getOldValue() // 旧值// ...}
});int createListener = map.addListener(new EntryCreatedListener<Integer, Integer>() {@Overridepublic void onCreated(EntryEvent<Integer, Integer> event) {event.getKey(); // 字段名event.getValue() // 值// ...}
});int expireListener = map.addListener(new EntryExpiredListener<Integer, Integer>() {@Overridepublic void onExpired(EntryEvent<Integer, Integer> event) {event.getKey(); // 字段名event.getValue() // 值// ...}
});int removeListener = map.addListener(new EntryRemovedListener<Integer, Integer>() {@Overridepublic void onRemoved(EntryEvent<Integer, Integer> event) {event.getKey(); // 字段名event.getValue() // 值// ...}
});map.removeListener(updateListener);
map.removeListener(createListener);
map.removeListener(expireListener);
map.removeListener(removeListener);

7.1.4. LRU有界映射

Redisson提供了基于Redis的以LRU为驱逐策略的分布式LRU有界映射对象。顾名思义,分布式LRU有界映射允许通过对其中元素按使用时间排序处理的方式,主动移除超过规定容量限制的元素。

RMapCache<String, String> map = redisson.getMapCache("map");
// 尝试将该映射的最大容量限制设定为10
map.trySetMaxSize(10);// 将该映射的最大容量限制设定或更改为10
map.setMaxSize(10);map.put("1", "2");
map.put("3", "3", 1, TimeUnit.SECONDS);

7.2. 多值映射(Multimap)

基于Redis的Redisson的分布式RMultimap Java对象允许Map中的一个字段值包含多个元素。 字段总数受Redis限制,每个Multimap最多允许有4 294 967 295个不同字段。

7.2.1. 基于集(Set)的多值映射(Multimap)

基于Set的Multimap不允许一个字段值包含有重复的元素。

RSetMultimap<SimpleKey, SimpleValue> map = redisson.getSetMultimap("myMultimap");
map.put(new SimpleKey("0"), new SimpleValue("1"));
map.put(new SimpleKey("0"), new SimpleValue("2"));
map.put(new SimpleKey("3"), new SimpleValue("4"));Set<SimpleValue> allValues = map.get(new SimpleKey("0"));List<SimpleValue> newValues = Arrays.asList(new SimpleValue("7"), new SimpleValue("6"), new SimpleValue("5"));
Set<SimpleValue> oldValues = map.replaceValues(new SimpleKey("0"), newValues);Set<SimpleValue> removedValues = map.removeAll(new SimpleKey("0"));

7.2.2. 基于列表(List)的多值映射(Multimap)

基于List的Multimap在保持插入顺序的同时允许一个字段下包含重复的元素。

RListMultimap<SimpleKey, SimpleValue> map = redisson.getListMultimap("test1");
map.put(new SimpleKey("0"), new SimpleValue("1"));
map.put(new SimpleKey("0"), new SimpleValue("2"));
map.put(new SimpleKey("0"), new SimpleValue("1"));
map.put(new SimpleKey("3"), new SimpleValue("4"));List<SimpleValue> allValues = map.get(new SimpleKey("0"));Collection<SimpleValue> newValues = Arrays.asList(new SimpleValue("7"), new SimpleValue("6"), new SimpleValue("5"));
List<SimpleValue> oldValues = map.replaceValues(new SimpleKey("0"), newValues);List<SimpleValue> removedValues = map.removeAll(new SimpleKey("0"));

7.2.3. 多值映射(Multimap)淘汰机制(Eviction)

Multimap对象的淘汰机制是通过不同的接口来实现的。它们是RSetMultimapCache接口和RListMultimapCache接口,分别对应的是Set和List的Multimaps。

所有过期元素都是通过org.redisson.EvictionScheduler实例来实现定期清理的。为了保证资源的有效利用,每次运行最多清理100个过期元素。任务的启动时间将根据上次实际清理数量自动调整,间隔时间趋于1秒到2小时之间。比如该次清理时删除了100条元素,那么下次执行清理的时间将在1秒以后(最小间隔时间)。一旦该次清理数量少于上次清理数量,时间间隔将增加1.5倍。

RSetMultimapCache的使用范例:

RSetMultimapCache<String, String> multimap = redisson.getSetMultimapCache("myMultimap");
multimap.put("1", "a");
multimap.put("1", "b");
multimap.put("1", "c");multimap.put("2", "e");
multimap.put("2", "f");multimap.expireKey("2", 10, TimeUnit.MINUTES);

7.3. 集(Set)

基于Redis的Redisson的分布式Set结构的RSet Java对象实现了java.util.Set接口。通过元素的相互状态比较保证了每个元素的唯一性。该对象的最大容量受Redis限制,最大元素数量是4 294 967 295个。

RSet<SomeObject> set = redisson.getSet("anySet");
set.add(new SomeObject());
set.remove(new SomeObject());

Redisson PRO版本中的Set对象还可以在Redis集群环境下支持单集合数据分片。

7.3.1. 集(Set)淘汰机制(Eviction)

基于Redis的Redisson的分布式RSetCache Java对象在基于RSet的前提下实现了针对单个元素的淘汰机制。由于RSetCache是基于RSet实现的,使它还集成了java.util.Set接口。

目前的Redis自身并不支持Set当中的元素淘汰,因此所有过期元素都是通过org.redisson.EvictionScheduler实例来实现定期清理的。为了保证资源的有效利用,每次运行最多清理100个过期元素。任务的启动时间将根据上次实际清理数量自动调整,间隔时间趋于1秒到2小时之间。比如该次清理时删除了100条元素,那么下次执行清理的时间将在1秒以后(最小间隔时间)。一旦该次清理数量少于上次清理数量,时间间隔将增加1.5倍。

RSetCache<SomeObject> set = redisson.getSetCache("anySet");
// ttl = 10 seconds
set.add(new SomeObject(), 10, TimeUnit.SECONDS);

7.3.2. 集(Set)数据分片(Sharding)

Set数据分片是Redis集群模式下的一个功能。Redisson提供的分布式RClusteredSet Java对象也是基于RSet实现的。在这里可以获取更多的信息。

RClusteredSet<SomeObject> set = redisson.getClusteredSet("anySet");
set.add(new SomeObject());
set.remove(new SomeObject());

除了RClusteredSet以外,Redisson还提供了另一种集群模式下的分布式集(Set),它不仅提供了透明的数据分片功能,还为每个元素提供了淘汰机制。RClusteredSetCache 类分别同时提供了RClusteredSet 和RSetCache 这两个接口的实现。当然这些都是基于java.util.Set的接口实现上的。

该功能仅限于Redisson PRO版本。

7.4. 有序集(SortedSet)

基于Redis的Redisson的分布式RSortedSet Java对象实现了java.util.SortedSet接口。在保证元素唯一性的前提下,通过比较器(Comparator)接口实现了对元素的排序。

RSortedSet<Integer> set = redisson.getSortedSet("anySet");
set.trySetComparator(new MyComparator()); // 配置元素比较器
set.add(3);
set.add(1);
set.add(2);set.removeAsync(0);
set.addAsync(5);

7.5. 计分排序集(ScoredSortedSet)

基于Redis的Redisson的分布式RScoredSortedSet Java对象是一个可以按插入时指定的元素评分排序的集合。它同时还保证了元素的唯一性。

RScoredSortedSet<SomeObject> set = redisson.getScoredSortedSet("simple");set.add(0.13, new SomeObject(a, b));
set.addAsync(0.251, new SomeObject(c, d));
set.add(0.302, new SomeObject(g, d));set.pollFirst();
set.pollLast();int index = set.rank(new SomeObject(g, d)); // 获取元素在集合中的位置
Double score = set.getScore(new SomeObject(g, d)); // 获取元素的评分

7.6. 字典排序集(LexSortedSet)

基于Redis的Redisson的分布式RLexSortedSet Java对象在实现了java.util.Set<String>接口的同时,将其中的所有字符串元素按照字典顺序排列。它公式还保证了字符串元素的唯一性。

RLexSortedSet set = redisson.getLexSortedSet("simple");
set.add("d");
set.addAsync("e");
set.add("f");set.lexRangeTail("d", false);
set.lexCountHead("e");
set.lexRange("d", true, "z", false);

7.7. 列表(List)

基于Redis的Redisson分布式列表(List)结构的RList Java对象在实现了java.util.List接口的同时,确保了元素插入时的顺序。该对象的最大容量受Redis限制,最大元素数量是4 294 967 295个。

RList<SomeObject> list = redisson.getList("anyList");
list.add(new SomeObject());
list.get(0);
list.remove(new SomeObject());

7.8. 队列(Queue)

基于Redis的Redisson分布式无界队列(Queue)结构的RQueue Java对象实现了java.util.Queue接口。尽管RQueue对象无初始大小(边界)限制,但对象的最大容量受Redis限制,最大元素数量是4 294 967 295个。

RQueue<SomeObject> queue = redisson.getQueue("anyQueue");
queue.add(new SomeObject());
SomeObject obj = queue.peek();
SomeObject someObj = queue.poll();

7.9. 双端队列(Deque)

基于Redis的Redisson分布式无界双端队列(Deque)结构的RDeque Java对象实现了java.util.Deque接口。尽管RDeque对象无初始大小(边界)限制,但对象的最大容量受Redis限制,最大元素数量是4 294 967 295个。

RDeque<SomeObject> queue = redisson.getDeque("anyDeque");
queue.addFirst(new SomeObject());
queue.addLast(new SomeObject());
SomeObject obj = queue.removeFirst();
SomeObject someObj = queue.removeLast();

7.10. 阻塞队列(Blocking Queue)

基于Redis的Redisson分布式无界阻塞队列(Blocking Queue)结构的RBlockingQueue Java对象实现了java.util.concurrent.BlockingQueue接口。尽管RBlockingQueue对象无初始大小(边界)限制,但对象的最大容量受Redis限制,最大元素数量是4 294 967 295个。

RBlockingQueue<SomeObject> queue = redisson.getBlockingQueue("anyQueue");
queue.offer(new SomeObject());SomeObject obj = queue.peek();
SomeObject someObj = queue.poll();
SomeObject ob = queue.poll(10, TimeUnit.MINUTES);

poll, pollFromAny, pollLastAndOfferFirstTo和take方法内部采用话题订阅发布实现,在Redis节点故障转移(主从切换)或断线重连以后,内置的相关话题监听器将自动完成话题的重新订阅。

7.11. 有界阻塞队列(Bounded Blocking Queue)

基于Redis的Redisson分布式有界阻塞队列(Bounded Blocking Queue)结构的RBoundedBlockingQueue Java对象实现了java.util.concurrent.BlockingQueue接口。该对象的最大容量受Redis限制,最大元素数量是4 294 967 295个。队列的初始容量(边界)必须在使用前设定好。

RBoundedBlockingQueue<SomeObject> queue = redisson.getBoundedBlockingQueue("anyQueue");
// 如果初始容量(边界)设定成功则返回`真(true)`,
// 如果初始容量(边界)已近存在则返回`假(false)`。
queue.trySetCapacity(2);queue.offer(new SomeObject(1));
queue.offer(new SomeObject(2));
// 此时容量已满,下面代码将会被阻塞,直到有空闲为止。
queue.put(new SomeObject());SomeObject obj = queue.peek();
SomeObject someObj = queue.poll();
SomeObject ob = queue.poll(10, TimeUnit.MINUTES);

poll, pollFromAny, pollLastAndOfferFirstTo和take方法内部采用话题订阅发布实现,在Redis节点故障转移(主从切换)或断线重连以后,内置的相关话题监听器将自动完成话题的重新订阅。

7.12. 阻塞双端队列(Blocking Deque)

基于Redis的Redisson分布式无界阻塞双端队列(Blocking Deque)结构的RBlockingDeque Java对象实现了java.util.concurrent.BlockingDeque接口。尽管RBlockingDeque对象无初始大小(边界)限制,但对象的最大容量受Redis限制,最大元素数量是4 294 967 295个。

RBlockingDeque<Integer> deque = redisson.getBlockingDeque("anyDeque");
deque.putFirst(1);
deque.putLast(2);
Integer firstValue = queue.takeFirst();
Integer lastValue = queue.takeLast();
Integer firstValue = queue.pollFirst(10, TimeUnit.MINUTES);
Integer lastValue = queue.pollLast(3, TimeUnit.MINUTES);

poll, pollFromAny, pollLastAndOfferFirstTo和take方法内部采用话题订阅发布实现,在Redis节点故障转移(主从切换)或断线重连以后,内置的相关话题监听器将自动完成话题的重新订阅。

7.13. 阻塞公平队列(Blocking Fair Queue)

基于Redis的Redisson分布式无界阻塞公平队列(Blocking Fair Queue)结构的RBlockingFairQueue Java对象在实现Redisson分布式无界阻塞队列(Blocking Queue)结构RBlockingQueue接口的基础上,解决了多个队列消息的处理者在复杂的网络环境下,网络延时的影响使“较远”的客户端最终收到消息数量低于“较近”的客户端的问题。从而解决了这种现象引发的个别处理节点过载的情况。

以分布式无界阻塞队列为基础,采用公平获取消息的机制,不仅保证了poll、pollFromAny、pollLastAndOfferFirstTo和take方法获取消息的先入顺序,还能让队列里的消息被均匀的发布到处在复杂分布式环境中的各个处理节点里。

RBlockingFairQueue queue = redisson.getBlockingFairQueue("myQueue");
queue.offer(new SomeObject());SomeObject obj = queue.peek();
SomeObject someObj = queue.poll();
SomeObject ob = queue.poll(10, TimeUnit.MINUTES);

该功能仅限于Redisson PRO版本。

7.14. 阻塞公平双端队列(Blocking Fair Deque)

基于Redis的Redisson分布式无界阻塞公平双端队列(Blocking Fair Deque)结构的RBlockingFairDeque Java对象在实现Redisson分布式无界阻塞双端队列(Blocking Deque)结构RBlockingDeque接口的基础上,解决了多个队列消息的处理者在复杂的网络环境下,网络延时的影响使“较远”的客户端最终收到消息数量低于“较近”的客户端的问题。从而解决了这种现象引发的个别处理节点过载的情况。

以分布式无界阻塞双端队列为基础,采用公平获取消息的机制,不仅保证了poll、take、pollFirst、takeFirst、pollLast和takeLast方法获取消息的先入顺序,还能让队列里的消息被均匀的发布到处在复杂分布式环境中的各个处理节点里。

RBlockingFairDeque deque = redisson.getBlockingFairDeque("myDeque");
deque.offer(new SomeObject());SomeObject firstElement = queue.peekFirst();
SomeObject firstElement = queue.pollFirst();
SomeObject firstElement = queue.pollFirst(10, TimeUnit.MINUTES);
SomeObject firstElement = queue.takeFirst();SomeObject lastElement = queue.peekLast();
SomeObject lastElement = queue.pollLast();
SomeObject lastElement = queue.pollLast(10, TimeUnit.MINUTES);
SomeObject lastElement = queue.takeLast();

该功能仅限于Redisson PRO版本。

7.15. 延迟队列(Delayed Queue)

基于Redis的Redisson分布式延迟队列(Delayed Queue)结构的RDelayedQueue Java对象在实现了RQueue接口的基础上提供了向队列按要求延迟添加项目的功能。该功能可以用来实现消息传送延迟按几何增长或几何衰减的发送策略。

RQueue<String> distinationQueue = ...
RDelayedQueue<String> delayedQueue = getDelayedQueue(distinationQueue);
// 10秒钟以后将消息发送到指定队列
delayedQueue.offer("msg1", 10, TimeUnit.SECONDS);
// 一分钟以后将消息发送到指定队列
delayedQueue.offer("msg2", 1, TimeUnit.MINUTES);

在该对象不再需要的情况下,应该主动销毁。仅在相关的Redisson对象也需要关闭的时候可以不用主动销毁。

RDelayedQueue<String> delayedQueue = ...
delayedQueue.destroy();

7.16. 优先队列(Priority Queue)

基于Redis的Redisson分布式优先队列(Priority Queue)Java对象实现了java.util.Queue的接口。可以通过比较器(Comparator)接口来对元素排序。

RPriorityQueue<Integer> queue = redisson.getPriorityQueue("anyQueue");
queue.trySetComparator(new MyComparator()); // 指定对象比较器
queue.add(3);
queue.add(1);
queue.add(2);queue.removeAsync(0);
queue.addAsync(5);queue.poll();

7.17. 优先双端队列(Priority Deque)

基于Redis的Redisson分布式优先双端队列(Priority Deque)Java对象实现了java.util.Deque的接口。可以通过比较器(Comparator)接口来对元素排序。

RPriorityDeque<Integer> queue = redisson.getPriorityDeque("anyQueue");
queue.trySetComparator(new MyComparator()); // 指定对象比较器
queue.addLast(3);
queue.addFirst(1);
queue.add(2);queue.removeAsync(0);
queue.addAsync(5);queue.pollFirst();
queue.pollLast();

7.18. 优先阻塞队列(Priority Blocking Queue)

基于Redis的分布式无界优先阻塞队列(Priority Blocking Queue)Java对象的结构与java.util.concurrent.PriorityBlockingQueue类似。可以通过比较器(Comparator)接口来对元素排序。PriorityBlockingQueue的最大容量是4 294 967 295个元素。

RPriorityBlockingQueue<Integer> queue = redisson.getPriorityBlockingQueue("anyQueue");
queue.trySetComparator(new MyComparator()); // 指定对象比较器
queue.add(3);
queue.add(1);
queue.add(2);queue.removeAsync(0);
queue.addAsync(5);queue.take();

当Redis服务端断线重连以后,或Redis服务端发生主从切换,并重新建立连接后,断线时正在使用poll,pollLastAndOfferFirstTo或take方法的对象Redisson将自动再次为其订阅相关的话题。

7.19. 优先阻塞双端队列(Priority Blocking Deque)

基于Redis的分布式无界优先阻塞双端队列(Priority Blocking Deque)Java对象实现了java.util.Deque的接口。addLast、 addFirst、push方法不能再这个对里使用。PriorityBlockingDeque的最大容量是4 294 967 295个元素。

RPriorityBlockingDeque<Integer> queue = redisson.getPriorityBlockingDeque("anyQueue");
queue.trySetComparator(new MyComparator()); // 指定对象比较器
queue.add(2);queue.removeAsync(0);
queue.addAsync(5);queue.pollFirst();
queue.pollLast();
queue.takeFirst();
queue.takeLast();

当Redis服务端断线重连以后,或Redis服务端发生主从切换,并重新建立连接后,断线时正在使用poll,pollLastAndOfferFirstTo或take方法的对象Redisson将自动再次为其订阅相关的话题。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/95850.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Pyhon-每日一练(1)

&#x1f308;write in front&#x1f308; &#x1f9f8;大家好&#xff0c;我是Aileen&#x1f9f8;.希望你看完之后&#xff0c;能对你有所帮助&#xff0c;不足请指正&#xff01;共同学习交流. &#x1f194;本文由Aileen_0v0&#x1f9f8; 原创 CSDN首发&#x1f412; 如…

Vue中如何进行分布式路由配置与管理

Vue中的分布式路由配置与管理 随着现代Web应用程序的复杂性不断增加&#xff0c;分布式路由配置和管理成为了一个重要的主题。Vue.js作为一种流行的前端框架&#xff0c;提供了多种方法来管理Vue应用程序的路由。本文将深入探讨在Vue中如何进行分布式路由配置与管理&#xff0…

OpenWRT、Yocto 、Buildroot和Ubuntu有什么区别

OpenWRT&#xff1a; 用途&#xff1a;OpenWRT 是一个专注于路由器和嵌入式网络设备的Linux发行版。它提供了一个优化的Linux环境&#xff0c;旨在将网络设备变成功能丰富、高度可定制的路由器。 包管理器&#xff1a;OpenWRT 使用 opkg 包管理器&#xff0c;它是一个轻量级的…

Spring Boot中的@Controller使用教程

一 Controller使用方法&#xff0c;如下所示&#xff1a; Controller是SpringBoot里最基本的组件&#xff0c;他的作用是把用户提交来的请求通过对URL的匹配&#xff0c;分配个不同的接收器&#xff0c;再进行处理&#xff0c;然后向用户返回结果。下面通过本文给大家介绍Spr…

linux内核分析:虚拟化

https://www.cnblogs.com/wujuntian/p/16294898.html 三种虚拟化方式 1. 对于虚拟机内核来讲,只要将标志位设为虚拟机状态,我们就可以直接在 CPU 上执行大部分的指令,不需要虚拟化软件在中间转述,除非遇到特别敏感的指令,才需要将标志位设为物理机内核态运行,这样大大…

minio分布式文件存储

基本介绍 什么是 MinIO MinIO 是一款基于 Go 语言的高性能、可扩展、云原生支持、操作简单、开源的分布式对象存储产品。基于 Apache License v2.0 开源协议&#xff0c;虽然轻量&#xff0c;却拥有着不错的性能。它兼容亚马逊S3云存储服务接口。可以很简单的和其他应…

Java方法:重复使用的操作可以写成方法哦

&#x1f451;专栏内容&#xff1a;Java⛪个人主页&#xff1a;子夜的星的主页&#x1f495;座右铭&#xff1a;前路未远&#xff0c;步履不停 目录 一、方法的概念1、什么是方法&#xff1f;2、方法的定义3、方法调用的过程 二、方法重载1、重载的概念2、方法签名 在日常生活中…

使用 python 检测泛洪攻击的案例

使用 python 检测泛洪攻击的案例 本案例只使用python标准库通过执行命令来监控异常请求, 并封锁IP, 不涉及其他第三方库工具. import os import time from collections import Counter# 1、update 命令, 采集CPU的平均负载 def get_cpu_load():"""uptime 命令…

计算机网络 快速了解网络层次、常用协议、常见物理设备。 掌握程序员必备网络基础知识!!!

文章目录 0 引言1 基础知识的定义1.1 计算机网络层次1.2 网络供应商 ISP1.3 猫、路由器、交换机1.4 IP协议1.5 TCP、UDP协议1.6 HTTP、HTTPS、FTP协议1.7 Web、Web浏览器、Web服务器1.8 以太网和WLAN1.9 Socket &#xff08;网络套接字&#xff09; 2 总结 0 引言 在学习的过程…

【Java-LangChain:使用 ChatGPT API 搭建系统-2】语言模型,提问范式与 Token

第二章 语言模型&#xff0c;提问范式与 Token 在本章中&#xff0c;我们将和您分享大型语言模型&#xff08;LLM&#xff09;的工作原理、训练方式以及分词器&#xff08;tokenizer&#xff09;等细节对 LLM 输出的影响。我们还将介绍 LLM 的提问范式&#xff08;chat format…

KNN(下):数据分析 | 数据挖掘 | 十大算法之一

⭐️⭐️⭐️⭐️⭐️欢迎来到我的博客⭐️⭐️⭐️⭐️⭐️ 🐴作者:秋无之地 🐴简介:CSDN爬虫、后端、大数据领域创作者。目前从事python爬虫、后端和大数据等相关工作,主要擅长领域有:爬虫、后端、大数据开发、数据分析等。 🐴欢迎小伙伴们点赞👍🏻、收藏⭐️、…

postgresql-物化视图

postgresql-物化视图 物化视图创建物化视图刷新物化视图修改物化视图删除物化视图 物化视图 创建物化视图 postgresql使用create materialized view 语句创建视图 create materialized view if not exists name as query [with [NO] data];-- 创建一个包含员工统计信息的物化…

java遇到的问题

java遇到的问题 Tomcat与JDK版本问题 当使用Tomcat10的版本用于springmvc借用浏览器调试时&#xff0c;使用JDK8浏览器会报异常。 需要JDK17&#xff08;可以配置多个JDK环境&#xff0c;切换使用&#xff09;才可以使用&#xff0c;配置为JAVA_HOME路径&#xff0c;否则&a…

【AI视野·今日Robot 机器人论文速览 第四十七期】Wed, 4 Oct 2023

AI视野今日CS.Robotics 机器人学论文速览 Wed, 4 Oct 2023 Totally 40 papers &#x1f449;上期速览✈更多精彩请移步主页 Interesting: &#x1f4da;基于神经网络的多模态触觉感知, classification, position, posture, and force of the grasped object多模态形象的解耦(f…

PostgreSQL进阶

PostgreSQL进阶&#xff1a;发挥强大功能的数据库技巧 PostgreSQL&#xff0c;作为一款强大且高度可定制的关系型数据库管理系统&#xff08;RDBMS&#xff09;&#xff0c;提供了众多高级功能和功能&#xff0c;使其成为开发人员和数据库管理员的首选。在这篇博客中&#xff0…

苹果系统_安装matplotlib__pygame,以pycharm导入模块

为了更便捷、连贯的进行python编程学习&#xff0c;尽量在开始安装python软件时&#xff0c;将编辑器、模块一并安装好&#xff0c;这样能避免以后版本冲突的问题。小白在开始安装pycharm、pip、matplotlib往往会遇到一些问题&#xff0c;文中列示其中部分bug&#xff0c;供大家…

VL53L5CX驱动开发(1)----驱动TOF进行区域检测

VL53L5CX驱动开发----1.驱动TOF进行区域检测 闪烁定义视频教学样品申请源码下载主要特点硬件准备技术规格系统框图应用示意图区域映射生成STM32CUBEMX选择MCU 串口配置IIC配置X-CUBE-TOF1串口重定向代码配置Tera Term配置演示结果 闪烁定义 VL53L5CX是一款先进的飞行感应&…

Reset信号如何同步?

首先来复习一个更加基础的概念&#xff1a;同步reset和异步reset。 同步reset&#xff08;synchronous reset&#xff09;是说&#xff0c;当reset信号为active的时候&#xff0c;寄存器在下一个时钟沿到来后被复位&#xff0c;时钟沿到来之前寄存器还是保持其之前的值。 异步…

xxl-job的原理(2)—调度中心管理注册信息

一、调度中心管理注册信息 1.JobApiController 执行器调用调度中心的url来实现注册、下线、回调等操作&#xff1b;其主要的实现类是JobApiController&#xff0c;调用/api/registry接口注册执行器信息&#xff0c;调用/api/registryRemove接口下线执行器信息&#xff0c;调用…

操作系统和进程相关的认识

目录 冯诺依曼体系结构 冯诺依曼体系结构五大组成部分 为什么数据只能通过存储器进行输入和输出 操作系统 概念一&#xff1a;访问操作系统的请求都是通过系统调用完成的 操作系统如何管理用户信息 概念二&#xff1a;先描述&#xff0c;再组织。 进程的概念 在认识进行相关的知…