Kafka3.0.0版本——消费者(手动提交offset)

目录

    • 一、消费者(手动提交 offset)的概述
      • 1.1、手动提交offset的两种方式
      • 1.2、手动提交offset两种方式的区别
      • 1.3、手动提交offset的图解
    • 二、消费者(手动提交 offset)的代码示例
      • 2.1、手动提交 offset(采用同步提交的方式)代码
      • 2.1、手动提交 offset(采用异步提交的方式)代码

一、消费者(手动提交 offset)的概述

1.1、手动提交offset的两种方式

  • commitSync(同步提交):必须等待offset提交完毕,再去消费下一批数据。
  • commitAsync(异步提交) :发送完提交offset请求后,就开始消费下一批数据了。

1.2、手动提交offset两种方式的区别

  • 相同点:都会将本次提交的一批数据最高的偏移量提交。
  • 不同点是:同步提交阻塞当前线程,一直到提交成功,并且会自动失败重试(由不可控因素导致,也会出现提交失败);而异步提交则没有失败重试机制,故有可能提交失败。

1.3、手动提交offset的图解

在这里插入图片描述

二、消费者(手动提交 offset)的代码示例

2.1、手动提交 offset(采用同步提交的方式)代码

  • 同步提交代码
    由于同步提交 offset 有失败重试机制,故更加可靠,但是由于一直等待提交结果,提交的效率比较低。

     // 是否自动提交 offset
    properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
    // 手动提交offset(同步提交)
    kafkaConsumer.commitSync();
    
  • 同步提交完整代码

    package com.xz.kafka.consumer;import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
    import java.util.ArrayList;
    import java.util.Properties;public class CustomConsumerByHandSync {public static void main(String[] args) {// 配置Properties properties = new Properties();// 连接 bootstrap.serversproperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.136.27:9092,192.168.136.28:9092,192.168.136.29:9092");// 反序列化properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());// 配置消费者组idproperties.put(ConsumerConfig.GROUP_ID_CONFIG,"test3");// 手动提交properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);// 1 创建一个消费者  "", "hello"KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);// 2 订阅主题 sevenTopicArrayList<String> topics = new ArrayList<>();topics.add("sevenTopic");kafkaConsumer.subscribe(topics);// 3 消费数据while (true){ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {System.out.println(consumerRecord);}// 手动提交offset(同步提交)kafkaConsumer.commitSync();}}
    }
    

2.1、手动提交 offset(采用异步提交的方式)代码

  • 异步提交代码
    虽然同步提交 offset 更可靠一些,但是由于其会阻塞当前线程,直到提交成功。因此吞吐量会受到很大的影响。因此更多的情况下,会选用异步提交 offset的方式。

     // 是否自动提交 offset
    properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
    // 手动提交offset(异步提交)
    kafkaConsumer.commitAsync();
    
  • 异步提交完整代码

    package com.xz.kafka.consumer;import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
    import java.util.ArrayList;
    import java.util.Properties;public class CustomConsumerByHandSync {public static void main(String[] args) {// 0 配置Properties properties = new Properties();// 连接 bootstrap.serversproperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.136.27:9092,192.168.136.28:9092,192.168.136.29:9092");// 反序列化properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());// 配置消费者组idproperties.put(ConsumerConfig.GROUP_ID_CONFIG,"test3");// 手动提交properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);// 1 创建一个消费者  "", "hello"KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);// 2 订阅主题 sevenTopicArrayList<String> topics = new ArrayList<>();topics.add("sevenTopic");kafkaConsumer.subscribe(topics);// 3 消费数据while (true){ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {System.out.println(consumerRecord);}// 手动提交offset(异步提交)kafkaConsumer.commitAsync();}}
    }
    

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/74842.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Python爬虫 教程:IP池的使用

前言 嗨喽~大家好呀&#xff0c;这里是魔王呐 ❤ ~! python更多源码/资料/解答/教程等 点击此处跳转文末名片免费获取 一、简介 爬虫中为什么需要使用代理 一些网站会有相应的反爬虫措施&#xff0c;例如很多网站会检测某一段时间某个IP的访问次数&#xff0c;如果访问频率…

[SICTF 2023 #Round2] Crypto,PWN,Reverse

似乎很久没写了。 周五到周日&#xff0c;两天的这个比赛&#xff0c;有些东西还真是头回用&#xff0c;值得纪录一下。 Crypto 密码这块这届还是比较简单的&#xff0c;没有复杂的题&#xff0c;但量大分多。 【签到】古典大杂烩 给了一堆emoji的图 &#x1f429;&#x…

GCP Architect之VPN+Network

VPN 搜索结果共计:11 [单选]As part of implementing their disaster recovery plan, your company is trying to replicate their production MySQL database from their private data center to their GCP project using a Google Cloud VPN connection. They are experien…

OpenCV(三十四):轮廓外接最大、最小矩形和多边形拟合

目录 1.轮廓外接最大矩形boundingRect() 2.轮廓外接最小矩形minAreaRect() 3.轮廓外接多边形approxPolyDP() 1.轮廓外接最大矩形boundingRect() Rect cv::boundingRect ( InputArray array ) array:输入的灰度图像或者2D点集&#xff0c;数据类型为vector<Point>或者M…

Go语言的[GPM模型]

在go中,线程是运行Groutine的实体,调度器的功能是把可以运行的Groutine分配到工作线程上 GPM模型 M与P的数量没有绝对的数量关系,当一个M阻塞时,P就会创建一个或者切换到另一个M,所以即使设置了runtime.GOMAXPROCS(1) 也可能创建多个M出来; 当M发现给自己输送G协程的那个P队列为…

《AI一键生成抖音商品种草文案》让你秒变带货王!

在这个数字化的时代&#xff0c;我们的生活被各种应用所包围&#xff0c;其中&#xff0c;抖音作为一款短视频分享平台&#xff0c;已经成为了我们生活中不可或缺的一部分。然而&#xff0c;作为一名抖音创作者&#xff0c;你是否曾经遇到过这样的困扰&#xff1a;在创作商品种…

C#程序到底从哪里开始看,从Main函数开始,那么Main函数是什么?

视觉人机器视觉粉丝问我,拿到自己公司得架构,问我,C#程序到底从哪里看,从Main函数开始,那么Main函数是什么? Main()函数 Main()是C#应用程序的入口点,执行这个函数就是执行应用程序。也就是说,在执行过程开始时,会执行Main()函数,在Main()函数执行完毕时,执行过…

【JavaSpring】spring接口-beanfactory和applicationcontext与事件解耦

beanfactory 1.applicationcontext的父接口 2.是Spring的核心容器 功能 表面只有getBean&#xff0c;但实现类默默发挥了巨大作用 1.管理所有bean 2.控制反转 3.基本的依赖注入 applicationcontext 功能 1.继承了MessageSource&#xff0c;有了处理国际化资源的能力 …

[H5动画制作系列] Sprite及Text Demo

参考代码: sprite.js: var canvas, stage, container; canvas document.getElementById("mainView"); function init() {stage new createjs.Stage(canvas);createjs.Touch.enable(stage);var loader new createjs.LoadQueue(false);loader.addEventListener(&q…

云计算与虚拟化

一、概念 什么是云计算&#xff1f; 云计算&#xff08;cloud computing&#xff09;是分布式计算的一种&#xff0c;指的是通过网络“云”将巨大的数据计算处理程序分解成无数个小程序&#xff0c;然后&#xff0c;通过多部服务器组成的系统进行处理和分析这些小程序得到结果…

SLAM论文详解(5) — Bundle_Adjustment_LM(BALM)论文详解

目录 1 摘要 2 相关工作 3 BA公式和导数 A. 直接BA公式 B. 导数 C. 二阶近似 4 自适应体素化 5. 将BALM结合进LOAM 6. 实验 7. 算法应用场景解析 1 摘要 Bundle Adjustment是一种用于同时估计三维结构和传感器运动运动的优化算法。在视觉SLAM&#xff0c;三维重建等…

爬虫逆向实战(30)-某查查股东关联公司(HmacSHA512)

一、数据接口分析 主页地址&#xff1a;某查查 1、抓包 通过抓包可以发现数据接口是api/people/getRelatCompany 2、判断是否有加密参数 请求参数是否加密&#xff1f; 无 请求头是否加密&#xff1f; 通过查看“标头”可以发现&#xff0c;请求头中有一个key和value都是…

基于Sentinel的微服务保护

前言 Sentinel是Alibaba开源的一款微服务流控组件&#xff0c;用于解决分布式应用场景下服务的稳定性问题。Sentinel具有丰富的应用场景&#xff0c;它基于流量提供一系列的服务保护措施&#xff0c;例如多线程秒杀情况下的系统承载&#xff0c;并发访问下的流量控制&#xff…

9.8day59

503. 下一个更大元素 II - 力扣&#xff08;LeetCode&#xff09; 知识点&#xff1a;单调栈 42. 接雨水 - 力扣&#xff08;LeetCode&#xff09;

Qt的窗口系统

代码仓库以及参考文件见文章底部 坐标体系 要想学好GUI,界面的坐标系首先要搞清楚 在Qt编程中,以左上角为原点,X向右增加,Y向下增加。 对于所有嵌套的窗口,其坐标是相对于父窗口来说的。 QWidget 所有窗口以及窗口控件都是从QWidget直接或者间接派生出来的。 对象模…

基于Fomantic UI Web构建 个人导航站点网站源码 网站技术导航源码

BYR-Navi-master好看有个性的网站技术导航源码 该网站基于Fomantic UI Web框架构建&#xff0c;整个项目的设计和构建具有高度的配置和定制灵活性。 整体风格比较适合个人导航站点使用 搜索框输入关键词后&#xff0c;点击上方搜索引擎图标可跳转打开对应搜索引擎搜索结果&am…

std : : vector

一.简介 std::vector 的底层实现通常基于动态数组&#xff08;dynamic array&#xff09;&#xff0c;它是一种连续分配的内存块&#xff0c;允许元素的快速随机访问。下面是 std::vector 的一些关键特点和底层实现细节&#xff1a; 连续内存块&#xff1a;std::vector 内部使…

MAC M1芯片安装mounty读写移动硬盘中的文件

因为移动硬盘中的文件是微软公司NTFS格式&#xff0c;MAC只支持自己的APFS或者HFS&#xff0c;与微软的NTFS不兼容&#xff0c;所以需要第三方的软件来支持读写硬盘中的文件&#xff0c;经过一上午的折腾&#xff0c;最终选择安装mounty这个免费的第三方软件 工具网址连接&am…

祝贺!Databend Cloud 入驻 AWS 云市场

关于 Databend Cloud Databend Cloud 是基于开源云原生数仓项目 Databend 打造的一款易用、低成本、高性能的新一代大数据分析平台&#xff0c;提供一站式 SaaS 服务&#xff0c;免运维、开箱即用。 Databend Cloud 架构如下&#xff1a; 存储层完全面向对象存储而设计。 计算…

第25节-PhotoShop基础课程-文本工具组

文章目录 前言1.横排文字工具1.基本操作1.字体选择2.字体大小3.字体颜色4.左对齐5.右对齐6.居中对齐 2.字符 2.段落文字3.蒙版文字 前言 1.横排文字工具 1.基本操作 1.字体选择 2.字体大小 3.字体颜色 4.左对齐 5.右对齐 6.居中对齐 2.字符 2.段落文字 3.蒙版文字