1 rabbitmq设计
生产者并不是直接将消息投递到queue,而是发送给exchange,由exchange根据type的规则来选定投递的queue,这样消息设计在生产者和消费者就实现解耦。
rabbitmq会给没有type预定义一些exchage,而实际我们却应该使用自己定义的。
1.1 用户注册设计
用户在官网注册,因为官网与其他各子系统是分库的,因此涉及到用户注册后,用户的账号信息也需要同步各子产品,于是就有了下面的设计。2018的时候SOA设计我还用通过otter进行同步,但是现在觉得还不如使用rabbitmq,因为消息队列有很多作用,而且有些情况是,各个子系统承建时间不一样,各自的创建用户后,也会触发其他的操作,这些otter的小表复制策略就不是那么好了。
1.1.1 生产者
岁月云官网,可以看到这个里面只需要一个exchange名称即可,将对象转成字符串作为消息发送过去即可。
1.1.2 消费者
消费者中定义的监听是针对queue,ignoreDeclarationExceptions是幂等设计,可以确保即使某个实例的声明操作失败(例如,因为另一个实例已经成功声明了相同的资源),整个系统仍然可以正常工作。
fanout是一种广播,绑定到此eayc_user_add_change的queue都可以收到此消息。因为从官网下发的消息,到各子系统都应该收到,并各自创建。
下面是子系统acc的配置
具体消费的代码如下所示,
下面是子系统ps的配置,与acc使用同一个exchange,但queue是不同的。
1.2 死信队列和延时队列
x-message-ttl定义了消息的时间生存期,有了这特性,就可以拓展一些功能,比如高并发的流量控制。
下面通过x-message-ttl设置了一个延迟队列,通过DECLARE_DEAD_ROUTING_KEY与死信交换机declareDeadExchange进行匹配路由。
@Configuration
public class RabbitMQDelayConfig {@Value("${spring.rabbitmq.declare.exchange}")private String DECLARE_EXCHANGE;@Value("${spring.rabbitmq.declare.queue}")private String DECLARE_QUEUE;@Value("${spring.rabbitmq.declare.routing}")private String DECLARE_ROUTING_KEY;@Value("${spring.rabbitmq.declare.deadExchange}")private String DECLARE_EXCHANGE_DEAD;@Value("${spring.rabbitmq.declare.deadQueue}")private String DECLARE_QUEUE_DEAD;@Value("${spring.rabbitmq.declare.deadRouting}")private String DECLARE_DEAD_ROUTING_KEY;@Value("${spring.rabbitmq.declare.ttl}")private int DECLARE_TTL;/*** 申明自动申报业务交换机:*/@Beanpublic DirectExchange declareExchange() {return new DirectExchange(DECLARE_EXCHANGE);}/*** 申明自动申报业务死信交换机:*/@Beanpublic DirectExchange declareDeadExchange() {return new DirectExchange(DECLARE_EXCHANGE_DEAD);}/*** 申明自动申报业务队列* 并绑定死信队列*/@Beanpublic Queue declareQueue() {Map<String, Object> arguments = new HashMap<>(3);// 设置死信交换机arguments.put("x-dead-letter-exchange", DECLARE_EXCHANGE_DEAD);// 设置死信路由键arguments.put("x-dead-letter-routing-key", DECLARE_DEAD_ROUTING_KEY);// 设置过期时间arguments.put("x-message-ttl", DECLARE_TTL);return new Queue(DECLARE_QUEUE, true, false, false, arguments);}/*** 申明自动申报业务死信队列*/@Beanpublic Queue declareDeadQueue() {return new Queue(DECLARE_QUEUE_DEAD);}/*** 绑定交换机和队列*/@Beanpublic Binding declareQueueBinding(@Qualifier("declareQueue") Queue queue, @Qualifier("declareExchange") DirectExchange exchange){return BindingBuilder.bind(queue).to(exchange).with(DECLARE_ROUTING_KEY);}/*** 绑定死信交换机和死信队列*/@Beanpublic Binding declareDeadQueueBinding(@Qualifier("declareDeadQueue") Queue queue, @Qualifier("declareDeadExchange") DirectExchange exchange){return BindingBuilder.bind(queue).to(exchange).with(DECLARE_DEAD_ROUTING_KEY);}}
生产者只需要往业务的exchange投递消息即可
// 发送一条消息到rabbitmq延时队列中,处理申报流程超时的情况message = new HashMap<>();message.put("dataId", taxDeclareDto.getDataId());message.put("batchId", req.getBatchId());rabbitTemplate.convertAndSend(DECLARE_EXCHANGE, DECLARE_ROUTING_KEY, gson.toJson(message));
异常情况是监听死信队列,处理对应的逻辑。
/*** 监听消息队列,处理申报流程超时的申请记录*/@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "${spring.rabbitmq.declare.deadQueue}"),exchange = @Exchange(name = "${spring.rabbitmq.declare.deadExchange}")))@RabbitHandlerpublic void declareTimeout(Message message){logger.info("收到rabbitMq申报超时消息:{}", message);Map<String, String> map = gson.fromJson((String) message.getPayload(), Map.class);if(CheckEmptyUtil.isNotEmpty(map.get("batchId"))) {// 如果是批量申报超时,中断批次中所有的申报中的请求interruptDeclaresInBatch(map.get("batchId"));} else {String dataId = map.get("dataId");updateTaxDeclareStatus(new TaxDeclareYearStatusUpdateReq(dataId, null,false, StatementConstants.DeclareMessage.TIMEOUT));}}
1.3 重复消费
如果根据高内聚低耦合的设计原则,消费者侧应该作重复消费设计,这个问题并不只是rabbitmq的问题,因为只要出现数据重复推送的问题,就会有重复消费的问题。比如有第三方系统定时同步数据到自己的系统,这个同步数据是由第三方承建的,你无法进行约束,必须在自己的系统进行幂等设计。
springboot默认使用tomcat作为servlet容器,servlet容器使用线程池管理http请求,而controller和service都是单例,是线程不安全的,因此在接收到重复数据的请求时,如果其程序再新启动了异步线程,就会出现重复的情况,如下所示:
主线程接收消息,做一些转换,然后执行交给异步线程处理。
@PostMapping("/xx/batchSync")public ResponseResult xxBatchSync(@RequestBody CommonRequest commonRequest) {log.info("销项发票同步请求:{}",commonRequest.getInfo());XxBatchSyncReq xxBatchSyncReq = JsonUtil.toPojo(commonRequest.getInfo(),XxBatchSyncReq.class);String zyCompanyId = xxBatchSyncReq.getZyCompanyId();if (!CheckEmptyUtil.isEmpty(xxBatchSyncReq.getInvoices())){// 账套信息Integer asId = accAccountSetService.selectByZyCompanyId(xxBatchSyncReq.getZyCompanyId());if (asId==null){throw new RuntimeException(String.format("账套信息不存在,企业id:%s",xxBatchSyncReq.getZyCompanyId() ));}// 异步写入发票数据accInvoice4ZYService.xxBatchSync(asId,xxBatchSyncReq);}return new ResponseResult(true,"销项发票接收成功");}
异步线程的逻辑如下,accInvoiceService.isExist看似基础逻辑没有问题,但是在多线程环境下会有问题,因为线程A添加进入到addInvoice方法添加发票的时候还没有提交,这个时候线程B执行accInvoiceService.isExist的时候判断已经是不存在的,于是他依旧会向下执行。导致出现数据重复写入。由此判断这个重复消费问题并不是消息队列独有的,还是业务处理的问题。
@Override@Async("loadDataExecutor")public void xxBatchSync(Integer asId, XxBatchSyncReq xxBatchSyncReq) {// 发票模板AccInvoiceTemplate accInvoiceTemplate = accInvoiceTemplateService.selectOne(asId, InvoiceConstants.InvoiceTemplateType.SALES);for (XxInvoiceDto xxInvoiceDto:xxBatchSyncReq.getInvoices()){xxInvoiceDto.setAsId(asId);if (!accInvoiceService.isExist(asId,xxInvoiceDto.getFpdm(),xxInvoiceDto.getFphm())){AccInvoiceDto accInvoiceDto = getAccInvoiceDto(xxInvoiceDto, xxBatchSyncReq.getZyCompanyId(),accInvoiceTemplate);addInvoice(accInvoiceDto);}}}
再看事务逻辑愿望是美好的,接收到批量发票,然后一张张提交。这里就很有问题,
@Override@Transactional
// @RedisReentrantLock(key = "'acc_invoice_lock_'+#accInvoiceDto.asId")public void addInvoice(AccInvoiceDto accInvoiceDto) {// 保存发票头accInvoiceService.save(accInvoiceDto);Integer invoiceId = accInvoiceDto.getId();// 保存发票明细信息List<AccInvoiceDetail> accInvoiceDetails = accInvoiceDto.getAccInvoiceDetails();accInvoiceDetails.stream().forEach(accInvoiceDetail -> {accInvoiceDetail.setInvoiceId(invoiceId);});accInvoiceDetailService.saveBatch(accInvoiceDetails);}
代码作如下调整,下面的代码依然会有问题,
@Override@Async("loadDataExecutor")public void xxBatchSync(Integer asId, XxBatchSyncReq xxBatchSyncReq) {// 发票模板AccInvoiceTemplate accInvoiceTemplate = accInvoiceTemplateService.selectOne(asId, InvoiceConstants.InvoiceTemplateType.SALES);for (XxInvoiceDto xxInvoiceDto:xxBatchSyncReq.getInvoices()){xxInvoiceDto.setAsId(asId);addInvoice(xxBatchSyncReq.getZyCompanyId(),xxInvoiceDto,accInvoiceTemplate);}}@Override@Transactional(rollbackFor = Exception.class)public void addInvoice(String zyCompanyId,XxInvoiceDto xxInvoiceDto,AccInvoiceTemplate accInvoiceTemplate){if (!accInvoiceService.isExist(xxInvoiceDto.getAsId(),xxInvoiceDto.getFpdm(),xxInvoiceDto.getFphm())){AccInvoiceDto accInvoiceDto = getAccInvoiceDto(xxInvoiceDto, zyCompanyId,accInvoiceTemplate);addInvoice(accInvoiceDto);}}
用MySQL来模拟一下,就可以看到问题。
另起一个事务,因为判断还是不存在,依旧写入进去,导致数据重复。那么为什么呢?Mysql的Repeatable Read事务隔离级别,不会出现脏读、不会出现不可重复读,而间隙锁又解决了幻读的问题,但这个业务问题却需要自己认为去处理。
解决方案最简的办法就是设置唯一键索引。另外一种办法,可以参考redis——岁月云实战,我们也可以采取加分布式锁的方式来控制数据操作。
2 线上问题
2.1 内存设置问题
k8s部署rabbitmq集群,搭建环境后登录web控制台发现内存飘红。进入到rabbitmq容器中,发现vm_memory_high_watermark.absolute = 100MB,这个就是从其他复制过来没有经过大脑的原因。这个值应该是按照Pod中设置最大内存的75%进行设置
调整为3GB后,恢复正常。