Kafka开发指南之 如何Kafka 事务型生产者,保证生产者exactly once

目录

至少一次(at least once)

最多一次(at most once)

精确一次(exactly once)

幂等性

幂等性作用范围

实现方法

代码

事务

事务作用范围

实现方法

代码


我们知道Kafka的消息交付可靠性保障分为 最多一次(at most once),至少一次(at least once),精确一次(exactly once)

 

至少一次(at least once)

什么时候Producer数据会重复发送 呢?

比如当Producer发送一条数据,当数据发送过去了,由于某种原因Broker没有反馈给Producer已经提交成功,Producer此时设置了重试机制,retries (设置方法:props.put(ProducerConfig.RETRIES_CONFIG, 5); ),则会再次发送数据,此时会导致数据重复发送

最多一次(at most once)

与at least once 相反,我们把retries 禁止,则就是最多一次,如果禁止重试,会导致数据丢失

 

精确一次(exactly once)

如何实现精确一次呢

Producer 有两种方法 幂等性与事务型

幂等性

幂等性作用范围

只能保证单个Producer不会产生重复数据,如果Producer重启或者多个Producer无法保证数据不重复

实现方法

设置一下配置即可

props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true)

代码


import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import wiki.hadoop.kafka.config.Constant;import java.util.Properties;
import java.util.concurrent.ExecutionException;/*** 幂等性生产者**      它只能保证单分区上的幂等性,即一个幂等性 Producer 能够保证某个主题的一个 分区上不出现重复消息,它无法实现多个分区的幂等性*      它只能实现单会话上的幂等性,不能实现跨会话的幂等性。这里的会话,你可以理 解为 Producer 进程的一次运行。当你重启了 Producer 进程之后,这种幂等性保 证就丧失了* @author jast* @date 2020/4/19 22:38*/
public class IdempotenceProducer {private  static Producer<String, String> producer ;public IdempotenceProducer() {Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Constant.KAFKA_BROKERS);props.put(ProducerConfig.ACKS_CONFIG, "all");props.put(ProducerConfig.RETRIES_CONFIG, 5);props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);props.put(ProducerConfig.LINGER_MS_CONFIG, 1);props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 1024 * 1024 * 1024);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");//设置Producer幂等性,其他不用变化props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true);producer = new KafkaProducer<String, String>(props);}public Producer<String,String> getProducer(){return producer;}public static void main(String[] args) throws ExecutionException, InterruptedException {IdempotenceProducer idempotenceProducer = new IdempotenceProducer();Producer<String, String> producer = idempotenceProducer.getProducer();producer.send(new ProducerRecord<String,String>("test","1234")).get();}}

 

 

事务

事务作用范围

全部

实现方法

Producer设置//设置Producer幂等性,其他不用变化
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true);
//设置事务,同时也要指定幂等性,自定义id名称
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"jast-acid");-------------------------------------------------------------------Consumer设置//设置只读事务提交成功后的数据props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.toString().toLowerCase());

代码

Producer

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import wiki.hadoop.kafka.config.Constant;import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;/*** Kafka事务提交,保证exactly once producer* 要么全部成功,要么全部失败* @author jast* @date 2020/4/21 22:38*/
public class TransactionProducer {private  static Producer<String, String> producer ;public TransactionProducer() {Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Constant.KAFKA_BROKERS);props.put(ProducerConfig.ACKS_CONFIG, "all");props.put(ProducerConfig.RETRIES_CONFIG, 5);props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);props.put(ProducerConfig.LINGER_MS_CONFIG, 1);props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 1024 * 1024 * 1024);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");//设置Producer幂等性,其他不用变化props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true);//设置事务,同时也要指定幂等性,自定义id名称props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"jast-acid");producer = new KafkaProducer<String, String>(props);}public Producer<String,String> getProducer(){return producer;}public static void main(String[] args) throws ExecutionException, InterruptedException {TransactionProducer transactionProducer = new TransactionProducer();Producer<String, String> producer = transactionProducer.getProducer();//初始化事务producer.initTransactions();boolean flag = true;//循环四次,最后一次我们把事务成功提交//理想结果:前三次事务提交失败//  事务消费者消费不到数据1,2,第四次可以消费到1,2,3,4;//  普通消费者可以消费到前三次的1,2 ,也可以消费到第四次1,2,3,4// 运行方法 TransactionConsumer/*** 结果如下,事务提交成功* 普通消费者消费数据->1 partition:2 offset:3080713* 事务消费者消费数据->3 partition:2 offset:3080717* 普通消费者消费数据->2 partition:1 offset:3081410* 普通消费者消费数据->1 partition:3 offset:3081465* 普通消费者消费数据->1 partition:2 offset:3080715* 普通消费者消费数据->3 partition:2 offset:3080717* 事务消费者消费数据->4 partition:1 offset:3081414* 事务消费者消费数据->2 partition:0 offset:3081470* 事务消费者消费数据->1 partition:3 offset:3081467* 普通消费者消费数据->2 partition:1 offset:3081412* 普通消费者消费数据->4 partition:1 offset:3081414* 普通消费者消费数据->2 partition:0 offset:3081468* 普通消费者消费数据->2 partition:0 offset:3081470* 普通消费者消费数据->1 partition:3 offset:3081467*/for(int i=0;i<=3;i++) {if(i==3)flag = false;try {//事务开始producer.beginTransaction();producer.send(new ProducerRecord<String, String>("test", "1")).get();producer.send(new ProducerRecord<String, String>("test", "2")).get();//手动制造异常if (flag)throw new RuntimeException("程序异常");producer.send(new ProducerRecord<String, String>("test", "3")).get();producer.send(new ProducerRecord<String, String>("test", "4")).get();//事务提交producer.commitTransaction();} catch (Exception e) {//中止事务producer.abortTransaction();e.printStackTrace();}}}
}

Consumer


import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.requests.IsolationLevel;
import org.apache.kafka.common.serialization.StringDeserializer;
import wiki.hadoop.kafka.config.Constant;
import wiki.hadoop.kafka.util.LogInit;import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;/*** 消费Kafka,保证事务性* @author jast* @date 2020/4/21 22:54*/
public class TransactionConsumer {/*** 事务性kafka消费* @return KafkaConsumer<String,String>* @param topic* @param max_poll_records* @param group* @return*/public KafkaConsumer<String, String> transactionConsumer(String topic, String group , int max_poll_records , boolean isLatest) {Properties props = new Properties();//-----------------------------------------------------------------------------------//设置只读事务提交成功后的数据props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.toString().toLowerCase());//-----------------------------------------------------------------------------------props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, Constant.KAFKA_BROKERS);props.put(ConsumerConfig.GROUP_ID_CONFIG, group);props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, max_poll_records);//控制每次poll的数量props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);//自动提交 falseprops.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 3000);props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, isLatest==true ? "latest" : "earliest");props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 5 * 1024 * 1024);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);consumer.subscribe(Arrays.asList(topic));return consumer;}public KafkaConsumer<String, String> consumer(String topic, String group , int max_poll_records , boolean isLatest) {Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, Constant.KAFKA_BROKERS);props.put(ConsumerConfig.GROUP_ID_CONFIG, group);props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, max_poll_records);//控制每次poll的数量props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);//自动提交 falseprops.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 3000);props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, isLatest==true ? "latest" : "earliest");props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 5 * 1024 * 1024);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);consumer.subscribe(Arrays.asList(topic));return consumer;}public static void main(String[] args) throws InterruptedException, ExecutionException {TransactionConsumer transactionConsumer = new TransactionConsumer();TransactionConsumer transactionConsumer2 = new TransactionConsumer();KafkaConsumer<String, String> consumer = transactionConsumer.consumer("test", "test", 10, false);KafkaConsumer<String, String> consumer2 = transactionConsumer2.transactionConsumer("test", "test2", 10, false);CompletableFuture.runAsync(()->{while(true) {ConsumerRecords<String, String> records = consumer.poll(1000);for (ConsumerRecord<String, String> record : records) {System.out.println("普通消费者消费数据->" + record.value() + " partition:"+record.partition()+ " offset:"+record.offset());}
//                System.out.println("普通消费者休眠1秒");try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}}});CompletableFuture.runAsync(()->{while(true) {ConsumerRecords<String, String> records2 = consumer2.poll(1000);for (ConsumerRecord<String, String> record : records2) {System.out.println("事务消费者消费数据->" + record.value() + " partition:"+record.partition()+ " offset:"+record.offset());}
//                System.out.println("事务消费者休眠1秒");try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}}}).get();}
}

 

 

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/509734.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

UML序列图

UML学习&#xff08;三&#xff09;-----序列图 UML的模型中可分为两种&#xff0c;动态模型和静态模型。用例图、类图和对象图都是UML中的静态结构模型。而在UML系统动态模型的其中一种就是交互视图&#xff0c;它描述了执行系统功能的各个角色之间相互传递消息的顺序关系。序…

OpenTSDB 开发指南之 查询数据

前面博主写了一篇文章去介绍opentsdb的http接口的使用方法,但是某一些接口的使用还是比较复杂&#xff0c;这篇文章会通过example来详细讲述opentsdb的一些特性。 本文的举的例子有这些&#xff1a; 基本的写入和查询数据的注释和说明子查询查询中的filters使用查询数据的rat…

OpenTSDB 开发指南之 Api操作数据

/api/put 请求方式&#xff1a;post请求参数&#xff1a; 参数说明examplesummary返回主要摘要/api/put?summarydetails返回详细信息/api/put?detailssync是否同步&#xff0c;即是否等待数据都写入成功后才返回结果/api/put?syncsync_timeout返回结果之前的等待时间/api/p…

xshell下利用SFTP传输文件

SFTP是基于SSH的文件传输协议&#xff0c;与ZMODEM相比具有更加安全且更为快速的文件传输功能。 如何利用SFTP接收文件: 在本地提示以sftp命令登陆拟要接收文件的主机。 Xshell:> sftp hostname在sftp提示下以get命令接收需要的文件。 sftp:/home/user21>get filenam…

libcurl使用方法

原文地址&#xff1a;http://curl.haxx.se/libcurl/c/libcurl-tutorial.html 译者&#xff1a;JGood(http://blog.csdn.net/JGood ) 译者注&#xff1a;这是一篇介绍如何使用libcurl的入门教程。文档不是逐字逐句按原文翻译&#xff0c;而是根据笔者对libcurl的理解&#xff0c…

OpenTSDB 开发指南之 Grafana 展示OpenTSDB监控数据

目录 准备数据 在Grafana创建OpenTSDB连接 创建一个仪表盘 统计 准备数据 将数据插入OpenTSDB {"metric":"jast.data","value":1023,"timestamp":1588742563,"tags":{"type":"jast-graph-data"}}…

linux重启后地址不是之前设置的静态地址的解决方案

按照以下步骤进行安装 1. vi /etc/hosts 修给域名匹配 2. vi /etc/sysconfig/network 修改主机名称 3. vi /etc/sysconfig/network-scripts/ifcfg-eth0 修改主机IP 4. vi /etc/udev/rules.d/70-persistent-net.rules 把里面的eth0删掉&#xff0c;把eth1名称改为eth0

CDH 版本 Kafka 外网设置

登陆CDH页面,进入Kafka配置页面 搜索 advertised 修改advertised.host.name,这里我们有三台Broker,我们把每台的外网ip填写到对应的机器上 advertised.port不填写 我们kafka的端口设置的是9099 将外网端口9099开放,允许外网访问 (这里不做介绍

SQLite、MySQL和PostgreSQL 三种关系数据库比较

关系型数据库的使用已经有相当长的时间了。它们变得流行起来托了管理系统的福&#xff0c;关系模型被实现得相当的好&#xff0c;并且被证明是操作数据的好方法&#xff08;特别是事务性强的应用&#xff09;。 在这篇DigitalOcean文章中&#xff0c;我们将尝试理解一些最常用、…

Ecilpse常用快捷键

Alt &#xff1f; #代码提示ctrl l #去某一行ctrl D #删掉光标所在的一行ctrl shift r #访问某个文件ctrl shiif f #格式化ctrl shift o #包的引入alt 方向键向下 #光标所在行向下移alt 方向键向上 # 光标所在行向上移ctrl alt 方向键向下 #光标所在行向…

OpenTSDB 安装

下载目录 https://github.com/OpenTSDB/opentsdb/releases https://github.com/OpenTSDB/opentsdb/releases/download/v2.4.0/opentsdb-2.4.0.noarch.rpm 安装 GnuPlot yum install gnuplot -y 直接安装OpenTSDB会报错 [rootecs-t-001-0001 openTSDB]# rpm -ivh opentsdb-2.…

leveldb使用指南

这篇文章是levelDB官方文档的译文&#xff0c;原文地址:LevelDB library documentation 这篇文章主要讲leveldb接口使用和注意事项。 leveldb是一个持久型的key-value数据库。key,value可以是任意的字节数组&#xff0c;key之间是有序的。key的比较函数可以由用户指定。 1. 打…

java中同步锁的原理和实现

接口 Lock 实现提供了比使用 synchronized 方法和语句可获得的更广泛的锁定操作。此实现允许更灵活的结构&#xff0c;可以具有差别很大的属性&#xff0c;可以支持多个相关的 Condition 对象。 锁是控制多个线程对共享资源进行访问的工具。通常&#xff0c;锁提供了对共享资…

HugeGraph 配置参数

gremlin-server.yaml 参数名称参数说明默认值scriptEvaluationTimeout查询超时时间&#xff0c;单位毫秒30000rest-server.properties 参数名称参数说明默认值restserver.max_worker_threads服务器最大工作线程2*cpurestserver.min_free_memory当服务器内存小于该值时&#x…

本地yum仓库以及网络版yum的私有仓库详细的安装配置

本地仓的配置 第一步&#xff1a;开启CD/DVD 设备&#xff0c;并且把centos镜像链接添加至设备中 第二步&#xff1a;创建一个 文件夹用来挂在 光驱文件 mkdir /mnt/cdrom &#xff08;通常约定挂载在/mnt目录下 &#xff09; 第三步&#xff1a;mount -t iso9660 -o ro /de…

LevelDB简述

转自&#xff1a;http://www.cnblogs.com/melons/p/5791855.html 既然开始了&#xff0c;哪有停下的道理&#xff0c;先了解一下Levedb的关荣历史、完美现在和光辉的未来&#xff1a; Leveldb: 1.Leveldb是一个google实现的非常高效的kv数据库&#xff0c;目前能够支持billion级…

HBase原理 – snapshot 快照

目录 snapshot&#xff08;快照&#xff09;基础原理 snapshot能实现什么功能&#xff1f; hbase snapshot用法大全 hbase snapshot分布式架构&#xff0d;两阶段提交 snapshot核心实现 clone_snapshot如何实现呢&#xff1f; 其他需要注意的 参考文献 更多信息可参考《…

linux如何自动化部署脚本实现免密登录并访问资源

任务把weijie主机jdk文件安装到weijie1中。 首先再各台主机中安装必要的命令&#xff1a; expect、wget、httpd、ssh 执行命令 如&#xff1a;expect提示命令不存在&#xff0c;则分别安装命令 yum install expect yum install wget yum install httpd yum install ssh 开…

比赛2016年暑假集训盲打首秀赛结果

4 53 2015计算机科学与技术1班 15111205046 鞠明杭 149 Fishing For Jasmine 75.50KPM 99.34% 149 2016-06-25 19:23:04 不及格&#xff0c;加强训练&#xff01;

时序数据库技术体系 – InfluxDB TSM存储引擎之数据读取

任何一个数据库系统内核关注的重点无非&#xff1a;数据在内存中如何存储、在文件中如何存储、索引结构如何存储、数据写入流程以及数据读取流程。关于InfluxDB存储内核&#xff0c;笔者在之前的文章中已经比较全面的介绍了数据的文件存储格式、倒排索引存储实现以及数据写入流…