背景
我们核心业务中订单完成时,需要完成后续的连带业务,扣件库存库存、增加积分、通知商家等。
如下图的架构:
这样设计出来导致我们的核心业务和其他业务耦合,每次新增连带业务或者去掉连带业务都需要修改核心业务。
一方面,不合符软件设计的OCP原则;二方面,修改核心业务风险、成本也是很大的。
方案
基于上述方案的问题,我们设计了新的方案。新的方案,可以动态接入新的连带业务,不会入侵核心业务,降低了变动的风险和成本。
1. 订单下发统一的ORDER_CREATED事件消息;
消息格式如下:
{
"enterEvent":"ORDER_CREATED",
"data":{
"order_id":"OR2023111000000001"
},
"source":"ORDER",
"datetime":"2023-11-10 21:40:52"
}
2. 所有核心业务发送到MQ的消息,统一发送到分发中心DISPATCHER_CENTER。
消息统一中心获取配置的路由信息,将消息发送到MQ。
*** @author darmi*/
@Component
public class KafkaEventListener {@Autowiredprivate MsgDispatcherCenterRepository msgDispatcherCenterRepository;@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;@Autowired@Qualifier(value = "eventExecutor")private Executor eventExecutor;@KafkaListener(topics = {"DISPATCHER_CENTER"})public void dispatchMsg(String event) {DispatcherCenterEvent dispatcherCenterEvent = DispatcherCenterEvent.getObject(event);eventExecutor.execute(() ->{msgDispatcherCenterRepository.findMsgDispatcherCenterByCenterEventAndActive(dispatcherCenterEvent.getCenterEvent(), Boolean.TRUE).forEach(e -> kafkaTemplate.send(e.getRouteEvent(), dispatcherCenterEvent.getData()));});}}
Mysql的消息路由表设计如下:
CREATE TABLE `tb_msg_dispatcher_ center` (`id` int NOT NULL AUTO_INCREMENT,`center_event` varchar(255) NOT NULL,`route_event` varchar(255) NOT NULL,`active` tinyint NOT NULL DEFAULT '0',`created` timestamp NOT NULL,`updated` timestamp NOT NULL,PRIMARY KEY (`id`),KEY `idx_center_event_active` (`center_event`,`active`)
) ENGINE=InnoDB AUTO_INCREMENT=4 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
3. 各个业务监听消息,处理自己的业务。
@KafkaListener(topics = {"ADD_POINT"})public void addPoint(String event) {// 添加积分}@KafkaListener(topics = {"REDUCE_STOCK"})public void reduceStock(String event) {// 扣件库存}@KafkaListener(topics = {"NOTICE_MERCHANT"})public void noticeMerchant(String event) {// 通知商家}
总结
这个方案是一个简易可行的方案,符合快速上手并实施。在人力、时间、成本不充足的情况下,基本能满足我们的需求。
如果想让它作为平台级的技术组件推广,还有一些细节的点可以优化。
- 核心的业务是否也可以分离出来,通过平台配置的方式自动分发数据到消息中心。
- 消息中心每次都会从数据库拉去路由表信息,这样性能不好,可以放在分布式缓存或本地内存。这时需要注意缓存数据的一致性问题。
- 分发中心是否存在性能瓶颈、集群化等。
- 连带业务是否也可以通过配置,自动拉取MQ的消息。