前面的章节中我们聊到如何避免保证消息丢失,没有印象的同学可以再看看,本节我们将展开如何实现kafka的一次精确。
首先我们需要明白两个概念“幂等”和“事物”
幂等
“幂等”这个词原是数学领域中的概念,指的是某些操作或函数能够被执行多次,但每次得到的结果都是不变的。比如乘以 1 就是一个幂等操作,而数加 1 这个操作就不是幂等的,因为执行一次和执行多次的结果不同。
在计算机领域中
- 若一个子程序是幂等的,不管运行这个子程序多少次,与该子程序关联的那部分系统状态保持不变。
- 在函数式编程语言(比如 Scala 或 Haskell)中,很多纯函数天然就是幂等的。
事务
事务就是是一个操作序列,这些操作要么都执行,要么都不执行,它是一个不可分割的工作单位。Kafka 的事务概念类似于我们熟知的数据库提供的事务。在数据库领域,事务提供的安全性保障是经典的 ACID,即原子性(Atomicity)、一致性 (Consistency)、隔离性 (Isolation) 和持久性 (Durability)。
Idempotent Producer
Producer 默认不是幂等性的,但我们可以创建幂等性 Producer。在 0.11.0.0 版本引入了幂等生产者。
- enable.idempotence=true
光有幂等还不够,还需要事务的保证
Transactional Producer
producer.initTransactions();
try {producer.beginTransaction();producer.send(record1);producer.send(record2);producer.commitTransaction();
} catch (KafkaException e) {producer.abortTransaction();
}
消费者端保证
生产者解决了重复消息的问题,消费这端自然也需要避免重复消费
- enable.auto.commit=false
由默认的自动提交改为手动提交,关于自动提交的最佳实践可以参考上一节【kafka实践】11|消费位移提交
while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {// Process the recordprocessRecord(record);}// Commit the offsets after processing the batch of recordsconsumer.commitSync();
}
当然更建议配合业务层面由幂等处理,这样就能做到“万无一失”。
笔者准备了英文原版的kafka经典书籍:kafka in action
电子书版本,扫码回复:kafka 请多多支持!