生产者:
package com.111.activemq;import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage;import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory;public class JMSProducer {//默认连接用户名private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;//默认连接密码private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;//默认连接地址private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;//发送的消息数量private static final int SENDNUM = 10;public static void main(String[] args) {//连接工厂 ConnectionFactory connectionFactory;//连接Connection connection = null;//会话 接受或者发送消息的线程Session session = null;//消息的目的地 Destination destination;//消息生产者 MessageProducer messageProducer;//消息队列名称String queueName = "helloWord"; //实例化连接工厂connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL);try {//通过连接工厂获取连接connection = connectionFactory.createConnection();//启动连接 connection.start();//创建sessionsession = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);//创建一个连接自定义队列名称的消息队列destination = session.createQueue(queueName);//创建消息生产者messageProducer = session.createProducer(destination);//发送消息 sendMessage(session, messageProducer);session.commit();} catch (Exception e) {e.printStackTrace();}finally{if(connection != null){try {session.close();connection.close();} catch (JMSException e) {e.printStackTrace();}}}}/*** 发送消息* @param session* @param messageProducer 消息生产者* @throws Exception*/public static void sendMessage(Session session,MessageProducer messageProducer) throws Exception{for (int i = 0; i < SENDNUM; i++) {//创建一条文本消息 TextMessage message = session.createTextMessage("ActiveMQ 发送消息" +i);System.out.println("发送消息:Activemq 发送消息" + i);//通过消息生产者发出消息 messageProducer.send(message);}} }
消费者:
package com.111.activemq;import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.Session; import javax.jms.TextMessage;import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; public class JMSConsumer {//默认连接用户名private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;//默认连接密码private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;//默认连接地址private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;public static void main(String[] args) {//连接工厂 ConnectionFactory connectionFactory;//连接Connection connection = null;//会话 接受或者发送消息的线程 Session session;//消息的目的地 Destination destination;//消息的消费者 MessageConsumer messageConsumer;//消息队列名称String queueName = "helloWord";//实例化连接工厂connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL);try {//通过连接工厂获取连接connection = connectionFactory.createConnection();//启动连接 connection.start();//创建sessionsession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);//创建一个连接自定义队列名称的消息队列destination = session.createQueue(queueName);//创建消息消费者messageConsumer = session.createConsumer(destination);while (true) {TextMessage textMessage = (TextMessage) messageConsumer.receive(100000);if(textMessage != null){System.out.println("收到的消息:" + textMessage.getText());}else {break;}}} catch (JMSException e) {e.printStackTrace();}} }
多线程生产者:
package com.111.activemq;import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage;import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory;public class JMSProducerMultithreading implements Runnable{//默认连接用户名private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;//默认连接密码private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;//默认连接地址private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;//发送的消息数量private static final int SENDNUM = 3;/*** 发送消息* @param session* @param messageProducer 消息生产者* @throws Exception*/public static void sendMessage(Session session,MessageProducer messageProducer) throws Exception{for (int i = 0; i < SENDNUM; i++) {//获取当前线程idString threadId = Thread.currentThread().getId()+"";//创建一条文本消息 TextMessage message = session.createTextMessage("ActiveMQ 发送消息" +i+"生产者线程编号="+threadId);//控制台打印System.out.println("ActiveMQ 发送消息" +i+"生产者线程编号="+threadId);//通过消息生产者发出消息 messageProducer.send(message);}}@Overridepublic void run() {//连接工厂 ConnectionFactory connectionFactory;//连接Connection connection = null;//会话 接受或者发送消息的线程Session session = null;//消息的目的地 Destination destination;//消息生产者 MessageProducer messageProducer;//消息队列名称String queueName = "Multithreading";//实例化连接工厂connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL);try {//通过连接工厂获取连接connection = connectionFactory.createConnection();//启动连接 connection.start();//创建sessionsession = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);//创建一个名称为HelloWorld的消息队列destination = session.createQueue(queueName);//创建消息生产者messageProducer = session.createProducer(destination);//发送消息 sendMessage(session, messageProducer);session.commit();} catch (Exception e) {e.printStackTrace();}finally{if(connection != null){try {session.close();connection.close();} catch (JMSException e) {e.printStackTrace();}}}} }
多线程消费者:
package com.111.activemq;import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.Session; import javax.jms.TextMessage;import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory;public class JMSConsumerMultithreading implements Runnable{//默认连接用户名private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;//默认连接密码private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;//默认连接地址private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;@Overridepublic void run() {ConnectionFactory connectionFactory;//连接工厂Connection connection = null;//连接 Session session;//会话 接受或者发送消息的线程Destination destination;//消息的目的地 MessageConsumer messageConsumer;//消息的消费者//消息队列名称String queueName = "Multithreading";//实例化连接工厂connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL);try {//通过连接工厂获取连接connection = connectionFactory.createConnection();//启动连接 connection.start();//创建sessionsession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);//创建一个连接HelloWorld的消息队列destination = session.createQueue(queueName);//创建消息消费者messageConsumer = session.createConsumer(destination);String threadId = Thread.currentThread().getId()+""; while (true) {TextMessage textMessage = (TextMessage) messageConsumer.receive(100000);if(textMessage != null){System.out.println("收到的消息:" + textMessage.getText()+" 消费者线程编号="+threadId);}else {break;}}} catch (JMSException e) {e.printStackTrace();}} }
多线程生产者测试类:
package com.111.activemq;public class JMSProducerMultithreadingTest {public static void main(String[] args) {JMSProducerMultithreading jpm = new JMSProducerMultithreading();//启动10个生产者线程for(int i = 0 ; i < 10 ; i++){Thread t = new Thread(jpm);t.start();}} }
多线程消费者测试类:
package com.111.activemq;public class JMSConsumerMultithreadingTest {public static void main(String[] args) {JMSConsumerMultithreading jcm = new JMSConsumerMultithreading();//启动3个消费者者线程for(int i = 0 ; i < 3 ; i++){Thread t = new Thread(jcm);t.start();}} }