kafka生产者2

1.数据可靠

• 0:生产者发送过来的数据,不需要等数据落盘应答。 

风险:leader挂了之后,follower还没有收到消息。。。。

• 1:生产者发送过来的数据,Leader收到数据后应答。

风险:leader应答完成之后,还没有开始同步副本。。。。

 

 生产者发送过来的数据,Leader和ISR队列里面 的所有节点收齐数据后应答。

 

可靠性总结: acks=0,生产者发送过来数据就不管了,可靠性差,效率高; acks=1,生产者发送过来数据Leader应答,可靠性中等,效率中等; acks=-1,生产者发送过来数据Leader和ISR队列里面所有Follwer应答,可靠性高,效率低; 在生产环境中,acks=0很少使用;acks=1,一般用于传输普通日志,允许丢个别数据;acks=-1,一般用于传输和钱相关的数据, 对可靠性要求比较高的场景。

 

// 设置 acksproperties.put(ProducerConfig.ACKS_CONFIG, "all");// 重试次数 retries,默认是 int 最大值,2147483647properties.put(ProducerConfig.RETRIES_CONFIG, 3);

2.数据去重 

 

 

3.生产者事务

 说明:开启事务,必须开启幂等性。

 

package com.atguigu.kafka.producer;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;public class CustomProducerTransaction {public static void main(String[] args) {// 1. 创建 kafka 生产者的配置对象Properties properties = new Properties();// 2. 给 kafka 配置对象添加配置信息:bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop100:9092");// key,value 序列化(必须):key.serializer,value.serializerproperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"tranactional_id_01");// 3. 创建 kafka 生产者对象KafkaProducer<String, String> kafkaProducer = newKafkaProducer<String, String>(properties);kafkaProducer.initTransactions();kafkaProducer.beginTransaction();// 4. 调用 send 方法,发送消息try{for (int i = 0; i < 5; i++) {kafkaProducer.send(newProducerRecord<>("first","atguigu " + i));int i1 = 1 / 0;}kafkaProducer.commitTransaction();}catch (Exception e){kafkaProducer.abortTransaction();}finally {// 5. 关闭资源kafkaProducer.close();}}
}

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/699501.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【机器学习】数据清洗——基于Numpy库的方法删除重复点

&#x1f388;个人主页&#xff1a;豌豆射手^ &#x1f389;欢迎 &#x1f44d;点赞✍评论⭐收藏 &#x1f917;收录专栏&#xff1a;机器学习 &#x1f91d;希望本文对您有所裨益&#xff0c;如有不足之处&#xff0c;欢迎在评论区提出指正&#xff0c;让我们共同学习、交流进…

解锁网络潜能:深入探索SCTP及其在现代通信中的革命性作用

第一部分&#xff1a;SCTP简介 流控制传输协议&#xff08;SCTP&#xff09;是一种传输层协议&#xff0c;设计用于支持Internet上的多路径消息传输。它结合了TCP的可靠性和UDP的简洁性&#xff0c;提供了多种高级特性&#xff0c;如多宿&#xff08;multi-homing&#xff09;…

解释Iterable和Iterator接口

在Java中&#xff0c;Iterable和Iterator是两个重要的接口&#xff0c;它们在集合框架中扮演着核心的角色&#xff0c;尤其是在遍历集合元素的时候。 Iterable接口 Iterable接口是Java Collections Framework的一部分&#xff0c;位于java.lang包中。任何实现了Iterable接口的…

PostgreSQL索引篇 | BTree

B-Tree索引 &#xff08;本文为《PostgreSQL数据库内核分析》一书的总结笔记&#xff0c;需要电子版的可私信我&#xff09; B树特点&#xff1a; 非叶子节点含一个或多个关键字值和子节点指针&#xff0c;不指向实际数据的存储位置所有关键字都是叶子节点&#xff0c;每个叶…

Python运算符你学会了吗?

1.算术运算符 &#xff08;加&#xff09;、-&#xff08;减&#xff09;、*&#xff08;乘&#xff09;、/&#xff08;除&#xff09;、%&#xff08;取余&#xff09;、//&#xff08;取整&#xff09;、**&#xff08;求幂&#xff09; a 12 b 3 c 7print(a b) # 15 …

关于Kinect 互动沙盘 深度图 Shader Graph 分层

把Kinect的深度图穿给Shader Graph using com.rfilkov.kinect; using UnityEngine; using UnityEngine.UI; public class GetDepthTex : MonoBehaviour { public Material Mat_SandTable; void Update() { Mat_SandTable.SetTexture("_MainTex"…

【openGL教程08】关于着色器(02)

LearnOpenGL - Shaders 一、说明 着色器是openGL渲染的重要内容&#xff0c;客户如果想自我实现渲染灵活性&#xff0c;可以用着色器进行编程&#xff0c;这种程序小脚本被传送到GPU的显卡内部&#xff0c;起到动态灵活的着色作用。 二、着色器简述 正如“Hello Triangle”一章…

【寸铁的刷题笔记】树、dfs、bfs、回溯、递归(二)

【寸铁的刷题笔记】树、dfs、bfs、回溯、递归(二) 大家好 我是寸铁&#x1f44a; 金三银四&#xff0c;树、dfs、bfs、回溯、递归是必考的知识点✨ 快跟着寸铁刷起来&#xff01;面试顺利上岸&#x1f44b; 喜欢的小伙伴可以点点关注 &#x1f49d; 上期回顾 感谢大家的支持&am…

简单的单层感知机(Perceptron)模型

[‘您需要实现一个简单的单层感知机&#xff08;Perceptron&#xff09;模型&#xff0c;并提供对其训练方法的实现。单层感知机是一种基础的线性二分类器&#xff0c;它对输入向量进行权值组合&#xff0c;然后通过激活函数&#xff08;通常是一个符号函数&#xff09;来决定该…

[附完整代码]群智能算法跑21种真实世界优化问题,并输出结果到excel||群智能算法跑CEC 2020真实世界优化问题,并输出结果到excel

1、简介 灰狼算法跑跑21种真实世界优化问题|足球训练队优化算法跑21种真实世界优化问题||牛顿拉夫逊算法跑21种真实世界优化问题||冠状豪猪CPO跑21种真实世界优化问题。 ‘FTTA’,‘BWO’, ‘CPO’, ‘FHO’, ‘GWO’, ‘HHO’, ‘NRBO’,‘SCA’,‘SGA’,WOA’跑21种真实世…

拓扑矩阵是四维协变量吗?

* 现在需要明确&#xff0c;拓扑矩阵的维度高于链表数据&#xff0c;那么用矩阵就可以控制链表数据&#xff0c;用矩阵表来 * 做循环&#xff0c;四维循环相当于三维向量的矢量运动嘛&#xff1f;我们在矩阵上面做计算也可以启动链表的运 * 动控制模型。。 * * 四维算法中应…

Linux:ACL权限,特殊位和隐藏属性

目录 一.什么是ACL 二.操作步骤 ① 添加测试目录、用户、组&#xff0c;并将用户添加到组 ② 修改目录的所有者和所属组 ③ 设定权限 ④ 为临时用户分配权限 ⑤ 验证acl权限 ⑥ 控制组的acl权限 三. 删除ACL权限 一.什么是ACL 访问控制列表 (Access Control List):ACL 通…

笔记本Win 10系统查看电池健康状况

博主最近换了个笔记本电池&#xff0c;之前的电池容量明显变小了很多&#xff0c;而且出现了轻微鼓包的情况。所以用gpt问了一下怎么用系统的方法查看电池情况。 在Windows 10系统中&#xff0c;您可以通过以下步骤来查看笔记本电脑电池的健康状况&#xff1a; 打开命令提示符&…

docker 带端口映射启动是报错

一、现象 docker端口映射或启动容器时报错 Error response from daemon: driver failed programming external connectivity on endpoint quirky_allen 二 、原因: docker服务启动时定义的自定义链DOCKER由于 centos7 firewall 被清掉 firewall的底层是使用iptables进行数据…

react + typescript 中 typeof的作用

在 TypeScript 中&#xff0c;使用 typeof 关键字可以获取一个变量或对象的类型。当你用 typeof 和一个组件&#xff08;无论是类组件还是函数组件&#xff09;一起使用时&#xff0c;你实际上是在获取这个组件的类型。 在 React 中&#xff0c;组件的类型不仅仅包括它的 prop…

CondaValueError: Malformed version string ‘~‘: invalid character(s)

使用conda 安装一些库时出现以下报错&#xff1a; CondaValueError: Malformed version string ~: invalid character(s)尝试进行更新conda conda upgrade -n base conda或者如果是环境方面的问题&#xff0c; conda upgrade -n base -c defaults --override-channels conda如…

记录 使用FFMPEG 笔记本摄像头推流

一、使用 FFMPEG 测试摄像头拉流显示 # 获取摄像头名称 ffmpeg -list_devices true -f dshow -i dummy# 我笔记本上的摄像头名称如下 device_pnp_\\?\usb#vid_0408&pid_1020&mi_00#6&199e90f7&0&0000#{65e8773d-8f56-11d0-a3b9-00a0c9223196}\global# 使…

重温MySQL

mysql 是什么 mysql 就是一个软件,专门用来管理文件的软件 关系型数据库:采用二维表结构组织和管理数据,并且规定了表和表间数据的关系. 表是由行和列构成,列包含一组命名的属性(也称字段),行包含一条记录.行和列的交集称为数据项 (也称字段值). 如何操作数据库 那就是用sq…

Vue2页面转化为Vue3

vue2element-ui转化为Vue3element plus 后台管理系统&#xff1a;增删查改 vue2页面&#xff1a; <template><div class"app-container"><div><el-form:model"queryParams"ref"queryForm"size"small":inline&qu…

golang tun设备创建并监听

golang tun设备创建并监听 linux tun设备文件地址为/dev/net/tun.直接打开即可(关闭文件描述符创建的tun虚拟接口自动注销) fd,err:syscall.Open("/dev/net/tun",syscall.O_RDWR,0640)//关闭 syscall.Close(fd)初始化 配置ip地址启动虚拟网卡 ip addr add xxx.xx…