🧑💻作者名称:DaenCode
🎤作者简介:啥技术都喜欢捣鼓捣鼓,喜欢分享技术、经验、生活。
😎人生感悟:尝尽人生百味,方知世间冷暖。
📖所属专栏:项目所感所想
文章目录
- 🌟架构图
- 🌟application.properties
- 🌟RabbitMQ配置
- 🌟消息协议封装
- 🌟消息类型封装
- 🌟C端消费者
- 🌟B端消费者
- 🌟发送消息与处理消息
- 🌟最后
🌟架构图
🌟application.properties
redundancy.mq.redundancy-event-exchange=redundancy.event.exchange
redundancy.mq.add-routing-key=redundancy.add.business.consumer.routing.key
redundancy.mq.add-business-binding-key=redundancy.add.business.*.routing.key
redundancy.mq.add-consumer-binding-key=redundancy.add.*.consumer.routing.key
redundancy.mq.add-business-queue=redundancy.add.business.queue
redundancy.mq.add-consumer-queue=redundancy.add.consumer.queue
🌟RabbitMQ配置
package top.daencode.config;import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** Copyright (C) 2023-11-29 智源恩创网络科技工作室** @BelongsProject: architecture-solution* @BelongsPackage: top.daencode.mq* @author: DaenCode* @createTime: 2023-11-29 15:08* @description: TODO* @version: 1.0*/
@Configuration
@Slf4j
@Data
@ConfigurationProperties(prefix = "redundancy.mq")
public class RabbitMqForRedundancyConfig {/*** 交换机*/private String redundancyEventExchange;/*** 添加路由key*/private String addRoutingkey;/*** B端添加绑定key*/private String addBusinessBindingKey;/*** C端添加绑定key*/private String addConsumerBindingKey;/*** B端添加队列*/private String addBusinessQueue;/*** C端添加队列*/private String addConsumerQueue;/*** 创建冗余双写交换机* @return*/@Beanpublic Exchange redundancyEventExchange(){return new TopicExchange(redundancyEventExchange);}/*** 创建B端添加队列* @return*/@Beanpublic Queue addBusinessQueue(){return new Queue(addBusinessQueue,true,false,false);}/*** 创建C端添加队列* @return*/@Beanpublic Queue addConsumerQueue(){return new Queue(addConsumerQueue,true,false,false);}/*** B端绑定关系*/@Beanpublic Binding addBusinessBinding(){return new Binding(addBusinessQueue, Binding.DestinationType.QUEUE,redundancyEventExchange,addBusinessBindingKey,null);}/*** C端交换机绑定到队列* @return*/@Beanpublic Binding addConsumerBinding(){return new Binding(addConsumerQueue, Binding.DestinationType.QUEUE,redundancyEventExchange,addConsumerBindingKey,null);}
}
🌟消息协议封装
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class EventMessage implements Serializable {/*** 消息队列id*/private String messageId;/*** 事件类型*/private String eventMessageType;/*** 业务id*/private String bizId;/*** 消息体*/private String content;/*** 异常备注*/private String remark;
}
🌟消息类型封装
public enum EventMessageTypeEnum {REDUNDANCY_ADD,REDUNDANCY_ADD_BUSINESS,REDUNDANCY_ADD_CONSUMER,REDUNDANCY_DEL,REDUNDANCY_DEL_BUSINESS,REDUNDANCY_DEL_CONSUMER,REDUNDANCY_UPDATE,REDUNDANCY_UPDATE_BUSINESS,REDUNDANCY_UPDATE_CONSUMER,
}
🌟C端消费者
@Component
@Slf4j
@RabbitListener(queuesToDeclare = {@Queue("redundancy.add.consumer.queue")})
public class RedundancyAddConsumerMQListener {@Autowiredprivate DetailService detailService;/*** 消费消息* @param eventMessage* @param message* @param channel*/@RabbitHandlerpublic void handleAddConsumer(EventMessage eventMessage, Message message, Channel channel){try {eventMessage.setEventMessageType(EventMessageTypeEnum.REDUNDANCY_ADD_CONSUMER.name());boolean flag= detailService.handleAddDetail(eventMessage);} catch (Exception e) {log.error("handleAddConsumer--消费失败{}",eventMessage);}}
}
🌟B端消费者
@Component
@Slf4j
@RabbitListener(queuesToDeclare = {@Queue("redundancy.add.business.queue")})
public class RedundancyAddBusinessMQListener {@Autowiredprivate DetailService detailService;/*** 消费消息* @param eventMessage* @param message* @param channel*/@RabbitHandlerpublic void handleAddBusiness(EventMessage eventMessage, Message message, Channel channel){try {eventMessage.setEventMessageType(EventMessageTypeEnum.REDUNDANCY_ADD_BUSINESS.name());boolean flag= detailService.handleAddDetail(eventMessage);} catch (Exception e) {log.error("handleAddBusiness--消费失败{}",eventMessage);}}
}
🌟发送消息与处理消息
/*** 发送新增消息* @param detailRequest*/@Overridepublic void addDetail(DetailRequest detailRequest) {detailRequest.setBId(IDUtil.generateRandomNumber(5));detailRequest.setCId(IDUtil.generateRandomNumber(5));//构造消息EventMessage eventMessage = EventMessage.builder().messageId(IDUtil.generateRandomNumber(5).toString()).content(JsonUtil.obj2Json(detailRequest)).eventMessageType(EventMessageTypeEnum.REDUNDANCY_ADD.name()).build();rabbitTemplate.convertAndSend(rabbitMqForRedundancyConfig.getRedundancyEventExchange(),rabbitMqForRedundancyConfig.getAddRoutingkey(),eventMessage);}
//处理新增消息
@Overridepublic boolean handleAddDetail(EventMessage eventMessage) {String messageType= eventMessage.getEventMessageType();DetailRequest detailRequest=JsonUtil.json2Obj(eventMessage.getContent(), DetailRequest.class);if (messageType.equals(EventMessageTypeEnum.REDUNDANCY_ADD_CONSUMER.name())){CDetailDO cDetailDOIndb=cDetailMapper.selectOne(new QueryWrapper<CDetailDO>().eq("c_id",detailRequest.getCId()).eq("detail",detailRequest.getDetail()));if (cDetailDOIndb==null){CDetailDO cDetailDO = CDetailDO.builder().bId(detailRequest.getBId()).cId(detailRequest.getCId()).detail(detailRequest.getDetail()).build();cDetailMapper.insert(cDetailDO);}else {log.error("handleAddDetail---REDUNDANCY_ADD_CONSUMER重复{}",eventMessage);}} else if (messageType.equals(EventMessageTypeEnum.REDUNDANCY_ADD_BUSINESS.name())) {BDetailDO bDetailDOIndb=bDetailMapper.selectOne(new QueryWrapper<BDetailDO>().eq("b_id",detailRequest.getCId()).eq("detail",detailRequest.getDetail()));if (bDetailDOIndb==null){BDetailDO bDetailDO = BDetailDO.builder().bId(detailRequest.getBId()).cId(detailRequest.getCId()).detail(detailRequest.getDetail()).build();bDetailMapper.insert(bDetailDO);}else {log.error("handleAddDetail---REDUNDANCY_ADD_BUSINESS重复{}",eventMessage);}}return false;}
🌟最后
最后,感谢大家对本文的阅读,希望对大家有帮助。