Kafka-Java四:Spring配置Kafka消费者提交Offset的策略

一、Kafka消费者提交Offset的策略

Kafka消费者提交Offset的策略有

  1. 自动提交Offset:
    1. 消费者将消息拉取下来以后未被消费者消费前,直接自动提交offset。
    2. 自动提交可能丢失数据,比如消息在被消费者消费前已经提交了offset,有可能消息拉取下来以后,消费者挂了
  2. 手动提交Offset
    1. 消费者在消费消息时/后,再提交offset,在消费者中实现
    2. 手动提交Offset分为:手动同步提交、手动异步提交
  3. 什么是Offset
    1. 参考文章:Linux:【Kafka三】组件介绍

二、自动提交策略

        Kafka消费者默认是自动提交Offset的策略

        可设置自动提交的时间间隔

package com.demo.lxb.kafka;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringSerializer;import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;/*** @Description: kafka消费者消费消息,自动提交offset* @Author: lvxiaobu* @Date: 2023-10-24 16:26**/
public class MyConsumerAutoSubmitOffset {private  final static String CONSUMER_GROUP_NAME = "GROUP1";private  final static String TOPIC_NAME = "topic0921";public static void main(String[] args) {Properties props = new Properties();// 一、设置参数// 配置kafka地址
//        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
//                "192.168.151.28:9092"); // 单机配置props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.154.128:9092,192.168.154.128:9093,192.168.154.128:9094"); // 集群配置// 配置消息 键值的序列化规则props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());// 配置消费者组props.put(ConsumerConfig.GROUP_ID_CONFIG,CONSUMER_GROUP_NAME);// 设置消费者offset的提交方式// 自动提交:默认配置props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true");// 自动提交offset的时间间隔props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");// 二、创建消费者KafkaConsumer<String,String> consumer = new KafkaConsumer<String,String>(props);// 三、消费者订阅主题consumer.subscribe(Arrays.asList(TOPIC_NAME));// 四、拉取消息,开始消费while (true){// 从kafka集群中拉取消息ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));// 消费消息,当前是自动提交模式,在消息上一行消息被拉取下来以后,offset就自动被提交了,下面的代码如果出错,或者此时// 消费者挂掉了,那么消费其实是没有进行消费的(也就是业务逻辑处理)for (ConsumerRecord<String, String> record : records) {System.out.println("接收到的消息: 分区: " + record.partition() + ", offset: " + record.offset()+ ", key值: " + record.key() + " , value值: "+record.value());}}}
}

上述代码中的如下代码是自动提交策略的相关设置 

        // 设置消费者offset的提交方式// 自动提交:默认配置props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true");// 自动提交offset的时间间隔props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");

三、手动提交策略

3.1、手动同步提交策略

        手动同步提交,会在提交offset处阻塞。当消费者接收到 kafka集群返回的消费者提交offset成功的ack后,才开始执行消费者中后续的代码。

        因为使用异步提交容易丢失消息,固一般使用同步提交,在同步提交后不要再做其他逻辑处理。

package com.demo.lxb.kafka;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringSerializer;import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;/*** @Description: kafka消费者消费消息,手动同步提交offset* @Author: lvxiaobu* @Date: 2023-10-24 16:26**/
public class MyConsumerMauSubmitOffset {private  final static String CONSUMER_GROUP_NAME = "GROUP1";private  final static String TOPIC_NAME = "topic0921";public static void main(String[] args) {Properties props = new Properties();// 一、设置参数// 配置kafka地址
//        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
//                "192.168.151.28:9092"); // 单机配置props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.154.128:9092,192.168.154.128:9093,192.168.154.128:9094"); // 集群配置// 配置消息 键值的序列化规则props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());// 配置消费者组props.put(ConsumerConfig.GROUP_ID_CONFIG,CONSUMER_GROUP_NAME);// 设置消费者offset的提交方式// 手动提交offsetprops.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");// 自动提交offset的时间间隔:此时不再需要设置该值
//        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");// 二、创建消费者KafkaConsumer<String,String> consumer = new KafkaConsumer<String,String>(props);// 三、消费者订阅主题consumer.subscribe(Arrays.asList(TOPIC_NAME));// 四、拉取消息,开始消费while (true){// 从kafka集群中拉取消息ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));//  业务逻辑处理for (ConsumerRecord<String, String> record : records) {System.out.println("接收到的消息: 分区: " + record.partition() + ", offset: " + record.offset()+ ", key值: " + record.key() + " , value值: "+record.value());}// 当for循环业务逻辑处理结束以后,再手动提交offset// 同步方式提交,此时会产生阻塞,当kafka集群返回了提交成功的ack以后,才会消除阻塞,进行后续的代码逻辑。// 一般使用同步提交,在同步提交后不再做其他逻辑处理consumer.commitAsync();// do anything}}
}

3.2、手动异步提交策略

        异步提交,不会在提交offset代码处阻塞,即消费者提交了offset后,不需要等待kafka集群返回的ack即可继续执行后续代码。但是在提交offset时需要提供一个回调方法,供kafka集群回调,来告诉消费者提交offset的结果。

package com.demo.lxb.kafka;import com.alibaba.fastjson.JSONObject;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringSerializer;import java.time.Duration;
import java.util.Arrays;
import java.util.Map;
import java.util.Properties;/*** @Description: kafka消费者消费消息,手动异步提交offset* @Author: lvxiaobu* @Date: 2023-10-24 16:26**/
public class MyConsumerMauSubmitOffset2 {private  final static String CONSUMER_GROUP_NAME = "GROUP1";private  final static String TOPIC_NAME = "topic0921";public static void main(String[] args) {Properties props = new Properties();// 一、设置参数// 配置kafka地址
//        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
//                "192.168.151.28:9092"); // 单机配置props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.154.128:9092,192.168.154.128:9093,192.168.154.128:9094"); // 集群配置// 配置消息 键值的序列化规则props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());// 配置消费者组props.put(ConsumerConfig.GROUP_ID_CONFIG,CONSUMER_GROUP_NAME);// 设置消费者offset的提交方式// 手动提交offsetprops.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");// 自动提交offset的时间间隔:此时不再需要设置该值
//        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");// 二、创建消费者KafkaConsumer<String,String> consumer = new KafkaConsumer<String,String>(props);// 三、消费者订阅主题consumer.subscribe(Arrays.asList(TOPIC_NAME));// 四、拉取消息,开始消费while (true){// 从kafka集群中拉取消息ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> record : records) {System.out.println("接收到的消息: 分区: " + record.partition() + ", offset: " + record.offset()+ ", key值: " + record.key() + " , value值: "+record.value());}// 异步提交,不影响后续的内容。// new OffsetCommitCallback是kafka集群会回调的方法,告诉消费者提交offset的结果consumer.commitAsync(new OffsetCommitCallback() {@Overridepublic void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) {if(e != null){// 可将提交失败的消息记录到日志System.out.println("记录提交offset失败的消息到日志");System.out.println("消费者提交offset抛出异常:" + Arrays.toString(e.getStackTrace()));System.out.println("消费者提交offset异常的消息信息:" + JSONObject.toJSONString(map));}}});// 后续逻辑处理,不需要等到kafka集群返回了提交成功的ack以后才开始处理。//do anything}}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/119615.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Ubuntu22.04 交叉编译阿里oss c-sdk

一、交叉编译openssl Ubuntu20.04 交叉编译openssl 1.0.1f_编译前去除 makefile 中所有的"-m64"字段_qq76211822的博客-CSDN博客文章浏览阅读319次。Ubuntu20.04 交叉编译openssl_编译前去除 makefile 中所有的"-m64"字段https://blog.csdn.net/sz7621182…

力扣刷题 day56:10-26

1.解码异或后的数组 未知 整数数组 arr 由 n 个非负整数组成。 经编码后变为长度为 n - 1 的另一个整数数组 encoded &#xff0c;其中 encoded[i] arr[i] XOR arr[i 1] 。例如&#xff0c;arr [1,0,2,1] 经编码后得到 encoded [1,2,3] 。 给你编码后的数组 encoded 和原…

【Java集合类面试二十三】、List和Set有什么区别?

文章底部有个人公众号&#xff1a;热爱技术的小郑。主要分享开发知识、学习资料、毕业设计指导等。有兴趣的可以关注一下。为何分享&#xff1f; 踩过的坑没必要让别人在再踩&#xff0c;自己复盘也能加深记忆。利己利人、所谓双赢。 面试官&#xff1a;List和Set有什么区别&am…

python爬虫入门(六)BeautifulSoup使用

简单来说&#xff0c;BeautifulSoup 就是 Python 的一个 HTML 或 XML 的解析库&#xff0c;我们可以用它来方便地从网页中提取数据&#xff0c;官方的解释如下&#xff1a; BeautifulSoup 提供一些简单的、Python 式的函数用来处理导航、搜索、修改分析树等功能。它是一个工具…

Redis单线程还是多线程?

&#x1f603; 个人学习笔记&#xff0c;不喜勿喷&#xff0c;望大佬指正&#xff01; 目录 一、Redis为什么选择单线程1. 是什么&#xff1f;2. why&#xff0c;为什么之前选择单线程&#xff1f;3. 作者原话使用单线程原因&#xff0c;官网证据 二、 为什么逐渐加入多线程特性…

零基础Linux_23(多线程)线程安全+线程互斥(加锁)+死锁

目录 1. 线程安全 1.1 线程不安全前期 1.2 线程不安全原因 2. 线程互斥 2.1 加锁保护&#xff08;代码&#xff09; 2.2 锁的本质 3. 可重入对比线程安全 4. 死锁 4.1 死锁的必要条件 4.2 避免死锁 5. 笔试面试题 答案及解析 本篇完。 1. 线程安全 基于上一篇线程…

0基础学习PyFlink——用户自定义函数之UDTF

大纲 表值函数完整代码 在《0基础学习PyFlink——用户自定义函数之UDF》中&#xff0c;我们讲解了UDF。本节我们将讲解表值函数——UDTF 表值函数 我们对比下UDF和UDTF def udf(f: Union[Callable, ScalarFunction, Type] None,input_types: Union[List[DataType], DataTy…

【网安大模型专题10.19】论文6:Java漏洞自动修复+数据集 VJBench+大语言模型、APR技术+代码转换方法+LLM和DL-APR模型的挑战与机会

How Effective Are Neural Networks for Fixing Security Vulnerabilities 写在最前面摘要贡献发现 介绍背景&#xff1a;漏洞修复需求和Java漏洞修复方向动机方法贡献 数据集先前的数据集和Java漏洞Benchmark数据集扩展要求数据处理工作最终数据集 VJBenchVJBench 与 Vul4J 的…

NO.2 | 977.有序数组的平方 ,209.长度最小的子数组 ,59.螺旋矩阵II

NO.2 | 977.有序数组的平方 &#xff0c;209.长度最小的子数组 &#xff0c;59.螺旋矩阵II 题目链接&#xff1a;977.有序数组的平方 前置条件&#xff1a;数组是非递减的&#xff0c;如果是乱序的数组应该就只能暴力解法暴力解法&#xff1a;直接遍历&#xff0c;对每个元素都…

与AI对话,如何写好prompt?

玩转AIGC&#xff0c;优质的Prompt提示词实在是太重要了&#xff01;同样的问题&#xff0c;换一个问法&#xff0c;就会得到差别迥异的答案。你是怎样和AI进行对话交流的呢&#xff1f;我来分享几个&#xff1a; 请告诉我…我想知道…对于…你有什么看法&#xff1f;帮我解决…

腾讯云学生专享云服务器介绍及购买攻略

随着互联网技术的不断发展&#xff0c;越来越多的人开始关注云计算领域。作为国内领先的云计算服务商&#xff0c;腾讯云推出了“云校园”扶持计划&#xff0c;完成学生认证即可购买学生专享云服务器。 一、活动对象 仅限腾讯云官网通过个人认证的35岁以下学生用户参与&#x…

初识Node.js开发

一、Node.js是什么 1.node.js是什么 官方对Node.js的定义&#xff1a; Node.js是一个基于V8 JavaScript引擎的JavaScript运行时环境。 也就是说Node.js基于V8引擎来执行JavaScript的代码&#xff0c;但是不仅仅只有V8引擎&#xff1a; 前面我们知道V8可以嵌入到任何C 应用…

PULP Ubuntu18.04

1. 安装eda工具&#xff1a;questasim_10.7_linux64&#xff0c;网上有教程和方法&#xff0c;如有问题&#xff0c;可私信我 2. 代码下载&#xff1a; git clone https://github.com/pulp-platform/pulp 编译代码 cd pulp source setup/vsim.sh make checkout make scripts …

解决 Element-ui中 表格(Table)使用 v-if 条件控制列显隐时数据展示错乱的问题

本文 Element-ui 版本 2.x 问题 在 el-table-column 上需根据不同 v-if 条件来控制列显隐时&#xff0c;就会出现列数据展示错乱的情况&#xff08;要么 A 列的数据显示在 B 列上&#xff0c;或者后端返回有数据的但是显示的为空&#xff09;&#xff0c;如下所示。 <tem…

vue中使用xlsx插件导出多sheet excel实现方法

安装xlsx&#xff0c;一定要注意版本&#xff1a; npm i xlsx0.17.0 -S package.json&#xff1a; {"name": "hello-world","version": "0.1.0","private": true,"scripts": {"serve": "vue-c…

【Kotlin精简】第5章 简析DSL

1 DSL是什么&#xff1f; Kotlin 是一门对 DSL 友好的语言&#xff0c;它的许多语法特性有助于 DSL 的打造&#xff0c;提升特定场景下代码的可读性和安全性。本文将带你了解 Kotlin DSL 的一般实现步骤&#xff0c;以及如何通过 DslMarker &#xff0c; Context Receivers 等…

[UDS] --- DiagnosticSessionControl 0x10 service

1 会话 $10包含3个子功能&#xff0c;01 Default默认会话&#xff0c;02 Programming编程会话&#xff0c;03 Extended扩展会话&#xff0c;ECU上电时&#xff0c;进入的是默认会话&#xff08;Default&#xff09;。 为什么设计三个会话模式呢&#xff1f;因为权限问题。默认…

FreeRTOS学习2018.6.27

《FreeRTOS学习》 1.freeRTOS基本功能函数&#xff1a; 定义任务&#xff1a;ATaskFunction(); 创建任务&#xff1a;xTaskCreate(); 改优先级&#xff1a;vTaskPrioritySet(); 系统延时&#xff1a;vTaskDelay(); 精确延时&#xff1a;vTaskDelayUntil(); 空闲任务钩子函数&…

CSS高级的详细解析

CSS高级 目标&#xff1a;掌握定位的作用及特点&#xff1b;掌握 CSS 高级技巧 01-定位 作用&#xff1a;灵活的改变盒子在网页中的位置 实现&#xff1a; 1.定位模式&#xff1a;position 2.边偏移&#xff1a;设置盒子的位置 left right top bottom 相对定位 posit…

【计算机网络(1)】计算机网络体系结构1:计算机网络概述

文章目录 概念 & 功能 & 发展计算机网络的概念计算机网络的功能计算机网络的发展网络的本质 组成 & 分类计算机网络的组成计算机网络的分类 概念 & 功能 & 发展 计算机网络的概念 1. 网络 网一样的东西或网状系统。其中&#xff08;有线电视网络、电信网…