activemq 发两条只收到一条_浅谈ActiveMQ与使用

更多大数据架构、实战经验,欢迎关注【大数据每日哔哔】,期待与你一起成长!

本文将介绍一下 ActiveMQ 的安装、原理和简单实战。

一、什么是消息中间件

消息中间件顾名思义实现的就是在两个系统或两个客户端之间进行消息传送

5ce8ad8e45d23d2669fd178982e81226.png

二、什么是ActiveMQ

ActiveMQ是一种开源的基于JMS(Java Message Servie)规范的一种消息中间件的实现,ActiveMQ的设计目标是提供标准的,面向消息的,能够跨越多语言和多系统的应用集成消息通信中间件。

三、什么时候需要用ActiveMQ

ActiveMQ常被应用与系统业务的解耦,异步消息的推送,增加系统并发量,提高用户体验。例如以我在工作中的使用,在比较耗时且异步的远程开锁操作时

49e3953af0015d4f12e166f23bc7e213.png

四、如何使用ActiveMQ

1.AcitveMQ的数据传送流程

ec66838c8568d2088f637b62256bfd03.png

2.ActiveMQ的两种消息传递类型

(1)点对点传输,即一个生产者对应一个消费者,生产者向broke推送数据,数据存储在broke的一个队列中,当消费者接受该条队列里的数据。

(2)基于发布/订阅模式的传输,即根据订阅话题来接收相应数据,一个生产者可向多个消费者推送数据,与MQTT协议的实现是类似的,对MQTT协议有兴趣的可跳转到https://www.cnblogs.com/xiguadadage/p/11216463.html

两种消息传递类型的不同,点对点传输消费者可以接收到在连接之前生产者所推送的数据,而基于发布/订阅模式的传输方式消费者只能接收到连接之后生产者推送的数据。

3.ActiveMQ的安装与启动

(1)官网下载对应服务器版本

4cbfe622234ddb3c25add8846f07d5e7.png

(2)解压后进入apache-activemq-5.15.9/bin目录

(3)执行./activemq start启动ActiveMQ

18af0e80684c892d33203add367ba13f.png

(4)浏览器输入ActiveMQ启动的服务器ip:8161便可进入web界面,点击Manage ActiveMQ broker可以查看消息推送的状态,默认账号密码为admin,admin

a31fb2c9935d7d814e74400488c7091f.png

(5)启动错误分析

进入/root/apache-activemq-5.15.9/data目录查看activemq.log文件,根据错误提示信息修改,例如端口号被占用等。

4.ActiveMQ的代码测试

(1)构建maven项目,引入依赖

org.apache.activemq    activemq-all    5.9.0

(2)生产者类

/** * @Description 生产者 * @Date 2019/7/20 * @Created by yqh */public class MyProducer {    private static final String ACTIVEMQ_URL = "tcp://192.168.168.242:61616";    public static void main(String[] args) throws JMSException {        // 创建连接工厂        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);        // 创建连接        Connection connection = activeMQConnectionFactory.createConnection();        // 打开连接        connection.start();        // 创建会话        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);        // 创建队列目标,并标识队列名称,消费者根据队列名称接收数据        Destination destination = session.createQueue("myQueue");        // 创建一个生产者        MessageProducer producer = session.createProducer(destination);        // 向队列推送10个文本消息数据        for (int i = 1 ; i <= 10 ; i++){            // 创建文本消息            TextMessage message = session.createTextMessage("第" + i + "个文本消息");            //发送消息            producer.send(message);            //在本地打印消息            System.out.println("已发送的消息:" + message.getText());        }        //关闭连接        connection.close();    }}

运行结果:

已发送的消息:第1个文本消息已发送的消息:第2个文本消息已发送的消息:第3个文本消息已发送的消息:第4个文本消息已发送的消息:第5个文本消息已发送的消息:第6个文本消息已发送的消息:第7个文本消息已发送的消息:第8个文本消息已发送的消息:第9个文本消息已发送的消息:第10个文本消息

测试查看web后台显示,有10条消息在队列中等待消费

741f805875676838d69010b9abfc15d1.png

(3)消费者类

/** * @Description 消费者类 * @Date 2019/7/20 0020 * @Created by yqh */public class MyConsumer {    private static final String ACTIVEMQ_URL = "tcp://192.168.168.242:61616";    public static void main(String[] args) throws JMSException {        // 创建连接工厂        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);        // 创建连接        Connection connection = activeMQConnectionFactory.createConnection();        // 打开连接        connection.start();        // 创建会话        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);        // 创建队列目标,并标识队列名称,消费者根据队列名称接收数据        Destination destination = session.createQueue("myQueue");        // 创建消费者        MessageConsumer consumer = session.createConsumer(destination);        // 创建消费的监听        consumer.setMessageListener(new MessageListener() {            public void onMessage(Message message) {                TextMessage textMessage = (TextMessage) message;                try {                    System.out.println("消费的消息:" + textMessage.getText());                } catch (JMSException e) {                    e.printStackTrace();                }            }        });    }}

测试结果:

消费的消息:第1个文本消息消费的消息:第2个文本消息消费的消息:第3个文本消息消费的消息:第4个文本消息消费的消息:第5个文本消息消费的消息:第6个文本消息消费的消息:第7个文本消息消费的消息:第8个文本消息消费的消息:第9个文本消息消费的消息:第10个文本消息

web后台显示有一个消费者处于连接状态,且已消费了10个message,而该条队列已没有message待消费了

70affd879daf184562b635fee86edd35.png

(4)当我们运行两个消费者类,消息又是怎么被消费的呢?是两个消费者都能收到生产者生产的message,还是只有其中一个消费者能消费呢?

我们先运行两个消费者,在运行一个生产者对目标队列生产10个message,会发现有以下情况

// Consumer1控制台消费的消息:第1个文本消息消费的消息:第3个文本消息消费的消息:第5个文本消息消费的消息:第7个文本消息消费的消息:第9个文本消息
// Consumer2控制台消费的消息:第2个文本消息消费的消息:第4个文本消息消费的消息:第6个文本消息消费的消息:第8个文本消息消费的消息:第10个文本消息

即队列中的数据会平均的分给每一个消费者消费,且每一条数据只能被消费一次

(5)以上是基于队列点对点的传输类型,以下是基于发布/订阅模式传输的类型测试

/** * @Description 基于发布/订阅模式传输类型的生产者测试 * @Date 2019/7/20 0020 * @Created by yqh */public class MyProducerForTopic {    private static final String ACTIVEMQ_URL = "tcp://192.168.168.242:61616";    public static void main(String[] args) throws JMSException {        // 创建连接工厂        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);        // 创建连接        Connection connection = activeMQConnectionFactory.createConnection();        // 打开连接        connection.start();        // 创建会话        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);        // 创建队列目标,并标识队列名称,消费者根据队列名称接收数据        Destination destination = session.createTopic("topicTest");        // 创建一个生产者        MessageProducer producer = session.createProducer(destination);        // 向队列推送10个文本消息数据        for (int i = 1 ; i <= 10 ; i++){            // 创建文本消息            TextMessage message = session.createTextMessage("第" + i + "个文本消息");            //发送消息            producer.send(message);            //在本地打印消息            System.out.println("已发送的消息:" + message.getText());        }        //关闭连接        connection.close();    }}
/** * @Description 基于发布/订阅模式传输类型的消费者测试 * @Date 2019/7/20 0020 * @Created by yqh */public class MyConsumerForTopic {    private static final String ACTIVEMQ_URL = "tcp://192.168.168.242:61616";    public static void main(String[] args) throws JMSException {        // 创建连接工厂        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);        // 创建连接        Connection connection = activeMQConnectionFactory.createConnection();        // 打开连接        connection.start();        // 创建会话        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);        // 创建队列目标,并标识队列名称,消费者根据队列名称接收数据        Destination destination = session.createTopic("topicTest");        // 创建消费者        MessageConsumer consumer = session.createConsumer(destination);        // 创建消费的监听        consumer.setMessageListener(new MessageListener() {            public void onMessage(Message message) {                TextMessage textMessage = (TextMessage) message;                try {                    System.out.println("消费的消息:" + textMessage.getText());                } catch (JMSException e) {                    e.printStackTrace();                }            }        });    }}

现在如果我们先启动生产者,再启动消费者,会发现消费者是无法接收到之前生产者之前所生产的数据,只有消费者先启动,再让生产者消费才可以正常接收数据,这也是发布/订阅的主题模式与点对点的队列模式的一个明显区别。

而如果启动两个消费者,那么每一个消费者都能完整的接收到生产者生产的数据,即每一条数据都被消费了两次,这是发布/订阅的主题模式与点对点的队列模式的另一个明显区别。

更多大数据架构、实战经验,欢迎关注【大数据每日哔哔】,期待与你一起成长!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/276727.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

ZZ:深入理解new

new的过程当我们使用关键字new在堆上动态创建一个对象时&#xff0c;它实际上做了三件事&#xff1a;获得一块内存空间、调用构造函数、返回正确的指针。当然&#xff0c;如果我们创建的是简单类型的变量&#xff0c;那么第二步会被省略。假如我们定义了如下一个类A&#xff1a…

大数据小白系列——HDFS(1)

【注1&#xff1a;结尾有大福利&#xff01;】 【注2&#xff1a;想写一个大数据小白系列&#xff0c;介绍大数据生态系统中的主要成员&#xff0c;理解其原理&#xff0c;明白其用途&#xff0c;万一有用呢&#xff0c;对不对。】 大数据是什么&#xff1f;抛开那些高大上但笼…

html select选择事件_一道搜狗面试题:IO多路复用中select、poll、epoll之间的区别...

(1)select>时间复杂度O(n)它仅仅知道了&#xff0c;有I/O事件发生了&#xff0c;却并不知道是哪那几个流(可能有一个&#xff0c;多个&#xff0c;甚至全部)&#xff0c;我们只能无差别轮询所有流&#xff0c;找出能读出数据&#xff0c;或者写入数据的流&#xff0c;对他们…

delphi tclientsocket接收不到返回数据_RS—485中教你主站发送报文结构、从站返回报文结构?系列11...

作者&#xff1a;马乐1.主站发送报文结构大家可以看到我之前写的文章中的程序都是没有什么具体功能的&#xff0c;都是两个站点之间互相传递数据&#xff0c;这些数据我们只是看看是否可以正常接收发送&#xff0c;数据本身是没有任何含义的。很明显在实际使用过程中我们是不会…

MybatisPlus 通用枚举无法正确取值

正常使用mybatisplus <dependency><groupId>com.baomidou</groupId><artifactId>mybatis-plus-boot-starter</artifactId><version>3.0.4</version></dependency> 使用后发现项目中原先对枚举值的转换存在异常&#xff1a; ER…

python input 文件名_Python播放音频与录音

这一讲主要介绍些音频基本处理方式&#xff0c;为接下来的语音识别打基础。三种播放音频的方式使用 python 播放音频有以下几种方式&#xff1a;os.system()os.system(file) 调用系统应用来打开文件&#xff0c;file 可为图片或者音频文件。缺点&#xff1a;要打开具体的应用&a…

ActionScript 3.0 Step By Step系列(四):来自面向对象开发之前的呐喊:“学会写可重用的代码”...

增强代码的可重用能力&#xff0c;从创建可重用的代码开始&#xff0c;可重用的代码则是通过从现有代码中重构加以封装,使其成为功能单一的可复用代码块。这句话笼统点说便是“封装”或“抽象”。 在实际的编程开发中&#xff0c;要实现代码重用&#xff0c;而不是每次都去Copy…

express利用nodemailer发送邮件(163邮箱)

Nodemailer 是一个简单易用的Node.js邮件发送组件 首先安装这个组件 npm install nodemailer --save安装之后&#xff0c;可以在某个get请求下&#xff0c;发送邮件&#xff0c;具体路由代码&#xff1a; const express require("express"); const nodemailer requ…

使用 Solid 私有化存储 IPFS 文件哈希值

背景 星际文件系统 IPFS&#xff08;InterPlanetary File System&#xff09;是一个面向全球的、点对点的分布式文件系统&#xff0c;目标是为了补充&#xff08;甚至是取代&#xff09;目前统治互联网的超文本传输协议&#xff08;HTTP&#xff09;&#xff0c;将所有具有相同…

appium启动app失败_Appium-Desktop Capability 配置及启动App演示

Appium-Desktop Capability配置介绍desired capability的功能是配置Appium会话。为什么要配置capability&#xff0c;目的就是为了告诉Appium服务器您想要自动化的平台和应用程序。Desired Capabilities是一组设置的键值对的集合&#xff0c;其中键对应设置的名称&#xff0c;而…

以Windows服务方式运行.NET Core程序

原文:以Windows服务方式运行.NET Core程序在之前一篇博客《以Windows服务方式运行ASP.NET Core程序》中我讲述了如何把ASP.NET Core程序作为Windows服务运行的方法&#xff0c;而今&#xff0c;我们又遇到了新的问题&#xff0c;那就是&#xff1a;我们的控制台程序&#xff0c…

好用的shell工具_精选5个酷毙的Python工具

来自&#xff1a;Python之禅工欲善其事必先利其器&#xff0c;一个好的工具能让起到事半功倍的效果&#xff0c;Python社区提供了足够多的优秀工具来帮助开发者更方便的实现某些想法&#xff0c;下面这几个工具给我的工作也带来了很多便利&#xff0c;推荐给追求美好事物的你。…

承载辉煌历史 畅想无线未来

看了JustDI的文章“手机也能当电脑用&#xff1f;&#xff0d;&#xff0d;谈谈未来智能手机操作系统的走向”&#xff0c;作为嵌入式爱好者&#xff0c;我也想谈谈自己的看法。首先&#xff0c;从网络发展的角度看&#xff0c;移动互联网的宽带化&#xff0c;宽带互联网的移动…

6款国内外SNS开源软件 搭建社交网站利器

SNS(Social Network Service)&#xff0c;有时称为社交网络&#xff0c;有时称为社会化网络&#xff0c;专指旨在帮助人们建立社会性网络的互联网应用服务。如果对SNS概念还很模糊&#xff0c;说到人人网、开心网你就明白了。 去年360圈、蚂蚁网接连关站给SNS前景蒙上一层阴影&…

aop实现原理_从宏观的实现原理和设计本质入手,带你理解 AOP 框架的原理

点击上方“Java知音”&#xff0c;选择“置顶公众号”技术文章第一时间送达&#xff01;作者&#xff1a;FeelsChaoticjuejin.im/post/5c57b2d5e51d457ffd56ffbb前言本文将从另一个角度讲解 AOP&#xff0c;从宏观的实现原理和设计本质入手。大部分讲 AOP 的博文都是一上来就罗…

xxl-job源码分析

xxl-job源码分析 xxl-job 系统说明 安装 安装部署参考文档&#xff1a;分布式任务调度平台xxl-job 功能 定时调度、服务解耦、灵活控制跑批时间&#xff08;停止、开启、重新设定时间、手动触发&#xff09; XXL-JOB是一个轻量级分布式任务调度平台&#xff0c;其核心设计目标是…

定制jQuery File Upload为微博式单文件上传

原文链接&#xff1a;http://avnpc.com/pages/single-file-upload-component-by-jquery-file-upload jQuery File Upload是一个非常优秀的上传组件&#xff0c;主要使用了XHR作为上传方式&#xff0c;并且利用了相当多的现代浏览器功能&#xff0c;所以可以实现诸如批量上传、超…

vb趣味编程弹球小游戏_最好玩的微信小游戏集合,总有一款是你没玩过的

大家好&#xff0c;这里是小雅龙生活趣味时间&#xff0c;自从17年微信推出小游戏程序以来&#xff0c;微信小游戏行业可谓是炙手可热&#xff0c;知道2019年不断有许许多多的微信小游戏如雨后春笋般的生根发芽。下面就由我带大家来看看今年最好玩&#xff0c;最受欢迎的微信小…

Golang——垃圾回收GC(2)

1 垃圾回收中的重要概念 1.1 定义 In computer science, garbage collection (GC) is a form of automatic memory management. The garbage collector, or just collector, attempts to reclaim garbage, or memory occupied by objects that are no longer in use by the pro…

java gui框架_推荐!程序员整理的Java资源大全

构建这里搜集了用来构建应用程序的工具。Apache Maven&#xff1a;Maven使用声明进行构建并进行依赖管理&#xff0c;偏向于使用约定而不是配置进行构建。Maven优于Apache Ant。后者采用了一种过程化的方式进行配置&#xff0c;所以维护起来相当困难。Gradle&#xff1a;Gradle…