参考:http://t.csdnimg.cn/DxjMm
ActiveMQ的安装
官方的下载地址:http://activemq.apache.org/components/classic/download
(1)运行:解压后,进入bin目录,执行对应版本的 activemq.bat
管理页面:ActiveMQ的默认端口是8161,通过http://localhost:8161/admin/ 可以进 入管理页面
消息生产者程序
依赖
<dependency><groupId>org.apache.activemq</groupId><artifactId>activemq-core</artifactId><version>5.7.0</version></dependency>
生产者
package com.example.activiti.mq;import org.apache.activemq.ActiveMQConnectionFactory;import javax.jms.*;public class ProducerDemo {public static void main(String[] args) {//创建mq连接工厂//管理路径8161 生产路径61616ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");Connection conn = null;try {//打开mq连接conn = connectionFactory.createConnection();conn.start();//创建mq会话Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);//创建消息队列Destination destination = session.createQueue("text-queue-1");//基于队列创建消息发布者MessageProducer msgProducer = session.createProducer(destination);msgProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);//向消息队列发送信息for (int i = 1; i <= 5; i++) {TextMessage msg = session.createTextMessage();msg.setText("消息,"+ i);msgProducer.send(msg);System.out.println("生产者: "+ msg.getText());}} catch (JMSException e) {throw new RuntimeException(e);}finally {if (conn != null){try {conn.close();}catch (JMSException e){e.printStackTrace();}}}}
}
消费者
package com.example.activiti.mq;import org.apache.activemq.ActiveMQConnectionFactory;import javax.jms.*;public class ConsumerDemo {public static void main(String[] args){ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");Connection conn = null;try {conn = connectionFactory.createConnection();conn.start();Session session = conn.createSession(false,Session.AUTO_ACKNOWLEDGE);Destination destination = session.createQueue("text-queue-1");MessageConsumer msgConsumer = session.createConsumer(destination);while (true){TextMessage msg = (TextMessage) msgConsumer.receive();if (msg == null)break;System.out.println("消费者"+msg.getText());}} catch (JMSException e) {throw new RuntimeException(e);}finally {if (conn != null){try {conn.close();}catch (JMSException e){e.printStackTrace();}}}}
}
使用 Spring Boot 简化JMS开发 发送字符串消息
依赖
<!--ActiveMQ--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-activemq</artifactId></dependency>
yml
spring:activemq:broker-url: tcp://127.0.0.1:61616user: rootpassword: 1234packages:trusted:- com.example.activemq.pojo
启动类
import jakarta.jms.Queue;
import org.apache.activemq.command.ActiveMQQueue;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;@SpringBootApplication
public class BootJmsDemoApplication {//在Spring配置类中创建Destination(消息目的地)——Queue(队列)@Bean(name = "msgQueue")public Queue msgQueue(){return new ActiveMQQueue("boot-queue-msg");}
}
生产者
package com.example.activemq.controller;import jakarta.jms.Queue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;@RestController
public class MsgProducerController {@Autowired@Qualifier("msgQueue")private Queue msgQueue;//spring提供了JmsMessagingTemplate来简化JMS的调用,直接可以向指定队列发送消 息。@Autowiredprivate JmsMessagingTemplate jmsTemplate;@GetMapping("/send-msg")public void sendMsg(){jmsTemplate.convertAndSend(msgQueue,"测试发送消息");}
}
消费者
package com.example.activemq.controller;import jakarta.jms.Queue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;@RestController
public class MsgConsumerController {//spring 提供了“@JmsListener”注解,用于指定接收特定队列消息的消费者方法@JmsListener(destination = "boot-queue-msg")public void receive(String msg){System.out.println("消费者,接收到:"+msg);}
}
发送对象消息
使用JmsMessagingTemplate还可从生产者向消费者以发送对象,对象实际上会被序列化 到消息队列中。
启动类
@Bean(name = "userQueue")public Queue userQueue(){return new ActiveMQQueue("boot-queue-user");}
生产者
package com.example.activemq.controller;import com.example.activemq.pojo.User;
import jakarta.jms.Queue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;@RestController
public class UserProducerController {@Autowired@Qualifier("userQueue")private Queue userQueue;@Autowiredprivate JmsMessagingTemplate jmsTemplate;@GetMapping("/send-user")public void sendUser(){User user = new User(1,"zhangsan","张三");jmsTemplate.convertAndSend(userQueue,user);}
}
消费者
package com.example.activemq.controller;import com.example.activemq.pojo.User;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.web.bind.annotation.RestController;@RestController
public class UserConsumerController {@JmsListener(destination = "boot-queue-user")public void receive(User user){System.out.println("消费者,接收到:"+user);}
}