目录
至少一次(at least once)
最多一次(at most once)
精确一次(exactly once)
幂等性
幂等性作用范围
实现方法
代码
事务
事务作用范围
实现方法
代码
我们知道Kafka的消息交付可靠性保障分为 最多一次(at most once),至少一次(at least once),精确一次(exactly once)
至少一次(at least once)
什么时候Producer数据会重复发送 呢?
比如当Producer发送一条数据,当数据发送过去了,由于某种原因Broker没有反馈给Producer已经提交成功,Producer此时设置了重试机制,retries (设置方法:props.put(ProducerConfig.RETRIES_CONFIG, 5); ),则会再次发送数据,此时会导致数据重复发送
最多一次(at most once)
与at least once 相反,我们把retries 禁止,则就是最多一次,如果禁止重试,会导致数据丢失
精确一次(exactly once)
如何实现精确一次呢
Producer 有两种方法 幂等性与事务型
幂等性
幂等性作用范围
只能保证单个Producer不会产生重复数据,如果Producer重启或者多个Producer无法保证数据不重复
实现方法
设置一下配置即可
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true)
代码
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import wiki.hadoop.kafka.config.Constant;import java.util.Properties;
import java.util.concurrent.ExecutionException;/*** 幂等性生产者** 它只能保证单分区上的幂等性,即一个幂等性 Producer 能够保证某个主题的一个 分区上不出现重复消息,它无法实现多个分区的幂等性* 它只能实现单会话上的幂等性,不能实现跨会话的幂等性。这里的会话,你可以理 解为 Producer 进程的一次运行。当你重启了 Producer 进程之后,这种幂等性保 证就丧失了* @author jast* @date 2020/4/19 22:38*/
public class IdempotenceProducer {private static Producer<String, String> producer ;public IdempotenceProducer() {Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Constant.KAFKA_BROKERS);props.put(ProducerConfig.ACKS_CONFIG, "all");props.put(ProducerConfig.RETRIES_CONFIG, 5);props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);props.put(ProducerConfig.LINGER_MS_CONFIG, 1);props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 1024 * 1024 * 1024);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");//设置Producer幂等性,其他不用变化props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true);producer = new KafkaProducer<String, String>(props);}public Producer<String,String> getProducer(){return producer;}public static void main(String[] args) throws ExecutionException, InterruptedException {IdempotenceProducer idempotenceProducer = new IdempotenceProducer();Producer<String, String> producer = idempotenceProducer.getProducer();producer.send(new ProducerRecord<String,String>("test","1234")).get();}}
事务
事务作用范围
全部
实现方法
Producer设置//设置Producer幂等性,其他不用变化
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true);
//设置事务,同时也要指定幂等性,自定义id名称
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"jast-acid");-------------------------------------------------------------------Consumer设置//设置只读事务提交成功后的数据props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.toString().toLowerCase());
代码
Producer
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import wiki.hadoop.kafka.config.Constant;import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;/*** Kafka事务提交,保证exactly once producer* 要么全部成功,要么全部失败* @author jast* @date 2020/4/21 22:38*/
public class TransactionProducer {private static Producer<String, String> producer ;public TransactionProducer() {Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Constant.KAFKA_BROKERS);props.put(ProducerConfig.ACKS_CONFIG, "all");props.put(ProducerConfig.RETRIES_CONFIG, 5);props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);props.put(ProducerConfig.LINGER_MS_CONFIG, 1);props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 1024 * 1024 * 1024);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");//设置Producer幂等性,其他不用变化props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true);//设置事务,同时也要指定幂等性,自定义id名称props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"jast-acid");producer = new KafkaProducer<String, String>(props);}public Producer<String,String> getProducer(){return producer;}public static void main(String[] args) throws ExecutionException, InterruptedException {TransactionProducer transactionProducer = new TransactionProducer();Producer<String, String> producer = transactionProducer.getProducer();//初始化事务producer.initTransactions();boolean flag = true;//循环四次,最后一次我们把事务成功提交//理想结果:前三次事务提交失败// 事务消费者消费不到数据1,2,第四次可以消费到1,2,3,4;// 普通消费者可以消费到前三次的1,2 ,也可以消费到第四次1,2,3,4// 运行方法 TransactionConsumer/*** 结果如下,事务提交成功* 普通消费者消费数据->1 partition:2 offset:3080713* 事务消费者消费数据->3 partition:2 offset:3080717* 普通消费者消费数据->2 partition:1 offset:3081410* 普通消费者消费数据->1 partition:3 offset:3081465* 普通消费者消费数据->1 partition:2 offset:3080715* 普通消费者消费数据->3 partition:2 offset:3080717* 事务消费者消费数据->4 partition:1 offset:3081414* 事务消费者消费数据->2 partition:0 offset:3081470* 事务消费者消费数据->1 partition:3 offset:3081467* 普通消费者消费数据->2 partition:1 offset:3081412* 普通消费者消费数据->4 partition:1 offset:3081414* 普通消费者消费数据->2 partition:0 offset:3081468* 普通消费者消费数据->2 partition:0 offset:3081470* 普通消费者消费数据->1 partition:3 offset:3081467*/for(int i=0;i<=3;i++) {if(i==3)flag = false;try {//事务开始producer.beginTransaction();producer.send(new ProducerRecord<String, String>("test", "1")).get();producer.send(new ProducerRecord<String, String>("test", "2")).get();//手动制造异常if (flag)throw new RuntimeException("程序异常");producer.send(new ProducerRecord<String, String>("test", "3")).get();producer.send(new ProducerRecord<String, String>("test", "4")).get();//事务提交producer.commitTransaction();} catch (Exception e) {//中止事务producer.abortTransaction();e.printStackTrace();}}}
}
Consumer
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.requests.IsolationLevel;
import org.apache.kafka.common.serialization.StringDeserializer;
import wiki.hadoop.kafka.config.Constant;
import wiki.hadoop.kafka.util.LogInit;import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;/*** 消费Kafka,保证事务性* @author jast* @date 2020/4/21 22:54*/
public class TransactionConsumer {/*** 事务性kafka消费* @return KafkaConsumer<String,String>* @param topic* @param max_poll_records* @param group* @return*/public KafkaConsumer<String, String> transactionConsumer(String topic, String group , int max_poll_records , boolean isLatest) {Properties props = new Properties();//-----------------------------------------------------------------------------------//设置只读事务提交成功后的数据props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.toString().toLowerCase());//-----------------------------------------------------------------------------------props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, Constant.KAFKA_BROKERS);props.put(ConsumerConfig.GROUP_ID_CONFIG, group);props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, max_poll_records);//控制每次poll的数量props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);//自动提交 falseprops.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 3000);props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, isLatest==true ? "latest" : "earliest");props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 5 * 1024 * 1024);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);consumer.subscribe(Arrays.asList(topic));return consumer;}public KafkaConsumer<String, String> consumer(String topic, String group , int max_poll_records , boolean isLatest) {Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, Constant.KAFKA_BROKERS);props.put(ConsumerConfig.GROUP_ID_CONFIG, group);props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, max_poll_records);//控制每次poll的数量props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);//自动提交 falseprops.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 3000);props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, isLatest==true ? "latest" : "earliest");props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 5 * 1024 * 1024);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);consumer.subscribe(Arrays.asList(topic));return consumer;}public static void main(String[] args) throws InterruptedException, ExecutionException {TransactionConsumer transactionConsumer = new TransactionConsumer();TransactionConsumer transactionConsumer2 = new TransactionConsumer();KafkaConsumer<String, String> consumer = transactionConsumer.consumer("test", "test", 10, false);KafkaConsumer<String, String> consumer2 = transactionConsumer2.transactionConsumer("test", "test2", 10, false);CompletableFuture.runAsync(()->{while(true) {ConsumerRecords<String, String> records = consumer.poll(1000);for (ConsumerRecord<String, String> record : records) {System.out.println("普通消费者消费数据->" + record.value() + " partition:"+record.partition()+ " offset:"+record.offset());}
// System.out.println("普通消费者休眠1秒");try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}}});CompletableFuture.runAsync(()->{while(true) {ConsumerRecords<String, String> records2 = consumer2.poll(1000);for (ConsumerRecord<String, String> record : records2) {System.out.println("事务消费者消费数据->" + record.value() + " partition:"+record.partition()+ " offset:"+record.offset());}
// System.out.println("事务消费者休眠1秒");try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}}}).get();}
}