文章目录
- 一、简单对象
- 1. 依赖
- 2. 生产者
- 3. 消费者
- 4. 配置文件
- 5. spring版本
- 二、复杂对象
- 2.1. 生产者
- 2.2. 消费者
一、简单对象
1. 依赖
<!--spring整合rabbitmq--><dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit</artifactId><version>2.0.1.RELEASE</version></dependency>
注:maven方式,这一个依赖即可,如果是非maven项目,需要引入5个jar如下:
推荐使用mavne方式,简单,非Maven项目,先用maven把以来下载本地仓库,复制到非maven的项目中即可。
2. 生产者
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:rabbit="http://www.springframework.org/schema/rabbit"xsi:schemaLocation="http://www.springframework.org/schema/rabbithttp://www.springframework.org/schema/rabbit/spring-rabbit-2.0.xsdhttp://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans-3.1.xsd"><!--生产者者配置如下:--><!-- 定义RabbitMQ的连接工厂 --><rabbit:connection-factory id="connectionFactory"host="${rabbitmq.host}"port="${rabbitmq.port}"username="${rabbitmq.username}"password="${rabbitmq.password}"virtual-host="${rabbitmq.vhost}"connection-timeout="${rabbitmq.conTimeout}"publisher-confirms="${rabbitmq.publisher-confirms}"publisher-returns="${rabbitmq.publisher-returns}"/><!-- 管理消息队列 --><rabbit:admin connection-factory="connectionFactory"/><!--此处为配置文件方式 管控台配置模式需要注释 默认模式管控台 Start--><!-- 定义一个队列或者多个队列 自动声明--><rabbit:queue name="Queue-1" auto-declare="true" durable="true"/><rabbit:topic-exchange name="exchange-1"><rabbit:bindings><!-- 可绑定多个队列,发送的时候指定key进行发送 --><rabbit:binding queue="Queue-1" pattern="ws.tjqb"/></rabbit:bindings></rabbit:topic-exchange><!--此处为配置文件方式 管控台配置模式需要注释 默认模式管控台 End--><!-- 定义交换机 自动声明--><rabbit:topic-exchange name="exchange-1"auto-declare="true" durable="true"/><!-- 5. 配置消息对象json转换类 --><bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" /><!-- 定义MQ消息模板1. id : 定义消息模板ID2.connection-factory : 把定义的连接工厂放到消息模板中3.confirm-callback : confirm确认机制4.return-callback : return确认机制5.mandatory :#有2种状态设置为 true 后 消费者在消息没有被路由到合适队列情况下会被return监听,而不会自动删除;设置为 false 后 消费者在消息没有被路由到合适队列情况下会自动删除--><rabbit:template id="rabbitTemplate"connection-factory="connectionFactory"exchange="exchange-1"confirm-callback="confirmCallBackListener"return-callback="returnCallBackListener"mandatory="true"message-converter="jsonMessageConverter"/>
</beans>
package com.gblfy.order.controller;import com.gblfy.order.pojo.FisCallingTrace;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;@RestController
public class Send {public static final String EXCHANGE = "exchange-1";@AutowiredRabbitTemplate rabbitTemplate;@RequestMapping("/test")public String test() {String uuidStr = UUID.randomUUID().toString();CorrelationData correlationId = new CorrelationData(uuidStr);// 发送消息Map<String, String> map = new HashMap<>();map.put("email", "550731230@qq.com");rabbitTemplate.convertAndSend(EXCHANGE, "ws.tjqb", map, correlationId);return "success";}
3. 消费者
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:rabbit="http://www.springframework.org/schema/rabbit"xsi:schemaLocation="http://www.springframework.org/schema/rabbithttp://www.springframework.org/schema/rabbit/spring-rabbit-2.0.xsdhttp://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans-3.1.xsd"><!--消费者配置如下:--><!-- 定义RabbitMQ的连接工厂 --><rabbit:connection-factory id="connectionFactory"host="${rabbitmq.host}" port="${rabbitmq.port}" username="${rabbitmq.username}"password="${rabbitmq.password}" virtual-host="${rabbitmq.vhost}"connection-timeout="${rabbitmq.conTimeout}"publisher-confirms="${rabbitmq.publisher-confirms}"publisher-returns="${rabbitmq.publisher-returns}"/><!-- 管理消息队列 --><rabbit:admin connection-factory="connectionFactory"/><!-- 声明多个消费者对象 --><bean id="emailMessageListener" class="com.gblfy.order.mqhandler.EmailMessageListener"/><!-- 监听队列1. connectionFactory 连接工厂2. manual 手动签收3. ref="" 消费者监听--><rabbit:listener-container connection-factory="connectionFactory"acknowledge="manual"concurrency="${rabbitmq.concurrency}"max-concurrency="${rabbitmq.max-concurrency}"><rabbit:listener ref="emailMessageListener" method="onMessage" queue-names="Queue-1"/></rabbit:listener-container>
</beans>
package com.gblfy.order.mqhandler;import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;import java.io.IOException;
@Component
public class EmailMessageListener implements MessageListener {private static final ObjectMapper MAPPER = new ObjectMapper();@Overridepublic void onMessage(Message message) {try {JsonNode jsonNode = MAPPER.readTree(message.getBody());String email = jsonNode.get("email").asText();System.out.println("获取队列中消息:" + email);} catch (IOException e) {e.printStackTrace();}}
}
4. 配置文件
#RabbitMQ 连接信息
#IP地址
rabbitmq.host=192.168.0.114
#端口
rabbitmq.port=5672
#用户名
rabbitmq.username=fis
#密码
rabbitmq.password=ncl@1234
#虚拟主机
rabbitmq.vhost=/app/fisMQ
#连接超时时间
rabbitmq.conTimeout=15000
#发送确认 对应RabbitTemplate.ConfirmCallback接口
#消息发送成功 有2个重要参数
# ack 状态为true correlationId 全局唯一ID用于标识每一支队列
rabbitmq.publisher-confirms=true
#发送失败回退,对应RabbitTemplate.ReturnCallback接口
rabbitmq.publisher-returns=true
#默认消费者数量
rabbitmq.concurrency=10
#最大消费者数量
rabbitmq.max-concurrency=20
5. spring版本
目前适配的spring版本4.2.3.RELEASE
二、复杂对象
声明:配置文件不变
2.1. 生产者
package com.gblfy.order.controller;import com.gblfy.order.pojo.FisCallingTrace;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;@RestController
public class Send {public static final String EXCHANGE = "exchange-1";@AutowiredRabbitTemplate rabbitTemplate;@RequestMapping("/test")public String test() {FisCallingTrace f = getFisCallingTrace();String uuidStr = UUID.randomUUID().toString();CorrelationData correlationId = new CorrelationData(uuidStr);// 发送消息Map<String, Object> map = new HashMap<>();map.put("mReqXml", "请求报文");map.put("mResXml", "响应报文");map.put("mUUID", uuidStr);map.put("serviceName", "NYHC");map.put("fisCallingTrace", f);rabbitTemplate.convertAndSend(EXCHANGE, "ws.tjqb", map, correlationId);return "success";}// @RequestMapping("/test")// public String test() {// String uuidStr = UUID.randomUUID().toString();// CorrelationData correlationId = new CorrelationData(uuidStr);// // 发送消息// Map<String, String> map = new HashMap<>();// map.put("email", "550731230@qq.com");// rabbitTemplate.convertAndSend(EXCHANGE, "ws.tjqb", map, correlationId);// return "success";// }private FisCallingTrace getFisCallingTrace() {FisCallingTrace f = new FisCallingTrace();f.setServicename("tjqb");f.setServicetype("1");f.setInterfacetype("2");f.setResstatus("1");f.setResremark("纽约数据回传接口");f.setReqdate(new Date());f.setReqtime("10:00:00");f.setResdate(new Date());f.setRestime("10:00:00");f.setReqxml("请求报文");f.setResxml("响应报文");return f;}}
2.2. 消费者
package com.gblfy.order.mqhandler;import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.gblfy.order.pojo.FisCallingTrace;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;import java.io.IOException;@Slf4j
@Component
public class EmailMessageListener implements ChannelAwareMessageListener {private static final ObjectMapper MAPPER = new ObjectMapper();@Overridepublic void onMessage(Message message, Channel channel) throws Exception {try {JsonNode jsonNode = MAPPER.readTree(message.getBody());String mReqXml = jsonNode.get("mReqXml").asText();String mResXml = jsonNode.get("mResXml").asText();String mUUID = jsonNode.get("mUUID").asText();String serviceName = jsonNode.get("serviceName").asText();System.out.println("获取队列中消息:" + mReqXml);System.out.println("获取队列中消息:" + mResXml);System.out.println("获取队列中消息:" + mUUID);System.out.println("获取队列中消息:" + serviceName);JsonNode jsonNode1 = jsonNode.get("fisCallingTrace");String jsonStr = MAPPER.writeValueAsString(jsonNode1);FisCallingTrace f= MAPPER.readValue(jsonStr , FisCallingTrace.class);System.out.println("获取队列中消息:" + f.getReqxml());System.out.println("获取队列中消息:" + f.getResxml());// 消息的标识,false只确认当前一个消息收到,true确认所有consumer获得的消息channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);} catch (IOException e) {e.printStackTrace();}log.info("解析操作");log.info("落库操作");}
}