ActiveMq使用笔记

java JMS技术

.1.   什么是JMS

         JMS即Java消息服务(Java Message Service)应用程序接口是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。Java消息服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持。

         JMS是一种与厂商无关的 API,用来访问消息收发系统消息。它类似于JDBC(Java Database Connectivity):这里,JDBC 是可以用来访问许多不同关系数据库的 API,而 JMS 则提供同样与厂商无关的访问方法,以访问消息收发服务。许多厂商都支持 JMS,包括 IBM 的 MQSeries、BEA的 Weblogic JMS service和 Progress 的 SonicMQ,这只是几个例子。 JMS 使您能够通过消息收发服务(有时称为消息中介程序或路由器)从一个 JMS 客户机向另一个 JMS客户机发送消息。消息是 JMS 中的一种类型对象,由两部分组成:报头和消息主体。报头由路由信息以及有关该消息的元数据组成。消息主体则携带着应用程序的数据或有效负载。根据有效负载的类型来划分,可以将消息分为几种类型,它们分别携带:简单文本(TextMessage)、可序列化的对象 (ObjectMessage)、属性集合 (MapMessage)、字节流 (BytesMessage)、原始值流 (StreamMessage),还有无有效负载的消息 (Message)。

.2.   JMS规范

.2.1.    专业技术规范

JMS(Java Messaging Service)是Java平台上有关面向消息中间件(MOM)的技术规范,它便于消息系统中的Java应用程序进行消息交换,并且通过提供标准的产生、发送、接收消息的接口简化企业应用的开发,翻译为Java消息服务。

.2.2.    体系架构

JMS由以下元素组成。

JMS提供者provider:连接面向消息中间件的,JMS接口的一个实现。提供者可以是Java平台的JMS实现,也可以是非Java平台的面向消息中间件的适配器。

JMS客户:生产或消费基于消息的Java的应用程序或对象。

JMS生产者:创建并发送消息的JMS客户。

JMS消费者:接收消息的JMS客户。

JMS消息:包括可以在JMS客户之间传递的数据的对象

JMS队列:一个容纳那些被发送的等待阅读的消息的区域。与队列名字所暗示的意思不同,消息的接受顺序并不一定要与消息的发送顺序相同。一旦一个消息被阅读,该消息将被从队列中移走。

JMS主题:一种支持发送消息给多个订阅者的机制。

.2.3.    Java消息服务应用程序结构支持两种模型

1、  点对点或队列模型

在点对点或队列模型下,一个生产者向一个特定的队列发布消息,一个消费者从该队列中读取消息。这里,生产者知道消费者的队列,并直接将消息发送到消费者的队列。

这种模式被概括为:

只有一个消费者将获得消息

生产者不需要在接收者消费该消息期间处于运行状态,接收者也同样不需要在消息发送时处于运行状态。

每一个成功处理的消息都由接收者签收

2、发布者/订阅者模型

发布者/订阅者模型支持向一个特定的消息主题发布消息。0或多个订阅者可能对接收来自特定消息主题的消息感兴趣。在这种模型下,发布者和订阅者彼此不知道对方。这种模式好比是匿名公告板。

 

这种模式被概括为:

多个消费者可以获得消息

在发布者和订阅者之间存在时间依赖性。发布者需要建立一个订阅(subscription),以便客户能够订阅。订阅者必须保持持续的活动状态以接收消息,除非订阅者建立了持久的订阅。在那种情况下,在订阅者未连接时发布的消息将在订阅者重新连接时重新发布。

 

1.下载ActiveMQ

去官方网站下载:http://activemq.apache.org/

2.运行ActiveMQ

解压缩apache-activemq-5.5.1-bin.zip,

修改配置文件activeMQ.xml,将0.0.0.0修改为localhost

默认的activeMQ.xml文件如下:

修改后:

<transportConnectors><transportConnector name="openwire" uri="tcp://localhost:61616"/><transportConnector name="ssl"     uri="ssl://localhost:61617"/><transportConnector name="stomp"   uri="stomp://localhost:61613"/><transportConnector uri="http://localhost:8081"/><transportConnector uri="udp://localhost:61618"/>
</transportConnectors>

然后双击apache-activemq-5.5.1\bin\activemq.bat运行ActiveMQ程序。

访问的时候如果需要用户名和密码 都是admin admin...

启动topic的相关的生产者和消费者:

生产者代码:

ProducerTest.java

import java.util.Random;import javax.jms.JMSException;      public class ProducerTest {      /**    * @param args    */     public static void main(String[] args) throws JMSException, Exception {      ProducerTool producer = new ProducerTool(); Random random = new Random();for(int i=0;i<20;i++){Thread.sleep(random.nextInt(10)*1000);producer.produceMessage("Hello, world!--"+i);      producer.close();}}      
}      

 ProducerTool.java

import javax.jms.Connection;      
import javax.jms.DeliveryMode;      
import javax.jms.Destination;      
import javax.jms.JMSException;      
import javax.jms.MessageProducer;      
import javax.jms.Session;      
import javax.jms.TextMessage;      import org.apache.activemq.ActiveMQConnection;      
import org.apache.activemq.ActiveMQConnectionFactory;      public class ProducerTool {        private String user = ActiveMQConnection.DEFAULT_USER;         private String password = ActiveMQConnection.DEFAULT_PASSWORD;       private String url = ActiveMQConnection.DEFAULT_BROKER_URL;       private String subject = "mytopic";      private Destination destination = null;      private Connection connection = null;      private Session session = null;      private MessageProducer producer = null;// 初始化      private void initialize() throws JMSException, Exception {      ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(      user, password, url);      connection = connectionFactory.createConnection();      session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);      destination = session.createTopic(subject);      producer = session.createProducer(destination);      producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);      }// 发送消息      public void produceMessage(String message) throws JMSException, Exception {      initialize();      TextMessage msg = session.createTextMessage(message);      connection.start();      System.out.println("Producer:->Sending message: " + message);      producer.send(msg);      System.out.println("Producer:->Message sent complete!");      }// 关闭连接      public void close() throws JMSException {      System.out.println("Producer:->Closing connection");      if (producer != null){producer.close();      }      if (session != null){session.close();      }      if (connection != null){connection.close();      }      }      
} 

消费者代码:

ConsumerTest.java

import javax.jms.JMSException;public class ConsumerTest implements Runnable {static Thread t1 = null;/*** @param args* @throws InterruptedException* @throws InterruptedException* @throws JMSException* @throws InterruptedException*/public static void main(String[] args) throws InterruptedException {t1 = new Thread(new ConsumerTest());t1.setDaemon(false);t1.start();/*** 如果发生异常,则重启consumer*//*while (true) {System.out.println(t1.isAlive());if (!t1.isAlive()) {t1 = new Thread(new ConsumerTest());t1.start();System.out.println("重新启动");}Thread.sleep(5000);}*/// 延时500毫秒之后停止接受消息// Thread.sleep(500);// consumer.close();
    }public void run() {try {ConsumerTool consumer = new ConsumerTool();consumer.consumeMessage();while (ConsumerTool.isconnection) {    }} catch (Exception e) {}}
}

ConsumerTool.java

import javax.jms.Connection;      
import javax.jms.Destination;      
import javax.jms.ExceptionListener;
import javax.jms.JMSException;      
import javax.jms.MessageConsumer;      
import javax.jms.Session;      
import javax.jms.MessageListener;      
import javax.jms.Message;      
import javax.jms.TextMessage;      import org.apache.activemq.ActiveMQConnection;      
import org.apache.activemq.ActiveMQConnectionFactory;      
/*** 消费者的模板     * @author ABC**/
public class ConsumerTool implements MessageListener,ExceptionListener {      private String user = ActiveMQConnection.DEFAULT_USER;      private String password = ActiveMQConnection.DEFAULT_PASSWORD;      private String url =ActiveMQConnection.DEFAULT_BROKER_URL;      private String subject = "mytopic";      private Destination destination = null;      private Connection connection = null;      private Session session = null;      private MessageConsumer consumer = null;  public static Boolean isconnection=false;// 初始化      private void initialize() throws JMSException, Exception {      ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(      user, password, url);      connection = connectionFactory.createConnection();      session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);      destination = session.createTopic(subject);      consumer = session.createConsumer(destination);     }      // 消费消息      public void consumeMessage() throws JMSException, Exception {      initialize();      connection.start();consumer.setMessageListener(this);    //注册一个消息监听器,有消息就执行onMessage()方法connection.setExceptionListener(this);//注册一个异常监听器,有异常就执行onException()方法isconnection=true;System.out.println("Consumer:->Begin listening...");      // 开始监听  // Message message = consumer.receive();      
    }// 关闭连接      public void close() throws JMSException {      System.out.println("Consumer:->Closing connection");      if (consumer != null)      consumer.close();      if (session != null)      session.close();      if (connection != null)      connection.close();      }// 消息处理函数      public void onMessage(Message message) {      try {      if (message instanceof TextMessage) {      TextMessage txtMsg = (TextMessage) message;      String msg = txtMsg.getText();      System.out.println("Consumer:->Received: " + msg);      } else {      System.out.println("Consumer:->Received: " + message);      }      } catch (JMSException e) {      // TODO Auto-generated catch block      
            e.printStackTrace();      }      }public void onException(JMSException arg0) {isconnection=false;//出现异常把isconnection设置成false
    }      
} 

只启动ProducerTest.java

如果这个时候把ActiveMq 关闭再开启....重新访问

 

之前的主题 mytopic产生的数据就没有了.....

ActiveMq默认是没有做持久化的,如果是Kafka只要是发过去的消息,都会一直存在,也可以设置一个过期的时间.到了期限,那些消息也是可以清除掉.否则就会一直都在.

ActiveMq一般是用在JavaEE中的....Kafka是用在大数据领域的.

再运行生产者的模板代码: ConsumerTest.java

生产者生产的数据:

再运行生产者的模板代码: ConsumerTest.java

生产者生产的数据:

消费者消费到数据:

 

看WEBUI

 

其他常用的JMS实现

要使用Java消息服务,你必须要有一个JMS提供者,管理会话和队列。既有开源的提供者也有专有的提供者。

开源的提供者包括:

Apache ActiveMQ

JBoss 社区所研发的 HornetQ

Joram

Coridan的MantaRay

The OpenJMS Group的OpenJMS

专有的提供者包括:

BEA的BEA WebLogic Server JMS

TIBCO Software的EMS

GigaSpaces Technologies的GigaSpaces

Softwired 2006的iBus

IONA Technologies的IONA JMS

SeeBeyond的IQManager(2005年8月被Sun Microsystems并购)

webMethods的JMS+ -

my-channels的Nirvana

Sonic Software的SonicMQ

SwiftMQ的SwiftMQ

IBM的WebSphere MQ

 ========================================================

附关于ActiveMq处理queue的模板代码:

ProducerTest.java

import java.util.Random;import javax.jms.JMSException;      public class ProducerTest {      /**    * @param args    * @throws Exception * @throws JMSException */     public static void main(String[] args) throws JMSException, Exception{      ProducerTool producer = new ProducerTool();Random random = new Random();for(int i=0;i<20;i++){Thread.sleep(random.nextInt(10)*1000);producer.produceMessage("Hello, world!--"+i);      producer.close();}}      
}

ProducerTool.java

import javax.jms.Connection;      
import javax.jms.DeliveryMode;      
import javax.jms.Destination;      
import javax.jms.JMSException;      
import javax.jms.MessageProducer;      
import javax.jms.Session;      
import javax.jms.TextMessage;      import org.apache.activemq.ActiveMQConnection;      
import org.apache.activemq.ActiveMQConnectionFactory;      public class ProducerTool {        private String user = ActiveMQConnection.DEFAULT_USER;         private String password = ActiveMQConnection.DEFAULT_PASSWORD;       private String url = ActiveMQConnection.DEFAULT_BROKER_URL;       private String subject = "myqueue";      private Destination destination = null;      private Connection connection = null;      private Session session = null;      private MessageProducer producer = null;// 初始化   private void initialize() throws JMSException, Exception {      ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(      user, password, url);      connection = connectionFactory.createConnection();      session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);      destination = session.createQueue(subject);      producer = session.createProducer(destination);      producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);      }// 发送消息 public void produceMessage(String message) throws JMSException, Exception {      initialize();      TextMessage msg = session.createTextMessage(message);      connection.start();      System.out.println("Producer:->Sending message: " + message);      producer.send(msg);      System.out.println("Producer:->Message sent complete!");      }// 关闭连接      public void close() throws JMSException {      System.out.println("Producer:->Closing connection");      if (producer != null){producer.close();      }      if (session != null){session.close();      }      if (connection != null){connection.close();      }      }      
}    

CustomerTest.java

public class ConsumerTest implements Runnable {static Thread t1 = null;public static void main(String[] args) throws InterruptedException {t1 = new Thread(new ConsumerTest());t1.start();
//        while (true) {
//            System.out.println(t1.isAlive());
//            if (!t1.isAlive()) {
//                t1 = new Thread(new ConsumerTest());
//                t1.start();
//                System.out.println("重新启动");
//            }
//            Thread.sleep(5000);
//        }// 延时500毫秒之后停止接受消息// Thread.sleep(500);// consumer.close();
    }public void run() {try {ConsumerTool consumer = new ConsumerTool();consumer.consumeMessage();while (ConsumerTool.isconnection) {    //System.out.println(123);
            }} catch (Exception e) {}}
}

CustomerTool.java

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.MessageListener;
import javax.jms.Message;
import javax.jms.TextMessage;import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;public class ConsumerTool implements MessageListener,ExceptionListener {private String user = ActiveMQConnection.DEFAULT_USER;private String password = ActiveMQConnection.DEFAULT_PASSWORD;private String url = ActiveMQConnection.DEFAULT_BROKER_URL;private String subject = "myqueue";private Destination destination = null;private Connection connection = null;private Session session = null;private MessageConsumer consumer = null;private ActiveMQConnectionFactory connectionFactory=null;public static Boolean isconnection=false;// 初始化private void initialize() throws JMSException {connectionFactory= new ActiveMQConnectionFactory(user, password, url);connection = connectionFactory.createConnection();session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);destination = session.createQueue(subject);consumer = session.createConsumer(destination);}// 消费消息public void consumeMessage() throws JMSException {initialize();connection.start();consumer.setMessageListener(this);connection.setExceptionListener(this);System.out.println("Consumer:->Begin listening...");isconnection=true;// 开始监听Message message = consumer.receive();System.out.println(message.getJMSMessageID());}// 关闭连接public void close() throws JMSException {System.out.println("Consumer:->Closing connection");if (consumer != null){consumer.close();}if (session != null){session.close();}if (connection != null){connection.close();}}// 消息处理函数public void onMessage(Message message) {try {if (message instanceof TextMessage) {TextMessage txtMsg = (TextMessage) message;String msg = txtMsg.getText();System.out.println("Consumer:->Received: " + msg);} else {System.out.println("Consumer:->Received: " + message);}} catch (JMSException e) {e.printStackTrace();}}public void onException(JMSException arg0){isconnection=false;}
}

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/456369.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

导入Anaconda中的第三方库运行时报错:ImportError: Missing required dependencies ['pandas']

今天碰到一个大坑&#xff0c;花了大半天才搞明白问题出在哪来。事情的经过是这样的&#xff1a;博主下午手贱把已将装好anaconda2给卸载了&#xff08;同时装了2和3&#xff09;&#xff0c;然后再次安装anconda2后&#xff0c;却发现配置完pycharm的解释器后&#xff0c;代码…

BZOJ2005 NOI2010 能量采集 欧拉函数

题意&#xff1a;求$\sum\limits_{i 1}^N {\sum\limits_{j 1}^M {f(i,j)} } $&#xff0c;其中f(i,j)(0,0)与(i,j)连线上点的数量 题解&#xff1a; 如果一个点(x,y)在(0,0)与(x,y)的连线上&#xff0c;则有gcd(x,y)gcd(x,y)。因此f(i,j)(gcd(i,j)gcd(i,j))且i<i,j<j的…

python子类继承父类特性,pycharm上面已经提示继承了,为什么会报没有该特性的错误?

因为在子类里覆盖了父类的__init__ 如果需要调用父类用super class A(object):def __init__(self):self.a 1def fun(self):print self.aclass B(A):def __init__(self):self.b 2super(B, self).__init__()def fun(self):print self.aprint self.bB().fun()

Hadoop伪分布安装详解(一)

注&#xff1a;以下截图针对Ubuntu操作系统&#xff0c;对Centos步骤类似。请读者选择不同镜像即可。 第一部分&#xff1a;VMware WorkStation10 安装 1.安装好VMware10虚拟机软件并下载好Ubuntu16.04 LTS 64位版的镜像包 2.打开VMware10虚拟机软件&#xff0c;选择“创建新的…

C++_const常成员作用

介绍 常成员是什么 1.常成员关键词为&#xff1a;const 2.常成员有&#xff1a;常成员变量、常成员函数、常成员对象 常成员有什么用 1.常成员变量&#xff1a;用于在程序中定义不可修改内部成员变量的函数 2.常成员函数&#xff1a;只能够访问成员变量&#xff0c;不可以修改成…

Unlicensed ARC session – terminating!

问题描述 近日&#xff0c;发现ArcGIS10.4中存在很多bug&#xff0c;而且费了好多时间去测试它&#xff0c;最终决定改用10.1。在降级程序时遇到许可问题。 重装ArcGIS10.1后&#xff0c;打开工程&#xff0c;所有引用都自动映射&#xff0c;没报任何错误&#xff0c;清理重新生…

SQLAlchemy - Column详解

SQLAlchemy - Column详解 Column常用参数&#xff1a; default&#xff1a;默认值 nullable&#xff1a;是否可有 primary_key&#xff1a;是否为主键 unique&#xff1a;是否唯一 autoincrement&#xff1a;是否自动增长 onupdate&#xff1a;更新的时候执行的函数 name&…

Linux命令三剑客:grep、sed、awk总结

文章目录前言一、grep命令语法实例grep结合pattern正则二、sed命令语法案例三、awk命令语法实例前言 最近看到了几篇关于linux命令grep、sed、awk的文章&#xff0c;这里总结下&#xff0c;方便后面使用。 一、grep grep命令&#xff08;grep的全称&#xff1a;Global searc…

Git常用指令及功能总结

文章目录前言&#xff1a;1、常用的git指令2、常用git功能及操作2.1、下载代码&#xff1a;2.2、当前分支和master保持一致2.3、修改代码后提交代码到指定分支2.4、版本回退&#xff08;时空穿梭机&#xff09;2.5、概念工作区和暂存区2.6、添加远程库2.7、分支管理2.8、标签管…

MacOS下MySQL配置

先去官网下载一个 MySQL for mac http://www.cnblogs.com/xiaobo-Linux/ 命令行运行终端&#xff0c;运行下面两条命令&#xff1a; 12alias mysql/usr/local/mysql/bin/mysqlalias mysqladmin/usr/local/mysql/bin/mysqladmin方便终端直接输入mysql命令&#xff0c;而不是必须…

SparkSQL-从0到1认识Catalyst

文章目录前言正文预备知识&#xff0d;Tree&RuleCatalyst工作流程ParserAnalyzerOptimizerSparkSQL执行计划前言 这篇文章是转载一位大神的文章&#xff0c;为什么要转载的&#xff0c;实在是因为写的太经典了&#xff0c;所以忍不住希望能有更多的人可以看到。后续还会转…

为什么程序员一定要加班?

摘要&#xff1a; 一提到程序员&#xff0c;大多数人的印象大概就是死宅、无趣、没有私人生活&#xff0c;除了上班写写写代码&#xff0c;加班写代码更是标配。似乎在深夜顶着鸡窝头&#xff0c;目光呆滞&#xff0c;面无表情敲键盘的场景才是一个程序员的真实写照。 当然&…

SparkSQL之Join原理

文章目录前言&#xff1a;Join背景介绍Join常见分类以及基本实现机制Hash JoinBroadcast Hash JoinShuffle Hash JoinSort-Merge Join总结前言&#xff1a; 写SQL的时候很多时候都有用到join语句&#xff0c;但是我们真的有仔细想过数据在join的过程到底是怎么样的吗&#xff…

SQLAlchemy中filter_by()和filter()的用法不同

filter_by() 和 filter() 的最主要的区别&#xff1a; 模块语法><&#xff08;大于和小于&#xff09;查询and_和or_查询filter_by()直接用属性名&#xff0c;比较用不支持不支持filter()用类名.属性名&#xff0c;比较用支持支持 谈 filter_by() 的语法之前先看下 filt…

python爬虫从入门到放弃(六)之 BeautifulSoup库的使用

上一篇文章的正则&#xff0c;其实对很多人来说用起来是不方便的&#xff0c;加上需要记很多规则&#xff0c;所以用起来不是特别熟练&#xff0c;而这节我们提到的beautifulsoup就是一个非常强大的工具&#xff0c;爬虫利器。 beautifulSoup “美味的汤&#xff0c;绿色的浓汤…

SparkHiveSQL中Join操作的谓词下推?

前言&#xff1a; SparkSQL和HiveSQL的Join操作中也有谓词下推&#xff1f;今天就通过大神的文章来了解下。同样&#xff0c;如有冒犯&#xff0c;请联系。 正文 上文简要介绍了Join在大数据领域中的使用背景以及常用的几种算法&#xff0d;broadcast hash join 、shuffle h…

六种方式实现生产者消费者(未完)

2019独角兽企业重金招聘Python工程师标准>>> 一、利用Object对象是wait和notify\notifyAll package com.jv.parallel.consumerandproducer.objectwait;public class Car {private volatile int flag 0;public void showConsumer(){System.out.println("I am a…

SQL中基于代价的优化

还记得笔者在上篇文章无意中挖的一个坑么&#xff1f;如若不知&#xff0c;强烈建议看官先行阅读前面两文&#xff0d;《SparkSQL Join原理》和《Join中竟然也有谓词下推?》 第一篇文章主要分析了大数据领域Join的三种基础算法以及各自的适用场景&#xff0c;第二篇文章在第一…

Java Map 怎样实现Key 的唯一性?

大家都知道。在Map和Set不可存在反复元素&#xff1f; 可是对于内部的细节我们并不了解。今天我们就一块来 探讨一下&#xff01; 1 对于 HashMap HashSet 他们的底层数据结构的实现是&#xff1a;维护了一张 HashTable 。容器中的元素所有存储在Hashtable 中。他们再加入…

win10下安装pyspark及碰到的问题

文章目录前言安装过程Q1总结&#xff1a;前言 最近由于工作需要&#xff0c;需要了解下pyspark&#xff0c;所以就在win10环境下装了下&#xff0c;然后在pycharm中使用的时候碰到了一些问题。整个过程可谓是一波三折。下面一一道来。 安装过程 安装过程就不详细说了&#x…