rabbitmq——岁月云实战笔记

1 rabbitmq设计

        生产者并不是直接将消息投递到queue,而是发送给exchange,由exchange根据type的规则来选定投递的queue,这样消息设计在生产者和消费者就实现解耦。

        rabbitmq会给没有type预定义一些exchage,而实际我们却应该使用自己定义的。

1.1 用户注册设计

        用户在官网注册,因为官网与其他各子系统是分库的,因此涉及到用户注册后,用户的账号信息也需要同步各子产品,于是就有了下面的设计。2018的时候SOA设计我还用通过otter进行同步,但是现在觉得还不如使用rabbitmq,因为消息队列有很多作用,而且有些情况是,各个子系统承建时间不一样,各自的创建用户后,也会触发其他的操作,这些otter的小表复制策略就不是那么好了。

1.1.1 生产者        

        岁月云官网,可以看到这个里面只需要一个exchange名称即可,将对象转成字符串作为消息发送过去即可。

1.1.2 消费者

        消费者中定义的监听是针对queue,ignoreDeclarationExceptions是幂等设计,可以确保即使某个实例的声明操作失败(例如,因为另一个实例已经成功声明了相同的资源),整个系统仍然可以正常工作。

        fanout是一种广播,绑定到此eayc_user_add_change的queue都可以收到此消息。因为从官网下发的消息,到各子系统都应该收到,并各自创建。

        下面是子系统acc的配置

        具体消费的代码如下所示,

        下面是子系统ps的配置,与acc使用同一个exchange,但queue是不同的。

1.2 死信队列和延时队列

x-message-ttl定义了消息的时间生存期,有了这特性,就可以拓展一些功能,比如高并发的流量控制。

        下面通过x-message-ttl设置了一个延迟队列,通过DECLARE_DEAD_ROUTING_KEY与死信交换机declareDeadExchange进行匹配路由。

@Configuration
public class RabbitMQDelayConfig {@Value("${spring.rabbitmq.declare.exchange}")private String DECLARE_EXCHANGE;@Value("${spring.rabbitmq.declare.queue}")private String DECLARE_QUEUE;@Value("${spring.rabbitmq.declare.routing}")private String DECLARE_ROUTING_KEY;@Value("${spring.rabbitmq.declare.deadExchange}")private String DECLARE_EXCHANGE_DEAD;@Value("${spring.rabbitmq.declare.deadQueue}")private String DECLARE_QUEUE_DEAD;@Value("${spring.rabbitmq.declare.deadRouting}")private String DECLARE_DEAD_ROUTING_KEY;@Value("${spring.rabbitmq.declare.ttl}")private int DECLARE_TTL;/*** 申明自动申报业务交换机:*/@Beanpublic DirectExchange declareExchange() {return new DirectExchange(DECLARE_EXCHANGE);}/*** 申明自动申报业务死信交换机:*/@Beanpublic DirectExchange declareDeadExchange() {return new DirectExchange(DECLARE_EXCHANGE_DEAD);}/*** 申明自动申报业务队列* 并绑定死信队列*/@Beanpublic Queue declareQueue() {Map<String, Object> arguments = new HashMap<>(3);// 设置死信交换机arguments.put("x-dead-letter-exchange", DECLARE_EXCHANGE_DEAD);// 设置死信路由键arguments.put("x-dead-letter-routing-key", DECLARE_DEAD_ROUTING_KEY);// 设置过期时间arguments.put("x-message-ttl", DECLARE_TTL);return new Queue(DECLARE_QUEUE, true, false, false, arguments);}/*** 申明自动申报业务死信队列*/@Beanpublic Queue declareDeadQueue() {return new Queue(DECLARE_QUEUE_DEAD);}/*** 绑定交换机和队列*/@Beanpublic Binding declareQueueBinding(@Qualifier("declareQueue") Queue queue, @Qualifier("declareExchange") DirectExchange exchange){return BindingBuilder.bind(queue).to(exchange).with(DECLARE_ROUTING_KEY);}/*** 绑定死信交换机和死信队列*/@Beanpublic Binding declareDeadQueueBinding(@Qualifier("declareDeadQueue") Queue queue, @Qualifier("declareDeadExchange") DirectExchange exchange){return BindingBuilder.bind(queue).to(exchange).with(DECLARE_DEAD_ROUTING_KEY);}}

        生产者只需要往业务的exchange投递消息即可

// 发送一条消息到rabbitmq延时队列中,处理申报流程超时的情况message = new HashMap<>();message.put("dataId", taxDeclareDto.getDataId());message.put("batchId", req.getBatchId());rabbitTemplate.convertAndSend(DECLARE_EXCHANGE, DECLARE_ROUTING_KEY, gson.toJson(message));

         异常情况是监听死信队列,处理对应的逻辑。

   /*** 监听消息队列,处理申报流程超时的申请记录*/@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "${spring.rabbitmq.declare.deadQueue}"),exchange = @Exchange(name = "${spring.rabbitmq.declare.deadExchange}")))@RabbitHandlerpublic void declareTimeout(Message message){logger.info("收到rabbitMq申报超时消息:{}", message);Map<String, String> map = gson.fromJson((String) message.getPayload(), Map.class);if(CheckEmptyUtil.isNotEmpty(map.get("batchId"))) {// 如果是批量申报超时,中断批次中所有的申报中的请求interruptDeclaresInBatch(map.get("batchId"));} else {String dataId = map.get("dataId");updateTaxDeclareStatus(new TaxDeclareYearStatusUpdateReq(dataId, null,false, StatementConstants.DeclareMessage.TIMEOUT));}}

1.3 重复消费

        如果根据高内聚低耦合的设计原则,消费者侧应该作重复消费设计,这个问题并不只是rabbitmq的问题,因为只要出现数据重复推送的问题,就会有重复消费的问题。比如有第三方系统定时同步数据到自己的系统,这个同步数据是由第三方承建的,你无法进行约束,必须在自己的系统进行幂等设计。

        springboot默认使用tomcat作为servlet容器,servlet容器使用线程池管理http请求,而controller和service都是单例,是线程不安全的,因此在接收到重复数据的请求时,如果其程序再新启动了异步线程,就会出现重复的情况,如下所示:

        主线程接收消息,做一些转换,然后执行交给异步线程处理。

 @PostMapping("/xx/batchSync")public ResponseResult xxBatchSync(@RequestBody CommonRequest commonRequest) {log.info("销项发票同步请求:{}",commonRequest.getInfo());XxBatchSyncReq xxBatchSyncReq = JsonUtil.toPojo(commonRequest.getInfo(),XxBatchSyncReq.class);String zyCompanyId = xxBatchSyncReq.getZyCompanyId();if (!CheckEmptyUtil.isEmpty(xxBatchSyncReq.getInvoices())){// 账套信息Integer asId = accAccountSetService.selectByZyCompanyId(xxBatchSyncReq.getZyCompanyId());if (asId==null){throw new RuntimeException(String.format("账套信息不存在,企业id:%s",xxBatchSyncReq.getZyCompanyId() ));}// 异步写入发票数据accInvoice4ZYService.xxBatchSync(asId,xxBatchSyncReq);}return new ResponseResult(true,"销项发票接收成功");}

        异步线程的逻辑如下,accInvoiceService.isExist看似基础逻辑没有问题,但是在多线程环境下会有问题,因为线程A添加进入到addInvoice方法添加发票的时候还没有提交,这个时候线程B执行accInvoiceService.isExist的时候判断已经是不存在的,于是他依旧会向下执行。导致出现数据重复写入。由此判断这个重复消费问题并不是消息队列独有的,还是业务处理的问题

    @Override@Async("loadDataExecutor")public void xxBatchSync(Integer asId, XxBatchSyncReq xxBatchSyncReq) {// 发票模板AccInvoiceTemplate accInvoiceTemplate = accInvoiceTemplateService.selectOne(asId, InvoiceConstants.InvoiceTemplateType.SALES);for (XxInvoiceDto xxInvoiceDto:xxBatchSyncReq.getInvoices()){xxInvoiceDto.setAsId(asId);if (!accInvoiceService.isExist(asId,xxInvoiceDto.getFpdm(),xxInvoiceDto.getFphm())){AccInvoiceDto accInvoiceDto = getAccInvoiceDto(xxInvoiceDto, xxBatchSyncReq.getZyCompanyId(),accInvoiceTemplate);addInvoice(accInvoiceDto);}}}

        再看事务逻辑愿望是美好的,接收到批量发票,然后一张张提交。这里就很有问题,

    @Override@Transactional
//    @RedisReentrantLock(key = "'acc_invoice_lock_'+#accInvoiceDto.asId")public void addInvoice(AccInvoiceDto accInvoiceDto) {// 保存发票头accInvoiceService.save(accInvoiceDto);Integer invoiceId = accInvoiceDto.getId();// 保存发票明细信息List<AccInvoiceDetail> accInvoiceDetails = accInvoiceDto.getAccInvoiceDetails();accInvoiceDetails.stream().forEach(accInvoiceDetail -> {accInvoiceDetail.setInvoiceId(invoiceId);});accInvoiceDetailService.saveBatch(accInvoiceDetails);}

         代码作如下调整,下面的代码依然会有问题,

    @Override@Async("loadDataExecutor")public void xxBatchSync(Integer asId, XxBatchSyncReq xxBatchSyncReq) {// 发票模板AccInvoiceTemplate accInvoiceTemplate = accInvoiceTemplateService.selectOne(asId, InvoiceConstants.InvoiceTemplateType.SALES);for (XxInvoiceDto xxInvoiceDto:xxBatchSyncReq.getInvoices()){xxInvoiceDto.setAsId(asId);addInvoice(xxBatchSyncReq.getZyCompanyId(),xxInvoiceDto,accInvoiceTemplate);}}@Override@Transactional(rollbackFor = Exception.class)public void addInvoice(String zyCompanyId,XxInvoiceDto xxInvoiceDto,AccInvoiceTemplate accInvoiceTemplate){if (!accInvoiceService.isExist(xxInvoiceDto.getAsId(),xxInvoiceDto.getFpdm(),xxInvoiceDto.getFphm())){AccInvoiceDto accInvoiceDto = getAccInvoiceDto(xxInvoiceDto, zyCompanyId,accInvoiceTemplate);addInvoice(accInvoiceDto);}}

        用MySQL来模拟一下,就可以看到问题。

        另起一个事务,因为判断还是不存在,依旧写入进去,导致数据重复。那么为什么呢?Mysql的Repeatable Read事务隔离级别,不会出现脏读、不会出现不可重复读,而间隙锁又解决了幻读的问题,但这个业务问题却需要自己认为去处理。

         解决方案最简的办法就是设置唯一键索引。另外一种办法,可以参考redis——岁月云实战,我们也可以采取加分布式锁的方式来控制数据操作。

2 线上问题

2.1 内存设置问题

k8s部署rabbitmq集群,搭建环境后登录web控制台发现内存飘红。进入到rabbitmq容器中,发现vm_memory_high_watermark.absolute = 100MB,这个就是从其他复制过来没有经过大脑的原因。这个值应该是按照Pod中设置最大内存的75%进行设置

        调整为3GB后,恢复正常。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/65305.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

2.系统学习-逻辑回归

逻辑回归 前言最大似然估计概率似然函数(likelihood function)最大似然估计 逻辑回归逻辑回归的似然函数与梯度 分类问题常用评价指标项目案例拓展内容作业 前言 逻辑回归与线性回归均属于广义线性模型&#xff0c;区别在于线性回归用于解决回归问题&#xff0c;例如身高、销量…

记录一次电脑被入侵用来挖矿的过程(Trojan、Miner、Hack、turminoob)

文章目录 0、总结1、背景2、端倪3、有个微软的系统更新&#xff0c;就想着更新看看&#xff08;能否冲掉问题&#xff09;4、更新没成功&#xff0c;自动重启电脑5、风险文件&#xff08;好家伙命名还挺规范&#xff0c;一看名字就知道出问题了&#xff09;6、开机有一些注册表…

阿里云 人工智能与机器学习

阿里云的 人工智能&#xff08;AI&#xff09;与机器学习&#xff08;ML&#xff09; 服务为企业提供了全面的AI解决方案&#xff0c;帮助用户在多个行业实现数据智能化&#xff0c;提升决策效率&#xff0c;推动业务创新。阿里云通过先进的技术和丰富的工具&#xff0c;支持用…

Structured-Streaming集成Kafka

一、上下文 《Structured-Streaming初识》博客中已经初步认识了Structured-Streaming&#xff0c;Kafka作为目前最流行的一个分布式的实时流消息系统&#xff0c;是众多实时流处理框架的最优数据源之一。下面我们就跟着官方例子来看看Structured-Streaming是如何集成Kafka的&a…

生物医学信号处理--绪论

前言 参考书籍&#xff1a;刘海龙&#xff0c;生物医学信号处理&#xff0c;化学工业出版社 生物医学信号分类 1、由生理过程自发或者诱发产生的电生理信号和非电生理信号 • 电生理信号&#xff1a;ECG/心电、EEG/脑电、EMG/肌电、 EGG/胃电、 EOG/眼电 • 非电生理信号&am…

unity 播放 序列帧图片 动画

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 前言一、方法一&#xff1a;代码控制播放序列帧1、设置图片属性2、创建Image组件3、简单的代码控制4、挂载代码并赋值 二、方法二&#xff1a;直接使用1.Image上添加…

小程序与物联网(IoT)融合:开启智能生活新篇章

一、引言 随着移动互联网技术的飞速发展&#xff0c;小程序作为一种轻量级的应用形式&#xff0c;凭借其无需下载安装、即用即走的特点&#xff0c;迅速渗透到人们生活的各个领域。与此同时&#xff0c;物联网&#xff08;IoT&#xff09;技术也在不断进步&#xff0c;将各种物…

如何很快将文件转换成另外一种编码格式?编码?按指定编码格式编译?如何检测文件编码格式?Java .class文件编码和JVM运行期内存编码?

如何很快将文件转换成另外一种编码格式? 利用VS Code右下角的"选择编码"功能&#xff0c;选择"通过编码保存"可以很方便将文件转换成另外一种编码格式。尤其&#xff0c;在测试w/ BOM或w/o BOM, 或者ANSI编码和UTF编码转换&#xff0c;特别方便。VS文件另…

PCL点云库入门——PCL库点云特征之PFH点特征直方图(Point Feature Histograms -PHF)

1、算法原理 PFH点&#xff08;Point Feature Histogram&#xff09;特征直方图的原理涉及利用参数化查询点与邻域点之间的空间差异&#xff0c;并构建一个多维直方图以捕捉点的k邻域几何属性。这个高维超空间为特征表示提供了一个可度量的信息空间&#xff0c;对于点云对应曲面…

5. CSS引入方式

5.1 CSS的三种样式 按照 CSS 样式书写的位置(或者引入的方式)&#xff0c;CSS样式表可以分为三大类&#xff1a; 1.行内样式表&#xff08;行内式&#xff09; 2.内部样式表&#xff08;嵌入式&#xff09; 3. 外部样式表&#xff08;链接式&#xff09; 5.2 内部样式表 …

为什么ip属地一会河南一会江苏

在使用互联网的过程中&#xff0c;许多用户可能会遇到这样一个问题&#xff1a;自己的IP属地一会儿显示为河南&#xff0c;一会儿又变成了江苏。这种现象可能会让人感到困惑&#xff0c;甚至产生疑虑&#xff0c;担心自己的网络活动是否受到了某种影响。为了解答这一疑问&#…

jmeter性能测试例子

目录 一、介绍 二、操作例子 设置线程数 添加同步定时器 添加聚合报告 一、介绍 在软件测试中&#xff0c;一般用jmeter来对接口做性能测试&#xff0c;对对接口进行一个压力的测试。 简述&#xff1a; 在接口的线程中设置线程的数量和时间&#xff0c;添加一个定时器…

PDFelement 特别版

Wondershare PDFelement Pro 是一款非常强大的PDF编辑软件&#xff0c;它允许用户轻松地编辑、转换、创建和管理PDF文件。这个中文特别版的软件具有许多令人印象深刻的功能&#xff0c;PDFelement Pro 提供了丰富的编辑功能&#xff0c;可以帮助用户直接在PDF文件中添加、删除、…

【OpenCV】使用Python和OpenCV实现火焰检测

1、 项目源码和结构&#xff08;转&#xff09; https://github.com/mushfiq1998/fire-detection-python-opencv 2、 运行环境 # 安装playsound&#xff1a;用于播放报警声音 pip install playsound # 安装opencv-python&#xff1a;cv2用于图像和视频处理&#xff0c;特别是…

深入理解Mybatis原理》MyBatis的sqlSessi

sqlSessionFactory 与 SqlSession 正如其名&#xff0c;Sqlsession对应着一次数据库会话。由于数据库会话不是永久的&#xff0c;因此Sqlsession的生命周期也不应该是永久的&#xff0c;相反&#xff0c;在你每次访问数据库时都需要创建它&#xff08;当然并不是说在Sqlsession…

《HarmonyOS第一课》焕新升级,赋能开发者快速掌握鸿蒙应用开发

随着HarmonyOS NEXT发布&#xff0c;鸿蒙生态日益壮大&#xff0c;广大开发者对于系统化学习平台和课程的需求愈发强烈。近日&#xff0c;华为精心打造的《HarmonyOS第一课》全新上线&#xff0c;集“学、练、考”于一体&#xff0c;凭借多维融合的教学模式与系统课程设置&…

springboot集成整合工作流,activiti审批流,整合实际案例,流程图设计,流程自定义,表单配置自定义,代码demo流程

前言 activiti工作流引擎项目&#xff0c;企业erp、oa、hr、crm等企事业办公系统轻松落地&#xff0c;一套完整并且实际运用在多套项目中的案例&#xff0c;满足日常业务流程审批需求。 一、项目形式 springbootvueactiviti集成了activiti在线编辑器&#xff0c;流行的前后端…

《探秘计算机视觉与深度学习:开启智能视觉新时代》

《探秘计算机视觉与深度学习&#xff1a;开启智能视觉新时代》 一、追溯起源&#xff1a;从萌芽到崭露头角二、核心技术&#xff1a;解锁智能视觉的密码&#xff08;一&#xff09;卷积神经网络&#xff08;CNN&#xff09;&#xff1a;图像识别的利器&#xff08;二&#xff0…

Vmware安装centos

用来记录自己安装的过程 一、创建虚拟机安装centos镜像 点击完成后&#xff0c;等待一会会进入centos的系统初始化界面 二、centos初始化配置 三、配置网络 1、虚拟网络编辑器&#xff0c;开启VMnet1、VMnet8的DHCP vmware左上角工具栏&#xff0c;点击【编辑】->【虚拟网…