Redisson—分布式服务

一、 分布式远程服务(Remote Service)

基于Redis的Java分布式远程服务,可以用来通过共享接口执行存在于另一个Redisson实例里的对象方法。换句话说就是通过Redis实现了Java的远程过程调用(RPC)。分布式远程服务基于可以用POJO对象,方法的参数和返回类不受限制,可以是任何类型。

分布式远程服务(Remote Service)提供了两种类型的RRemoteService实例:

  • 服务端(远端)实例 - 用来执行远程方法(工作者实例即worker instance). 例如:
RRemoteService remoteService = redisson.getRemoteService();
SomeServiceImpl someServiceImpl = new SomeServiceImpl();// 在调用远程方法以前,应该首先注册远程服务
// 只注册了一个服务端工作者实例,只能同时执行一个并发调用
remoteService.register(SomeServiceInterface.class, someServiceImpl);// 注册了12个服务端工作者实例,可以同时执行12个并发调用
remoteService.register(SomeServiceInterface.class, someServiceImpl, 12);
  • 客户端(本地)实例 - 用来请求远程方法. 例如:
RRemoteService remoteService = redisson.getRemoteService();
SomeServiceInterface service = remoteService.get(SomeServiceInterface.class);String result = service.doSomeStuff(1L, "secondParam", new AnyParam());

客户端和服务端必须使用一样的共享接口,生成两者的Redisson实例必须采用相同的连接配置。客户端和服务端实例可以运行在同一个JVM里,也可以是不同的。客户端和服务端的数量不收限制。(注意:尽管Redisson不做任何限制,但是Redis的限制仍然有效。)

在服务端工作者可用实例数量 大于1 的时候,将并行执行并发调用的远程方法。

并行执行工作者数量计算方法如下: T = R * N

T - 并行执行工作者总数 R - Redisson服务端数量 N - 注册服务端时指定的执行工作者数量

超过该数量的并发请求将在列队中等候执行。

在服务端工作者实例可用数量为 1 时,远程过程调用将会按 顺序执行。这种情况下,每次只有一个请求将会被执行,其他请求将在列队中等候执行。

1. 分布式远程服务工作流程

分布式远程服务为每个注册接口建立了两个列队。一个列队用于请求,由服务端监听,另一个列队用于应答回执和结果回复,由客户端监听。应答回执用于判定该请求是否已经被接受。如果在指定的超时时间内没有被执行工作者执行将会抛出RemoteServiceAckTimeoutException错误。

下图描述了每次发起远程过程调用请求的工作流程。

2. 发送即不管(Fire-and-Forget)模式和应答回执(Ack-Response)模式

分布式远程服务通过org.redisson.core.RemoteInvocationOptions类,为每个远程过程调用提供了一些可配置选项。这些选项可以用来指定和修改请求超时和选择跳过应答回执或结果的发送模式。例如:

// 应答回执超时1秒钟,远程执行超时30秒钟
RemoteInvocationOptions options = RemoteInvocationOptions.defaults();// 无需应答回执,远程执行超时30秒钟
RemoteInvocationOptions options = RemoteInvocationOptions.defaults().noAck();// 应答回执超时1秒钟,不等待执行结果
RemoteInvocationOptions options = RemoteInvocationOptions.defaults().noResult();// 应答回执超时1分钟,不等待执行结果
RemoteInvocationOptions options = RemoteInvocationOptions.defaults().expectAckWithin(1, TimeUnit.MINUTES).noResult();// 发送即不管(Fire-and-Forget)模式,无需应答回执,不等待结果
RemoteInvocationOptions options = RemoteInvocationOptions.defaults().noAck().noResult();RRemoteService remoteService = redisson.getRemoteService();
YourService service = remoteService.get(YourService.class, options);

3. 异步调用

远程过程调用也可以采用异步的方式执行。异步调用需要单独提交一个带有@RRemoteAsync注解(annotation)的异步接口类。异步接口方法签名必须与远程接口的方法签名相符。异步接口的返回类必须是org.redisson.api.RFuture对象或其子对象。在调用RRemoteService.get方法时将对异步接口的方法进行验证。异步接口无须包含所有的远程接口里的方法,只需要包含要求异步执行的方法即可。

// 远程接口
public interface RemoteInterface {Long someMethod1(Long param1, String param2);void someMethod2(MyObject param);MyObject someMethod3();}// 匹配远程接口的异步接口
@RRemoteAsync(RemoteInterface.class)
public interface RemoteInterfaceAsync {RFuture<Long> someMethod1(Long param1, String param2);RFuture<Void> someMethod2(MyObject param);}RRemoteService remoteService = redisson.getRemoteService();
RemoteInterfaceAsync asyncService = remoteService.get(RemoteInterfaceAsync.class);

4. 取消异步调用

通过调用Future.cancel()方法可以非常方便的取消一个异步调用。分布式远程服务允许在三个阶段中任何一个阶段取消异步调用:

  1. 远程调用请求在列队中排队阶段
  2. 远程调用请求已经被分布式远程服务接受,还未发送应答回执,执行尚未开始。
  3. 远程调用请求已经在执行阶段

想要正确的处理第三个阶段,在服务端代码里应该检查Thread.currentThread().isInterrupted()的返回状态。范例如下:

// 远程接口
public interface MyRemoteInterface {Long myBusyMethod(Long param1, String param2);}// 匹配远程接口的异步接口
@RRemoteAsync(MyRemoteInterface.class)
public interface MyRemoteInterfaceAsync {RFuture<Long> myBusyMethod(Long param1, String param2);}// 远程接口的实现
public class MyRemoteServiceImpl implements MyRemoteInterface {public Long myBusyMethod(Long param1, String param2) {for (long i = 0; i < Long.MAX_VALUE; i++) {iterations.incrementAndGet();if (Thread.currentThread().isInterrupted()) {System.out.println("interrupted! " + i);return;}}}}RRemoteService remoteService = redisson.getRemoteService();
ExecutorService executor = Executors.newFixedThreadPool(5);
// 注册远程服务的服务端的同时,通过单独指定的ExecutorService来配置执行线程池
MyRemoteInterface serviceImpl = new MyRemoteServiceImpl();
remoteService.register(MyRemoteInterface.class, serviceImpl, 5, executor);// 异步调用方法
MyRemoteInterfaceAsync asyncService = remoteService.get(MyRemoteInterfaceAsync.class);
RFuture<Long> future = asyncService.myBusyMethod(1L, "someparam");
// 取消异步调用
future.cancel(true);

二、分布式实时对象(Live Object)服务

1. 介绍

一个 分布式实时对象(Live Object) 可以被理解为一个功能强化后的Java对象。该对象不仅可以被一个JVM里的各个线程相引用,还可以被多个位于不同JVM里的线程同时引用。Wikipedia对这种特殊对象的概述是:

Live distributed object (also abbreviated as live object) refers to a running instance of a distributed multi-party (or peer-to-peer) protocol, viewed from the object-oriented perspective, as an entity that has a distinct identity, may encapsulate internal state and threads of execution, and that exhibits a well-defined externally visible behavior.

Redisson分布式实时对象(Redisson Live Object,简称RLO)运用即时生成的代理类(Proxy),将一个指定的普通Java类里的所有字段,以及针对这些字段的操作全部映射到一个Redis Hash的数据结构,实现这种理念。每个字段的get和set方法最终被转译为针对同一个Redis Hash的hget和hset命令,从而使所有连接到同一个Redis节点的所有可以客户端同时对一个指定的对象进行操作。众所周知,一个对象的状态是由其内部的字段所赋的值来体现的,通过将这些值保存在一个像Redis这样的远程共享的空间的过程,把这个对象强化成了一个分布式对象。这个分布式对象就叫做Redisson分布式实时对象(Redisson Live Object,简称RLO)。

通过使用RLO,运行在不同服务器里的多个程序之间,共享一个对象实例变得和在单机程序里共享一个对象实例一样了。同时还避免了针对任何一个字段操作都需要将整个对象序列化和反序列化的繁琐,进而降低了程序开发的复杂性和其数据模型的复杂性:从任何一个客户端修改一个字段的值,处在其他服务器上的客户端(几乎^)即刻便能查看到。而且实现代码与单机程序代码无异。(^连接到从节点的客户端仍然受Redis的最终一致性的特性限制)

鉴于Redis是一个单线程的程序,针对实时对象的所有的字段操作可以理解为全部是原子性操作,也就是说在读取一个字段的过程不会担心被其他线程所修改。

通过使用RLO,可以把Redis当作一个允许被多个JVM同时操作且不受GC影响的共享堆(Heap Space)。

2. 使用方法

Redisson为分布式实时对象提供了一系列不同功能的注解,其中@REntity和@RId两个注解是分布式实时对象的必要条件。

@REntity
public class MyObject {@RIdprivate String id;@RIndexprivate String value;private MyObject parent;public MyObject(String id) {this.id = id;}public MyObject() {}// getters and setters}

在开始使用分布式实时对象以前,需要先通过Redisson服务将指定的对象连接(attach),合并(merge)或持久化(persist)到Redis里。

RLiveObjectService service = redisson.getLiveObjectService();
MyLiveObject myObject = new MyLiveObject();
myObject.setId("1");
// 将myObject对象当前的状态持久化到Redis里并与之保持同步。
myObject = service.persist(myObject);MyLiveObject myObject = new MyLiveObject("1");
// 抛弃myObject对象当前的状态,并与Redis里的数据建立连接并保持同步。
myObject = service.attach(myObject);MyLiveObject myObject = new MyLiveObject();
myObject.setId("1");
// 将myObject对象当前的状态与Redis里的数据合并之后与之保持同步。
myObject = service.merge(myObject);
myObject.setValue("somevalue");// 通过ID获取分布式实时对象
MyLiveObject myObject = service.get(MyLiveObject.class, "1");// 通过索引查找分布式实时对象
Collection<MyLiveObject> myObjects = service.find(MyLiveObject.class, Conditions.in("value", "somevalue", "somevalue2"));Collection<MyLiveObject> myObjects = service.find(MyLiveObject.class, Conditions.and(Conditions.in("value", "somevalue", "somevalue2"), Conditions.eq("secondfield", "test")));

“parent”字段中包含了指向到另一个分布式实时对象的引用,它可以与包含类是同一类型也可以不同。Redisson内部采用了与Java的引用类似的方式保存这个关系,而非将全部对象序列化,可视为与普通的引用同等效果。

//RLO对象:
MyObject myObject = service.get(MyObject.class, "1");
MyObject myParentObject = service.get(MyObject.class, "2");
myObject.setValue(myParentObject);

RLO的字段类型基本上无限制,可以是任何类型。比如Java util包里的集合类,Map类等,也可以是自定义的对象。只要指定的编码解码器能够对其进行编码和解码操作便可。关于编码解码器的详细信息请查阅高级使用方法章节。

尽管RLO的字段类型基本上无限制,个别类型还是受限。注解了RId的字段类型不能是数组类(Array),比如int[],long[],double[],byte[]等等。更多关于限制有关的介绍和原理解释请查阅使用限制 章节。

为了保证RLO的用法和普通Java对象的用法尽可能一直,Redisson分布式实时对象服务自动将以下普通Java对象转换成与之匹配的Redisson分布式对象RObject。

普通Java类

转换后的Redisson类

SortedSet.class

RedissonSortedSet.class

Set.class

RedissonSet.class

ConcurrentMap.class

RedissonMap.class

Map.class

RedissonMap.class

BlockingDeque.class

RedissonBlockingDeque.class

Deque.class

RedissonDeque.class

BlockingQueue.class

RedissonBlockingQueue.class

Queue.class

RedissonQueue.class

List.class

RedissonList.class

类型转换将按照从上至下的顺序匹配类型,例如LinkedList类同时实现了Deque,List和Queue,由于Deque排在靠上的位置,因此它将会被转换成一个RedissonDeque类型。

Redisson的分布式对象也采用类似的方式,将自身的状态储存于Redis当中,(几乎^)所有的状态改变都直接映射到Redis里,不在本地JVM中保留任何赋值。(^本地缓存对象除外,比如RLocalCachedMap)

3. 高级使用方法

正如上述介绍,RLO类其实都是按需实时生成的代理(Proxy)类。生成的代理类和原类都一同缓存Redisson实例里。这个过程会消耗一些时间,在对耗时比较敏感的情况下,建议通过RedissonLiveObjectService提前注册所有的RLO类。这个服务也可以用来注销不再需要的RLO类,也可以用来查询一个类是否已经注册了。

RLiveObjectService service = redisson.getLiveObjectService();
service.registerClass(MyClass.class);
service.unregisterClass(MyClass.class);
Boolean registered = service.isClassRegistered(MyClass.class);

4. 注解(Annotation)使用方法

@REntity

仅适用于类。通过指定@REntity的各个参数,可以详细的对每个RLO类实现特殊定制,以达到改变RLO对象的行为。

  • namingScheme - 命名方案。命名方案规定了每个实例在Redis中对应key的名称。它不仅被用来与已存在的RLO建立关联,还被用来储存新建的RLO实例。默认采用Redisson自带的DefaultNamingScheme对象。
  • codec - 编码解码器。在运行当中,Redisson用编码解码器来对RLO中的每个字段进行编码解码。Redisson内部采用了实例池管理不同类型的编码解码器实例。Redisson提供了多种不同的编码解码器,默认使用JsonJacksonCodec。
  • fieldTransformation - 字段转换模式。如上所述,为了尽可能的保证RLO的用法和普通Java对象一致,Redisson会自动将常用的普通Java对象转换成与其匹配的Redisson分布式对象。这是由于字段转换模式的默认值是ANNOTATION_BASED,修改为IMPLEMENTATION_BASED就可以不转换。

@RId

仅适用于字段。@RId注解只能用在具备区分实例的字段上,这类字段可以理解为一个类的id字段或主键字段。这个字段的值将被命名方案namingScheme用来与事先存在的RLO建立引用。加了该注解的字段是唯一在本地JVM里同时保存赋值的字段。一个类只能有一个字段包含@RId注解。

可以通过指定一个生成器generator策略来实现自动生成这个字段的值。默认不提供生成器。

@RIndex

仅适用于字段。用来指定可用于搜索的字段。可以通过RLiveObjectService.find方法来根据条件精细查找分布式实时对象。查询条件可以是含(IN),或(OR),和(AND)或相等(EQ)以及它们的任意组合。

使用范例如下:

public class MyObject {@RIndexString field1;@RIndexString field2;@RIndexString field3;
}Collection<MyObject> objects = RLiveObjectService.find(MyObject.class, Conditions.or(Conditions.and(Conditions.eq("field1", "value"), Conditions.eq("field2", "value")), Conditions.in("field3", "value1", "value2"));

@RObjectField

仅适用于字段。允许通过该注解中的namingScheme或codec来改变该字段的命名或编码方式,用来区别于@REntity中指定的预设方式。

@RCascade

仅适用于字段。用来指定包含于分布式实时对象字段内其它对象的级联操作方式。

可选的级联操作方式为如下:

RCascadeType.ALL - 执行所有级联操作
RCascadeType.PERSIST - 仅在执行RLiveObjectService.persist()方法时进行级联操作 RCascadeType.DETACH - 仅在执行RLiveObjectService.detach()方法时进行级联操作 RCascadeType.MERGE - 仅在执行RLiveObjectService.merge()方法时进行级联操作 RCascadeType.DELETE - 仅在执行RLiveObjectService.delete()方法时进行级联操作

5. 使用限制

如上所述,带有RId注解字段的类型不能使数组类,这是因为目前默认的命名方案类DefaultNamingScheme还不能正确地将数组类序列化和反序列化。在改善了DefaultNamingScheme类的不足以后会考虑取消这个限制。另外由于带有RId注解的字段是用来指定Redis中映射的key的名称,因此组建一个只含有唯一一个字段的RLO类是毫无意义的。选用RBucket会更适合这样的场景。

三、分布式执行服务(Executor Service)

1. 分布式执行服务概述

Redisson的分布式执行服务实现了java.util.concurrent.ExecutorService接口,支持在不同的独立节点里执行基于java.util.concurrent.Callable接口或java.lang.Runnable接口或Lambda的任务。这样的任务也可以通过使用Redisson实例,实现对储存在Redis里的数据进行操作。Redisson分布式执行服务是最快速和有效执行分布式运算的方法。

2. 任务

Redisson独立节点不要求任务的类在类路径里。他们会自动被Redisson独立节点的ClassLoader加载。因此每次执行一个新任务时,不需要重启Redisson独立节点。

采用Callable任务的范例:

public class CallableTask implements Callable<Long> {@RInjectprivate RedissonClient redissonClient;@Overridepublic Long call() throws Exception {RMap<String, Integer> map = redissonClient.getMap("myMap");Long result = 0;for (Integer value : map.values()) {result += value;}return result;}}

采用Runnable任务的范例:

public class RunnableTask implements Runnable {@RInjectprivate RedissonClient redissonClient;private long param;public RunnableTask() {}public RunnableTask(long param) {this.param = param;}@Overridepublic void run() {RAtomicLong atomic = redissonClient.getAtomicLong("myAtomic");atomic.addAndGet(param);}}

在创建ExecutorService时可以配置以下参数:

ExecutorOptions options = ExecutorOptions.defaults()// 指定重新尝试执行任务的时间间隔。
// ExecutorService的工作节点将等待10分钟后重新尝试执行任务
//
// 设定为0则不进行重试
//
// 默认值为5分钟
options.taskRetryInterval(10, TimeUnit.MINUTES);
RExecutorService executorService = redisson.getExecutorService("myExecutor", options);
executorService.submit(new RunnableTask(123));RExecutorService executorService = redisson.getExecutorService("myExecutor", options);
Future<Long> future = executorService.submit(new CallableTask());
Long result = future.get();

使用Lambda任务的范例:

RExecutorService executorService = redisson.getExecutorService("myExecutor", options);
Future<Long> future = executorService.submit((Callable & Serializable)() -> {System.out.println("task has been executed!");
});
Long result = future.get();

可以通过@RInject注解来为任务实时注入Redisson实例依赖。

3. 取消任务

通过Future.cancel()方法可以很方便的取消所有已提交的任务。通过对Thread.currentThread().isInterrupted()方法的调用可以在已经处于运行状态的任务里实现任务中断:

public class CallableTask implements Callable<Long> {@RInjectprivate RedissonClient redissonClient;@Overridepublic Long call() throws Exception {RMap<String, Integer> map = redissonClient.getMap("myMap");Long result = 0;// map里包含了许多的元素for (Integer value : map.values()) {if (Thread.currentThread().isInterrupted()) {// 任务被取消了return null;}result += value;}return result;}}RExecutorService executorService = redisson.getExecutorService("myExecutor");
Future<Long> future = executorService.submit(new CallableTask());
// 或
RFuture<Long> future = executorService.submitAsync(new CallableTask());
// ...
future.cancel(true);

四、 分布式调度任务服务(Scheduler Service)

1. 分布式调度任务服务概述

Redisson的分布式调度任务服务实现了java.util.concurrent.ScheduledExecutorService接口,支持在不同的独立节点里执行基于java.util.concurrent.Callable接口或java.lang.Runnable接口的任务。Redisson独立节点按顺序运行Redis列队里的任务。调度任务是一种需要在未来某个指定时间运行一次或多次的特殊任务。

2. 设定任务计划

Redisson独立节点不要求任务的类在类路径里。他们会自动被Redisson独立节点的ClassLoader加载。因此每次执行一个新任务时,不需要重启Redisson独立节点。

采用Callable任务的范例:

public class CallableTask implements Callable<Long> {@RInjectprivate RedissonClient redissonClient;@Overridepublic Long call() throws Exception {RMap<String, Integer> map = redissonClient.getMap("myMap");Long result = 0;for (Integer value : map.values()) {result += value;}return result;}}

在创建ExecutorService时可以配置以下参数:

ExecutorOptions options = ExecutorOptions.defaults()// 指定重新尝试执行任务的时间间隔。
// ExecutorService的工作节点将等待10分钟后重新尝试执行任务
//
// 设定为0则不进行重试
//
// 默认值为5分钟
options.taskRetryInterval(10, TimeUnit.MINUTES);
RScheduledExecutorService executorService = redisson.getExecutorService("myExecutor");
ScheduledFuture<Long> future = executorService.schedule(new CallableTask(), 10, TimeUnit.MINUTES);
Long result = future.get();

使用Lambda任务的范例:

RExecutorService executorService = redisson.getExecutorService("myExecutor", options);
ScheduledFuture<Long> future = executorService.schedule((Callable & Serializable)() -> {System.out.println("task has been executed!");
}, 10, TimeUnit.MINUTES);
Long result = future.get();

采用Runnable任务的范例:

public class RunnableTask implements Runnable {@RInjectprivate RedissonClient redissonClient;private long param;public RunnableTask() {}public RunnableTask(long param) {this.param= param;}@Overridepublic void run() {RAtomicLong atomic = redissonClient.getAtomicLong("myAtomic");atomic.addAndGet(param);}}RScheduledExecutorService executorService = redisson.getExecutorService("myExecutor");
ScheduledFuture<?> future1 = executorService.schedule(new RunnableTask(123), 10, TimeUnit.HOURS);
// ...
ScheduledFuture<?> future2 = executorService.scheduleAtFixedRate(new RunnableTask(123), 10, 25, TimeUnit.HOURS);
// ...
ScheduledFuture<?> future3 = executorService.scheduleWithFixedDelay(new RunnableTask(123), 5, 10, TimeUnit.HOURS);

3. 通过CRON表达式设定任务计划

在分布式调度任务中,可以通过CRON表达式来为任务设定一个更复杂的计划。表达式与Quartz的CRON格式完全兼容。

例如:

RScheduledExecutorService executorService = redisson.getExecutorService("myExecutor");
executorService.schedule(new RunnableTask(), CronSchedule.of("10 0/5 * * * ?"));
// ...
executorService.schedule(new RunnableTask(), CronSchedule.dailyAtHourAndMinute(10, 5));
// ...
executorService.schedule(new RunnableTask(), CronSchedule.weeklyOnDayAndHourAndMinute(12, 4, Calendar.MONDAY, Calendar.FRIDAY));

4. 取消计划任务

分布式调度任务服务提供了两张取消任务的方式:通过调用ScheduledFuture.cancel()方法或调用RScheduledExecutorService.cancelScheduledTask方法。通过对Thread.currentThread().isInterrupted()方法的调用可以在已经处于运行状态的任务里实现任务中断:

public class RunnableTask implements Callable<Long> {@RInjectprivate RedissonClient redissonClient;@Overridepublic Long call() throws Exception {RMap<String, Integer> map = redissonClient.getMap("myMap");Long result = 0;// map里包含了许多的元素for (Integer value : map.values()) {if (Thread.currentThread().isInterrupted()) {// 任务被取消了return null;}result += value;}return result;}}RScheduledExecutorService executorService = redisson.getExecutorService("myExecutor");
RScheduledFuture<Long> future = executorService.scheduleAsync(new RunnableTask(), CronSchedule.dailyAtHourAndMinute(10, 5));
// ...
future.cancel(true);
// 或
String taskId = future.getTaskId();
// ...
executorService.cancelScheduledTask(taskId);

五、分布式映射归纳服务(MapReduce)

1. 介绍

Redisson提供了通过映射归纳(MapReduce)编程模式来处理储存在Redis环境里的大量数据的服务。这个想法来至于其他的类似实现方式和谷歌发表的研究。所有 映射(Map)归纳(Reduce) 阶段中的任务都是被分配到各个独立节点(Redisson Node)里并行执行的。以下所有接口均支持映射归纳(MapReduce)功能: RMap、 RMapCache、 RLocalCachedMap、 RSet、 RSetCache、 RList、 RSortedSet、 RScoredSortedSet、 RQueue、 RBlockingQueue、 RDeque、 RBlockingDeque、 RPriorityQueue 和 RPriorityDeque

映射归纳(MapReduce)的功能是通过RMapper、 RCollectionMapper、 RReducer 和 RCollator 这几个接口实现的。

1.RMapper映射器接口适用于映射(Map)类,它用来把映射(Map)中的每个元素转换为另一个作为归纳(Reduce)处理用的键值对。

public interface RMapper<KIn, VIn, KOut, VOut> extends Serializable {void map(KIn key, VIn value, RCollector<KOut, VOut> collector);}

2.RCollectionMapper 映射器接口仅适用于集合(Collection)类型的对象,它用来把集合(Collection)中的元素转换成一组作为归纳(Reduce)处理用的键值对。

public interface RCollectionMapper<VIn, KOut, VOut> extends Serializable {void map(VIn value, RCollector<KOut, VOut> collector);}

3.RReducer归纳器接口用来将上面这些,由映射器生成的键值对列表进行归纳整理。

public interface RReducer<K, V> extends Serializable {V reduce(K reducedKey, Iterator<V> values);}

4.RCollator收集器接口用来把归纳整理以后的结果化简为单一一个对象。

public interface RCollator<K, V, R> extends Serializable {R collate(Map<K, V> resultMap);}

以上每个阶段的任务都可以用@RInject注解的方式来获取RedissonClient实例:

    public class WordMapper implements RMapper<String, String, String, Integer> {@RInjectprivate RedissonClient redissonClient;@Overridepublic void map(String key, String value, RCollector<String, Integer> collector) {// ...redissonClient.getAtomicLong("mapInvocations").incrementAndGet();}}

2. 映射(Map)类型的使用范例

Redisson提供的RMap、 RMapCache和RLocalCachedMap这三种映射(Map)类型的对象均可以使用这种分布式映射归纳(MapReduce)服务。

以下是在映射(Map)类型的基础上采用映射归纳(MapReduce)来实现字数统计的范例:

  1. Mapper对象将每个映射的值用空格且分开。
    public class WordMapper implements RMapper<String, String, String, Integer> {@Overridepublic void map(String key, String value, RCollector<String, Integer> collector) {String[] words = value.split("[^a-zA-Z]");for (String word : words) {collector.emit(word, 1);}}}
  1. Reducer对象计算统计所有单词的使用情况。
    public class WordReducer implements RReducer<String, Integer> {@Overridepublic Integer reduce(String reducedKey, Iterator<Integer> iter) {int sum = 0;while (iter.hasNext()) {Integer i = (Integer) iter.next();sum += i;}return sum;}}
  1. Collator对象统计所有单词的使用情况。
    public class WordCollator implements RCollator<String, Integer, Integer> {@Overridepublic Integer collate(Map<String, Integer> resultMap) {int result = 0;for (Integer count : resultMap.values()) {result += count;}return result;}}
  1. 把上面的各个对象串起来使用:
    RMap<String, String> map = redisson.getMap("wordsMap");map.put("line1", "Alice was beginning to get very tired");map.put("line2", "of sitting by her sister on the bank and");map.put("line3", "of having nothing to do once or twice she");map.put("line4", "had peeped into the book her sister was reading");map.put("line5", "but it had no pictures or conversations in it");map.put("line6", "and what is the use of a book");map.put("line7", "thought Alice without pictures or conversation");RMapReduce<String, String, String, Integer> mapReduce= map.<String, Integer>mapReduce().mapper(new WordMapper()).reducer(new WordReducer());// 统计词频Map<String, Integer> mapToNumber = mapReduce.execute();// 统计字数Integer totalWordsAmount = mapReduce.execute(new WordCollator());

3. 集合(Collection)类型的使用范例

Redisson提供的RSet、 RSetCache、 RList、 RSortedSet、 RScoredSortedSet、 RQueue、 RBlockingQueue、 RDeque、 RBlockingDeque、 RPriorityQueue和RPriorityDeque这几种集合(Collection)类型的对象均可以使用这种分布式映射归纳(MapReduce)服务。

以下是在集合(Collection)类型的基础上采用映射归纳(MapReduce)来实现字数统计的范例:

    public class WordMapper implements RCollectionMapper<String, String, Integer> {@Overridepublic void map(String value, RCollector<String, Integer> collector) {String[] words = value.split("[^a-zA-Z]");for (String word : words) {collector.emit(word, 1);}}}
    public class WordReducer implements RReducer<String, Integer> {@Overridepublic Integer reduce(String reducedKey, Iterator<Integer> iter) {int sum = 0;while (iter.hasNext()) {Integer i = (Integer) iter.next();sum += i;}return sum;}}
    public class WordCollator implements RCollator<String, Integer, Integer> {@Overridepublic Integer collate(Map<String, Integer> resultMap) {int result = 0;for (Integer count : resultMap.values()) {result += count;}return result;}}
    RList<String> list = redisson.getList("myList");list.add("Alice was beginning to get very tired");list.add("of sitting by her sister on the bank and");list.add("of having nothing to do once or twice she");list.add("had peeped into the book her sister was reading");list.add("but it had no pictures or conversations in it");list.add("and what is the use of a book");list.add("thought Alice without pictures or conversation");RCollectionMapReduce<String, String, Integer> mapReduce= list.<String, Integer>mapReduce().mapper(new WordMapper()).reducer(new WordReducer());// 统计词频Map<String, Integer> mapToNumber = mapReduce.execute();// 统计字数Integer totalWordsAmount = mapReduce.execute(new WordCollator());

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/96048.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【JVM】运行时数据区(内存区域划分)详解

文章目录 前言一、JVM 运行时数据区1, 堆2, Java 虚拟机栈3, 本地方法栈4, 程序计数器5, 元数据区 / 方法区 二、内存异常问题1, 栈溢出2, 内存溢出3, 内存泄露 总结 前言 &#x1f4d5;各位读者好, 我是小陈, 这是我的个人主页 &#x1f4d7;小陈还在持续努力学习编程, 努力通…

Minecraft个人服务器搭建自己的皮肤站并实现外置登录更换自定义皮肤组件

Minecraft个人服务器搭建自己的皮肤站并实现外置登录更换自定义皮肤组件 大家好&#xff0c;我是艾西有不少小伙伴非常喜欢我的世界Minecraft游戏&#xff0c;今天小编跟大家分享下Minecraft个人服务器怎么设置皮肤站。 Minecraft皮肤站是什么&#xff1f;其实官网就有皮肤站…

关于智能空气动力学

智能空气动力学是指运用智能科学方法和研究范式研究空气运动&#xff0c;尤其是物体与空气相对运动时空气对物体所施作用力规律、气体的流动规律和伴随发生的物理学变化&#xff0c;解决空气动力学问题的新的交叉学科。在空气动力学三大传统研究手段的基础上&#xff0c;智能空…

【Overload游戏引擎分析】画场景网格的Shader

Overload引擎地址&#xff1a; GitHub - adriengivry/Overload: 3D Game engine with editor 一、栅格绘制基本原理 Overload Editor启动之后&#xff0c;场景视图中有栅格线&#xff0c;这个在很多软件中都有。刚开始我猜测它应该是通过绘制线实现的。阅读代码发现&#xff0…

STM32复习笔记(二):GPIO

目录 &#xff08;一&#xff09;Demo流程 &#xff08;二&#xff09;工程配置 &#xff08;三&#xff09;代码部分 &#xff08;四&#xff09;外部中断&#xff08;EXTI&#xff09; &#xff08;一&#xff09;Demo流程 首先&#xff0c;板子上有4个按键&#xff0c;…

外包做了3个月,技术退步明显。。。。。

先说一下自己的情况&#xff0c;大专生&#xff0c;17年通过校招进入广州某软件公司&#xff0c;干了接近4年的功能测试&#xff0c;今年年初&#xff0c;感觉自己不能够在这样下去了&#xff0c;长时间呆在一个舒适的环境会让一个人堕落!而我已经在一个企业干了四年的功能测试…

网络初识必知会

局域网&#xff1a;把一些设备通过交换机/路由器连接起来 广域网&#xff1a;把更多的局域网也相互连接&#xff0c;当网络规模足够大的 交换机&#xff1a;组网过程中的重要设备&#xff01; 路由器&#xff1a;组网过程中的重要设备&#xff01; IP地址&#xff1a;描述一…

什么,这年头还有人不知道404

写在前面 哥&#xff0c;来帮我看看&#xff0c;这个请求怎么404了&#xff0c;明明接口路径是对的啊&#xff01;一个下午&#xff0c;组里的小哥突然让我帮忙看这个问题&#xff0c;我不禁一惊&#xff0c;啥&#xff0c;这年头了还有人搞不定404&#xff0c;如有还有&#…

FFmpeg:打印音/视频信息(Meta信息)

多媒体文件基本概念 多媒体文件其实是个容器在容器里面有很多流(Stream/Track)每种流是由不同的编码器编码的从流中读出的数据称为包在一个包中包含着一个或多个帧 几个重要的结构体 AVFormatContextAVStreamAVPacket FFmpeg操作流数据的基本步骤 打印音/视频信息(Meta信息…

LVGL_基础控件滚轮roller

LVGL_基础控件滚轮roller 1、创建滚轮roller控件 /* 创建一个 lv_roller 部件(对象) */ lv_obj_t * roller lv_roller_create(lv_scr_act()); // 创建一个 lv_roller 部件(对象),他的父对象是活动屏幕对象// 将部件(对象)添加到组&#xff0c;如果设置了默认组&#xff0c…

不断优化的素数算法

前言&#xff1a;素数判断是算法中重要的一环&#xff0c;掌握优秀的素数判断方法是算法player的必修课。本文介绍的是由简到繁的素数算法&#xff0c;便于初学者从入门到精通。 素数&#xff08;质数&#xff09;&#xff1a;只能被 1 和它本身整除的数称作素数&#xff0c;如…

总结二:linux面经

文章目录 1、 Linux中查看进程运行状态的指令、查看内存使用情况的指令、tar解压文件的参数。2、文件权限怎么修改&#xff1f;3、说说常用的Linux命令&#xff1f;4、说说如何以root权限运行某个程序&#xff1f;5、 说说软链接和硬链接的区别&#xff1f;6、说说静态库和动态…

(四)正点原子STM32MP135移植——u-boot移植

一、概述 u-boot概述就不概述了&#xff0c;u-boot、kernel、dtb三件套&#xff0c;dddd 经过国庆艰苦奋战&#xff0c;已经成功把所有功能移植好了 二、编译官方代码 进入u-boot的目录 2.1 解压源码、打补丁 /* 解压源码 */ tar xf u-boot-stm32mp-v2022.10-stm32mp-r1-r0.…

充分理清限制与条件+构造二分图+最小割:ARC142E

https://www.luogu.com.cn/problem/AT_arc142_e 他的充要条件是是什么&#xff1a; a i , a j ≥ m i n ( b i , b j ) a_i,a_j\ge min(b_i,b_j) ai​,aj​≥min(bi​,bj​)存在 a i ≥ m a x ( b i , b j ) a_i\ge max(b_i,b_j) ai​≥max(bi​,bj​) 第一个条件直接预处理一…

Springcloud支付模块

客户端消费者80 order 微服务提供者8001 payment 订单模块可以调动支付模块 步骤&#xff1a; 1、建moudle 2、改写pom 3、写yml 4、主启类 5、业务类

【LinuxC】时间、时区,相关命令、函数

文章目录 一、序1.1 时间和时区1.11 时间1.12 时区 1.2 查看时间时区的命令1.21 Windows1.22 Linux 二、C语言函数2.1 通用2.11 函数简介2.12 数据类型简介 2.2 windows 和 Linux特有函数2.3 C语言示例 一、序 1.1 时间和时区 1.11 时间 时间是一种用来描述物体运动变化的量…

黑马点评-01基于Redis实现短信登陆的功能

环境准备 当前模型 nginx服务器的作用 手机或者app端向nginx服务器发起请求,nginx基于七层模型走的是HTTP协议,可以实现基于Lua直接绕开tomcat访问Redis nginx也可以作为静态资源服务器,轻松扛下上万并发并负载均衡到下游的tomcat服务器,利用集群支撑起整个项目 使用nginx部…

黑马JVM总结(二十七)

&#xff08;1&#xff09;synchronized代码块 synchronized代码块的底层原理&#xff0c;它是给一个对象进行一个加锁操作&#xff0c;它是如何保证如果你出现了synchronized代码块中出现了问题&#xff0c;它需要给这个对象有一个正确的解锁操作呢&#xff0c;加锁解锁是成对…

【c++_containers】10分钟带你学会list

前言 链表作为一个像是用“链子”链接起来的容器&#xff0c;在数据的存储等方面极为便捷。虽然单链表单独在实际的应用中没用什么作用&#xff0c;但是当他可以结合其他结构&#xff0c;比如哈希桶之类的。不过今天学习的list其实是一个带头双向链表。 言归正传&#xff0c;让…

overleaf在线编辑工具使用教程

文章目录 1 用 orcid注册overleaf获取模板2 使用模板 1 用 orcid注册overleaf获取模板 通常来说&#xff0c;在期刊投稿网站information for author中找template 。下载压缩包后上传到over leaf中。 加入找不到官方模板&#xff0c;用overleaf中的 2 使用模板 .bib文件&…