ActiveMQ 下载、安装、运行、实战需求
文章目录
- 一、ActiveMQ简介
- 1. 什么是ActiveMQ?
- 2. ActiveMQ能干什么?
- 3. ActiveMQ特点
- 4. MOM基本功能
- 5. MOM主要特点
- 6. MOM的应用场景_前
- 7. MOM的应用场景_后
- 二、ActiveMQ下载/安装
- 2.1 ActiveMQ下载
- 2.2 ActiveMQ安装环境(jdk)
- 2.3 ActiveMQ 解压缩/运行
- 2.4 ActiveMQ目录介绍
- 2.5 ActiveMQ后台页面
- 三、ActiveMQ消息形式
- 3.1. 点对点(Point-to-Point)
- 3.2. 广播TOPIC(publish-subscribe)
- 四、ActiveMQ消息类型
- 五、基础创建流程+代码构建
- 5.1. MQ基础流程总览
- 5.2. MQ_点对点_代码案例
- 5.2.1. MQ_点对点_生产者_图示
- 5.2.2. MQ_点对点_生产者_代码
- 5.2.3. MQ_点对点_消费者_图示
- 5.2.4. MQ_点对点_消费者_代码
- 5.3. MQ_TOPIC_代码案例
- 5.3.1. MQ_广播_生产者_图示
- 5.3.2. MQ_广播_生产者_代码
- 5.3.3. MQ_广播_生产者_图示
- 5.3.4. MQ_广播_生产者_代码
- 六、实战ActiveMQ
- 6.1. 解压ActiveMQ/附目录权限/启动
- 6.2. 关闭防火墙或者开放8161端口(任选1种)
- 6.2.1. 关闭防火墙
- 6.2.2. 开放8161端口
- 6.3. 登录后台
- 七、点对点_实战发送/接收消息_场景01
- 7.1. 创建一个maven项目
- 7.2. 引入依赖
- 7.3. 创建一个点对点的生产者
- 7.4. 创建一个点对点的消费者
- 7.5. 启动生产者
- 7.5.1. 运行main方法即可
- 7.5.2. Connection timed out: connect
- 7.5.3. 解决方案
- 7.5.4. 再次启动项目
- 7.5.5. 查看MQ控制台
- 7.5.6. MQ名称讲解
- 7.6. 启动消费者
- 八、点对点_实战发送/接收消息_场景2
- 8.1. 启动一个生产者,投递一次消息
- 8.2. 启动消费者1号
- 8.3. 启动消费者2号
- 8.3. 现象分析/总结
- 8.4. 总结
- 8.5. 效果图:
- 九、广播_实战发送/接收消息_场景02
- 9.1. 创建一个topic的生产者
- 9.2. 创建一个topic的消费者
- 9.3. 启动一个生产者
- 9.4. 登录后台查看
- 9.5. 数据展示
- 9.6. 启动消费者1号和消费者2号
- 9.7. 结果分析
- 9.8. topic 广播形式,如果先启动生产者在启动消费者,消息失效会丢失
- 9.9. 再次测试
- 十、总结归纳
一、ActiveMQ简介
1. 什么是ActiveMQ?
2. ActiveMQ能干什么?
3. ActiveMQ特点
4. MOM基本功能
5. MOM主要特点
6. MOM的应用场景_前
7. MOM的应用场景_后
二、ActiveMQ下载/安装
2.1 ActiveMQ下载
官网链接:http://activemq.apache.org/download.html
2.2 ActiveMQ安装环境(jdk)
2.3 ActiveMQ 解压缩/运行
2.4 ActiveMQ目录介绍
2.5 ActiveMQ后台页面
三、ActiveMQ消息形式
3.1. 点对点(Point-to-Point)
3.2. 广播TOPIC(publish-subscribe)
四、ActiveMQ消息类型
五、基础创建流程+代码构建
5.1. MQ基础流程总览
5.2. MQ_点对点_代码案例
5.2.1. MQ_点对点_生产者_图示
5.2.2. MQ_点对点_生产者_代码
package com.gblfy.jms;import org.apache.activemq.ActiveMQConnectionFactory;import javax.jms.*;public class QueueProducer {public static void main(String[] args) throws JMSException {//1.创建连接工厂ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.25.135:61616");//2.创建连接Connection connection = connectionFactory.createConnection();//3.启动连接connection.start();//4.获取session(会话对象) 参数1:是否启动事务 参数2:消息确认方式Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);//5.创建队列对象Queue queue = session.createQueue("test-queue");//6.创建消息生产者对象MessageProducer producer = session.createProducer(queue);//7.创建消息对象(文本消息)TextMessage textMessage = session.createTextMessage("欢迎来到神奇的gblfy世界");//8.发送消息producer.send(textMessage);//9.关闭资源producer.close();session.close();connection.close();}
}
5.2.3. MQ_点对点_消费者_图示
5.2.4. MQ_点对点_消费者_代码
package com.gblfy.jms;import org.apache.activemq.ActiveMQConnectionFactory;import javax.jms.*;
import java.io.IOException;public class QueueConsumer {public static void main(String[] args) throws JMSException, IOException {//1.创建连接工厂ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.25.135:61616");//2.创建连接Connection connection = connectionFactory.createConnection();//3.启动连接connection.start();//4.获取session(会话对象) 参数1:是否启动事务 参数2:消息确认方式Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);//5.创建队列对象Queue queue = session.createQueue("test-queue");//6.创建消息消费者对象MessageConsumer consumer = session.createConsumer(queue);//7.设置监听consumer.setMessageListener(new MessageListener() {public void onMessage(Message message) {TextMessage textMessage=(TextMessage)message;try {System.out.println("提取的消息:"+ textMessage.getText() );} catch (JMSException e) { e.printStackTrace();}}});//8.等待键盘输入System.in.read();//9.关闭资源consumer.close();session.close();connection.close();}}
5.3. MQ_TOPIC_代码案例
5.3.1. MQ_广播_生产者_图示
5.3.2. MQ_广播_生产者_代码
package com.gblfy.jms;import org.apache.activemq.ActiveMQConnectionFactory;import javax.jms.*;
/*** 发布订阅模式(消息生产者)* @author gblfy**/
public class TopicProducer {public static void main(String[] args) throws JMSException {//1.创建连接工厂ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.25.135:61616");//2.创建连接Connection connection = connectionFactory.createConnection();//3.启动连接connection.start();//4.获取session(会话对象) 参数1:是否启动事务 参数2:消息确认方式Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);//5.创建主题对象Topic topic = session.createTopic("test-topic"); //6.创建消息生产者对象MessageProducer producer = session.createProducer(topic);//7.创建消息对象(文本消息)TextMessage textMessage = session.createTextMessage("欢迎来到申请的品优购世界");//8.发送消息producer.send(textMessage);//9.关闭资源producer.close();session.close();connection.close();}}
5.3.3. MQ_广播_生产者_图示
5.3.4. MQ_广播_生产者_代码
package com.gblfy.jms;import org.apache.activemq.ActiveMQConnectionFactory;import javax.jms.*;
import java.io.IOException;public class TopicConsumer {public static void main(String[] args) throws JMSException, IOException {//1.创建连接工厂ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.135:61616");//2.创建连接Connection connection = connectionFactory.createConnection();//3.启动连接connection.start();//4.获取session(会话对象) 参数1:是否启动事务 参数2:消息确认方式Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);//5.创建主题对象Topic topic = session.createTopic("test-topic");//6.创建消息消费者对象MessageConsumer consumer = session.createConsumer(topic);//7.设置监听consumer.setMessageListener(new MessageListener() {public void onMessage(Message message) {TextMessage textMessage = (TextMessage) message;try {System.out.println("提取的消息:" + textMessage.getText());} catch (JMSException e) {e.printStackTrace();}}});//8.等待键盘输入System.in.read();//9.关闭资源consumer.close();session.close();connection.close();}
}
六、实战ActiveMQ
6.1. 解压ActiveMQ/附目录权限/启动
- (1)将apache-activemq-5.12.0-bin.tar.gz 上传至服务器
- (2)解压此文件
tar zxvf apache-activemq-5.12.0-bin.tar.gz
- (3)为apache-activemq-5.12.0目录赋权
chmod 777 apache-activemq-5.12.0
- (4)进入apache-activemq-5.12.0\bin目录
- (5)赋与执行权限
chmod 755 activemq
- (6)启动mq
./activemq start
6.2. 关闭防火墙或者开放8161端口(任选1种)
6.2.1. 关闭防火墙
/etc/init.d/iptables stop
6.2.2. 开放8161端口
vim /etc/sysconfig/iptables
-A INPUT -m state --state NEW -m tcp -p tcp --dport 8161 -j ACCEPT
:wq
service iptables restart
service iptables status
6.3. 登录后台
七、点对点_实战发送/接收消息_场景01
7.1. 创建一个maven项目
7.2. 引入依赖
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.gblfy.jms</groupId><artifactId>jmsDemo</artifactId><version>0.0.1-SNAPSHOT</version><dependencies><dependency><groupId>org.apache.activemq</groupId><artifactId>activemq-client</artifactId><version>5.13.4</version></dependency></dependencies></project>
7.3. 创建一个点对点的生产者
//1.创建连接工厂ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.25.135:61616");//2.创建连接Connection connection = connectionFactory.createConnection();//3.启动连接connection.start();//4.获取session(会话对象) 参数1:是否启动事务 参数2:消息确认方式Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);//5.创建队列对象Queue queue = session.createQueue("test-queue");//6.创建消息生产者对象MessageProducer producer = session.createProducer(queue);//7.创建消息对象(文本消息)TextMessage textMessage = session.createTextMessage("欢迎来到神奇的gblfy世界");//8.发送消息producer.send(textMessage);//9.关闭资源producer.close();session.close();connection.close();}
7.4. 创建一个点对点的消费者
//1.创建连接工厂ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.25.135:61616");//2.创建连接Connection connection = connectionFactory.createConnection();//3.启动连接connection.start();//4.获取session(会话对象) 参数1:是否启动事务 参数2:消息确认方式Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);//5.创建队列对象Queue queue = session.createQueue("test-queue");//6.创建消息消费者对象MessageConsumer consumer = session.createConsumer(queue);//7.设置监听consumer.setMessageListener(new MessageListener() {public void onMessage(Message message) {TextMessage textMessage=(TextMessage)message;try {System.out.println("提取的消息:"+ textMessage.getText() );} catch (JMSException e) { e.printStackTrace();}}});//8.等待键盘输入System.in.read();//9.关闭资源consumer.close();session.close();connection.close();}
7.5. 启动生产者
7.5.1. 运行main方法即可
7.5.2. Connection timed out: connect
7.5.3. 解决方案
- 关闭防火墙,或者开放61616端口
过程上面一样,如果开发防火墙,记得要重启防火墙!!!
-A INPUT -m state --state NEW -m tcp -p tcp --dport 61616 -j ACCEPT
7.5.4. 再次启动项目
7.5.5. 查看MQ控制台
- 点击Queues
发现消息已经成功发送成功!!!
7.5.6. MQ名称讲解
- ① 队列名
- ② 未消费消息数量
- ③ 活跃的消费者数量
- ④ 一共投递消息总量
- ⑤消费消息数量
- ⑥ atom,可以查看具体消息
- ⑦ 删除消息按钮
7.6. 启动消费者
消费者已经成功接收到投递的消息
查看后台MQ后台:
名称 | 数量 |
---|---|
未消费的消息 | 0 |
活跃消费者 | 1 |
消息投递总量 | 1 |
已经消费消息 | 1 |
八、点对点_实战发送/接收消息_场景2
8.1. 启动一个生产者,投递一次消息
启动一个生产者,投递一次消息,启动2个消费者
8.2. 启动消费者1号
8.3. 启动消费者2号
8.3. 现象分析/总结
只有消费者2号,接收到生产者投递的消息
8.4. 总结
不管有多少消费者,消息只能被一个接收,消费了也就没了,先来先得
8.5. 效果图:
名称 | 数量 |
---|---|
队列 | 1 |
未消费消息 | 0 |
活跃消费者 | 2 |
投递消息总量 | 2 |
消费消息数量 | 2 |
注:此次测试建立在刚才第一次测试基础上
正常如下:
名称 | 数量 |
---|---|
队列 | 1 |
未消费消息 | 0 |
活跃消费者 | 2 |
投递消息总量 | 1 |
消费消息数量 | 1 |
九、广播_实战发送/接收消息_场景02
9.1. 创建一个topic的生产者
package com.gblfy.jms;import org.apache.activemq.ActiveMQConnectionFactory;import javax.jms.*;
/*** 发布订阅模式(消息生产者)* @author gblfy**/
public class TopicProducer {public static void main(String[] args) throws JMSException {//1.创建连接工厂ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.25.135:61616");//2.创建连接Connection connection = connectionFactory.createConnection();//3.启动连接connection.start();//4.获取session(会话对象) 参数1:是否启动事务 参数2:消息确认方式Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);//5.创建主题对象Topic topic = session.createTopic("test-topic"); //6.创建消息生产者对象MessageProducer producer = session.createProducer(topic);//7.创建消息对象(文本消息)TextMessage textMessage = session.createTextMessage("欢迎来到申请的品优购世界");//8.发送消息producer.send(textMessage);//9.关闭资源producer.close();session.close();connection.close();}}
9.2. 创建一个topic的消费者
package com.gblfy.jms;import org.apache.activemq.ActiveMQConnectionFactory;import javax.jms.*;
import java.io.IOException;public class TopicConsumer {public static void main(String[] args) throws JMSException, IOException {//1.创建连接工厂ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.135:61616");//2.创建连接Connection connection = connectionFactory.createConnection();//3.启动连接connection.start();//4.获取session(会话对象) 参数1:是否启动事务 参数2:消息确认方式Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);//5.创建主题对象Topic topic = session.createTopic("test-topic");//6.创建消息消费者对象MessageConsumer consumer = session.createConsumer(topic);//7.设置监听consumer.setMessageListener(new MessageListener() {public void onMessage(Message message) {TextMessage textMessage = (TextMessage) message;try {System.out.println("提取的消息:" + textMessage.getText());} catch (JMSException e) {e.printStackTrace();}}});//8.等待键盘输入System.in.read();//9.关闭资源consumer.close();session.close();connection.close();}
}
9.3. 启动一个生产者
9.4. 登录后台查看
9.5. 数据展示
名称 | 数量 |
---|---|
活跃消费者数量 | 0 |
消息投递总量 | 1 |
已消费消息 | 0 |
9.6. 启动消费者1号和消费者2号
9.7. 结果分析
发现消费者1号和消费者2号都没有接收到消息
9.8. topic 广播形式,如果先启动生产者在启动消费者,消息失效会丢失
因此,需要先启动消费者在启动生产者,再启动生产者。
9.9. 再次测试
从截图中可以看出,生产者投递一次消息,2个消费者都可以接收到消息
十、总结归纳
MQ的需要根据应用的业务场景,选择不同的消息投递方式,一次的选择点对点,消息同一个,有多个消费者的选择广播形式。