springboot手动提交kafka offset

转载自 springboot手动提交kafka offset

enable.auto.commit参数设置成了false

但是测试发现enable.auto.commit参数设置成了false,kafka的offset依然提交了(也没有进行人工提交offset)。

查看源码

如果我们enable.auto.commit设置为false,那么就会走标红的if语句。而且下面有个stopInvokerAndCommitManualAcks()方法,看名字就知道是人工提交的意思。那么我们进去stopInvokerAndCommitManualAcks()方法瞅瞅。 

如上图所示有个processCommits()方法,那么继续追进去: 

单单看标红的方法是不是就知道这方法里面是更新offset和提交offset的方法。那么我们继续追进去:


结论:如果我们把enable.auto.commit参数设置成true。那么offset交给kafka来管理,offset进行默认的提交模式。 
enable.auto.commit参数设置成false。那么就是Spring来替为我们做人工提交,从而简化了人工提交的方式。 
所以kafka和springboot结合中的enable.auto.commit为false为spring的人工提交模式。enable.auto.commit为true是采用kafka的默认提交模式。 

手动提交

spring.kafka.consumer.enable-auto-commit设置为false,设置AckMode的值

 /*** The offset commit behavior enumeration.*/public enum AckMode {/*** Commit after each record is processed by the listener.*/RECORD,/*** Commit whatever has already been processed before the next poll.*/BATCH,/*** Commit pending updates after* {@link ContainerProperties#setAckTime(long) ackTime} has elapsed.*/TIME,/*** Commit pending updates after* {@link ContainerProperties#setAckCount(int) ackCount} has been* exceeded.*/COUNT,/*** Commit pending updates after* {@link ContainerProperties#setAckCount(int) ackCount} has been* exceeded or after {@link ContainerProperties#setAckTime(long)* ackTime} has elapsed.*/COUNT_TIME,/*** User takes responsibility for acks using an* {@link AcknowledgingMessageListener}.*/MANUAL,/*** User takes responsibility for acks using an* {@link AcknowledgingMessageListener}. The consumer is woken to
  • RECORD
    每处理一条commit一次
  • BATCH(默认)
    每次poll的时候批量提交一次,频率取决于每次poll的调用频率
  • TIME 
    每次间隔ackTime的时间去commit
  • COUNT 
    累积达到ackCount次的ack去commit
  • COUNT_TIME
    ackTime或ackCount哪个条件先满足,就commit
  • MANUAL
    listener负责ack,但是背后也是批量上去
  • MANUAL_IMMEDIATE
    listner负责ack,每调用一次,就立即commit

manual commit

@KafkaListener(topics = "k010")public void listen(ConsumerRecord<?, ?> cr,Acknowledgment ack) throws Exception {LOGGER.info(cr.toString());ack.acknowledge();}

方法参数里头传递Acknowledgment,然后手工ack

如果只添加上面语句会报错:

the listener container must have a MANUAL Ackmode to populate the Acknowledgment

我们要配置AckMode为MANUAL Ackmode

factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL);

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/322774.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

P2114-[NOI2014]起床困难综合症【位运算】

正题 题目大意 若干个位操作&#xff0c;求一个值xxx使得进行了以后的最大值。 解题思路 每位分开运算计算出每一位为0和为1时的结果&#xff0c;然后贪心选择。 codecodecode // luogu-judger-enable-o2 #include<cstdio> using namespace std; int n,m,ans,t; char …

可观测性与原生云监控

在近日发表的一篇文章中&#xff0c;Cindy Sridharan概括介绍了可观测性及其与原生云应用程序监控的关系。可观测性是一种理念&#xff0c;包括监控、日志聚合、指标和分布式跟踪&#xff0c;可以实时更深入地观察系统。 Sridharan的文章基于她就同一个主题所做的Velocity演讲。…

项目总览地址

写这篇的目的是整理一下最近一段时间写的小demo&#xff0c;我这里稍微自夸一下暂且先叫项目吧&#xff0c;如果服务器不垮台的话&#xff0c;以后还是一直可以看到预览效果的&#xff0c;反正域名和服务器我是不会扔掉的&#xff0c;就怕后面项目多了&#xff0c;服务器承受不…

mybatis3中@SelectProvider的使用技巧

转载自 mybatis3中SelectProvider的使用技巧 mybatis3中增加了使用注解来配置Mapper的新特性&#xff0c;本篇文章主要介绍其中几个Provider的使用方式&#xff0c;他们是&#xff1a;SelectProvider、UpdateProvider、InsertProvider和DeleteProvider。MyBatis 3 User Guide中…

P2766-最长不下降子序列问题【网络流,dp】

正题 题目大意 求最长不下降子序列和可以取出多少以及允许多次使用一些数时可以取出多少个。 解题思路 第一问dp求&#xff0c;且保存以xxx开头的最长长度fxf_xfx​。 第二问考虑网络流。 我们对于每个数字以fif_ifi​的不同分层&#xff0c;然后每次从下往上一层连接&#x…

使用Microsoft.AspNetCore.TestHost进行完整的功能测试

简介 Microsoft.AspNetCore.TestHost是可以用于Asp.net Core 的功能测试工具。很多时候我们一个接口写好了&#xff0c;单元测试什么的也都ok了&#xff0c;需要完整调试一下&#xff0c;检查下单元测试未覆盖到的代码是否有bug。步骤为如下&#xff1a;程序打个断点->F5运行…

MockJs案例

有时候前端写好模板后&#xff0c;后端还完工&#xff0c;那么总不能一直让项目停滞吧&#xff0c;这里就用Mockjs来模拟后端接口的数据&#xff0c;让我们先人一步完成项目。 首先创建一个html&#xff0c;导入axios和mockjs 再用mock去拦截请求&#xff0c;如果后端接口写好了…

JS document.execCommand实现复制功能

转载自 JS document.execCommand实现复制功能 最近项目中需要实现功能&#xff1a;点击button&#xff0c;复制input框的值&#xff1b; 我使用的是 document.execCommand(copy)的方法&#xff1b; 但是很郁闷的是&#xff0c;始终实现不了功能&#xff1b;代码如下 HTML代…

Entity Framework Core 使用HiLo生成主键

HiLo是在NHibernate中生成主键的一种方式&#xff0c;不过现在我们可以在Entity Framework Core中使用。所以在这篇内容中&#xff0c;我将向您在介绍如何在Entity Framework Core中使用HiLo生成主键。 什么是Hilo&#xff1f; HiLo是High Low的简写&#xff0c;翻译成中文叫高…

P1991-无线通讯网【最小生成树,瓶颈生成树】

正题 题目大意 有nnn个点&#xff0c;连边长度不超过DDD的情况下分为SSS个联通块。 求最小的DDD 解题思路 直接KruskalKruskalKruskal连边连到只剩下SSS个联通块就好了。 codecodecode #include<cstdio> #include<cmath> #include<algorithm> using names…

Echarts报错:Component series.lines not exists. Load it first.

前几天用的echarts标签是bootcdn的 <script src"https://cdn.bootcdn.net/ajax/libs/echarts/4.7.0/echarts-en.common.js"></script>用着官方给的案例还可以&#xff0c;但是一用gallery社区里面的例子就报错 后来经过不断调试终于知道是需要换个cdn&…

Mongodb常见问题

转载自 Mongodb常见问题 一.数据库级锁 MongoDB的锁机制和一般关系数据库如 MySQL&#xff08;InnoDB&#xff09;, Oracle 有很大的差异&#xff0c;InnoDB 和 Oracle 能提供行级粒度锁&#xff0c;而 MongoDB 2.x 只能提供 库级粒度锁&#xff0c;这意味着当 MongoDB 一个…

认识微软Visual Studio Tools for AI

微软已经发布了其 Visual Studio Tools for AI 的测试版本&#xff0c;这是微软 Visual Studio 2017 IDE 的扩展&#xff0c;可以让开发人员和数据科学家将深度学习模型嵌入到应用程序中。Visual Studio Tools for AI 工具同时支持 Microsoft 的 Cognitive Toolkit 和 Google 的…

P1297-[国家集训队]单选错位【期望概率】

正题 题目大意 nnn道题&#xff0c;第iii道aia_iai​个选项&#xff0c;选择每个选项的概率第相等的。但是每个选择都会填到后一道题。求对的期望题数。 解题思路 考虑若前面一道题有xxx个选项&#xff0c;后一道有yyy个选项&#xff0c;那么其实就是求一个在1∼x1\sim x1∼x随…

mybatis源码阅读(一):SqlSession和SqlSessionFactory

转载自 mybatis源码阅读(一)&#xff1a;SqlSession和SqlSessionFactory 一、接口定义 听名字就知道这里使用了工厂方法模式&#xff0c;SqlSessionFactory负责创建SqlSession对象。其中开发人员最常用的就是DefaultSqlSession &#xff08;1&#xff09;SqlSession接口定义…

开源纯C#工控网关+组态软件(六)图元组件

一、 图元概述 图元是构成人机界面的基本单元。如一个个的电机、设备、数据显示、仪表盘&#xff0c;都是图元。构建人机界面的过程就是铺排、挪移、定位图元的过程。 图元设计是绘图和编码的结合。因为图元不仅有显示和动画&#xff0c;还有背后操纵动画的控制逻辑。 一个好…

P2590-[ZJOI2008]树的统计【树链剖分,线段树】

正题 题目大意 一棵带权树&#xff0c;要求单点修改&#xff0c;路径求和和路径求最大值。 解题思路 先来一个树链剖分&#xff0c;然后线段树维护。 codecodecode #include<cstdio> #include<algorithm> using namespace std; const int N31000; int tot,cnt,n…

git合并分支的策略(赞)

假设当前有两个分支 master和test&#xff0c;两个分支一模一样&#xff0c;都有这三个文件 现在test添加一个test4.txt&#xff0c;然后提交到本地&#xff08;git add . git commit&#xff09; 切换到master分支上&#xff0c;git checkout master git merge test 这样mase…

mybatis源码阅读(二):mybatis初始化上

转载自 mybatis源码阅读(二)&#xff1a;mybatis初始化上 1.初始化入口 //Mybatis 通过SqlSessionFactory获取SqlSession, 然后才能通过SqlSession与数据库进行交互 private static SqlSessionFactory getSessionFactory() {SqlSessionFactory sessionFactory null;String …

P1983-车站分级【图论,记忆化dfs,构图】

正题 题目链接:https://www.luogu.org/problemnew/show/P1983 题目大意 一个辆车会一个一个值xxx&#xff0c;如果等级大于等于xxx的车站都会停靠(包括起点和终点)。给每辆车的停靠点&#xff0c;求至少要将车站分多少级。 解题思路 对于一辆车&#xff0c;若一个点他经过了…