接上一篇:(企业内部需求实战_进阶_01)SSM集成RabbitMQ 关键代码讲解、开发、测试
https://gblfy.blog.csdn.net/article/details/104197309
文章目录
- 一、RabbitMQ配置文件
- 1. RabbitMQ生产者配置文件
- 2. RabbitMQ消费者配置文件
- 3. 连接配置文件
- 二、生产者Java代码Conding
- 2.1. 生产者代码①
- 2.2. 生产者代码②
- 三、消费者Java代码Conding
- 3.1. 生产者代码①
- 3.2. 生产者代码②
- 四、项目准备
- 4.1. 启动项目
- 4.2. 清空控制台
- 五、管控台 队列绑定交换机
- 5.1. 启动 RabbitMQ
- 5.2. 管控台总览
- 5.3. 队列①绑定
- 5.4. 队列②绑定
- 六、管控台绑定后纵览
- 6.1. 在交换机菜单查看 绑定后的队列
- 6.2. 在队列①菜单中 查看 绑定后的交换机
- 6.3. 在队列②菜单中 查看 绑定后的交换机
- 七、生产者请求测试
- 7.1. 生产者①请求
- 7.2. 生产者②请求
- 7.3. 消费者①请求
- 7.4. 消费者②请求
一、RabbitMQ配置文件
1. RabbitMQ生产者配置文件
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:rabbit="http://www.springframework.org/schema/rabbit"xsi:schemaLocation="http://www.springframework.org/schema/rabbithttp://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsdhttp://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans-3.1.xsd"><!--生产者者配置如下:--><!-- 定义RabbitMQ的连接工厂 --><rabbit:connection-factory id="connectionFactory"host="${rabbitmq.host}" port="${rabbitmq.port}" username="${rabbitmq.username}"password="${rabbitmq.password}" virtual-host="${rabbitmq.vhost}"/><!-- 管理消息队列 --><rabbit:admin connection-factory="connectionFactory"/><!-- 定义交换机 自动声明--><rabbit:direct-exchange name="ORDER-TRACE-EXCHANGE"auto-declare="true" durable="true"/><!-- 定义MQ消息模板 --><rabbit:template id="rabbitTemplate"connection-factory="connectionFactory" exchange="ORDER-TRACE-EXCHANGE"/>
</beans>
2. RabbitMQ消费者配置文件
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:rabbit="http://www.springframework.org/schema/rabbit"xsi:schemaLocation="http://www.springframework.org/schema/rabbithttp://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsdhttp://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans-3.1.xsd"><!--消费者配置如下:--><!-- 定义RabbitMQ的连接工厂 --><rabbit:connection-factory id="connectionFactory"host="${rabbitmq.host}" port="${rabbitmq.port}" username="${rabbitmq.username}"password="${rabbitmq.password}" virtual-host="${rabbitmq.vhost}"/><!-- 管理消息队列 --><rabbit:admin connection-factory="connectionFactory"/><!-- 定义一个队列或者多个队列 --><rabbit:queue name="FIS-TRACE-QUEUE" auto-declare="true" durable="true"/><rabbit:queue name="FIS-TRACE-MONITOR-QUEUE" auto-declare="true" durable="true"/><rabbit:queue name="ORDER-MENU-CATEGORY-QUEUE" auto-declare="true" durable="true"/><!-- 声明多个消费者对象 --><bean id="fisMQMsgHandler" class="com.gblfy.order.mqhandler.FisMQMsgHandler"/><bean id="fisMQMonitorMsgHandler" class="com.gblfy.order.mqhandler.FisMQMonitorMsgHandler"/><bean id="mQSimpleMsgHandler" class="com.gblfy.order.mqhandler.MQSimpleMsgHandler"/><!-- 监听队列 --><rabbit:listener-container connection-factory="connectionFactory"><rabbit:listener ref="fisMQMsgHandler" method="execute" queue-names="FIS-TRACE-QUEUE"/><rabbit:listener ref="fisMQMonitorMsgHandler" method="onMessage" queue-names="FIS-TRACE-MONITOR-QUEUE"/><rabbit:listener ref="mQSimpleMsgHandler" method="execute" queue-names="ORDER-MENU-CATEGORY-QUEUE"/></rabbit:listener-container>
</beans>
3. 连接配置文件
rabbitmq.host=127.0.0.1
rabbitmq.port=5672
rabbitmq.username=admin
rabbitmq.password=admin
rabbitmq.vhost=/admin
二、生产者Java代码Conding
此次案例:主要演示对MQ消息的两种不同方式。
有2个生产者和2个消费者,2个队列分别对应一个交换机,路由key和队列名称不一样,消费者处理MQ的消息的2种不同步处理方法而已!
2.1. 生产者代码①
package com.gblfy.order.controller;import com.gblfy.order.pojo.FisCallingTrace;
import com.gblfy.order.utils.MQSendMsgUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.ResponseBody;import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.UUID;@Controller
@Slf4j
public class FisSendMQControllor {public static final DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd");// 日期格式public static final DateFormat timeFormat = new SimpleDateFormat("HH:mm:ss");// 日期格式@Autowiredprivate MQSendMsgUtils mqSendMsgUtils;/*** 发送轨迹数据 MQ异步存储轨迹** @return*/@RequestMapping(value = "/sendMQObjMsg", method = RequestMethod.GET)@ResponseBodypublic String sendObj() throws Exception {Date tStartDate = new Date();// 记录转发结束时间Date tEndDate = new Date();// 记录转发结束时间//模拟请求和响应报文String reqXml = "my name is reqXml";String resXml = "my name is resXml";String uuid = UUID.randomUUID().toString();//模拟 轨迹储存数据FisCallingTrace mFisCallingTrace = new FisCallingTrace().builder().servicename("myServiceNme is A").servicetype("2").interfacetype("2").resstatus("1").resremark("1").reqdate(dateFormat.parse(dateFormat.format(tStartDate))).reqtime(timeFormat.format(tStartDate)).resdate(dateFormat.parse(dateFormat.format(tEndDate))).restime(timeFormat.format(tEndDate)).reqxml("").resxml("").build();//定义路由routingKeyString routingKey = "trace";//调用MQ松松消息公共方法mqSendMsgUtils.sendMsg(mFisCallingTrace, routingKey, reqXml, resXml, uuid);return "send sendMQObjMsg success !!!";}
}
2.2. 生产者代码②
package com.gblfy.order.controller;import com.gblfy.order.pojo.FisCallingTrace;
import com.gblfy.order.utils.MQSendMsgUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.ResponseBody;import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.UUID;@Controller
@Slf4j
public class FisSendMQMsgControllor {public static final DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd");// 日期格式public static final DateFormat timeFormat = new SimpleDateFormat("HH:mm:ss");// 日期格式@Autowiredprivate MQSendMsgUtils mqSendMsgUtils;/*** 发送轨迹数据 MQ异步存储轨迹** @return*/@RequestMapping(value = "/sendMQObjMsg2", method = RequestMethod.GET)@ResponseBodypublic String sendObj() throws Exception {Date tStartDate = new Date();// 记录转发结束时间Date tEndDate = new Date();// 记录转发结束时间//模拟请求和响应报文String reqXml = "my name is reqXml";String resXml = "my name is resXml";String uuid = UUID.randomUUID().toString();//模拟 轨迹储存数据FisCallingTrace mFisCallingTrace = new FisCallingTrace().builder().servicename("myServiceNme is A").servicetype("2").interfacetype("2").resstatus("1").resremark("1").reqdate(dateFormat.parse(dateFormat.format(tStartDate))).reqtime(timeFormat.format(tStartDate)).resdate(dateFormat.parse(dateFormat.format(tEndDate))).restime(timeFormat.format(tEndDate)).reqxml("").resxml("").build();//定义路由routingKeyString routingKey = "trace2";//调用MQ松松消息公共方法mqSendMsgUtils.sendMsg(mFisCallingTrace, routingKey, reqXml, resXml, uuid);return "send sendMQObjMsg success !!!";}
}
三、消费者Java代码Conding
3.1. 生产者代码①
package com.gblfy.order.mqhandler;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.gblfy.order.pojo.FisCallingTrace;
import com.gblfy.order.pojo.RequestInfo;
import lombok.extern.slf4j.Slf4j;@Slf4j
public class FisMQMsgHandler {/*** 接收MQ消息,保存轨迹** @param msg*/public void execute(String msg) {try {//通过 判断路由routingKey是否等于trace相同即可//fastjson解析MQ接收的json字符串 转换成RequestInfo对象JSONObject jsonObject = JSON.parseObject(msg);RequestInfo requestInfo = JSON.toJavaObject(jsonObject, RequestInfo.class);log.info("请求报文 mReqXml:" + requestInfo.getMReqXml());log.info("响应报文 mResXml:" + requestInfo.getMResXml());log.info("接口名称 serviceName:" + requestInfo.getServiceName());log.info("路由routingKey:" + requestInfo.getType());log.info("生成的 mUUID:" + requestInfo.getMUUID());/*** 1.从requestInfo对象中,获取fisCallingTrace轨迹对象* 2.请求报文和响应报文需要添加进去 fisCallingTrace对象中的请求报文和响应报文默认是空字符串* 3.将fisCallingTrace 轨技数据保存数据库*/FisCallingTrace fisCallingTrace = requestInfo.getFisCallingTrace();fisCallingTrace.setTraceId(requestInfo.getMUUID());fisCallingTrace.setReqxml(requestInfo.getMReqXml());fisCallingTrace.setResxml(requestInfo.getMResXml());log.info("从MQ接收消息并封装完成!!!");log.info("开始进行插入数据库操作!!!");//把MQ接收消息的数据进行 保存轨迹数据库操作 todo//注入mqpper 插入数据库 todo} catch (Exception e) {log.info("如果对象中没有,指定的元素,一般会导致空指针异常!!!");e.printStackTrace();}}
}
3.2. 生产者代码②
package com.gblfy.order.mqhandler;import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;@Slf4j
public class MQSimpleMsgHandler {private static final ObjectMapper MAPPER = new ObjectMapper();/*** 接收MQ消息** @param msg*/public void execute(String msg) {try {JsonNode jsonNode = MAPPER.readTree(msg);String serviceName = jsonNode.get("serviceName").asText();String routingKey = jsonNode.get("routingKey").asText();String currentDate = jsonNode.get("currentDate").asText();log.info("接口名称:" + serviceName);log.info("路由routingKey:" + routingKey);log.info("当前时间:" + currentDate);} catch (Exception e) {e.printStackTrace();}}
}
四、项目准备
4.1. 启动项目
4.2. 清空控制台
五、管控台 队列绑定交换机
5.1. 启动 RabbitMQ
双击运行
5.2. 管控台总览
5.3. 队列①绑定
5.4. 队列②绑定
六、管控台绑定后纵览
6.1. 在交换机菜单查看 绑定后的队列
6.2. 在队列①菜单中 查看 绑定后的交换机
6.3. 在队列②菜单中 查看 绑定后的交换机
七、生产者请求测试
7.1. 生产者①请求
7.2. 生产者②请求
其他和生产①一样的,知识路由key值不一样
7.3. 消费者①请求
7.4. 消费者②请求