1 ActiveMQ简介
1.1 ActiveMQ是什么
ActiveMQ是一个消息队列应用服务器(推送服务器)。支持JMS规范。
1.1.1 JMS概述
全称:Java Message Service ,即为Java消息服务,是一套java消息服务的API标准。(标准即接口)
实现了JMS标准的系统,称之为JMS Provider。
1.1.2 消息队列
1.1.2.1 概念
消息队列是在消息的传输过程中保存消息的容器,提供一种不同进程或者同一进程不同线程直接通讯的方式。
Producer:消息生产者,负责产生和发送消息到Broker;
Broker:消息处理中心。负责消息存储、确认、重试等,一般其中会包含多个queue;
Consumer:消息消费者,负责从Broker中获取消息,并进行相应处理;
1.1.2.2 常见消息队列应用
(1)、ActiveMQ
ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现。
(2)、RabbitMQ
RabbitMQ是一个在AMQP基础上完成的,可复用的企业消息系统。他遵循Mozilla Public License开源协议。开发语言为Erlang。
(3)、RocketMQ
由阿里巴巴定义开发的一套消息队列应用服务。
1.2 ActiveMQ能做什么
(1)实现两个不同应用(程序)之间的消息通讯。
(2)实现同一个应用,不同模块之间的消息通讯。(确保数据发送的稳定性)
1.3 ActiveMQ下载
ActiveMQ下载地址:http://activemq.apache.org/download-archives.html
--可供下载的历史版本
--说明:
ActiveMQ 5.10.x以上版本必须使用JDK1.8才能正常使用。
ActiveMQ 5.9.x及以下版本使用JDK1.7即可正常使用。
--根据操作系统,选择下载版本。(本教程下载Linux版本)
1.4 ActiveMQ主要特点
(1)支持多语言、多协议客户端。语言: Java,C,C++,C#,Ruby,Perl,Python,PHP。应用协议:OpenWire,Stomp REST,WS Notification,XMPP,AMQP
(2)对Spring的支持,ActiveMQ可以很容易整合到Spring的系统里面去。
(3)支持高可用、高性能的集群模式。
2 入门示例
2.1 需求
使用ActiveMQ实现消息队列模型。
2.2 配置步骤说明
(1)搭建ActiveMQ消息服务器。
(2)创建一个java项目。
(3)创建消息生产者,发送消息。
(4)创建消息消费者,接收消息。
2.3 第一部分:搭建ActiveMQ消息服务器
2.3.1 第一步:下载、上传至Linux
--说明:确保已经安装了jdk
2.3.2 第二步:安装到/usr/local/activemq目录
(1)解压到/usr/local目录下
[root@node07192 ~]# tar -zxvf apache-activemq-5.9.0-bin.tar.gz -C /usr/local
(2)修改名称为activemq
[root@node07192 ~]# cd /usr/local/
[root@node07192 local]# mv apache-activemq-5.9.0/ activemq
2.3.3 第三步:启动ActiveMQ服务器
--说明:ActiveMQ是免安装软件,解压即可启动服务。
[root@node07192 local]# cd activemq/bin
[root@node07192 bin]# ./activemq start
--查看ActiveMQ启动状态
[root@node07192 bin]# ./activemq status
2.3.4 第四步:浏览器访问ActiveMQ管理界面
2.3.4.1 Step1:查看ActiveMQ管理界面的服务端口。在/conf/jetty.xml中
--访问管理控制台的服务端口,默认为:8161
[root@node07192 bin]# cd ../conf
[root@node07192 conf]# vim jetty.xml
2.3.4.2 Step2:查看ActiveMQ用户、密码。在/conf/users.properties中:
--默认的用户名、密码均为amdin
[root@node07192 conf]# vim users.properties
2.3.4.3 Step3:访问ActiveMQ管理控制台。地址:http://ip:8161/
--注意:防火墙是没有配置该服务的端口的。
因此,要访问该服务,必须在防火墙中配置。
(1)修改防火墙,开放8161端口
[root@node07192 conf]# vim /etc/sysconfig/iptables
(2)重启防火墙
[root@node07192 conf]# service iptables restart
(3)登录管理控制台
--登陆,用户名、密码均为admin
--控制台主界面
--搭建ActiveMQ服务器成功!!!
2.4 第二部分:创建java项目,导入jar包
--导包说明:
ActiveMQ的解压包中,提供了运行ActiveMQ的所有jar。
--创建项目
2.5 第三部分:创建消息生成者,发送消息
--说明:ActiveMQ是实现了JMS规范的。在实现消息服务的时候,必须基于API接口规范。
2.5.1 JMS常用的API说明
下述API都是接口类型,定义在javax.jms包中,是JMS标准接口定义。ActiveMQ完全实现这一套api标准。
2.5.1.1 ConnectionFactory
链接工厂, 用于创建链接的工厂类型。
2.5.1.2 Connection
链接,用于建立访问ActiveMQ连接的类型,由链接工厂创建。
2.5.1.3 Session
会话, 一次持久有效、有状态的访问,由链接创建。
2.5.1.4 Destination & Queue & Topic
目的地, 即本次访问ActiveMQ消息队列的地址,由Session会话创建。
(1)interfaceQueue extends Destination
(2)Queue:队列模型,只有一个消费者。消息一旦被消费,默认删除。
(3)Topic:主题订阅中的消息,会发送给所有的消费者同时处理。
2.5.1.5 Message
消息,在消息传递过程中数据载体对象,是所有消息【文本消息TextMessage,对象消息ObjectMessage等】具体类型的顶级接口,可以通过会话创建或通过会话从ActiveMQ服务中获取。
2.5.1.6 MessageProducer
消息生成者, 在一次有效会话中,用于发送消息给ActiveMQ服务的工具,由Session会话创建。
2.5.1.7 MessageCustomer
消息消费者【消息订阅者,消息处理者】, 在一次有效会话中,用于ActiveMQ服务中获取消息的工具,由Session会话创建。
我们定义的消息生产者和消费者,都是基于上面API实现的。
2.5.2 第一步:创建MyProducer类,定义sendMessage方法
package cn.gzsxt.mq.producer;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
public class MyProducer {
// 定义链接工厂
ConnectionFactory connectionFactory = null;
// 定义链接
Connection connection = null;
// 定义会话
Session session = null;
// 定义目的地
Destination destination = null;
// 定义消息生成者
MessageProducer producer = null;
// 定义消息
Message message = null;
public void sendToMQ(){
try{
/*
* 创建链接工厂
* ActiveMQConnectionFactory - 由ActiveMQ实现的ConnectionFactory接口实现类.
* 构造方法: public ActiveMQConnectionFactory(String userName, String password, String brokerURL)
* userName - 访问ActiveMQ服务的用户名,用户名可以通过jetty-realm.properties配置文件配置.
* password - 访问ActiveMQ服务的密码,密码可以通过jetty-realm.properties配置文件配置.
* brokerURL - 访问ActiveMQ服务的路径地址.路径结构为-协议名://主机地址:端口号
* 此链接基于TCP/IP协议.
*/
connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.23.13:61616");
// 创建链接对象
connection = connectionFactory.createConnection();
// 启动链接
connection.start();
/*
* 创建会话对象
* 方法- connection.createSession(boolean transacted,int acknowledgeMode);
* transacted - 是否使用事务,可选值为true|false
* true - 使用事务,当设置此变量值,则acknowledgeMode参数无效,建议传递的acknowledgeMode参数值为
* Session.SESSION_TRANSACTED
* false - 不使用事务,设置此变量值,则acknowledgeMode参数必须设置.
* acknowledgeMode - 消息确认机制,可选值为:
* Session.AUTO_ACKNOWLEDGE - 自动确认消息机制
* Session.CLIENT_ACKNOWLEDGE - 客户端确认消息机制
* Session.DUPS_OK_ACKNOWLEDGE - 有副本的客户端确认消息机制
*/
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建目的地,目的地命名即队列命名,消息消费者需要通过此命名访问对应的队列
destination = session.createQueue("test-mq");
// 创建消息生成者,创建的消息生成者与某目的地对应,即方法参数目的地.
producer = session.createProducer(destination);
// 创建消息对象,创建一个文本消息,此消息对象中保存要传递的文本数据.
message = session.createTextMessage("hello,activeme");
// 发送消息
producer.send(message);
System.out.println("消息发送成功!");
}catch(Exception e){
e.printStackTrace();
System.out.println("访问ActiveMQ服务发生错误!!");
}finally{
try {
// 回收消息发送者资源
if(null != producer)
producer.close();
} catch (JMSException e) {
e.printStackTrace();
}
try {
// 回收会话资源
if(null != session)
session.close();
} catch (JMSException e) {
e.printStackTrace();
}
try {
// 回收链接资源
if(null != connection)
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
2.5.3 第二步:创建一个测试类MessageTest
--添加junit类库,快捷键ctrl+1
package cn.gzsxt.mq.test;
import org.junit.Test;
import cn.gzsxt.mq.producer.MyProducer;
public class MessageTest {
@Test
public void sendToMQ(){
MyProducer producer = new MyProducer();
producer.sendToMQ();
}
}
2.5.4 第三步:测试
(1)设置防火墙,配置61616端口。注意修改之后重启防火墙。
(2)测试结果:
--查看控制台
--查看ActiveMQ管理控制界面
--消息发送成功!!!
2.6 第四部分:创建消息消费者,消费消息
2.6.1 第一步:创建MyConsumer类
package cn.gzsxt.mq.consumer;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
* @ClassName:MyConsumer
* @Description: 消息消费者代码
*/
public class MyConsumer {
// 定义链接工厂
ConnectionFactory connectionFactory = null;
// 定义链接
Connection connection = null;
// 定义会话
Session session = null;
// 定义目的地
Destination destination = null;
// 定义消息消费者
MessageConsumer consumer = null;
// 定义消息
Message message = null;
public void recieveFromMQ(){
try{
/*
* 创建链接工厂
* ActiveMQConnectionFactory - 由ActiveMQ实现的ConnectionFactory接口实现类.
* 构造方法: public ActiveMQConnectionFactory(String userName, String password, String brokerURL)
* userName - 访问ActiveMQ服务的用户名,用户名可以通过jetty-realm.properties配置文件配置.
* password - 访问ActiveMQ服务的密码,密码可以通过jetty-realm.properties配置文件配置.
* brokerURL - 访问ActiveMQ服务的路径地址.路径结构为-协议名://主机地址:端口号
* 此链接基于TCP/IP协议.
*/
connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.23.13:61616");
// 创建链接对象
connection = connectionFactory.createConnection();
// 启动链接
connection.start();
/*
* 创建会话对象
* 方法- connection.createSession(boolean transacted,int acknowledgeMode);
* transacted - 是否使用事务,可选值为true|false
* true - 使用事务,当设置此变量值,则acknowledgeMode参数无效,建议传递的acknowledgeMode参数值为
* Session.SESSION_TRANSACTED
* false - 不使用事务,设置此变量值,则acknowledgeMode参数必须设置.
* acknowledgeMode - 消息确认机制,可选值为:
* Session.AUTO_ACKNOWLEDGE - 自动确认消息机制
* Session.CLIENT_ACKNOWLEDGE - 客户端确认消息机制
* Session.DUPS_OK_ACKNOWLEDGE - 有副本的客户端确认消息机制
*/
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建目的地,目的地命名即队列命名,消息消费者需要通过此命名访问对应的队列
destination = session.createQueue("test-mq");
// 创建消息消费者,创建的消息消费者与某目的地对应,即方法参数目的地.
consumer = session.createConsumer(destination);
// 从ActiveMQ服务中获取消息
message = consumer.receive();
TextMessage tMsg = (TextMessage) message;
System.out.println("从MQ中获取的消息是:"+tMsg.getText());
}catch(Exception e){
e.printStackTrace();
System.out.println("访问ActiveMQ服务发生错误!!");
}finally{
try {
// 回收消息消费者资源
if(null != consumer)
consumer.close();
} catch (JMSException e) {
e.printStackTrace();
}
try {
// 回收会话资源
if(null != session)
session.close();
} catch (JMSException e) {
e.printStackTrace();
}
try {
// 回收链接资源
if(null != connection)
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
2.6.2 第二步:修改测试类MessageTest,新增测试方法
@Test
public void recieveFromMQ(){
MyConsumer consumer = new MyConsumer();
consumer.recieveFromMQ();
}
2.6.3 第三步:测试
--查看Eclipse控制台
--查看ActiveMQ管理控制界面
--消息被消费了,测试成功!!!
3 ActiveMQ监听器
问题:在前面的示例中,我们发现消费者每次只能消费一条消息。当队列中有多条消息的时候,我们需要多次运行消费者,才能消费完这些消息。很麻烦!!!!如何解决这个问题呢?我们希望一次将所有的消息全部接收。
答:使用ActiveMQ监听器来监听队列,持续消费消息。
3.1 配置步骤说明
(1)创建一个监听器对象。
(2)修改消费者代码,加载监听器。
3.2 配置步骤
3.2.1 第一步:创建监听器MyListener类
--说明:自定义监听器需要实现MessageListener接口
package cn.gzsxt.mq.listener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
public class MyListener implements MessageListener{
@Override
public void onMessage(Message message) {
if(null!=message){
TextMessage tMsg = (TextMessage) message;
try {
System.out.println("从MQ中获取的消息是:"+tMsg.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
3.2.2 第二步:修改MyConsumer代码,加载监听器
--说明:监听器需要持续加载,因此消费程序不能结束。
这里我们使用输入流阻塞消费线程结束。(实际开发中,使用web项目加载)
package cn.gzsxt.mq.consumer;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import cn.gzsxt.mq.listener.MyListener;
/**
* @ClassName:MyConsumer
* @Description: 消息消费者代码
*/
public class MyConsumer {
// 定义链接工厂
ConnectionFactory connectionFactory = null;
// 定义链接
Connection connection = null;
// 定义会话
Session session = null;
// 定义目的地
Destination destination = null;
// 定义消息消费者
MessageConsumer consumer = null;
// 定义消息
Message message = null;
public Message recieveFromMQ(){
try{
/*
* 创建链接工厂
* ActiveMQConnectionFactory - 由ActiveMQ实现的ConnectionFactory接口实现类.
* 构造方法: public ActiveMQConnectionFactory(String userName, String password, String brokerURL)
* userName - 访问ActiveMQ服务的用户名,用户名可以通过jetty-realm.properties配置文件配置.
* password - 访问ActiveMQ服务的密码,密码可以通过jetty-realm.properties配置文件配置.
* brokerURL - 访问ActiveMQ服务的路径地址.路径结构为-协议名://主机地址:端口号
* 此链接基于TCP/IP协议.
*/
connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.23.13:61616");
// 创建链接对象
connection = connectionFactory.createConnection();
// 启动链接
connection.start();
/*
* 创建会话对象
* 方法- connection.createSession(boolean transacted,int acknowledgeMode);
* transacted - 是否使用事务,可选值为true|false
* true - 使用事务,当设置此变量值,则acknowledgeMode参数无效,建议传递的acknowledgeMode参数值为
* Session.SESSION_TRANSACTED
* false - 不使用事务,设置此变量值,则acknowledgeMode参数必须设置.
* acknowledgeMode - 消息确认机制,可选值为:
* Session.AUTO_ACKNOWLEDGE - 自动确认消息机制
* Session.CLIENT_ACKNOWLEDGE - 客户端确认消息机制
* Session.DUPS_OK_ACKNOWLEDGE - 有副本的客户端确认消息机制
*/
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建目的地,目的地命名即队列命名,消息消费者需要通过此命名访问对应的队列
destination = session.createQueue("test-mq");
// 创建消息消费者,创建的消息消费者与某目的地对应,即方法参数目的地.
consumer = session.createConsumer(destination);
// // 从ActiveMQ服务中获取消息
// message = consumer.receive();
//
// TextMessage tMsg = (TextMessage) message;
//
// System.out.println("从MQ中获取的消息是:"+tMsg.getText());
//加载监听器
consumer.setMessageListener(newMyListener());
//监听器需要持续加载,这里我们使用输入流阻塞当前线程结束。
System.in.read();
}catch(Exception e){
e.printStackTrace();
System.out.println("访问ActiveMQ服务发生错误!!");
}finally{
try {
// 回收消息消费者资源
if(null != consumer)
consumer.close();
} catch (JMSException e) {
e.printStackTrace();
}
try {
// 回收会话资源
if(null != session)
session.close();
} catch (JMSException e) {
e.printStackTrace();
}
try {
// 回收链接资源
if(null != connection)
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
return message;
}
}
3.3 测试
(1)多次运行生产者,发送多条消息到队列中。
(2)运行消费者。观察结果
--查看Eclipse控制台,一次消费了3条消息
--查看ActiveMQ管理控制界面,所有消息都被消费了!
--测试成功!!!
4 ActiveMQ消息服务模式
问题:在入门示例中,只能向一个消费者发送消息。但是有一些场景,需求有多个消费者都能接收到消息,比如:美团APP每天的消息推送。该如何实现呢?
答:ActiveMQ是通过不同的服务模式来解决这个问题的。
所以,要搞清楚这个问题,必须知道ActiveMQ有哪些应用模式。
4.1 PTP模式(point to point)
--消息模型
消息生产者生产消息发送到queue中,然后消息消费者从queue中取出并且消费消息。
消息被消费以后,queue中不再有存储,所以消息消费者不可能消费到已经被消费的消息。
Queue支持存在多个消费者,但是对一个消息而言,只会有一个消费者可以消费、其它的则不能消费此消息了。
当消费者不存在时,消息会一直保存,直到有消费消费
我们的入门示例,就是采用的这种PTP服务模式。
4.2 TOPIC(主题订阅模式)
--消息模型
消息生产者(发布)将消息发布到topic中,同时有多个消息消费者(订阅)消费该消息。
和点对点方式不同,发布到topic的消息会被所有订阅者消费。
当生产者发布消息,不管是否有消费者。都不会保存消息
所以,主题订阅模式下,一定要先有消息的消费者(订阅者),后有消息的生产者(发布者)。
我们前面已经实现了PTP模式,下面我们来实现TOPIC模式。
5 Topic模式实现
5.1 配置步骤说明
发表于 2019-08-01 00:00
阅读 ( 411 )