使用Hazelcast发布和订阅

几周前,我写了一篇有关Hazelcast入门的博客,描述了创建分布式地图,列表和队列是多么简单。 当时我提到Hazelcast还能做很多其他事情。 该博客快速浏览了Hazelcast的另一个功能:基于Publish / Subscribe模式的广播消息系统。 这采用通常的格式,即邮件发件人应用通过该格式发布有关特定主题的邮件。 这些消息并不针对任何特定的客户端,而是可以由对主题感兴趣的任何客户端读取。

发布和订阅的明显场景来自高金融和做市商的世界。 做市商买卖股票之类的金融工具,并通过在通常是电子的市场上宣传买卖价格来竞争业务。 为了使用Hazelcast实现非常简单的做市商方案,我们需要三个类: StockPrice bean, MarketMakerClient

以下代码已添加到我在Github上可用的现有Hazelcast项目中。 无需担心其他POM依赖项。

public class StockPrice implements Serializable { private static final long serialVersionUID = 1L; private final BigDecimal bid; private final BigDecimal ask; private final String code; private final String description; private final long timestamp; /** * Create a StockPrice for the given stock at a given moment */ public StockPrice(BigDecimal bid, BigDecimal ask, String code, String description, long timestamp) { super(); this.bid = bid; this.ask = ask; this.code = code; this.description = description; this.timestamp = timestamp; } public BigDecimal getBid() { return bid; } public BigDecimal getAsk() { return ask; } public String getCode() { return code; } public String getDescription() { return description; } public long getTimestamp() { return timestamp; } @Override public String toString() { StringBuilder sb = new StringBuilder("Stock - "); sb.append(code); sb.append(" - "); sb.append(description); sb.append(" - "); sb.append(description); sb.append(" - Bid: "); sb.append(bid); sb.append(" - Ask: "); sb.append(ask); sb.append(" - "); SimpleDateFormat df = new SimpleDateFormat("HH:MM:SS"); sb.append(df.format(new Date(timestamp))); return sb.toString(); } 
}

StockPrice bean具有所有常用的获取器和设置器,可以在任何给定时间模拟股票的买价和买价(以正常语言进行买卖),并且MarketMaker类使用Hazelcast发布这些bean。

通常,做市商会在一种以上的金融工具中发布价格; 但是,为简单起见, MarketMaker在此演示中仅发布单个价格。

public class MarketMaker implements Runnable { private static Random random = new Random(); private final String stockCode; private final String description; private final ITopic<StockPrice> topic; private volatile boolean running; public MarketMaker(String topicName, String stockCode, String description) { this.stockCode = stockCode; this.description = description; this.topic = createTopic(topicName); running = true; } @VisibleForTesting ITopic<StockPrice> createTopic(String topicName) { HazelcastInstance hzInstance = Hazelcast.newHazelcastInstance(); return hzInstance.getTopic(topicName); } public void publishPrices() { Thread thread = new Thread(this); thread.start(); } @Override public void run() { do { publish(); sleep(); } while (running); } private void publish() { StockPrice price = createStockPrice(); System.out.println(price.toString()); topic.publish(price); } @VisibleForTesting StockPrice createStockPrice() { double price = createPrice(); DecimalFormat df = new DecimalFormat("#.##"); BigDecimal bid = new BigDecimal(df.format(price - variance(price))); BigDecimal ask = new BigDecimal(df.format(price + variance(price))); StockPrice stockPrice = new StockPrice(bid, ask, stockCode, description, System.currentTimeMillis()); return stockPrice; } private double createPrice() { int val = random.nextInt(2010 - 1520) + 1520; double retVal = (double) val / 100; return retVal; } private double variance(double price) { return (price * 0.01); } private void sleep() { try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } } public void stop() { running = false; } public static void main(String[] args) throws InterruptedException { MarketMaker bt = new MarketMaker("STOCKS", "BT.L", "British Telecom"); MarketMaker cbry = new MarketMaker("STOCKS", "CBRY.L", "Cadburys"); MarketMaker bp = new MarketMaker("STOCKS", "BP.L", "British Petrolium"); bt.publishPrices(); cbry.publishPrices(); bp.publishPrices(); } }

像往常一样,设置Hazelcast相当简单,上面MarketMaker类中的大多数代码与Hazelcast无关。 该课程分为两部分:建筑价格和出版价格。 构造函数采用三个参数,将其存储起来以备后用。 它还创建一个Hazelcast实例,并通过私有createTopic()方法注册一个名为"STOCKS"的简单主题。 如您所料,创建Hazelcast实例并注册主题需要两行代码,如下所示:

ITopic<StockPrice> createTopic(String topicName) { HazelcastInstance hzInstance = Hazelcast.newHazelcastInstance(); return hzInstance.getTopic(topicName); }

该类的其余部分使用线程来调用MarketMakerrun()方法来运行价格发布机制。 此方法生成随机出价,为关联的股票代码要价,并使用Hazelcast发布。 使用以下单行代码即可完成发布:

topic.publish(price);

MarketMaker类的最后一部分是main()方法,其作用是创建多个MarketMaker实例并使它们运行。

现在,Hazelcast知道了我们不断变化的股价,接下来要做的就是整理客户代码。

public class Client implements MessageListener<StockPrice> { public Client(String topicName) { HazelcastInstance hzInstance = Hazelcast.newHazelcastInstance(); ITopic<StockPrice> topic = hzInstance.getTopic(topicName); topic.addMessageListener(this); } /** * @see com.hazelcast.core.MessageListener#onMessage(com.hazelcast.core.Message) */ @Override public void onMessage(Message<StockPrice> arg0) { System.out.println("Received: " + arg0.getMessageObject().toString()); } public static void main(String[] args) { new Client("STOCKS"); } }

与任何消息传递系统一样,消息发送者代码必须知道呼叫谁和呼叫什么。 客户端通过创建Hazelcast实例并在"STOCKS"主题中注册兴趣来实现“调用什么"STOCKS" ,方法与发布者相同,如下所示:

HazelcastInstance hzInstance = Hazelcast.newHazelcastInstance(); ITopic<StockPrice> topic = hzInstance.getTopic(topicName); topic.addMessageListener(this);

客户端实现Hazelcast的MessageListener接口及其单一方法onMessage()实现“呼叫”

@Override public void onMessage(Message<StockPrice> arg0) { System.out.println("Received: " + arg0.getMessageObject().toString()); }

客户端代码的最后一部分是其main()方法,该方法创建一个客户端实例。

最后要做的是运行代码。 为此,我仅将所有必需的JAR文件放在一个目录中,只需考虑两个:hazel cast-3.1.jar和guava-13.0.1.jar。

屏幕截图2013年12月7日的10.51.43

完成后,我转到项目的classes目录:

cd /Users/Roger/git/captaindebug/hazelcast/target/classes

…并解雇了发布者

java -cp ./:/Users/Roger/tmp/mm/guava-13.0.1.jar:/Users/Roger/tmp/mm/hazelcast-3.1.jar com.captaindebug.hazelcast.pubsub.MarketMaker

……然后是客户。

java -cp ./:/Users/Roger/tmp/mm/guava-13.0.1.jar:/Users/Roger/tmp/mm/hazelcast-3.1.jar com.captaindebug.hazelcast.pubsub.Client

当然,如果您正在使用此粗略且已准备好的技术在计算机上运行此程序,则请记住将其替换
/Users/Roger/tmp/mm以及放置这些JAR文件副本的路径。

如果您在一个终端中运行MarketMaker发布者,并在其他两个终端中运行几个客户,那么您将得到类似的信息,在这里您可以看到正在发布的价格以及客户正在接收更新。

屏幕截图2013年12月22日的17.40.07

关于Hazelcast的一件事要注意的是,“ 集群 ”是指Hazelcast实例的集群,而不是JVM的集群。 在您为每个应用程序请求多个Hazelcast实例之前,这是不明显的。 当其他客户端加入集群时,您将看到类似以下内容:

Members [5] {
Member [192.168.0.7]:5701
Member [192.168.0.7]:5702
Member [192.168.0.7]:5703
Member [192.168.0.7]:5704 this
Member [192.168.0.7]:5705
}

在上面的日志中,有两个侦听器条目,每个侦听器条目一个,每个发布者条目三个,在MarketMakermain()方法中启动的每个MarketMaker实例一个。

屏幕快照2013-12-07 at 11.16.21
这里要考虑的事情是,是否每个对象实例创建一个Hazelcast实例是一种好习惯(就像我在示例代码中所做的那样),还是在代码中有一个static Hazelcast实例更好。 我不确定该答案是什么,因此,如果有任何Hazelcast专家正在阅读此书,请告诉我。

就是这样:Hazelcast可以在发布和订阅模式下愉快地运行,但是我还没有介绍Hazelcast的所有功能。 也许以后再说……

  • 可以在Github上找到此源代码: https : //github.com/roghughe/captaindebug/tree/master/hazelcast

参考: Captain Debug的Blog博客上的JCG合作伙伴 Roger Hughes的Hazelcast发布和订阅 。

翻译自: https://www.javacodegeeks.com/2014/01/publish-and-subscribe-with-hazelcast.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/366054.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

接口自动化测试持续集成--Soapui接口功能测试参数化

按照自动化测试分层实现的原理&#xff0c;每一层的脚本实现都要进行参数化&#xff0c;自动化的目标就是要实现脚本代码与测试数据分离。当测试数据进行调整的时候不会对脚本的实现带来震荡&#xff0c;从而提高脚本的稳定性与灵活度&#xff0c;降低脚本的维护成本。Soapui最…

linux的tcpdump命令详解,tcpdump命令

tcpdump命令是一款sniffer工具&#xff0c;它可以打印所有经过网络接口的数据包的头信息&#xff0c;也可以使用-w选项将数据包保存到文件中&#xff0c;方便以后分析。语法tcpdump(选项)选项-a&#xff1a;尝试将网络和广播地址转换成名称&#xff1b;-c&#xff1a;收到指定的…

Codeforces 1027E Inverse Coloring 【DP】

Codeforces 1027E Inverse Coloring 题目链接 1 #include<bits/stdc.h>2 using namespace std;3 #define N 10104 #define LL long long5 #define Mod 9982443536 int n,k;7 LL dp[N][N],ans0;8 LL sum[N][N];9 int main(){ 10 cin>>n>>k; 11 dp[0][…

合并远程仓库到本地_git远程仓库创建和合并

上周发了一个张佳波小朋友关于git的文章&#xff0c;马上就更多小朋友的回馈。其中周玉涛同志发来了自己对于git之前问题的一些理解和解决方法&#xff0c;希望能帮助更多人解决问题。为了保证周玉涛同学提供材料的完整性&#xff0c;以下将他原文和图片发出&#xff0c;不做其…

swfobject.js视频播放插件

在网页中经常会用到视频播放的功能&#xff0c;下面介绍一下swfobject.js的视频播放应用&#xff1a;html代码结构&#xff1a; <div id"video_content"></div> css样式结构&#xff1a; body{background: #003368}#video_content{width:600px;height:40…

项目学生:JPA标准查询

这是Project Student的一部分。 其他职位包括带有Jersey的Webservice Client&#xff0c;带有Jersey的 Webservice Server &#xff0c; 业务层 &#xff0c; 具有Spring Data的持久性 &#xff0c;分片集成测试数据和Webservice Integration 。 我们已经介绍了CRUD的基本操作…

linux 广播命令,Linux基础命令---ping

pingping指令可以发送ICMP请求到目标地址&#xff0c;如果网络功能正常&#xff0c;目标主机会给出回应信息。ping使用ICMP协议强制发送ECHO_REQUEST报文到目标主机&#xff0c;从主机或网关获取ICMP ECHO_RESPONSE。ECHO_REQUESTt数据报(‘pings’)有一个IP和ICMP报头&#xf…

第5章 Internet协议 [TCP/IP详解 卷1:协议]

IP是TCP/IP协议族中的核心协议。所有TCP、UDP、ICMP和IGMP数据都通过IP数据报传输。IP提供了一种尽力而为、无连接的数据报交付服务。“尽力而为”的含义是不保证IP数据报能成功到达目的地。任何可靠性必须由上层&#xff08;例如TCP&#xff09;提供。IPv4和IPv6都使用这种尽力…

【[SHOI2015]超能粒子炮·改】

就是运用\(Lucas\)推一个柿子 首先是前置芝士\(Lucas\)定理 \[C_{n}^{m}\%pC_{n/p}^{m/p}*C_{n\%p}^{m\%p}\%p\] 至于证明 我建议去问一下Lucas本人 至于这道题&#xff0c;我们要求的是这个柿子 \[\sum_{i0}^kC_{n}^i\%p\] 于是我们设\(f(n,k)\sum_{i0}^kC_{n}^i\) 我们就可以…

Spring REST:异常处理卷。 3

这是该系列中有关Spring REST异常处理的最后一篇文章。 最后&#xff0c;这次我将讨论在表单处理期间可能发生的REST异常的处理。 因此&#xff0c;在本教程中&#xff0c;您将看到与REST&#xff0c;表单和异常处理有关的所有内容。 客户端呢&#xff1f; jQuery将用于反映RES…

centos6安装mysql并远程连接_Ubantu下MySQL安装、部署和远程连接

系统阿里云 ubantu 16.04MySQL 5.0/8.0连接工具 Navicat Premium安装MySQL1、MySQL 5.0直接使用apt命令安装sudo apt install mysql-server输入密码完成安装。安装完mysql-server后&#xff0c;mysql-client就带了&#xff0c;无需单独安装安装成功后输入如下命令检查数据库状态…

linux强制用户改密码,如何在Linux中强制用户在下次登录时更改密码?

由于安全方面的考虑&#xff0c;系统中的用户需要定期更新其密码。在本文中&#xff0c;我们将看到如何强制用户下次登录系统时更改其密码。列出用户首先让我们看一下系统中可用的用户。$ cut -d: -f1 /etc/passwd运行上面的代码给我们以下结果-mailnewsuucpproxywww-databacku…

C# 键盘钩子

C# 键盘钩子 1、键盘钩子&#xff1a; 通过代码将键盘的事件屏蔽掉&#xff0c;达到锁屏的效果。&#xff08;参考地址&#xff1a;https://zhidao.baidu.com/question/135132386108196965.html&#xff09; 2、代码如下&#xff1a; public class Hook : IDisposable{public d…

js中字符串和数组的使用

函数&#xff1a; 函数在调用的时候&#xff0c;会形成一个私有作用域&#xff0c;内部的变量不会被外面访问&#xff0c;这种保护机制叫闭包。这就意味着函数调用完毕&#xff0c;这个函数形成的栈内存会被销毁。 但有时候我们不希望他被销毁。 函数归属谁跟它在哪调用没有关…

Spring REST:异常处理卷。 1个

目录 Spring REST&#xff1a;异常处理卷。 1个 Spring REST&#xff1a;异常处理卷。 2 Spring REST&#xff1a;异常处理卷。 3 大家好&#xff0c;是时候继续在我的博客中发布新文章了。 因此&#xff0c;我很高兴地宣布&#xff0c;我计划编写一系列技术文章。 在当前文…

vue/cli3 配置vux

安装各插件 试过 安装“必须安装”的部分亦可 1、安装vuex npm install vuex --save-dev 2、在项目里面安装vux【必须安装】 npm install vux --save 3、安装vux-loader【必须安装】 npm install vux-loader --save-dev 4、安装less-loader&#xff08;这个是用以正确编译less源…

鼠标右键 移动选定的文件夹到指定位置_怎么把电脑桌面上的文件移动到更加安全的地方...

我们在使用电脑的时候习惯于把各种文档以及其他文件资料随手保存到电脑桌面上&#xff0c;这样操作可以方便以后对这些文档和文件资料的使用、管理&#xff0c;但是由于默认状态下桌面文件位于C盘中&#xff0c;这些文件资料不仅会占用掉C盘的很大的存储空间&#xff0c;并且日…

非常精简的Linux线程池实现(一)——使用互斥锁和条件变量

线程池的含义跟它的名字一样&#xff0c;就是一个由许多线程组成的池子。 有了线程池&#xff0c;在程序中使用多线程变得简单。我们不用再自己去操心线程的创建、撤销、管理问题&#xff0c;有什么要消耗大量CPU时间的任务通通直接扔到线程池里就好了&#xff0c;然后我们的主…

linux vim 执行shell命令行,Linux中vim和shell

在Linux系统中一切皆文件&#xff0c;配置服务其实就是在修改其配置文件的参数&#xff0c;而在日常文件中肯定少不了的就是编辑文档&#xff0c;这就离不开vim&#xff0c;而vim之所以能够得到广大厂商的青睐与用户的认可&#xff0c;原因在于vim编辑器中有三种模式&#xff1…

JS之setTimeOut与clearTimeOut

小练习1&#xff1a;针对HTML&#xff0c;分别使用 setTimeout 和 setInterval 实现以下功能&#xff1a; 点击按钮时&#xff0c;开始改变 fade-obj 的透明度&#xff0c;开始一个淡出&#xff08;逐渐消失&#xff09;动画&#xff0c;直到透明度为0在动画过程中&#xff0c…