JMS(Java消息服务)与消息队列ActiveMQ基本使用(一)

最近的项目中用到了mq,之前自己一直在码农一样的照葫芦画瓢。最近几天研究了下,把自己所有看下来的文档和了解总结一下。

一. 认识JMS

1.概述

对于JMS,百度百科,是这样介绍的:JMS即Java消息服务(Java Message Service)应用程序接口是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。Java消息服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持。

简短来说,JMS是一种与厂商无关的 API,用来访问消息收发系统消息。它类似于JDBC(Java Database Connectivity),提供了应用程序之间异步通信的功能。

JMS1.0是jsr 194里规定的规范(关于jsr规范,请点击)。目前最新的规范是JSR 343,JMS2.0。

好了,说了这么多,其实只是在说,JMS只是sun公司为了统一厂商的接口规范,而定义出的一组api接口。

2. JMS体系结构

描述如下:

  • JMS提供者(JMS的实现者,比如activemq jbossmq等)
  • JMS客户(使用提供者发送消息的程序或对象,例如在12306中,负责发送一条购票消息到处理队列中,用来解决购票高峰问题,那么,发送消息到队列的程序和从队列获取消息的程序都叫做客户)
  • JMS生产者,JMS消费者(生产者及负责创建并发送消息的客户,消费者是负责接收并处理消息的客户)
  • JMS消息(在JMS客户之间传递数据的对象)
  • JMS队列(一个容纳那些被发送的等待阅读的消息的区域)
  • JMS主题(一种支持发送消息给多个订阅者的机制)

3. JMS对象模型

  • 连接工厂(connectionfactory)客户端使用JNDI查找连接工厂,然后利用连接工厂创建一个JMS连接。
  • JMS连接 表示JMS客户端和服务器端之间的一个活动的连接,是由客户端通过调用连接工厂的方法建立的。
  • JMS会话 session 标识JMS客户端和服务端的会话状态。会话建立在JMS连接上,标识客户与服务器之间的一个会话进程。
  • JMS目的 Destinatio 又称为消息队列,是实际的消息源
  • 生产者和消费者
  • 消息类型,分为队列类型(优先先进先出)以及订阅类型

二. ActiveMQ

1. ActiveMQ的安装

  1. 从官网下载安装包,http://activemq.apache.org/download.html
  2. 赋予运行权限 chmod +x,windows可以忽略此步
  3. 运行 ./active start | stop

启动后,activeMQ会占用两个端口,一个是负责接收发送消息的tcp端口:61616,一个是基于web负责用户界面化管理的端口:8161。这两个端口可以在conf下面的xml中找到。http服务器使用了jettry。
这里有个问题是启动mq后,很长时间管理界面才可以显示出来。

2. 用Java访问ActiveMQ

先附上Bean代码:

public class MqBean implements Serializable{ private Integer age; private String name; public Integer getAge() { return age; } public void setAge(Integer age) { this.age = age; } public String getName() { return name; } public void setName(String name) { this.name = name; } }

2.1 队列消息的发送:

public static void main(String[] args) { ConnectionFactory connectionFactory; Connection connection; Session session; Destination destination; MessageProducer producer; connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.3.159:61616"); try { connection = connectionFactory.createConnection(); connection.start(); //第一个参数是是否是事务型消息,设置为true,第二个参数无效 //第二个参数是 //Session.AUTO_ACKNOWLEDGE为自动确认,客户端发送和接收消息不需要做额外的工作。异常也会确认消息,应该是在执行之前确认的 //Session.CLIENT_ACKNOWLEDGE为客户端确认。客户端接收到消息后,必须调用javax.jms.Message的acknowledge方法。jms服务器才会删除消息。可以在失败的 //时候不确认消息,不确认的话不会移出队列,一直存在,下次启动继续接受。接收消息的连接不断开,其他的消费者也不会接受(正常情况下队列模式不存在其他消费者) //DUPS_OK_ACKNOWLEDGE允许副本的确认模式。一旦接收方应用程序的方法调用从处理消息处返回,会话对象就会确认消息的接收;而且允许重复确认。在需要考虑资源使用时,这种模式非常有效。 //待测试 session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); destination = session.createQueue("test-queue"); producer = session.createProducer(destination); producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); //优先级不能影响先进先出。。。那这个用处究竟是什么呢呢呢呢 MqBean bean = new MqBean(); bean.setAge(13); for(int i=0;i<100;i++){ bean.setName("小黄"+i); producer.send(session.createObjectMessage(bean)); } producer.close(); System.out.println("呵呵"); } catch (JMSException e) { e.printStackTrace(); } }

注:在上面的代码中,确认模式有三种,里面的DUPS_OK_ACKNOWLEDGE和AUTO_ACKNOWLEDGE一直没明白有什么区别。因为无法测试。不过大概也明白了一些。其实主要是MQ处理消息的流程决定的:

  1. 消息从生成方客户端传送到消息服务器。
  2. 消息服务器读取消息。
  3. 消息被放置到持久性存储器当中(出于可靠性的考虑)。
  4. 消息服务器确认收到消息(出于可靠性的考虑)。
  5. 消息服务器确定消息的路由。
  6. 消息服务器写出消息。
  7. 消息从消息服务器传送到使用方客户端。
  8. 使用方客户端确认收到消息(出于可靠性的考虑)。
  9. 消息服务器处理客户端确认(出于可靠性的考虑)。
  10. 消息服务器确定已经处理客户端确认。

这些步骤是连续的,所以任何步骤都可能成为消息从生成方客户端到使用方客户端的传送过程的瓶颈。这些步骤中的大多数都取决于消息传送系统的物理特征:网络带宽、计算机处理速度和消息服务器体系结构等等。但是,有一些步骤还取决于消息传送应用程序的特征和该应用程序要求的可靠性级别。
其实就是基于可靠性还是性能的选择.

2.2 队列消息的接收:

public static void main(String[] args) { ConnectionFactory connectionFactory; // Connection :JMS 客户端到JMS Provider 的连接 Connection connection = null; // Session: 一个发送或接收消息的线程 Session session; // Destination :消息的目的地;消息发送给谁. Destination destination; // 消费者,消息接收者 MessageConsumer consumer; connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.3.159:61616"); try { // 构造从工厂得到连接对象 connection = connectionFactory.createConnection(); // 启动 connection.start(); // 获取操作连接 //这个最好还是有事务 session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置 destination = session.createQueue("test-queue"); consumer = session.createConsumer(destination); consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { try { MqBean bean = (MqBean) ((ObjectMessage)message).getObject(); System.out.println(bean); if (null != message) { System.out.println("收到消息" + bean.getName()); } } catch (Exception e) { // TODO: handle exception } } }); } catch (Exception e) { e.printStackTrace(); } }

注:对于队列来说,比较简单的优化策略,应该就是队列分载了。由于每个消费者都是单线程的,所以可以设置多个消费者来提高速度。
大家可以复制个消费者自己测试下,在消费者中添加sleep测试下效果。

2.3 订阅消息的发送

public static void main(String[] args) { ConnectionFactory connectionFactory; Connection connection; Session session; Destination destination; MessageProducer producer; connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.3.159:61616"); try { connection = connectionFactory.createConnection(); connection.start(); session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); destination = session.createTopic("test-topic"); producer = session.createProducer(destination); producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); //优先级不能影响先进先出。。。那这个用处究竟是什么呢呢呢呢 MqBean bean = new MqBean(); bean.setAge(13); for(int i=0;i<100;i++){ Thread.sleep(1000); bean.setName("小黄"+i); producer.send(session.createObjectMessage(bean)); } producer.close(); System.out.println("呵呵"); } catch (Exception e) { e.printStackTrace(); } }

2.4 订阅消息的接收

public static void main(String[] args) { ConnectionFactory connectionFactory; // Connection :JMS 客户端到JMS Provider 的连接 Connection connection = null; // Session: 一个发送或接收消息的线程 Session session; // Destination :消息的目的地;消息发送给谁. Destination destination; // 消费者,消息接收者 MessageConsumer consumer; connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.3.159:61616"); try { // 构造从工厂得到连接对象 connection = connectionFactory.createConnection(); // 启动 connection.start(); // 获取操作连接 //这个最好还是有事务 session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置 destination = session.createQueue("test-queue"); consumer = session.createConsumer(destination); consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { try { MqBean bean = (MqBean) ((ObjectMessage)message).getObject(); System.out.println(bean); if (null != message) { System.out.println("收到消息" + bean.getName()); } } catch (Exception e) { // TODO: handle exception } } }); } catch (Exception e) { e.printStackTrace(); } }

以上的消息发送后,如果没有接收到,可以登录自己的MQ管理页面:http://192.168.3.159:8161/admin/ ,默认帐号密码都是admin,查看队列中的消息

enter image description here

Number Of Pending Messages 等待消费的消息 这个是当前未出队列的数量。可以理解为总接收数-总出队列数
Messages Enqueued 进入队列的消息 进入队列的总数量,包括出队列的。 这个数量只增不减
Messages Dequeued 出了队列的消息 可以理解为是消费这消费掉的数量

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/539225.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

python单词反转_python文本 字符串逐字符反转以及逐单词反转

python文本 字符串逐字符反转以及逐单词反转 场景&#xff1a; 字符串逐字符反转以及逐单词反转 首先来看字符串逐字符反转&#xff0c;由于python提供了非常有用的切片&#xff0c;所以只需要一句就可以搞定了 >>> aabc edf degd >>> a[::-1] dged fde cba …

hive复合数据类型之struct

概述 STRUCT&#xff1a;STRUCT可以包含不同数据类型的元素。这些元素可以通过”点语法”的方式来得到所需要的元素&#xff0c;比如user是一个STRUCT类型&#xff0c;那么可以通过user.address得到这个用户的地址。 操作实例 1、创建表 create table student_test(id int,in…

pycharm 运行celery_Celery全面学习笔记

来源介绍Celery 是 Distributed Task Queue&#xff0c;分布式任务队列。分布式决定了可以有多个 worker 的存在&#xff0c;队列表示其是异步操作。Celery 核心模块Celery有一下5个核心角色Task就是任务&#xff0c;有异步任务和定时任务Broker中间人&#xff0c;接收生产者发…

hive复合数据类型之array

概述 ARRAY&#xff1a;ARRAY类型是由一系列相同数据类型的元素组成&#xff0c;这些元素可以通过下标来访问。比如有一个ARRAY类型的变量fruits&#xff0c;它是由[apple,orange,mango]组成&#xff0c;那么我们可以通过fruits[1]来访问元素orange&#xff0c;因为ARRAY类型的…

Exploit开发系列教程-Mona 2 SEH

P3nro5e 2015/07/10 10:580x00 Mona 2 前言 & 准备Mona 2是一种非常有用的插件&#xff0c;它由Corelan Team开发。起初是为Immunity Debugger写的&#xff0c;现在它适用于WinDbg调试器。你将需要为WinDbg x86 和 WinDbg x64安装一些工具&#xff1a;安装Python 2.7 (从这…

python集合的元素可以是_Python集合的元素中,为什么不可以是包含嵌套列表的元组?...

你有一个误解&#xff0c;hash算法针对的是元素的内容&#xff0c;并不是针对指针&#xff0c;所以指针不变不等于可hash。 如果你想深究细节的话&#xff0c;可以看tuple的源码&#xff1a; static Py_hash_t tuplehash(PyTupleObject *v) { Py_uhash_t x; /* Unsigned for de…

python lib库_python_lib基础库

1&#xff1a;argv传递给python脚本的命令行参数列表&#xff0c;argv[0]是脚本的名字(他是平台独立的&#xff0c;不管他是一个路径全名或不是)&#xff0c;如果使用了-c参数选项&#xff0c;argv[0]会被设置为字符串-c&#xff0c;如果没有脚本名传递给python解释器&#xff…

hive复合数据类型之map

概述 MAP&#xff1a;MAP包含key->value键值对&#xff0c;可以通过key来访问元素。比如”userlist”是一个map类型&#xff0c;其中username是key&#xff0c;password是value&#xff1b;那么我们可以通过userlist[username]来得到这个用户对应的password&#xff1b; 操…

Beego框架使用

为什么80%的码农都做不了架构师&#xff1f;>>> Beego Web项目目录结构 new 命令是新建一个 Web 项目&#xff0c;我们在命令行下执行 bee new <项目名> 就可以创建一个新的项目。但是注意该命令必须在 $GOPATH/src 下执行。最后会在 $GOPATH/src 相应目录下…

oracle下lag和lead分析函数

Lag和Lead分析函数可以在同一次查询中取出同一字段的前N行的数据(Lag)和后N行的数据(Lead)作为独立的列。 这种操作可以代替表的自联接&#xff0c;并且LAG和LEAD有更高的效率。 语法&#xff1a; [sql] view plaincopy /*语法*/ lag(exp_str,offset,defval) over() Lead(…

802d简明调试手册_SINUMERIK-828D简明调试手册.pdf

SINUMERIK 828D / 828D BASIC简明调试手册SINUMERIKAnswers for industry. SIEMENSABC01.2012 ASINUMERIK 828D / 828D BASIC V04.04SP01123PLC 45NC 67PLC 891011121314151617PLC 18i1 11.1 11.1.1 NC 31.1.2 31.2

jtessboxeditorfx 界面显示不出来_macOS 使用 XQuartz 支持 X11 实现 Linux 图形化界面显示...

更多奇技淫巧欢迎订阅博客&#xff1a;https://fuckcloudnative.io前言在 Windows 中相信大家已经很熟悉使用 Xmanager(Xshell), MobaXterm, SecureCRT 通过 X11 实现 Linux 图形化界面显示&#xff0c;我的需求是在 macOS 下使用 iTerm2 作为 Terminal 实现 X11 图形化界面显示…

EntityFramework Core 2.0 Explicitly Compiled Query(显式编译查询)

前言 EntityFramework Core 2.0引入了显式编译查询&#xff0c;在查询数据时预先编译好LINQ查询便于在请求数据时能够立即响应。显式编译查询提供了高可用场景&#xff0c;通过使用显式编译的查询可以提高查询性能。EF Core已经使用查询表达式的散列来表示自动编译和缓存查询&a…

Oracle Minus关键字 不包含 取差集

Oracle Minus关键字   SQL中的MINUS关键字   SQL中有一个MINUS关键字&#xff0c;它运用在两个SQL语句上&#xff0c;它先找出第一条SQL语句所产生的结果&#xff0c;然后看这些结果有没有在第二个SQL语句的结果 中。如果有的话&#xff0c;那这一笔记录就被去除&#xff0…

python扫描器甄别操作系统类型_20189317 《网络攻防技术》 第三周作业

一.教材内容总结1.网络踩点&#xff1a;web搜索与挖掘、DNS和IP查询、网络拓扑侦察(1)网络踩点目标确定(2)技术手段&#xff1a;web信息搜索与挖掘、DNS和IP查询、网络拓扑侦察(3)web信息搜索与挖掘&#xff1a;基本搜索与挖掘技巧、高级搜索与挖掘技巧、编程实现google搜索、元…

python 网页重定向_小试牛刀:python爬虫爬取springer开放电子书.

首先声明,本文旨在记录反思,并没有资源,代码也不具有借鉴意义(水平实在不行.某天,水群的时候发现群友发了一个文件,里面是疫情时期springer开放的免费电子书名单,同时还附有下载链接,总共有400多本,这要是一个一个下载不得累死个人,只下载自己感兴趣的书也是一个好主意,但是,我…

直面桌面云带来的现状优势

在桌面云解决方案里&#xff0c;首先&#xff0c;所有的数据以及运算都在服务器端进行&#xff0c;客户端只是显示其变化的影像而已&#xff0c;所以在不需要担心客户端来非法窃取资料&#xff0c;我们在电影里面看到的商业间谍拿着 U 盘疯狂的拷贝公司商业机密的情况再也不会出…

ORA-28001: the password has expired解决方法

Oracle提示错误消息ORA-28001: the password has expired&#xff0c;是由于Oracle11G的新特性所致&#xff0c; Oracle11G创建用户时缺省密码过期限制是180天&#xff08;即6个月&#xff09;&#xff0c; 如果超过180天用户密码未做修改则该用户无法登录。 Oracle公司是为了数…

.net 导出excel_Qt编写的项目作品18-数据导出到Excel及Pdf和打印数据

一、功能特点原创导出数据机制&#xff0c;不依赖任何office组件或者操作系统等第三方库&#xff0c;尤其是支持嵌入式linux。10万行数据9个字段只需要2秒钟完成。只需要四个步骤即可开始急速导出大量数据到Excel。同时提供直接写入数据接口和多线程写入数据接口&#xff0c;不…

hive数据库定义

默认数据库"default" 可以显式切换数据库&#xff1a;hive> use 数据库名; 创建 hive>CREATE DATABASE [IF NOT EXISTS] mydb [LOCATION] /....... [COMMENT] ....; 实例 hive (default)> create database test_db comment test database; OK Ti…