1:自定义mq信息类(我的交换这些信息都从nacos上直接取的,怎么从nacos取配置信息看上篇文章):
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;import java.io.Serializable;@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class MqInfoEntity implements Serializable {private static final long serialVersionUID = 1L;/*** 交换机类型(0:DIRECT直连交换机; 1:TOPIC主题交换机; 2:FANOUT扇形交换机; 3:HEADERS头交换机)*/private Integer exchangeType;/*** 交换机名称*/private String exchangeName;/*** 队列名称*/private String queueName;/*** 绑定关系*/private String routingKey;
}
2:自定义生产者类:
import com.alibaba.fastjson.JSONArray;
import com.fescotech.ordercommon.model.MqInfoEntity;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;@Component
@Slf4j
public class SendRequestParamsProducer implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback{@Autowiredprivate RabbitTemplate rabbitTemplate;//这两个变量是用来将交换机发送消息失败异常的原因在service里抛异常用public boolean ack1 = true;public String reason;@PostConstructpublic void init() {rabbitTemplate.setConfirmCallback(this);rabbitTemplate.setReturnCallback(this);rabbitTemplate.setMandatory(true);}@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {String deliveryNumber = correlationData.getId();if (ack){ack1=true;//记录发送记录log.info("向交换机发送消息成功,派单编号:{}",deliveryNumber);} else {ack1=false;reason = cause;log.error("向交换机发送消息失败,派单编号:{},原因为:{},", deliveryNumber, cause);}}/*** 推送消息* @author fu* @time 2023/7/18 14:28* @param mqInfoEntity* @param data为要推送的数据* @param correlationDataId调用时赋值一个唯一值就行* @return void*/public void sendRabbitMq(MqInfoEntity mqInfoEntity, JSONArray data ,String correlationDataId) {// rabbitTemplate.convertAndSend(mqInfoEntity.getExchangeName(), mqInfoEntity.getRoutingKey(), data, new CorrelationData(correlationDataId));rabbitTemplate.convertSendAndReceive(mqInfoEntity.getExchangeName(), mqInfoEntity.getRoutingKey(), data, new CorrelationData(correlationDataId));}@Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {String json = new String(message.getBody());log.info("消息本身:" + json);log.info("退回的replyCode是:" + replyCode);log.info("退回的replyText是:" + replyText);log.info("退回的exchange是:" + exchange);log.info("退回的routingKey是:" + routingKey);String errorDesc = "未匹配到队列,交换机:"+ exchange +",routingKey:"+ routingKey;}
}
3:逻辑推送数据至mq(前三步是我个人需求业务处理,你们可以从第四步看):
import cn.hutool.json.JSONUtil;
import com.alibaba.fastjson.JSONArray;
import com.fescotech.ft.common.model.vo.Result;
import com.fescotech.ft.common.util.FtResultUtil;
import com.fescotech.ordercommon.constants.ExchangeTypeEnum;
import com.fescotech.ordercommon.model.MqInfoEntity;
import com.fescotech.ordercommon.model.param.SendMsgParam;
import com.fescotech.orderservice.config.limit.CommonMqMsgConfig;
import com.fescotech.orderservice.mq.producer.SendRequestParamsProducer;
import com.fescotech.orderservice.service.CommonMqMsgService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import java.util.Map;@Service
@Slf4j
public class CommonMqMsgServiceImpl implements CommonMqMsgService {@Autowiredprivate SendRequestParamsProducer sendRequestParamsProducer;@Autowiredprivate CommonMqMsgConfig commonMqMsgConfig;@Overridepublic Result sendMsg(SendMsgParam sendMsgParam) {//1.拼接faceCode_sysChannelString faceCodeSysChannel = sendMsgParam.getFaceCode()+"_"+sendMsgParam.getSysChannel();//2.获取nacos配置文件Map<String,CommonMqMsgConfig.SystemMap> map = commonMqMsgConfig.getSystemMap();//3.校验faceCode_sysChannel在nacos上是否有配置if(!map.keySet().contains(faceCodeSysChannel)){return FtResultUtil.result(Result.FAIL,null,"未找到"+sendMsgParam.getFaceCode()+"_"+sendMsgParam.getSysChannel()+"配置",null);}//4.将要推送的数据转至json串(我要推送json串,你们推啥数据用啥数据就行)JSONArray jsonArray = JSONArray.parseArray(sendMsgParam.getRequestParams()) ;log.info("发送数据:{}",jsonArray);//5.取mq队列数据(我这是从nacos上取交换机名称、队列名称、绑定关系值,你们有现成的直接在第6步赋值即可)CommonMqMsgConfig.SystemMap systemMap = map.get(faceCodeSysChannel);//6.构建发送信息MqInfoEntity mqInfoEntity = MqInfoEntity.builder().exchangeName(systemMap.getExchange_name()).queueName(systemMap.getQueue_name()).routingKey(systemMap.getRouting_key()).exchangeType(ExchangeTypeEnum.TOPIC.getCode()).build();log.info("推送MQ,{},data:{}", JSONUtil.toJsonStr(mqInfoEntity),jsonArray);//7.推送mqsendRequestParamsProducer.sendRabbitMq(mqInfoEntity,jsonArray,sendMsgParam.getFaceCode()+"_"+sendMsgParam.getSysChannel());//8.捕捉交换机未收到消息异常原因boolean flag = sendRequestParamsProducer.ack1;if(flag){return FtResultUtil.result(Result.SUCCESS, "推送成功", null, null);}else {log.error("向交换机发送消息失败,编号:{},原因为:{},", faceCodeSysChannel, sendRequestParamsProducer.reason);return FtResultUtil.result(Result.FAIL,null,"推送失败,失败原因:"+sendRequestParamsProducer.reason,null);}}
}
推送成功: