hazelcast入门教程
这是我的Hazelcast系列的第四部分。 如果一个人没有看到其他三个人,我建议一个人去看第1 部分 , 第2 部分和第3部分 。
记录中
日志记录是任何应用程序的重要功能,我的示例也是如此。 System.out.println
可以用作告诉用户控制台应用程序中正在发生什么的有用工具。 但是,让我们面对现实吧,如果您正在阅读如何为分布式应用程序使用工具,那么该人确实不是初学者。 看到一系列日志消息不应吓到任何人。 实际上,对于本文中的示例,他们有必要了解谁在做什么。 毕竟,我们将讨论线程编程。
Hazelcast的好伙伴似乎已经同意日志记录很重要,因此有许多不同的方法来定义正在记录的库。 日志记录框架仅取决于JDK日志记录,并且具有许多适配器,甚至允许自定义日志记录框架。 一个人选择的日志适配器由属性hazelcast.logging.type设置为以下设置:
- JDK日志记录,这是默认设置。
- log4j
- slf4j
- 没有
我使用了Log4j2,所以我选择了slf4j并放入了使其工作所需的四个 jar文件。
旋转分布式线程
像Hazelcast中的许多类一样,IExecutorService实现了Java库ExecutorService的接口。 该接口定义什么是线程池。 该接口是java.util.concurrent包的一部分,自Java 1.5开始就存在。 该程序包还具有其实现,可以从java.util.concurrent.Executors中进行访问。 我希望我在Java 1.4或1.3或1.2或1.1中有类似的东西。 直到发生死锁之前,使线程池变得很有趣。 现在,我可以使用Java库的池了,对我来说已经足够了。
ExecutorService具有有趣的“功能”。 必须关闭它们,否则服务不会消失。 第一次使用它们时,导致内存泄漏并关闭了JVM。 我在自己的测试中发现了该错误,因此客户不必再看到我的学习经验。 IExecutorService的皱纹有所不同。 在所有线程完成之前,该服务不会消失。 这导致许多不正常的关机。 你被警告了!
IExecutorServices可以几种不同的方式共享线程。 这里是它们的详细信息:
任何'Ole实例
这是一个只调用submit(Callable call).
这不仅仅只是将线程随机设置到集群中。 它对该线程进行了一些负载平衡,因此实例不会被线程破坏。
致特定成员
这是通过submit(Callable call, Member member)
方法完成的。 这会将线程发送到集群的特定成员。 这里没有负载平衡; 只是发送给会员。 小心,一个成员很容易过载,并在进行任何处理时真正踩刹车。 我可以将其视为创建自定义负载均衡器的一种方式。
致会员集合
是的,可以将一个线程发送给多个成员。 当我进行示例编码时,所有成员的行为就像他们拥有自己的线程并且不共享线程。 如果将Callable <T>作为其线程实现来实现,则该方法使用成员作为键返回“未来地图”。 如果使用Runnable,则不返回任何内容。
用正确的钥匙致会员
IMap的条目可以在群集中的任何位置。 如果需要对该条目进行处理,则本地线程将必须通过网络上拉该条目。 如果条目很大,可能会出现问题。 更好的方法是将希望较小的线程转移到条目上。 为此,群集需要知道将其发送到哪里。 因此,呼叫submit(Callable call, Object key)
。
致全体会员
这与提交成员集合的方式相同,但是就像集群中的每个成员一样,都是成员。 如果一个集群中有大量成员,这可能会很“有趣”。 我想我在一个集群中听到了多达1000名成员。 确保这是人们想要的,然后再调用它。
使用ExecutionCallback
这基本上是一种发出一些线程并异步返回结果的方法。 如果提交了一个线程,则使用ExecutionCallback。 如果涉及多个成员,则使用MultiExecutionCallback。
范例程式码
在开始之前,请允许我说我没有IExecutorService中每个方法的示例。 但是,对于每种讨论的类型,我都有一个示例。 关于示例代码的另一件事。 出于指导目的,我在以前的文章中做了一些复制和粘贴编码,因此每个示例可以独立存在,并且可以了解所处位置。 我在第3部分中做了很多。 如果没有注意到,请再次查看。
这次我没有这样做,因为将会复制很多代码,并且结果会非常难看。 我使用了一个Enum,我认为结果非常好。 由于示例数量有限,我认为枚举是一个不错的选择,并且使我能够以块的形式显示代码,而如果首先显示框架,则可以理解。
有了这个解释,让我们继续前进!
构架
这是主要的位。 它由主类和线程类组成。 注意主类如何显示调用线程可以提交的每种方式。
主要
package hazelcastservice;import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.IExecutorService;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.RejectedExecutionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;/**** @author Daryl*/
public class Main {private static final Logger logger = LoggerFactory.getLogger(Main.class);public static final String SERVICE_NAME = "spinnerella";public static final int NUM_INSTANCES = 5;/*** @param args the command line arguments*/public static void main(String[] args) {System.setProperty("hazelcast.logging.type", "slf4j");List<HazelcastInstance> instances = new ArrayList<>(NUM_INSTANCES);for(int i = 0; i < NUM_INSTANCES; i++) {instances.add(Hazelcast.newHazelcastInstance());logger.info("instance {} up", i);}IExecutorService spinner = instances.get(0).getExecutorService(SERVICE_NAME);try {HazelcastIExecutorServiceExamples.TO_SOME_MEMBER.example(instances, spinner);HazelcastIExecutorServiceExamples.TO_PARTICULAR_MEMBER.example(instances, spinner);HazelcastIExecutorServiceExamples.ON_THE_KEY_OWNER.example(instances, spinner);HazelcastIExecutorServiceExamples.ON_A_SET_OF_MEMBERS.example(instances, spinner);HazelcastIExecutorServiceExamples.ON_ALL_MEMBERS.example(instances, spinner);HazelcastIExecutorServiceExamples.CALLBACK.example(instances, spinner);HazelcastIExecutorServiceExamples.MULTIPLE_MEMBERS_WITH_CALLBACK.example(instances, spinner);//Lets setup a loop to make sure they are all done (Especially the callback ones)for(HazelcastIExecutorServiceExamples example: HazelcastIExecutorServiceExamples.values()) {while(!example.isDone()) {Thread.sleep(1000);}}} catch(ExecutionException ee) {logger.warn("Can't finish the job", ee);} catch(InterruptedException ie) {logger.warn("Everybody out of the pool", ie);} finally {// time to clean up my toysboolean allClear = false;while(!allClear) {try {Thread.sleep(1000);Hazelcast.shutdownAll();allClear = true;} catch(InterruptedException ie) {//got interrupted. try again} catch(RejectedExecutionException ree) {logger.debug("caught a RejectedExecutionException");allClear = false;}}logger.info("All done");}}
}
线
package hazelcastservice;import java.io.Serializable;
import java.util.Random;
import java.util.concurrent.Callable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;/*** This class was inspired by the song "I Like to Move it" from the movie * Madagascar by Dreamworks. I offer NO apologies for using it. * * To those software developers who like consistent results, I used java.util.Random to* make it loop inconsistently each time call is called. * * Sometimes you need to make your own entertainment.* @author Daryl*/
public class MoveItMoveIt implements Callable<Integer>, Serializable {private static final Logger logger = LoggerFactory.getLogger(MoveItMoveIt.class);private static final int UPPER_BOUND = 15;@Overridepublic Integer call() throws Exception {Random random = new Random();int howMany = random.nextInt(UPPER_BOUND);
// int howMany = 2;for(int i = 0; i < howMany; i++) {logger.info("I like to Move it Move it!");}logger.info("Move it!");return howMany;}
}
细节
在这里,我将展示所讨论的不同类型的呼叫。 请记住,这些是Enum类的块。 done
是一个受保护的变量,并且需要实现public void example(List<HazelcastInstance> instances, IExecutorService spinner)
。
任何'Ole实例
TO_SOME_MEMBER() {@Overridepublic void example(List<HazelcastInstance> instances, IExecutorService spinner)throws ExecutionException, InterruptedException {logger.info("Submit to some member.");Future<Integer> howMany = spinner.submit(new MoveItMoveIt());logger.info("It moved it {} times", howMany.get());done = true;}}
致特定成员
TO_PARTICULAR_MEMBER {@Overridepublic void example(List<HazelcastInstance> instances, IExecutorService spinner)throws ExecutionException, InterruptedException {logger.info("Submit to a particular member.");Member member = getRandomMember(instances);logger.debug("member is {}", member);Future<Integer> howMany = spinner.submitToMember(new MoveItMoveIt(), member);logger.info("It moved it {} times.", howMany.get());done = true;}private Member getRandomMember(List<HazelcastInstance> instances) {Set<Member> members = instances.get(0).getCluster().getMembers();int i = 0;int max = new Random().nextInt(instances.size());Iterator<Member> iterator = members.iterator();Member member = iterator.next();while(iterator.hasNext() && (i < max)) {member = iterator.next();i++;}return member;}}
致会员集合
ON_A_SET_OF_MEMBERS {@Overridepublic void example(List<HazelcastInstance> instances, IExecutorService spinner)throws ExecutionException, InterruptedException {logger.info("Send to some of the members");Set<Member> randomMembers = getRandomMembers(instances);Map<Member, Future<Integer>> results = spinner.submitToMembers(new MoveItMoveIt(), randomMembers);for(Future<Integer> howMany: results.values()) {logger.info("It moved {} times", howMany.get());}done = true;}private Set<Member> getRandomMembers(List<HazelcastInstance> instances) {int max = new Random().nextInt(instances.size());Set<Member> newSet = new HashSet<>(instances.size());int k = 0;Iterator<Member> i = instances.get(0).getCluster().getMembers().iterator();while(i.hasNext() && k < max) {newSet.add(i.next());k++;}return newSet;}}
用正确的钥匙致会员
ON_THE_KEY_OWNER {@Overridepublic void example(List<HazelcastInstance> instances, IExecutorService spinner)throws ExecutionException, InterruptedException {logger.info("Send to the one owning the key");HazelcastInstance randomInstance = getRandomInstance(instances);IMap<Long, Boolean> map = randomInstance.getMap("default");Long one = 1L;map.put(one, Boolean.TRUE);Future<Integer> howMany = spinner.submitToKeyOwner(new MoveItMoveIt(), one);logger.info("It moved it {} times.", howMany.get());done = true;}private HazelcastInstance getRandomInstance(List<HazelcastInstance> instances) {return instances.get(new Random().nextInt(instances.size()));}}
致全体会员
ON_ALL_MEMBERS {@Overridepublic void example(List<HazelcastInstance> instances, IExecutorService spinner)throws ExecutionException, InterruptedException {logger.info("Send to all members");Map<Member, Future<Integer>> results = spinner.submitToAllMembers(new MoveItMoveIt());for(Future<Integer> howMany: results.values()) {logger.info("It moved {} times", howMany.get());}done = true;}}
使用ExecutionCallback
此示例代码包含两段代码,分别显示一个回调和多个回调。
CALLBACK {@Overridepublic void example(List<HazelcastInstance> instances, IExecutorService spinner)throws ExecutionException, InterruptedException {logger.info("example with a callback");spinner.submit(new MoveItMoveIt(), new ExecutionCallback<Integer>() {@Overridepublic void onResponse(Integer response) {logger.info("It moved {} times", response);done = true;}@Overridepublic void onFailure(Throwable thrwbl) {logger.error("trouble in the callback", thrwbl);done = true;}});} },MULTIPLE_MEMBERS_WITH_CALLBACK {@Overridepublic void example(List<HazelcastInstance> instances, IExecutorService spinner)throws ExecutionException, InterruptedException {logger.info("running on multiple members with callback");spinner.submitToAllMembers(new MoveItMoveIt(), new MultiExecutionCallback() {@Overridepublic void onResponse(Member member, Object o) {logger.info("member finished with {} moves", o);}@Overridepublic void onComplete(Map<Member, Object> map) {logger.info("All members completed");for(Object value: map.values()) {logger.info("It moved {} times", value);}done = true;}});}
结论
再次在自己的博客上发布自己的代码/想法非常好。 我快速浏览了Hazelcast的IExecutorService的功能。 我的示例代码遵循DRY原理。 完整的代码可以在这里找到。
参考资料
与我的Hazelcast指南一样,我的信息来自Hazelcast文档,可在此处找到。
翻译自: https://www.javacodegeeks.com/2014/10/beginners-guide-to-hazelcast-part-4.html
hazelcast入门教程