壁纸网站模板/福州seo建站

壁纸网站模板,福州seo建站,成都建设网站的公司有哪些,国内免费的b2b平台目录 1.出现重复消费的原因 2.解决 2.1 数据库插入法 2.2 使用布隆过滤器 2.2.1 添加hutool的依赖 2.2.2 测试生产者 2.2.2 测试消费者 1.出现重复消费的原因 BROADCASTING(广播) 模式下,所有注册的消费者都会消费,而这些消费者通常是集群部署的…

目录

1.出现重复消费的原因

2.解决

2.1 数据库插入法

2.2 使用布隆过滤器

2.2.1 添加hutool的依赖

2.2.2 测试生产者

2.2.2 测试消费者


1.出现重复消费的原因

  1. BROADCASTING(广播) 模式下,所有注册的消费者都会消费,而这些消费者通常是集群部署的一个个微服务,这样就会多台机器重复消费,当然这个是根据需要来选择。
  2. CLUSTERING(负载均衡)模式下,如果一个 topic 被多个 consumerGroup 消费,也会重复消费。
  3. 即使是在 CLUSTERING 模式下,同一个 consumerGroup 下,一个队列只会分配给一个消费者,看起来好像是不会重复消费。但是,有个特殊情况:一个消费者新上线后,同组的所有消费者要重新负载均衡重平衡 reBalance(反之一个消费者掉线后,也一样)。一个队列所对应的新的消费者要获取之前消费的 offset(偏移量,也就是消息消费的点位),此时之前的消费者可能已经消费了一条消息,但是并没有把 offset 提交给 broker,那么新的消费者可能会重新消费一次。虽然 orderly 模式是前一个消费者先解锁,后一个消费者加锁再消费的模式,比起 concurrently 要严格了,但是加锁的线程和提交offset 的线程不是同一个,所以还是会出现极端情况下的重复消费。
  4. 还有在发送批量消息的时候,会被当做一条消息进行处理,那么如果批量消息中有一条业务处理成功,其他失败了,还是会被重新消费一次。

简单的说:

  1. Consumer 消费完消息并不是实时同步到 Broker 的,而是将 offset 先保存在本地map中,通过定时任务持久化上去。这就导致消息被消费了,但是此时消费者宕机了导致 offset 没提交,下次没提交 offset 的这部分消息会被再次消费
  2. 即使 offset 被提交到了 Broker,在还没来得及持久化的时候 Broker 宕机了,当重启的时候 Broker 会读取consumerOffset.json 中保存的 offset 信息,这就会导致没持久化 offset 的这部分消息会被再次消费

那么如果在CLUSTERING(负载均衡)模式下,并且在同一个消费者组中,不希望一条消息被重复消费,改怎么办呢?我们可以想到去重操作,找到消息唯一的标识,可以是 msgId 也可以是你自定义的唯一的 key,这样就可以去重了

2.解决

我们需要给我们的消费者实现 幂等 ,也就是对同一个消息的处理结果,执行多少次都不变。

幂等性:多次操作产生的影响均和第一次操作产生的影响相同

例如:判断 crud 的幂等性

a. 新增:普通的新增是非幂等,设置了唯一索引的新增是幂等操作

b. 修改:update goods set stock = 10 where id = 1 幂等

               update goods set stock = stock - 1 where id = 1 非幂等

c. 查询:幂等

d. 删除:幂等

那么如何给业务实现幂等呢?这个还是需要结合具体的业务的。你可以使用写入 Redis 来保证,因为Redis 的 key 和 value 就是天然支持幂等的。当然还有使用 数据库插入法 ,基于数据库的唯一键来保证重复数据不会被插入多条。

2.1 数据库插入法

发送方需要给消息带一个唯一标记(根据业务标识)

模拟业务 数据库的订单操作日志表结构(去重表)

给订单号添加唯一索引(订单号存的是 key)

模拟业务,生产者发送了重复的消息

@Test
public void repeatTest() throws Exception {
String key = UUID.randomUUID().toString();
Message<String> msg = MessageBuilder.withPayload("扣减库存 -1").setHeader(RocketMQHeaders.KEYS, key).build();
rocketMQTemplate.syncSend("repeatTopic", msg);
rocketMQTemplate.syncSend("repeatTopic", msg);
}

消费者

@Component
@RocketMQMessageListener(topic = "repeatTopic",consumerGroup = "repeat-consumer-group")
public class RepeatListener implements RocketMQListener<MessageExt> {@Autowiredprivate LogMapper logMapper;@Overridepublic void onMessage(MessageExt messageExt) {// 先拿keyString keys = messageExt.getKeys();// 插入数据库 因为key做了唯一索引OrderOperLog orderOperLog = new OrderOperLog();orderOperLog.setType(1l);orderOperLog.setOrderSn(keys);orderOperLog.setUserId("1003");int insert = logMapper.insert(orderOperLog);System.out.println(keys);System.out.println(new String(messageExt.getBody()));}
}

在消费第二条的时候抛出唯一索引重复 SQLIntegrityConstraintViolationException

数据库只插入一条这样的记录

优化,捕获到异常是 SQLIntegrityConstraintViolationException 时直接将消息签收了,不再进行业务处理,因为之前已经消费了一条同样的消息,这样便可以解决重复消费问题

2.2 使用布隆过滤器

  • 使用去重方案解决,例如将消息的唯一标识存起来,然后每次消费之前先判断是否存在这个唯一标识,如果存在则不消费,如果不存在则消费,并且消费以后将这个标记保存。
  • 想法很好,但是消息的体量是非常大的,可能在生产环境中会到达上千万甚至上亿条,那么我们该如何选择一个容器来保存所有消息的标识,并且又可以快速的判断是否存在呢?

我们可以选择布隆过滤器(BloomFilter)

介绍:

布隆过滤器(英语:Bloom Filter)是1970年由布隆提出的。它实际上是一个很长的二进制向量和一系列随机映射函数。布隆过滤器可以用于检索一个元素是否在一个集合中。它的优点是空间效率和查询时间都远远超过一般的算法,缺点是有一定的误识别率和删除困难。

布隆过滤器的原理是,当一个元素被加入集合时,通过K个散列函数将这个元素映射成一个位数组中的K个点,把它们置为1。检索时,我们只要看看这些点是不是都是1就(大约)知道集合中有没有它了:如果这些点有任何一个0,则被检元素一定不在;如果都是1,则被检元素很可能在。这就是布隆过滤器的基本思想。

2.2.1 添加hutool的依赖

<dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.7.11</version>
</dependency>

2.2.2 测试生产者

public void testRepeatProducer() throws Exception {
// 创建默认的生产者
DefaultMQProducer producer = new DefaultMQProducer("test-group");
// 设置nameServer地址
producer.setNamesrvAddr("localhost:9876");
// 启动实例
producer.start();
// 我们可以使用自定义key当做唯一标识
String keyId = UUID.randomUUID().toString();
System.out.println(keyId);
Message msg = new Message("TopicTest", "tagA", keyId, "我是一个测试消息".getBytes());
SendResult send = producer.send(msg);
System.out.println(send);
// 关闭实例
producer.shutdown();
}

发送了两条相同的消息

55d397c9-814f-4931-b0fd-7e142c04759b
SendResult [sendStatus=SEND_OK, msgId=7F00000121C418B4AAC204A76B050000, offsetMsgId=C0A8588200002A9F000000000002C359, messageQueue=MessageQueue [topic=repeatTestTopic, brokerName=broker-a, queueId=1], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=7F00000121C418B4AAC204A76B050000, offsetMsgId=C0A8588200002A9F000000000002C43F, messageQueue=MessageQueue [topic=repeatTestTopic, brokerName=broker-a, queueId=2], queueOffset=0]

2.2.2 测试消费者

/*** 在boot项目中可以使用@Bean在整个容器中放置一个单利对象*/
public static BitMapBloomFilter bloomFilter = new BitMapBloomFilter(100); // m数组长度@Test
public void testRepeatConsumer() throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("repeat-consumer-group");
consumer.setMessageModel(MessageModel.BROADCASTING);
consumer.setNamesrvAddr(MyConstant.NAME_SRV_ADDR);
consumer.subscribe("repeatTestTopic", "*");
// 注册一个消费监听 MessageListenerConcurrently是并发消费
consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {// 拿到消息的keyMessageExt messageExt = msgs.get(0);String keys = messageExt.getKeys();// 判断是否存在布隆过滤器中if (bloomFilter.contains(keys)) {// 直接返回了 不往下处理业务return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}// 这个处理业务,然后放入过滤器中// do sth...bloomFilter.add(keys);System.out.println("keys:" + keys);System.out.println(new String(messageExt.getBody()));return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}
});
consumer.start();
System.in.read();
}

业务只处理了一条

keys:55d397c9-814f-4931-b0fd-7e142c04759b
库存-1

延迟过了后 重复消息被签收

解决重复消费问题

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/114314.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

ubuntu20.04下安装nc

前言 nc在网络渗透测试中非常好用&#xff0c;这里的主要记一下Ubuntu20.04中nc的安装 编译安装 第一种方式是自己编译安装&#xff0c;先下载安装包 nc.zip wget http://sourceforge.net/projects/netcat/files/netcat/0.7.1/netcat-0.7.1.tar.gz/download -O netcat-0.7.…

国密https访问

前言 现在的SSL的加密算法实际上主要是国际算法&#xff0c;包括JDK&#xff0c;Go等语言也仅支持国际算法加密&#xff08;毕竟是国外开源项目&#xff09;&#xff0c;hash。随着国密算法的普及&#xff0c;比如openssl就支持国密了&#xff0c;还要新版本的Linux内核也开始…

解决因d3dx9_30.dll丢失程序无法运行,电脑缺失d3dx9_30.dll报错解决方案

我们的生活和工作都离不开电脑。然而&#xff0c;电脑作为一种复杂的工具&#xff0c;也会出现各种各样的问题。其中&#xff0c;丢失d3dx9_30.dll文件是一个常见的问题。d3dx9_30.dll是DirectX的动态链接库文件&#xff0c;如果丢失或损坏&#xff0c;可能会导致许多软件和游戏…

通讯录和内存动态管理

目录 (通讯录)动态增长版 实现效果 找单身狗 题目 源码 思路 三个内存函数的模拟实现 模拟实现strncpy 模拟实现strncat 模拟实现atoi (通讯录)动态增长版 该版本通讯录在原版的基础上增加了检查容量函数&#xff0c;实现了通讯录的动态…

Linux中的shell编程

shell编程 重定向 cat >temp 输入内容到temp文件中&#xff0c;如果存在temp则覆盖&#xff0c;没有则新建 cat >>temp 追加内容 cat temp1>>temp2 将temp1中的内容追加到temp 命令执行控制符号 ; 一个命令行执行多条语句 命令替换符 1.双引号&#…

数据结构--线性表回顾

目录 线性表 1.定义 2.线性表的基本操作 3.顺序表的定义 3.1顺序表的实现--静态分配 3.2顺序表的实现--动态分配 4顺序表的插入、删除 4.1插入操作的时间复杂度 4.2顺序表的删除操作-时间复杂度 5 顺序表的查找 5.1按位查找 5.2 动态分配的方式 5.3按位查找的时间…

Spark简介

文章目录 一、简介二、安装1、简介2、本地部署(Local模式)2.1 安装2.2 官方WordCount实例 3、Standlong模式3.1 简介2.2 安装集群2.3 官方测试案例 4、Yarn模式3.1 安装3.2 配置历史服务器3.3 配置查看历史日志 5、Mesos模式6、几种模式对比7、常用端口 三、Yarn模式详解1、简介…

sql语句数据库查询:如果当前元素已经使用过,下拉框不显示该元素该如何查询?

写宿舍管理系统&#xff0c;做到宿管和楼栋关系时&#xff0c;新增一个宿管&#xff0c;一个宿管管理一栋楼&#xff0c;如果当前楼栋已选择&#xff0c;那么就不能再选&#xff0c;如图所示&#xff1a; 最开始使用的是&#xff1a; SELECT DISTINCT b.building_num,b.TYPE,b…

【Python】图像和办公文档的处理

图像和办公文档处理 用程序来处理图像和办公文档经常出现在实际开发中&#xff0c;Python的标准库中虽然没有直接支持这些操作的模块&#xff0c;但我们可以通过Python生态圈中的第三方模块来完成这些操作。 操作图像 计算机图像相关知识 颜色。如果你有使用颜料画画的经历&…

【计算机毕设选题推荐】口腔助手小程序SpringBoot+Vue+小程序

前言&#xff1a;我是IT源码社&#xff0c;从事计算机开发行业数年&#xff0c;专注Java领域&#xff0c;专业提供程序设计开发、源码分享、技术指导讲解、定制和毕业设计服务 项目名 基于SpringBoot的口腔助手小程序 技术栈 SpringBootVue小程序MySQLMaven 文章目录 一、口腔…

nonaDlA 逻辑分析仪 使用记录

注意事项&#xff0c;很灵敏&#xff0c;不要用手碰&#xff0c;产生误触发 安装软件 github地址 官方提供的淘宝地址与使用说明 1.安装 1.安装程序 &#xff1a;下载githubDLA源码&#xff0c;打开 software\PulseView.exe安装 2.安装驱动&#xff1a;安装完第一步后&a…

系统报错“由于找不到vcomp140.dll无法继续执行代码”的解决方案

在我们日常使用电脑的过程中&#xff0c;可能会遇到一些错误提示&#xff0c;其中之一就是“找不到vcomp140.dll”。这个错误可能让许多用户感到困扰&#xff0c;因为它可能影响到我们的电脑使用。那么&#xff0c;vcomp140.dll是什么意思&#xff1f;当我们遇到这个问题时&…

22下半年下午题

声明&#xff1a;哔哩哔哩视频笔记 源地址 第一大题题目 第一大题解答 第一小问 根据0层数据流图来找&#xff0c;看数据流向和相应的处理模块匹配。并且这个第一问&#xff0c;肯定是能在说明中找到对应短语作为答案的。 第二小问 搞清楚具体存储数据的信息名字&#xff…

《低代码指南》——维格云和Airtable的比较

Airtable​ 什么是Airtable​ Airtable 是一个任务管理应用程序,它合并了电子表格、数据存储和模板,以帮助组织构建他们的工作流程。 适用于哪些企业/组织/人群​ 根据 Airtable 网站,该工具被超过 200,000 个组织的团队使用。 维格表与Airtable相比如何​ Airtable作为…

机器学习中参数优化调试方法

1 超参数优化 调参即超参数优化&#xff0c;是指从超参数空间中选择一组合适的超参数&#xff0c;以权衡好模型的偏差(bias)和方差(variance)&#xff0c;从而提高模型效果及性能。常用的调参方法有&#xff1a; 人工手动调参 网格/随机搜索(Grid / Random Search) 贝叶斯优…

01、MySQL-------性能优化

目录 一、影响性能的相关因素存储过程&#xff1a; 二、sql优化1>、Mysql系统架构2>、引擎区别&#xff1a; 3>、索引1、什么是索引&#xff1f;联合主键索引理解&#xff1a;索引长度理解&#xff1a;什么是慢查询&#xff1f; 1&#xff09;、索引理解2&#xff09;…

SpringBoot结合Druid实现SQL监控

1、前言 SpringBoot不用我多介绍了吧&#xff0c;目前后端最流行的框架。后端开发人员最基本的要求。 Druid数据库连接池&#xff0c;出自国内 ”java圣地" 阿里巴巴。 Druid是一个用于大数据实时查询和分析的高容错、高性能开源分布式系统&#xff0c;旨在快速处理大规模…

新年学新语言Go之五

一、前言 Go虽然不算是面向对象语言&#xff0c;但它支持面向对象一些特性&#xff0c;面向接口编程是Go一个很重要的特性&#xff0c;而Go的接口与Java的接口区别很大&#xff0c;Go的接口比较复杂&#xff0c;这里仅用一个最简单例子做介绍&#xff0c;复杂的我也还没学。 …

filebeat(8.9.0)采集日志到logstash,由logstash发送的es

filebeat采集日志到logstash&#xff0c;由logstash发送的es 下载并配置filebeat下载配置logback.xml logstash配置 下载并配置filebeat 下载 参考 配置 filebeat.inputs: - type: filestreamenabled: truepaths:# 日志文件目录- D:\modellog\elkdemo\*\*.logparsers:# 多…

LeetCode2409——统计共同度过的日子数

博主的解法过于冗长&#xff0c;是一直对着不同的案例debug修改出来的&#xff0c;不建议学习。虽然提交成功了&#xff0c;但是自己最后都不知道写的是啥了哈哈哈。 package keepcoding.leetcode.leetcode2409; /*Alice 和 Bob 计划分别去罗马开会。给你四个字符串 arriveA…