前言
之所以使用JNDI 是出于通用性考虑,该例子使用JMS规范提供的通用接口,没有使用具体JMS提供者的接口,这样可以保证我们编写的程序适用于任何一种JMS实现(ActiveMQ、HornetQ等)。
什么是JNDI:JNDI(Java Naming and Directory Interface)是一个标准规范,类似于JDBC,JMS等规范,为开发人员提供了查找和访问各种命名和目录服务的通用、统一的接口。J2EE 规范要求所有 J2EE 容器都要提供 JNDI 规范的实现,因此Tomcat就实现了JNDI 规范。
PTP(Point to point)消息模式(JMS的点对点消息传送)
1、使用Tomcat配置JNDI
找到Tomcat安装路径下的conf文件夹,打开context.xml,添加如下配置:
View Code
2、启动ActiveMQ
3、编写一个Web工程
Eclipse上新建web工程,添加ActiveMQ依赖的jar包,然后开始编写两个Servlet,一个用于生产消息,另一个用于消费消息,如下代码:
消息生产者Servlet:
import java.io.IOException;
import java.io.PrintWriter;
import javax.jms.DeliveryMode;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.InitialContext;
import javax.servlet.ServletException;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;/**
* Servlet implementation class JMSTest*/@WebServlet("/Send")public classSend extends HttpServlet {private static final long serialVersionUID = 1L;/**
* @see HttpServlet#HttpServlet()*/
publicSend() {
super();//TODO Auto-generated constructor stub
}/**
* @see HttpServlet#doGet(HttpServletRequest request, HttpServletResponse
* response)*/
protected voiddoGet(HttpServletRequest request,
HttpServletResponse response) throws ServletException, IOException {
PrintWriterout =response.getWriter();try{//get the initial context
InitialContext context = newInitialContext();//lookup the queue object
Queue queue = (Queue) context.lookup("java:comp/env/queue/queue0");//lookup the queue connection factory
QueueConnectionFactory conFactory =(QueueConnectionFactory) context
.lookup("java:comp/env/queue/connectionFactory");//create a queue connection
QueueConnection queConn =conFactory.createQueueConnection();//create a queue session
QueueSession queSession = queConn.createQueueSession(false,
Session.DUPS_OK_ACKNOWLEDGE);//create a queue sender
QueueSender queSender =queSession.createSender(queue);
queSender.setDeliveryMode(DeliveryMode.NON_PERSISTENT);//create a simple message to say "Hello World"
TextMessage message = queSession.createTextMessage("Hello World");//send the message
queSender.send(message);//print what we did
out.write("Message Sent:" +message.getText());//close the queue connection
queConn.close();
}catch(Exception e) {//TODO Auto-generated catch block
e.printStackTrace();
}
}/**
* @see HttpServlet#doPost(HttpServletRequest request, HttpServletResponse
* response)*/
protected voiddoPost(HttpServletRequest request,
HttpServletResponse response) throws ServletException, IOException {//TODO Auto-generated method stub
}
}
View Code
消息消费者Servlet:
import java.io.IOException;
import java.io.PrintWriter;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueReceiver;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.InitialContext;
import javax.servlet.ServletException;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;/**
* Servlet implementation class Receive*/@WebServlet("/Receive")public classReceive extends HttpServlet {private static final long serialVersionUID = 1L;/**
* @see HttpServlet#HttpServlet()*/
publicReceive() {
super();//TODO Auto-generated constructor stub
}/**
* @see HttpServlet#doGet(HttpServletRequest request, HttpServletResponse
* response)*/
protected voiddoGet(HttpServletRequest request,
HttpServletResponse response) throws ServletException, IOException {
PrintWriterout =response.getWriter();try{//get the initial context
InitialContext context = newInitialContext();//lookup the queue object
Queue queue = (Queue) context.lookup("java:comp/env/queue/queue0");//lookup the queue connection factory
QueueConnectionFactory conFactory =(QueueConnectionFactory) context
.lookup("java:comp/env/queue/connectionFactory");//create a queue connection
QueueConnection queConn =conFactory.createQueueConnection();//create a queue session
QueueSession queSession = queConn.createQueueSession(false,
Session.AUTO_ACKNOWLEDGE);//create a queue receiver
QueueReceiver queReceiver =queSession.createReceiver(queue);//start the connection
queConn.start();//receive a message
TextMessage message =(TextMessage) queReceiver.receive();//print the message
out.write("Message Received:" +message.getText());//close the queue connection
queConn.close();
}catch(Exception e) {//TODO Auto-generated catch block
e.printStackTrace();
}
}/**
* @see HttpServlet#doPost(HttpServletRequest request, HttpServletResponse
* response)*/
protected voiddoPost(HttpServletRequest request,
HttpServletResponse response) throws ServletException, IOException {//TODO Auto-generated method stub
}
}
View Code
4、验证结果
在Tomcat里运行该Web工程,执行消息生产者Servlet,返回消息发送成功标志,同时我们可以在http://localhost:8161/admin/queues.jsp查看到该消息,如下图所示
继续执行消息消费者Servlet,返回消息接收成功标志,同时我们可以打开http://localhost:8161/admin/queues.jsp页面,发现刚才的消息已经不见了,如下图所示
Pub/Sub消息模式(JMS发布/订阅消息传送)
1、在Tomcat中配置JNDI
配置连接工厂和话题:
View Code
2、启动ActiveMQ
3、在Web工厂中编写代码
新建一个发布者Servlet:
package pubSub;
import java.io.IOException;
import java.io.PrintWriter;
import javax.naming.InitialContext;
import javax.servlet.ServletException;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.jms.Topic;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.TopicPublisher;
import javax.jms.DeliveryMode;
import javax.jms.TopicSession;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;/**
* Servlet implementation class JMSTest*/@WebServlet("/Publish")public classPublisher extends HttpServlet {private static final long serialVersionUID = 1L;/**
* @see HttpServlet#HttpServlet()*/
publicPublisher() {
super();//TODO Auto-generated constructor stub
}/**
* @see HttpServlet#doGet(HttpServletRequest request, HttpServletResponse
* response)*/
protected voiddoGet(HttpServletRequest request,
HttpServletResponse response) throws ServletException, IOException {
PrintWriterout =response.getWriter();try{//get the initial context
InitialContext ctx = newInitialContext();//lookup the topic object
Topic topic = (Topic) ctx.lookup("java:comp/env/topic/topic0");//lookup the topic connection factory
TopicConnectionFactory connFactory =(TopicConnectionFactory) ctx
.lookup("java:comp/env/topic/connectionFactory");//create a topic connection
TopicConnection topicConn =connFactory.createTopicConnection();//create a topic session
TopicSession topicSession = topicConn.createTopicSession(false,
Session.AUTO_ACKNOWLEDGE);//create a topic publisher
TopicPublisher topicPublisher =topicSession.createPublisher(topic);
topicPublisher.setDeliveryMode(DeliveryMode.NON_PERSISTENT);//create the "Hello World" message
TextMessage message =topicSession.createTextMessage();
message.setText("Hello World");//publish the messages
topicPublisher.publish(message);//print what we did
out.write("Message published:" +message.getText());//close the topic connection
topicConn.close();
}catch(Exception e) {//TODO Auto-generated catch block
e.printStackTrace();
}
}/**
* @see HttpServlet#doPost(HttpServletRequest request, HttpServletResponse
* response)*/
protected voiddoPost(HttpServletRequest request,
HttpServletResponse response) throws ServletException, IOException {//TODO Auto-generated method stub
}
}
View Code
新建一个订阅者Servlet:
package pubSub;
import java.io.IOException;
import java.io.PrintWriter;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import javax.naming.InitialContext;
import javax.servlet.ServletException;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;/**
* Servlet implementation class Receive*/@WebServlet("/Subscribe")public classSubscriber extends HttpServlet {private static final long serialVersionUID = 1L;/**
* @see HttpServlet#HttpServlet()*/
publicSubscriber() {
super();//TODO Auto-generated constructor stub
}/**
* @see HttpServlet#doGet(HttpServletRequest request, HttpServletResponse
* response)*/
protected voiddoGet(HttpServletRequest request,
HttpServletResponse response) throws ServletException, IOException {
PrintWriterout =response.getWriter();try{//get the initial context
InitialContext ctx = newInitialContext();//lookup the topic object
Topic topic = (Topic) ctx.lookup("java:comp/env/topic/topic0");//lookup the topic connection factory
TopicConnectionFactory connFactory =(TopicConnectionFactory) ctx
.lookup("java:comp/env/topic/connectionFactory");//create a topic connection
TopicConnection topicConn =connFactory.createTopicConnection();//create a topic session
TopicSession topicSession = topicConn.createTopicSession(false,
Session.AUTO_ACKNOWLEDGE);//create a topic subscriber
TopicSubscriber topicSubscriber =topicSession
.createSubscriber(topic);//start the connection
topicConn.start();//receive the message
TextMessage message =(TextMessage) topicSubscriber.receive();//print the message
out.write("Message received:" +message.getText());//close the topic connection
topicConn.close();
}catch(Exception e) {//TODO Auto-generated catch block
e.printStackTrace();
}
}/**
* @see HttpServlet#doPost(HttpServletRequest request, HttpServletResponse
* response)*/
protected voiddoPost(HttpServletRequest request,
HttpServletResponse response) throws ServletException, IOException {//TODO Auto-generated method stub
}
}
View Code
4、验证结果
运行Web工程,分别打开多个标签访问订阅servlet,然后访问发布servlet,结果如下:
在订阅者订阅消息的时候,一开始没接收到消息,一旦发布者发布消息后,订阅者马上收到消息。