使用Hazelcast发布和订阅

几周前,我写了一篇有关Hazelcast入门的博客,描述了创建分布式地图,列表和队列是多么简单。 当时我提到Hazelcast还能做很多其他事情。 该博客快速浏览了Hazelcast的另一个功能:基于Publish / Subscribe模式的广播消息系统。 这采用通常的格式,即邮件发件人应用通过该格式发布有关特定主题的邮件。 这些消息并不针对任何特定的客户端,而是可以由对主题感兴趣的任何客户端读取。

发布和订阅的明显场景来自高金融和做市商的世界。 做市商买卖股票之类的金融工具,并通过在通常是电子的市场上宣传买卖价格来竞争业务。 为了使用Hazelcast实现非常简单的做市商方案,我们需要三个类: StockPrice bean, MarketMakerClient

以下代码已添加到我在Github上可用的现有Hazelcast项目中。 无需担心其他POM依赖项。

public class StockPrice implements Serializable { private static final long serialVersionUID = 1L; private final BigDecimal bid; private final BigDecimal ask; private final String code; private final String description; private final long timestamp; /** * Create a StockPrice for the given stock at a given moment */ public StockPrice(BigDecimal bid, BigDecimal ask, String code, String description, long timestamp) { super(); this.bid = bid; this.ask = ask; this.code = code; this.description = description; this.timestamp = timestamp; } public BigDecimal getBid() { return bid; } public BigDecimal getAsk() { return ask; } public String getCode() { return code; } public String getDescription() { return description; } public long getTimestamp() { return timestamp; } @Override public String toString() { StringBuilder sb = new StringBuilder("Stock - "); sb.append(code); sb.append(" - "); sb.append(description); sb.append(" - "); sb.append(description); sb.append(" - Bid: "); sb.append(bid); sb.append(" - Ask: "); sb.append(ask); sb.append(" - "); SimpleDateFormat df = new SimpleDateFormat("HH:MM:SS"); sb.append(df.format(new Date(timestamp))); return sb.toString(); } 
}

StockPrice bean具有所有常用的获取器和设置器,可以在任何给定时间模拟股票的买价和买价(以正常语言进行买卖),并且MarketMaker类使用Hazelcast发布这些bean。

通常,做市商会在一种以上的金融工具中发布价格; 但是,为简单起见, MarketMaker在此演示中仅发布单个价格。

public class MarketMaker implements Runnable { private static Random random = new Random(); private final String stockCode; private final String description; private final ITopic<StockPrice> topic; private volatile boolean running; public MarketMaker(String topicName, String stockCode, String description) { this.stockCode = stockCode; this.description = description; this.topic = createTopic(topicName); running = true; } @VisibleForTesting ITopic<StockPrice> createTopic(String topicName) { HazelcastInstance hzInstance = Hazelcast.newHazelcastInstance(); return hzInstance.getTopic(topicName); } public void publishPrices() { Thread thread = new Thread(this); thread.start(); } @Override public void run() { do { publish(); sleep(); } while (running); } private void publish() { StockPrice price = createStockPrice(); System.out.println(price.toString()); topic.publish(price); } @VisibleForTesting StockPrice createStockPrice() { double price = createPrice(); DecimalFormat df = new DecimalFormat("#.##"); BigDecimal bid = new BigDecimal(df.format(price - variance(price))); BigDecimal ask = new BigDecimal(df.format(price + variance(price))); StockPrice stockPrice = new StockPrice(bid, ask, stockCode, description, System.currentTimeMillis()); return stockPrice; } private double createPrice() { int val = random.nextInt(2010 - 1520) + 1520; double retVal = (double) val / 100; return retVal; } private double variance(double price) { return (price * 0.01); } private void sleep() { try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } } public void stop() { running = false; } public static void main(String[] args) throws InterruptedException { MarketMaker bt = new MarketMaker("STOCKS", "BT.L", "British Telecom"); MarketMaker cbry = new MarketMaker("STOCKS", "CBRY.L", "Cadburys"); MarketMaker bp = new MarketMaker("STOCKS", "BP.L", "British Petrolium"); bt.publishPrices(); cbry.publishPrices(); bp.publishPrices(); } }

像往常一样,设置Hazelcast相当简单,上面MarketMaker类中的大多数代码与Hazelcast无关。 该课程分为两部分:建筑价格和出版价格。 构造函数采用三个参数,将其存储起来以备后用。 它还创建一个Hazelcast实例,并通过私有createTopic()方法注册一个名为"STOCKS"的简单主题。 如您所料,创建Hazelcast实例并注册主题需要两行代码,如下所示:

ITopic<StockPrice> createTopic(String topicName) { HazelcastInstance hzInstance = Hazelcast.newHazelcastInstance(); return hzInstance.getTopic(topicName); }

该类的其余部分使用线程来调用MarketMakerrun()方法来运行价格发布机制。 此方法生成随机出价,为关联的股票代码要价,并使用Hazelcast发布。 使用以下单行代码即可完成发布:

topic.publish(price);

MarketMaker类的最后一部分是main()方法,其作用是创建多个MarketMaker实例并使它们运行。

现在,Hazelcast知道了我们不断变化的股价,接下来要做的就是整理客户代码。

public class Client implements MessageListener<StockPrice> { public Client(String topicName) { HazelcastInstance hzInstance = Hazelcast.newHazelcastInstance(); ITopic<StockPrice> topic = hzInstance.getTopic(topicName); topic.addMessageListener(this); } /** * @see com.hazelcast.core.MessageListener#onMessage(com.hazelcast.core.Message) */ @Override public void onMessage(Message<StockPrice> arg0) { System.out.println("Received: " + arg0.getMessageObject().toString()); } public static void main(String[] args) { new Client("STOCKS"); } }

与任何消息传递系统一样,消息发送者代码必须知道呼叫谁和呼叫什么。 客户端通过创建Hazelcast实例并在"STOCKS"主题中注册兴趣来实现“调用什么"STOCKS" ,方法与发布者相同,如下所示:

HazelcastInstance hzInstance = Hazelcast.newHazelcastInstance(); ITopic<StockPrice> topic = hzInstance.getTopic(topicName); topic.addMessageListener(this);

客户端实现Hazelcast的MessageListener接口及其单一方法onMessage()实现“呼叫”

@Override public void onMessage(Message<StockPrice> arg0) { System.out.println("Received: " + arg0.getMessageObject().toString()); }

客户端代码的最后一部分是其main()方法,该方法创建一个客户端实例。

最后要做的是运行代码。 为此,我仅将所有必需的JAR文件放在一个目录中,只需考虑两个:hazel cast-3.1.jar和guava-13.0.1.jar。

屏幕截图2013年12月7日的10.51.43

完成后,我转到项目的classes目录:

cd /Users/Roger/git/captaindebug/hazelcast/target/classes

…并解雇了发布者

java -cp ./:/Users/Roger/tmp/mm/guava-13.0.1.jar:/Users/Roger/tmp/mm/hazelcast-3.1.jar com.captaindebug.hazelcast.pubsub.MarketMaker

……然后是客户。

java -cp ./:/Users/Roger/tmp/mm/guava-13.0.1.jar:/Users/Roger/tmp/mm/hazelcast-3.1.jar com.captaindebug.hazelcast.pubsub.Client

当然,如果您正在使用此粗略且已准备好的技术在计算机上运行此程序,则请记住将其替换
/Users/Roger/tmp/mm以及放置这些JAR文件副本的路径。

如果您在一个终端中运行MarketMaker发布者,并在其他两个终端中运行几个客户,那么您将得到类似的信息,在这里您可以看到正在发布的价格以及客户正在接收更新。

屏幕截图2013年12月22日的17.40.07

关于Hazelcast的一件事要注意的是,“ 集群 ”是指Hazelcast实例的集群,而不是JVM的集群。 在您为每个应用程序请求多个Hazelcast实例之前,这是不明显的。 当其他客户端加入集群时,您将看到类似以下内容:

Members [5] {
Member [192.168.0.7]:5701
Member [192.168.0.7]:5702
Member [192.168.0.7]:5703
Member [192.168.0.7]:5704 this
Member [192.168.0.7]:5705
}

在上面的日志中,有两个侦听器条目,每个侦听器条目一个,每个发布者条目三个,在MarketMakermain()方法中启动的每个MarketMaker实例一个。

屏幕快照2013-12-07 at 11.16.21
这里要考虑的事情是,是否每个对象实例创建一个Hazelcast实例是一种好习惯(就像我在示例代码中所做的那样),还是在代码中有一个static Hazelcast实例更好。 我不确定该答案是什么,因此,如果有任何Hazelcast专家正在阅读此书,请告诉我。

就是这样:Hazelcast可以在发布和订阅模式下愉快地运行,但是我还没有介绍Hazelcast的所有功能。 也许以后再说……

  • 可以在Github上找到此源代码: https : //github.com/roghughe/captaindebug/tree/master/hazelcast

参考: Captain Debug的Blog博客上的JCG合作伙伴 Roger Hughes的Hazelcast发布和订阅 。

翻译自: https://www.javacodegeeks.com/2014/01/publish-and-subscribe-with-hazelcast.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/366054.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

接口自动化测试持续集成--Soapui接口功能测试参数化

按照自动化测试分层实现的原理&#xff0c;每一层的脚本实现都要进行参数化&#xff0c;自动化的目标就是要实现脚本代码与测试数据分离。当测试数据进行调整的时候不会对脚本的实现带来震荡&#xff0c;从而提高脚本的稳定性与灵活度&#xff0c;降低脚本的维护成本。Soapui最…

Codeforces 1027E Inverse Coloring 【DP】

Codeforces 1027E Inverse Coloring 题目链接 1 #include<bits/stdc.h>2 using namespace std;3 #define N 10104 #define LL long long5 #define Mod 9982443536 int n,k;7 LL dp[N][N],ans0;8 LL sum[N][N];9 int main(){ 10 cin>>n>>k; 11 dp[0][…

合并远程仓库到本地_git远程仓库创建和合并

上周发了一个张佳波小朋友关于git的文章&#xff0c;马上就更多小朋友的回馈。其中周玉涛同志发来了自己对于git之前问题的一些理解和解决方法&#xff0c;希望能帮助更多人解决问题。为了保证周玉涛同学提供材料的完整性&#xff0c;以下将他原文和图片发出&#xff0c;不做其…

swfobject.js视频播放插件

在网页中经常会用到视频播放的功能&#xff0c;下面介绍一下swfobject.js的视频播放应用&#xff1a;html代码结构&#xff1a; <div id"video_content"></div> css样式结构&#xff1a; body{background: #003368}#video_content{width:600px;height:40…

Spring REST:异常处理卷。 3

这是该系列中有关Spring REST异常处理的最后一篇文章。 最后&#xff0c;这次我将讨论在表单处理期间可能发生的REST异常的处理。 因此&#xff0c;在本教程中&#xff0c;您将看到与REST&#xff0c;表单和异常处理有关的所有内容。 客户端呢&#xff1f; jQuery将用于反映RES…

centos6安装mysql并远程连接_Ubantu下MySQL安装、部署和远程连接

系统阿里云 ubantu 16.04MySQL 5.0/8.0连接工具 Navicat Premium安装MySQL1、MySQL 5.0直接使用apt命令安装sudo apt install mysql-server输入密码完成安装。安装完mysql-server后&#xff0c;mysql-client就带了&#xff0c;无需单独安装安装成功后输入如下命令检查数据库状态…

js中字符串和数组的使用

函数&#xff1a; 函数在调用的时候&#xff0c;会形成一个私有作用域&#xff0c;内部的变量不会被外面访问&#xff0c;这种保护机制叫闭包。这就意味着函数调用完毕&#xff0c;这个函数形成的栈内存会被销毁。 但有时候我们不希望他被销毁。 函数归属谁跟它在哪调用没有关…

Spring REST:异常处理卷。 1个

目录 Spring REST&#xff1a;异常处理卷。 1个 Spring REST&#xff1a;异常处理卷。 2 Spring REST&#xff1a;异常处理卷。 3 大家好&#xff0c;是时候继续在我的博客中发布新文章了。 因此&#xff0c;我很高兴地宣布&#xff0c;我计划编写一系列技术文章。 在当前文…

vue/cli3 配置vux

安装各插件 试过 安装“必须安装”的部分亦可 1、安装vuex npm install vuex --save-dev 2、在项目里面安装vux【必须安装】 npm install vux --save 3、安装vux-loader【必须安装】 npm install vux-loader --save-dev 4、安装less-loader&#xff08;这个是用以正确编译less源…

鼠标右键 移动选定的文件夹到指定位置_怎么把电脑桌面上的文件移动到更加安全的地方...

我们在使用电脑的时候习惯于把各种文档以及其他文件资料随手保存到电脑桌面上&#xff0c;这样操作可以方便以后对这些文档和文件资料的使用、管理&#xff0c;但是由于默认状态下桌面文件位于C盘中&#xff0c;这些文件资料不仅会占用掉C盘的很大的存储空间&#xff0c;并且日…

非常精简的Linux线程池实现(一)——使用互斥锁和条件变量

线程池的含义跟它的名字一样&#xff0c;就是一个由许多线程组成的池子。 有了线程池&#xff0c;在程序中使用多线程变得简单。我们不用再自己去操心线程的创建、撤销、管理问题&#xff0c;有什么要消耗大量CPU时间的任务通通直接扔到线程池里就好了&#xff0c;然后我们的主…

嵌入式linux系统文件,嵌入式Linux文件系统知多少

Nand/Nor Flash在嵌入式Linux产品中&#xff0c;通常使用的存储介质为Nand Flash和Nor Flash&#xff0c;而手机、相机等产品通常使用eMMC、SD Card作为存储介质&#xff0c;导致这种差异的原因主要是成本考量。Nand Flash和Nor Flash具有低成本、高密度存储的优势。但是&#…

三分钟上手Highcharts简易甘特图

根据业务需求&#xff0c;找到了这个很少使用的图形&#xff0c;话不多说&#xff0c;看看该如何使用。首先要引入支持文件&#xff1a;可根据链接下载。 exporting.js&#xff1a;https://img.hcharts.cn/highcharts/modules/exporting.js xrange.js&#xff1a;https://img.h…

WEB语义化

WEB语义化让机器读懂内容&#xff0c;HTML就带有一定「语义」的标签&#xff0c;比如段落&#xff0c;标题&#xff0c;表格和图片等。让机器读懂内容&#xff0c;那么两种方案&#xff1a;第一种让机器变得更人工智能化&#xff0c;也就是现在大火的AI。第二种是人们去发布认可…

关于使用JQ scrollTop方法进行滚动定位

没图我说个锤子&#xff0c;先来个自拍镇楼。 又到了每周周五总结时间。我广州刘德华又来讲故事了。这一周没啥任务&#xff0c;就一个任务&#xff0c;产品口头交代了两句&#xff0c;也没有psd没有设计图没有样式。自由发挥&#xff0c;你自己敲代码做个作品出来。 what&…

ssh密钥登录

方法一:使用下例中ssky-keygen和ssh-copy-id&#xff0c;仅需通过3个步骤的简单设置而无需输入密码就能登录远程Linux主机。 ssh-keygen 创建公钥和密钥。 ssh-copy-id 把本地主机的公钥复制到远程主机的authorized_keys文件上。ssh-copy-id 也会给远程主机的用户主目录&#x…

Spring REST:异常处理卷。 2

这是有关使用Spring进行REST异常处理的系列的第二篇文章。 在我以前的文章中&#xff0c;我描述了如何在REST服务中组织最简单的异常处理。 这次&#xff0c;我将更进一步&#xff0c;我将向您展示何时最好在ControllerAdvice级别上使用异常处理 。 介绍 在开始本文的技术部分…

python 装饰器有哪些_python装饰器有什么用

简言之&#xff0c;python装饰器就是用于拓展原来函数功能的一种函数&#xff0c;这个函数的特殊之处在于它的返回值也是一个函数&#xff0c;使用python装饰器的好处就是在不用更改原函数的代码前提下给函数增加新的功能。 一般而言&#xff0c;我们要想拓展原来函数代码&…

linux中查看相关日志记录,linux重启查看日志及历史记录 查询原因

linux系统文件通常在/var/log中下面是对下面常出现的文件进行解释/var/log/message ---------------------------------------系统启动后的信息和错误日志/var/log/secure ------------------------------------------与安全相关的日志信息/var/log/maillog ------------------…

04,认证、权限、频率

认证组件 Django原生的authentic组件为我们的用户注册与登录提供了认证功能&#xff0c;十分的简介与强大。同样DRF也为我们提供了认证组件&#xff0c;一起来看看DRF里面的认证组件是怎么为我们工作的&#xff01;models.py# 定义一个用户表和一个保存用户Token的表 class Use…