activemq 发两条只收到一条_浅谈ActiveMQ与使用

更多大数据架构、实战经验,欢迎关注【大数据每日哔哔】,期待与你一起成长!

本文将介绍一下 ActiveMQ 的安装、原理和简单实战。

一、什么是消息中间件

消息中间件顾名思义实现的就是在两个系统或两个客户端之间进行消息传送

5ce8ad8e45d23d2669fd178982e81226.png

二、什么是ActiveMQ

ActiveMQ是一种开源的基于JMS(Java Message Servie)规范的一种消息中间件的实现,ActiveMQ的设计目标是提供标准的,面向消息的,能够跨越多语言和多系统的应用集成消息通信中间件。

三、什么时候需要用ActiveMQ

ActiveMQ常被应用与系统业务的解耦,异步消息的推送,增加系统并发量,提高用户体验。例如以我在工作中的使用,在比较耗时且异步的远程开锁操作时

49e3953af0015d4f12e166f23bc7e213.png

四、如何使用ActiveMQ

1.AcitveMQ的数据传送流程

ec66838c8568d2088f637b62256bfd03.png

2.ActiveMQ的两种消息传递类型

(1)点对点传输,即一个生产者对应一个消费者,生产者向broke推送数据,数据存储在broke的一个队列中,当消费者接受该条队列里的数据。

(2)基于发布/订阅模式的传输,即根据订阅话题来接收相应数据,一个生产者可向多个消费者推送数据,与MQTT协议的实现是类似的,对MQTT协议有兴趣的可跳转到https://www.cnblogs.com/xiguadadage/p/11216463.html

两种消息传递类型的不同,点对点传输消费者可以接收到在连接之前生产者所推送的数据,而基于发布/订阅模式的传输方式消费者只能接收到连接之后生产者推送的数据。

3.ActiveMQ的安装与启动

(1)官网下载对应服务器版本

4cbfe622234ddb3c25add8846f07d5e7.png

(2)解压后进入apache-activemq-5.15.9/bin目录

(3)执行./activemq start启动ActiveMQ

18af0e80684c892d33203add367ba13f.png

(4)浏览器输入ActiveMQ启动的服务器ip:8161便可进入web界面,点击Manage ActiveMQ broker可以查看消息推送的状态,默认账号密码为admin,admin

a31fb2c9935d7d814e74400488c7091f.png

(5)启动错误分析

进入/root/apache-activemq-5.15.9/data目录查看activemq.log文件,根据错误提示信息修改,例如端口号被占用等。

4.ActiveMQ的代码测试

(1)构建maven项目,引入依赖

org.apache.activemq    activemq-all    5.9.0

(2)生产者类

/** * @Description 生产者 * @Date 2019/7/20 * @Created by yqh */public class MyProducer {    private static final String ACTIVEMQ_URL = "tcp://192.168.168.242:61616";    public static void main(String[] args) throws JMSException {        // 创建连接工厂        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);        // 创建连接        Connection connection = activeMQConnectionFactory.createConnection();        // 打开连接        connection.start();        // 创建会话        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);        // 创建队列目标,并标识队列名称,消费者根据队列名称接收数据        Destination destination = session.createQueue("myQueue");        // 创建一个生产者        MessageProducer producer = session.createProducer(destination);        // 向队列推送10个文本消息数据        for (int i = 1 ; i <= 10 ; i++){            // 创建文本消息            TextMessage message = session.createTextMessage("第" + i + "个文本消息");            //发送消息            producer.send(message);            //在本地打印消息            System.out.println("已发送的消息:" + message.getText());        }        //关闭连接        connection.close();    }}

运行结果:

已发送的消息:第1个文本消息已发送的消息:第2个文本消息已发送的消息:第3个文本消息已发送的消息:第4个文本消息已发送的消息:第5个文本消息已发送的消息:第6个文本消息已发送的消息:第7个文本消息已发送的消息:第8个文本消息已发送的消息:第9个文本消息已发送的消息:第10个文本消息

测试查看web后台显示,有10条消息在队列中等待消费

741f805875676838d69010b9abfc15d1.png

(3)消费者类

/** * @Description 消费者类 * @Date 2019/7/20 0020 * @Created by yqh */public class MyConsumer {    private static final String ACTIVEMQ_URL = "tcp://192.168.168.242:61616";    public static void main(String[] args) throws JMSException {        // 创建连接工厂        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);        // 创建连接        Connection connection = activeMQConnectionFactory.createConnection();        // 打开连接        connection.start();        // 创建会话        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);        // 创建队列目标,并标识队列名称,消费者根据队列名称接收数据        Destination destination = session.createQueue("myQueue");        // 创建消费者        MessageConsumer consumer = session.createConsumer(destination);        // 创建消费的监听        consumer.setMessageListener(new MessageListener() {            public void onMessage(Message message) {                TextMessage textMessage = (TextMessage) message;                try {                    System.out.println("消费的消息:" + textMessage.getText());                } catch (JMSException e) {                    e.printStackTrace();                }            }        });    }}

测试结果:

消费的消息:第1个文本消息消费的消息:第2个文本消息消费的消息:第3个文本消息消费的消息:第4个文本消息消费的消息:第5个文本消息消费的消息:第6个文本消息消费的消息:第7个文本消息消费的消息:第8个文本消息消费的消息:第9个文本消息消费的消息:第10个文本消息

web后台显示有一个消费者处于连接状态,且已消费了10个message,而该条队列已没有message待消费了

70affd879daf184562b635fee86edd35.png

(4)当我们运行两个消费者类,消息又是怎么被消费的呢?是两个消费者都能收到生产者生产的message,还是只有其中一个消费者能消费呢?

我们先运行两个消费者,在运行一个生产者对目标队列生产10个message,会发现有以下情况

// Consumer1控制台消费的消息:第1个文本消息消费的消息:第3个文本消息消费的消息:第5个文本消息消费的消息:第7个文本消息消费的消息:第9个文本消息
// Consumer2控制台消费的消息:第2个文本消息消费的消息:第4个文本消息消费的消息:第6个文本消息消费的消息:第8个文本消息消费的消息:第10个文本消息

即队列中的数据会平均的分给每一个消费者消费,且每一条数据只能被消费一次

(5)以上是基于队列点对点的传输类型,以下是基于发布/订阅模式传输的类型测试

/** * @Description 基于发布/订阅模式传输类型的生产者测试 * @Date 2019/7/20 0020 * @Created by yqh */public class MyProducerForTopic {    private static final String ACTIVEMQ_URL = "tcp://192.168.168.242:61616";    public static void main(String[] args) throws JMSException {        // 创建连接工厂        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);        // 创建连接        Connection connection = activeMQConnectionFactory.createConnection();        // 打开连接        connection.start();        // 创建会话        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);        // 创建队列目标,并标识队列名称,消费者根据队列名称接收数据        Destination destination = session.createTopic("topicTest");        // 创建一个生产者        MessageProducer producer = session.createProducer(destination);        // 向队列推送10个文本消息数据        for (int i = 1 ; i <= 10 ; i++){            // 创建文本消息            TextMessage message = session.createTextMessage("第" + i + "个文本消息");            //发送消息            producer.send(message);            //在本地打印消息            System.out.println("已发送的消息:" + message.getText());        }        //关闭连接        connection.close();    }}
/** * @Description 基于发布/订阅模式传输类型的消费者测试 * @Date 2019/7/20 0020 * @Created by yqh */public class MyConsumerForTopic {    private static final String ACTIVEMQ_URL = "tcp://192.168.168.242:61616";    public static void main(String[] args) throws JMSException {        // 创建连接工厂        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);        // 创建连接        Connection connection = activeMQConnectionFactory.createConnection();        // 打开连接        connection.start();        // 创建会话        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);        // 创建队列目标,并标识队列名称,消费者根据队列名称接收数据        Destination destination = session.createTopic("topicTest");        // 创建消费者        MessageConsumer consumer = session.createConsumer(destination);        // 创建消费的监听        consumer.setMessageListener(new MessageListener() {            public void onMessage(Message message) {                TextMessage textMessage = (TextMessage) message;                try {                    System.out.println("消费的消息:" + textMessage.getText());                } catch (JMSException e) {                    e.printStackTrace();                }            }        });    }}

现在如果我们先启动生产者,再启动消费者,会发现消费者是无法接收到之前生产者之前所生产的数据,只有消费者先启动,再让生产者消费才可以正常接收数据,这也是发布/订阅的主题模式与点对点的队列模式的一个明显区别。

而如果启动两个消费者,那么每一个消费者都能完整的接收到生产者生产的数据,即每一条数据都被消费了两次,这是发布/订阅的主题模式与点对点的队列模式的另一个明显区别。

更多大数据架构、实战经验,欢迎关注【大数据每日哔哔】,期待与你一起成长!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/276727.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

php发送get、post请求的几种方法

方法1: 用file_get_contents 以get方式获取内容 <?php $urlhttp://www.domain.com/; $html file_get_contents($url); echo $html; ?>方法2: 用fopen打开url, 以get方式获取内容<?php $fp fopen($url, r); stream_get_meta_data($fp); while(!feof($fp)) { $res…

ZZ:深入理解new

new的过程当我们使用关键字new在堆上动态创建一个对象时&#xff0c;它实际上做了三件事&#xff1a;获得一块内存空间、调用构造函数、返回正确的指针。当然&#xff0c;如果我们创建的是简单类型的变量&#xff0c;那么第二步会被省略。假如我们定义了如下一个类A&#xff1a…

mysql数据库的优缺点

优点1. 通常存储过程 标题有助于提高应用程序的性能。因为当你创建他的时候就已经编译了&#xff0c;只不过是按需编译的。2.存储过程有助于减少应用程序和数据库服务器之间的流量&#xff0c;因为应用程序不必发送多个冗长的SQL语句&#xff0c;而只能发送存储过程的名称和参数…

大数据小白系列——HDFS(1)

【注1&#xff1a;结尾有大福利&#xff01;】 【注2&#xff1a;想写一个大数据小白系列&#xff0c;介绍大数据生态系统中的主要成员&#xff0c;理解其原理&#xff0c;明白其用途&#xff0c;万一有用呢&#xff0c;对不对。】 大数据是什么&#xff1f;抛开那些高大上但笼…

PHP检测远端文件是否存在

简单解释一下上面的代码。get_headers的作用就是访问一个远程地址&#xff0c;把服务器发送的HTTP头以数组形式返回。而$header[0]则是服务器返回的状态码&#xff08;如果不出意外的话状态码应该都是第一个返回的&#xff09;。 要确定一个文件在远端服务器上存在&#xff0c…

C#中使用DTS来导入数据及相关问题

向Sql 中导入Excel数据时&#xff0c;使用MS SQL的DTS功能 可以很方便的导入&#xff0c;同时引用Dll文件&#xff0c;可以在程序中对导入过程进行控制。 创建DTS包的过程如下&#xff1a; &#xff11;。在&#xff33;&#xff31;&#xff2c;企业管理器中&#xff0c;工具菜…

html select选择事件_一道搜狗面试题:IO多路复用中select、poll、epoll之间的区别...

(1)select>时间复杂度O(n)它仅仅知道了&#xff0c;有I/O事件发生了&#xff0c;却并不知道是哪那几个流(可能有一个&#xff0c;多个&#xff0c;甚至全部)&#xff0c;我们只能无差别轮询所有流&#xff0c;找出能读出数据&#xff0c;或者写入数据的流&#xff0c;对他们…

【MySQL】redo log --- 刷入磁盘过程

1、redo log基本概念 redo log的相关概念这里就不再过多阐述&#xff0c;网上有非常多的好的资料&#xff0c;可以看下缥缈大神的文章&#xff1a;https://www.cnblogs.com/cuisi/p/6525077.html&#xff0c;个人感觉介绍的非常详细。 2、数据更改过程简述 MySQL 在更新数据的时…

WPF DataGrid根据内容设置行颜色

转&#xff1a; https://code.4noobz.net/wpf-change-color-of-a-row-in-a-datagrid-depending-on-the-value/ 转载于:https://www.cnblogs.com/Mindy-hym/p/11475024.html

QQ web api

QQ的很多功能和信息可以通过web方式获得&#xff5e;以下链接&#xff0c;星号应换成你要查询的QQ号一、Activities Previewhttp://labs.qq.com/ie8/preview/?uin******二、QQ空间访问次数查询&#xff08;需权限&#xff09;http://g.qzone.qq.com/fcg-bin/cgi_emotion_list.…

delphi tclientsocket接收不到返回数据_RS—485中教你主站发送报文结构、从站返回报文结构?系列11...

作者&#xff1a;马乐1.主站发送报文结构大家可以看到我之前写的文章中的程序都是没有什么具体功能的&#xff0c;都是两个站点之间互相传递数据&#xff0c;这些数据我们只是看看是否可以正常接收发送&#xff0c;数据本身是没有任何含义的。很明显在实际使用过程中我们是不会…

MybatisPlus 通用枚举无法正确取值

正常使用mybatisplus <dependency><groupId>com.baomidou</groupId><artifactId>mybatis-plus-boot-starter</artifactId><version>3.0.4</version></dependency> 使用后发现项目中原先对枚举值的转换存在异常&#xff1a; ER…

零基础学习 Python 之条件语句

写在之前 我们写程序&#xff0c;就好比学生时代写作文一样&#xff0c;由 “字” 到 “词” 到 “句” 最后到 “文章” 。此前我们学会了一些词语&#xff08;对象类型&#xff09;&#xff0c;我们接下来就是学如何造句&#xff0c;而在编程语言里&#xff0c;句子被叫做语句…

python input 文件名_Python播放音频与录音

这一讲主要介绍些音频基本处理方式&#xff0c;为接下来的语音识别打基础。三种播放音频的方式使用 python 播放音频有以下几种方式&#xff1a;os.system()os.system(file) 调用系统应用来打开文件&#xff0c;file 可为图片或者音频文件。缺点&#xff1a;要打开具体的应用&a…

jQuery选择器和方法的等价关系

层级选择器 1、ancestor descendant &#xff08;后代选择器&#xff09; 在给定的祖先元素下匹配所有的后代元素 $(“form input”) $(.div span)选取<div>里的所有的<span>元素 2、parent child &#xff08;子选择器&#xff09;在给定的父元素下匹配所有…

ActionScript 3.0 Step By Step系列(四):来自面向对象开发之前的呐喊:“学会写可重用的代码”...

增强代码的可重用能力&#xff0c;从创建可重用的代码开始&#xff0c;可重用的代码则是通过从现有代码中重构加以封装,使其成为功能单一的可复用代码块。这句话笼统点说便是“封装”或“抽象”。 在实际的编程开发中&#xff0c;要实现代码重用&#xff0c;而不是每次都去Copy…

express利用nodemailer发送邮件(163邮箱)

Nodemailer 是一个简单易用的Node.js邮件发送组件 首先安装这个组件 npm install nodemailer --save安装之后&#xff0c;可以在某个get请求下&#xff0c;发送邮件&#xff0c;具体路由代码&#xff1a; const express require("express"); const nodemailer requ…

使用 Solid 私有化存储 IPFS 文件哈希值

背景 星际文件系统 IPFS&#xff08;InterPlanetary File System&#xff09;是一个面向全球的、点对点的分布式文件系统&#xff0c;目标是为了补充&#xff08;甚至是取代&#xff09;目前统治互联网的超文本传输协议&#xff08;HTTP&#xff09;&#xff0c;将所有具有相同…

使用window.postMessage实现跨域通信

JavaScript由于同源策略的限制,跨域通信一直是棘手的问题。当然解决方案也有很多&#xff1a; document.domainiframe的设置&#xff0c;应用于主域相同而子域不同&#xff1b;利用iframe和location.hash&#xff0c;数据直接暴露在了url中&#xff0c;数据容量和类型都有限Fla…

appium启动app失败_Appium-Desktop Capability 配置及启动App演示

Appium-Desktop Capability配置介绍desired capability的功能是配置Appium会话。为什么要配置capability&#xff0c;目的就是为了告诉Appium服务器您想要自动化的平台和应用程序。Desired Capabilities是一组设置的键值对的集合&#xff0c;其中键对应设置的名称&#xff0c;而…