文章目录
- 一、依赖配置
- 1. 引入依赖
- 2. 配置文件
- 3. 主配置
- 二、生产者代码代码Conding
- 2.1. 发送客户端
- 2.2. 确认机制
- 2.3. 消息 return机制
- 2.4. controller
- 2.5. MQ工具类
- 2.6. 常量类
- 三、消费端
- 3.2. 消费者代码
- 3.2. RabbitMQ常用命令
一、依赖配置
1. 引入依赖
<!--服务注册发现--><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId></dependency><!-- SpringCloud Alibaba Nacos Config --><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId></dependency><!--springboot整合RabbitMQ依赖--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
2. 配置文件
项目内部配置bootstrap.yml,
server:port: 8001
spring:application:# 应用名称name: ly-rabbitmqprofiles:# 环境配置active: devcloud:nacos:discovery:# 服务注册地址server-addr: nacos.server.com:8848config:# 配置中心地址server-addr: nacos.server.com:8848# 配置文件格式file-extension: yml# 共享配置shared-configs:- application-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension}
nacos-config服务端配置
在这里插入代码片
3. 主配置
package com.gblfy.lyrabbitmq.config;import com.gblfy.lyrabbitmq.consts.MQPrefixConst;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** RabbitMQ 交换机和队列绑定配置类** @author gblfy* @Date 2021-09-28 9:59*/
@Configuration
public class RabbitTopicConfig {@BeanTopicExchange topicExchange() {return new TopicExchange(MQPrefixConst.WS_EXCEHANGE, true, false);}@BeanQueue hisUQ() {return new Queue("ly_mq_fai_q");}@BeanBinding hisUQBinding() {//ly-his.# 代表路由规则 表示如果路由的 routingKey 是以ly-his 开头就会发送到 ly_mq_his_u_q 这个队列上return BindingBuilder.bind(hisUQ()).to(topicExchange()).with("ly-fai.#");}
}
二、生产者代码代码Conding
2.1. 发送客户端
package com.gblfy.lyrabbitmq.provider;import com.alibaba.fastjson.JSON;
import com.gblfy.common.entity.Order;
import com.gblfy.lyrabbitmq.consts.MQPrefixConst;
import com.gblfy.lyrabbitmq.utils.MQSendMsgUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import java.time.LocalDateTime;/*** RabbitMQ发送消息** @author gblfy* @Date 2021-09-28 9:59*/
@Service
public class RabbitMQProvider {private final static Logger log = LoggerFactory.getLogger(RabbitMQProvider.class);@Autowiredprivate MQSendMsgUtils mqSendMsgUtils;/*** 发送补偿MQ消息** @param orderId 订单编码* @param orderNum 订单数量* @param createTime 订单时间* @return 发送标识*/public void sendMQContent(long orderId, String orderNum, LocalDateTime createTime) {Order order = Order.builder().orderId(orderId).orderNum(orderNum).createTime(createTime).build();// 发送MQ消息到交换机通过指定消息路由key路由到指定队列中mqSendMsgUtils.snedStrMQMsg(MQPrefixConst.LY_MQ_FAI_QUERY, JSON.toJSONString(order));log.info("MQ消息发送成功");}
}
2.2. 确认机制
package com.gblfy.lyrabbitmq.confirms;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;/*** Confirm 消息确认机制* 消息发送成功和失败都会记录** @author gblfy* @Date 2021-09-28 9:59*/
@Component("confirmCallback")
public class ConfirmCallBackListener implements RabbitTemplate.ConfirmCallback {//日志输出private final static Logger log = LoggerFactory.getLogger(ConfirmCallBackListener.class);/*** 生产者消息发送成功与失败确认机制* <p>* 1. ack* true : 标志生产者将消息发出成功* false: 标志生产者将消息发出失败* 2. ack :true 意味着消息发送成功 有2种场景* 第一种:生产者将消息成功发送到指定队列中,等待消费者消费消息* 第两种:生产者将消息发送成功,但是,由于无法路由到指定的消息* 队列,这种场景的消息,会被return机制监听到,后续进行补偿机制,做消息补发处理* </p>** @param correlationData 队列消息的唯一标识ID,消息做补偿机制会用到* @param ack ack 消息是否发送成功的标识* @param cause 消息发送失败的原因*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {log.info("消息队列标识ID: {}", correlationData.getId());/***对消息发送成功/失败状态做不同的处理* 1. 第一种场景 发送消息成功* 1>发送消息成功,交换机路由队列成功* 2>发送消息成功,交换机路由队列不成功* 2. 发送消息失败*/if (ack) {log.info("发送消息成功: {}", ack);} else {log.info("发送消息失败: {}", ack);}}
}
/*** !ack 场景结果示例:* <p>* correlationData: CorrelationData [id=a37285dc-5dd6-4e22-8cc4-5c0fbf67b568]* ack: false* 异常处理....* 消息: CorrelationData [id=a37285dc-5dd6-4e22-8cc4-5c0fbf67b568],* nack,失败原因是:channel error;* protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'FIS-TRACE-COMMON-EXCHANGE' in vhost '/admin',* class-id=60, method-id=40)*/
2.3. 消息 return机制
package com.gblfy.lyrabbitmq.returns;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;/*** 消息 return机制* 路由失败的消息会先走这,然后到ConfirmCallBackListener记录异常错误信息** @author gblfy* @Date 2021-09-28 9:59*/
@Component("returnCallBackListener")
public class ReturnCallBackListener implements RabbitTemplate.ReturnCallback {//打印日志 实时定位private final static Logger log = LoggerFactory.getLogger(ReturnCallBackListener.class);/*** 消息无法路由 触发消息 return机制* <p>* 1. 消费者在消息没有被路由到合适队列情况下会被return监听,而不会自动删除* 2. 会监听到生产者发送消息的关键信息* 3. 根据关键信息,后续进行补偿机制,做消息补发处理* </p>** @param message 消息实体* @param replyCode 应答码312* @param replyText NO_ROUTE* @param exchange 交换机* @param routingKey 路由routingKey*/@Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {log.info("DeliveryTag: {}", message.getMessageProperties().getDeliveryTag());log.info("ContentType: {}", message.getMessageProperties().getContentType());log.info("ContentEncoding: {}", message.getMessageProperties().getContentEncoding());log.info("消息发送的指定交换机: {}", exchange);log.info("队列路由的routingKey: {}", routingKey);log.info("队列的响应码replyCode: {}", replyCode);log.info("队列的响应信息: {}", replyText);//TODO 消息发送交换机成功 路由失败 保存轨迹记录}
}
/*** 场景结果示例:* return exchange: FIS-TRACE-COMMON-EXCHANGE, routingKey: fis-str.user, replyCode: 312, replyText: NO_ROUTE* correlationData: CorrelationData [id=30d924db-77b4-41df-bbe6-9a8f0eb3fe7a]* ack: true* 消息: CorrelationData [id=30d924db-77b4-41df-bbe6-9a8f0eb3fe7a],已经被ack成功*/
2.4. controller
package com.gblfy.lyrabbitmq.controller;import com.gblfy.lyrabbitmq.provider.RabbitMQProvider;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.time.LocalDateTime;@RestController
@RequestMapping("/mq")
public class MQProviderController {@Autowiredprivate RabbitMQProvider mqProvider;@GetMapping("/sendMQ")public String sendMQContent() {mqProvider.sendMQContent(0001, "10", LocalDateTime.now());return "OK";}
}
2.5. MQ工具类
package com.gblfy.lyrabbitmq.utils;import com.gblfy.lyrabbitmq.confirms.ConfirmCallBackListener;
import com.gblfy.lyrabbitmq.consts.MQPrefixConst;
import com.gblfy.lyrabbitmq.returns.ReturnCallBackListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import java.util.UUID;/*** MQ发送消息(公用工具类)** @author gblfy* @Date 2021-09-28 9:59*/
@Component
public class MQSendMsgUtils {private final static Logger log = LoggerFactory.getLogger(MQSendMsgUtils.class);@Autowired//注入发送消息模板private RabbitTemplate rabbitTemplate;@Autowiredprivate ConfirmCallBackListener confirmCallback;@Autowiredprivate ReturnCallBackListener returnCallback;/*** 发送MQ STRING类型消息 第1种** @param queueRouteKey 路由routingKey* @param msg MQ STRING类型消息*/public void snedStrMQMsg(String queueRouteKey, String msg) {try {log.info("交换机名称: {}, 路由routingKey: {}, 发送的消息: {} ", "EXCHANGE-CMIIP", queueRouteKey, msg);String mID = UUID.randomUUID().toString();CorrelationData correlationId = new CorrelationData(mID);// Confirm 消息确认策略rabbitTemplate.setConfirmCallback(confirmCallback);// Return 消息确认策略rabbitTemplate.setReturnCallback(returnCallback);log.info("发送消息的路由key: {}", queueRouteKey);log.info("发送消息的标识ID: {}", mID);//发送消息到MQ的交换机,通知其他系统rabbitTemplate.convertAndSend(MQPrefixConst.WS_EXCEHANGE, queueRouteKey, msg, correlationId);} catch (Exception e) {e.printStackTrace();}}
}
2.6. 常量类
package com.gblfy.lyrabbitmq.consts;/*** 消息路由规则前缀(常量类)** @author gblfy* @Date 2021-09-28 9:59*/
public class MQPrefixConst {//交换机名称//回归环境public static final String WS_EXCEHANGE = "LY-REPORT-EXCHANGE";// 路由keypublic static final String LY_MQ_FAI_QUERY = "ly-fai.query";
}
三、消费端
3.2. 消费者代码
package com.gblfy.lyrabbitmq.consumer;import com.gblfy.lyrabbitmq.consts.MQPrefixConst;
import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;/*** RabbitMQ消费端处理** @author gblfy* @Date 2021-09-28 9:59*/
@Component
public class RabbitMQHandler implements ChannelAwareMessageListener {//打印日志 实时定位private final static Logger log = LoggerFactory.getLogger(RabbitMQHandler.class);/*** 接收字符串类型MQ消息** @param message* @param channel* @throws Exception*/@Override@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "${spring.rabbitmq.listener.str1.queue.name}",durable = "${spring.rabbitmq.listener.str1.queue.durable}"),exchange = @Exchange(value = "${spring.rabbitmq.listener.str1.exchange.name}",durable = "${spring.rabbitmq.listener.str1.exchange.durable}",type = "${spring.rabbitmq.listener.str1.exchange.type}",ignoreDeclarationExceptions = "${spring.rabbitmq.listener.str1.exchange.ignoreDeclarationExceptions}"),key = "${spring.rabbitmq.listener.str1.key}"))@RabbitHandlerpublic void onMessage(Message message, Channel channel) throws Exception {//TODO 接收消息成功 创建一个消费端轨迹表来存储消息的轨迹数据String jsonMsg = new String(message.getBody());log.info("响应报文 mResXml: {}", jsonMsg);// 同一时刻服务器只会发一条消息给消费者channel.basicQos(1);// 反馈消息的消费状态channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);// 反馈消息的消费状态System.err.println("--------------------------------------");//------------------------------根据约定解析指定的标签--------------------------------------------// JSONObject jsonObject = new JSONObject();// jsonObject = JSON.parseObject(jsonMsg);// String msgID = jsonObject.getString("msgID");// log.info("接收的消息ID: {}", msgID);//// String tResXml = jsonObject.getString("tResXml");// log.info("解析后的zip路径: {}", tResXml);String queueRouteKey = message.getMessageProperties().getReceivedRoutingKey();log.info("接收的路由key: {}", queueRouteKey);if (MQPrefixConst.LY_MQ_FAI_QUERY.equals(queueRouteKey)) {//TODO 监听查询接口逻辑//TODO 保存数据到数据库} else {log.error("无此路由key: {}", queueRouteKey);}}
}
3.2. RabbitMQ常用命令
# 启动MQ
rabbitmq-server -detatched