kafka入门(二): 位移提交

位移提交:

Kafka的每条消息都有唯一的 offset, 用来表示消息在分区中对应的位置。有的也称之为 “偏移量”。

消费者每次在 poll() 拉取消息,它要返回的是还没有消费过的消息集,

因此,需要记录上一次消费时的消费位移,并且持久化。

消费者在消费完消息之后,需要执行消费位移的提交。

自动位移提交:

Kafka默认的消费位移的提交方式是 自动提交。

自动提交,由消费者客户端参数 enable.auto.commit 配置,默认值是 true。

默认的自动提交,是定期提交,提交的周期由 auto.commit.interval.ms 配置,默认是 5s。

自动位移提交,有可能会重复消费和消息丢失。

假设刚刚提交完一次消费位移,然后拉取一批消息进行消费,在下一次自动提交消费位移之前,消费者崩溃了,那又得从上一次位移提交的地方重新开始消费,这样就会重复消费。

手动位移提交:

手动位移提交,由消费者客户端参数 enable.auto.commit 配置, 设置为 false 就是手动位移提交。

手动位移提交,可以分为 同步提交、异步提交。

commitSync() 同步提交

同步提交,会阻塞消费者线程直到位移提交完成。

示例代码:

public class OffsetCommitSync {public static final String BROKER_LIST = "localhost:9092";public static final String TOPIC = "myTopic1";public static final String GROUP_ID = "group.demo";public static void main(String[] args) {Properties props = initConfig();KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);//消费者订阅主题consumer.subscribe(Collections.singletonList(TOPIC));while (true) {ConsumerRecords<String, String> records = consumer.poll(1000);for (ConsumerRecord<String, String> record : records) {//do something}//手动提交位移consumer.commitSync();System.out.println("手动提交位移成功.");}}public static Properties initConfig() {Properties props = new Properties();props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_LIST);props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");//不自动提交,采用手动提交props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);return props;}}

commitAsync() 异步提交 :

异步提交,在执行的时候消费者线程不会被阻塞,可能在提交消费位移的结果还未返回之前就开始了新一次的拉取操作。异步提交,可以使消费者的性能得到一定的增强。

异步提交,将 consumer.commitSync(); 换成 commitAsync。

如果还需要回调,就用 OffsetCommitCallback对象作为参数。

示例如下:

public class OffsetCommitAsyncCallback {public static final String BROKER_LIST = "localhost:9092";public static final String TOPIC = "myTopic1";public static final String GROUP_ID = "group.demo";public static void main(String[] args) {Properties props = initConfig();KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Collections.singletonList(TOPIC));while (true) {ConsumerRecords<String, String> records = consumer.poll(1000);for (ConsumerRecord<String, String> record : records) {//do something}//异步回调,如果不需要回调,就采用无参的方法consumer.commitAsync(new OffsetCommitCallback() {@Overridepublic void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets,Exception exception) {if (exception == null) {System.out.println(offsets);} else {log.error("fail to commit offsets {}", offsets, exception);}}});}}public static Properties initConfig() {Properties props = new Properties();props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_LIST);props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);return props;}}

参考资料:

《深入理解kafka:核心设计与实践原理》

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/171026.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

[计算机网络]运输层概述

虽然我自己也不知道写在前面和前言有什么区别..... 这个系列其实是针对<深入浅出计算机网络>的简单总结,加入了一点个人的理解和浅薄见识,如果您有一些更好的意见和见解,欢迎随时协助我改正,感激不尽啦. 最近心态平和了不少, 和过去也完全做了个割舍吧,既然痛苦和压力的…

记录华为云服务器(Linux 可视化 宝塔面板)-- 安全组篇

文章目录 前言安全组说明安全组的特性安全组的应用场景 进入安全组添加基本规则添加自定义规则如有启发&#xff0c;可点赞收藏哟~ 前言 和windows防火墙类似&#xff0c;安全组是一种虚拟防火墙&#xff0c;具备状态检测和数据包过滤功能&#xff0c;可以对进出云服务器的流量…

typeof,instanceof

1.typeof typeof运算符返回的结果是以小写的字符串表示的变量的类型 2.instanceof instanceof运算符用于判断右边构造函数的原型对象是否在左边对象的原型链上 let arr[]let obj{}let datenew Dateconsole.log(arr instanceof Array)console.log(arr instanceof Object)conso…

Maven 简单配置阿里云镜像

配置步骤&#xff1a; 1、找到 maven 的安装目录&#xff0c;修改settings.xml 2、在文件中找到<mirrors>标签&#xff0c;然后再标签中添加阿里云配置即可 <mirror><id>aliyunmaven</id><mirrorOf>*</mirrorOf><name>阿里云公共…

CocosCreator 面试题(十五)Cocos Creator如何内置protobuf JS版本?

一、说说protobuf 是什么&#xff1f; Protocol Buffers&#xff08;简称为ProtoBuf&#xff09;是一种由Google开发的数据序列化格式。它是一种轻量级、高效且通用的数据交换格式&#xff0c;可用于各种编程语言和平台。 ProtoBuf使用结构化的消息定义语言&#xff08;IDL&a…

巧妙之中见真章:深入解析常用的创建型设计模式

设计模式之创建型设计模式详解 一、设计模式是什么&#xff1f;二、模板方法2.1、代码结构2.2、符合的设计原则2.3、如何扩展代码2.4、小结 三、观察者模式3.1、代码结构3.2、符合的设计原则3.3、如何扩展代码3.4、小结 四、策略模式4.1、代码结构4.2、符合的设计原则4.3、如何…

Pinctrl子系统和GPIO子系统

Pinctrl子系统&#xff1a; 借助Princtr子系统来设置一个Pin的复用和电气属性&#xff1b; pinctrl子系统主要做的工作是&#xff1a;1. 获取设备树中的PIN信息&#xff1b;2.根据获取到的pin信息来设置的Pin的复用功能&#xff1b;3.根据获取到的pin信息去设置pin的电气特性…

【机器学习 | 聚类】关于聚类最全评价方法大全,确定不收藏?

&#x1f935;‍♂️ 个人主页: AI_magician &#x1f4e1;主页地址&#xff1a; 作者简介&#xff1a;CSDN内容合伙人&#xff0c;全栈领域优质创作者。 &#x1f468;‍&#x1f4bb;景愿&#xff1a;旨在于能和更多的热爱计算机的伙伴一起成长&#xff01;&#xff01;&…

访谈 破风之人毛京波,选择难而正确的路

“无论是在燃油时代还是电动时代&#xff0c;我们所做的一切&#xff0c;只为回归纯粹的驾驶乐趣。”履新路特斯中国总裁整整一年的毛京波&#xff0c;从不放过任何一个展示路特斯品牌驾驭精神的机会。 11月17日&#xff0c;广州车展开幕首日&#xff0c;位于5.2馆的路特斯“冠…

【理解ARM架构】 散列文件 | 重定位

&#x1f431;作者&#xff1a;一只大喵咪1201 &#x1f431;专栏&#xff1a;《理解ARM架构》 &#x1f525;格言&#xff1a;你只管努力&#xff0c;剩下的交给时间&#xff01; 目录 &#x1f3d3;引出重定位&#x1f3d3;散列文件&#x1f3d3;可读可写数据段重定位&#…

C++ 基于Boost.Asio实现端口映射器

Boost.Asio 是一个功能强大的 C 库&#xff0c;用于异步编程和网络编程&#xff0c;它提供了跨平台的异步 I/O 操作。在这篇文章中&#xff0c;我们将深入分析一个使用 Boost.Asio 实现的简单端口映射服务器&#xff0c;该服务器能够将本地端口的数据包转发到指定的远程服务器上…

OSG编程指南<十一>:OSG几何体操作及三维地形创建

1、简化几何体 在 OSG 中&#xff0c;场景都是由基本的绘图基元构成的&#xff0c;基本的绘图基元构成简单的几何体&#xff0c;简单的几何体构成复杂的几何体&#xff0c;复杂的几何体最终构造成复杂的场景。当多个几何体组合时&#xff0c;可能 存在多种降低场景渲染效率的原…

220. 存在重复元素 III(滑动窗口+有序集合)

Problem: 220. 存在重复元素 III 文章目录 题目思路Code 题目 给你一个整数数组 nums 和两个整数 indexDiff 和 valueDiff 。 找出满足下述条件的下标对 (i, j)&#xff1a; i ! j,abs(i - j) < indexDiffabs(nums[i] - nums[j]) < valueDiff 如果存在&#xff0c;返回…

Vue事件处理

Vue的事件处理与BOMDOMjs下的事件处理没有明显的差异&#xff0c;差别主要体现在规范化上。在模板或者DOM结构上绑定事件的指令是v-on或者符号。事件的处理函数除了直接指定处理函数之外&#xff0c;还增加了对事件简易处理函数的内联处理方式。在事件上&#xff0c;Vue增加了事…

JAVA小游戏“简易版王者荣耀”

第一步是创建项目 项目名自拟 第二部创建个包名 来规范class 然后是创建类 GameFrame 运行类 package com.sxt;import java.awt.Graphics; import java.awt.Image; import java.awt.Toolkit; import java.awt.event.ActionEvent; import java.awt.event.ActionListener; im…

06_正则与异常处理

正则表达式 基础演示 public class Test {public static void main(String[] args) {System.out.println(check("7029622989")); // true}public static boolean check(String userId) {return userId ! null && userId.matches("[1-9]\\d{5,19}&quo…

DPDK系列之三十七IO处理

一、介绍 如果一条通信链路要想达到最优的效果&#xff0c;一定是整体上每个环节都要有最佳的节奏协调控制而不一定是每个环节都是最优。这个在计算机的数据处理上就更是明显。一般来说&#xff0c;IO的速度是最低的&#xff0c;至少在可见的时光里要想超越CPU和内存还是很难的…

Scope 模块

Scope 模块可以连接任何类型的实数信号线 (不支持复数)。 波形显示界面主要包括两个部分: Scope 独有的工具栏、波形显示区域。 波形显示界面默认是黑色背景, 当有单个信号输入时, 信号线是黄色的。 Scope 模块也有菜单栏, 只不过默认将其句柄和显示都隐藏起来, 可以通过下面…

技术类知识汇总(二)

在自己日常学习javaweb的过程中&#xff0c;做的一些笔记和总结&#xff0c;汇总如下&#xff1a; Springboot项目的静态资源(html&#xff0c;css&#xff0c;js等前端资源)默认存放目录为&#xff1a;classpath:/static classpath:/public classpath:/resources"三层架…

java-hprof 文件是什么

一、是什么 hprof 文件是 Java进程所使用的内存情况在某一时间的一次快照&#xff08;Heap Profile 的缩写&#xff09;&#xff0c;格式为java_pidxxxxx*.hprof 二、文件里面有什么 1、所有的对象信息 对象的类信息、字段信息、原生值(int, long等)及引用值 2、所有的类信…