Kafka java 配置

前言:
        大家好,大家在springboot项目中,经常采用 @KafkaListener 做为消费者。这个是spring为我们封装的。 但是某些情况 注解的方式并不能满足需求。这个时候就需要手动版本了。

介绍:

        我们已经集成spring-Kafka 就不需要再额外引入kafka-clients的依赖了。直接亮代码。

给大家解释配置含义。

1.Kafka配置代码

public KafkaConsumer<String, String> getCustomer() {// 1. 配置属性参数Properties properties = new Properties();// 设置Kafka集群的地址和端口,消费者将连接到这个地址和端口properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");// 设置键(Key)的反序列化器为StringDeserializer,用于将字节数据转换为String类型properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());// 设置值(Value)的反序列化器为StringDeserializer,用于将字节数据转换为String类型properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());// 设置消费者所属的消费者组,消费者组内的消费者将共同消费同一个Topic的消息properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");// 设置消费者与Kafka集群之间的会话超时时间(单位:毫秒)properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000);// 设置消费者是否自动提交offset,true表示自动提交properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);// 设置自动提交offset的时间间隔(单位:毫秒)properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000);// 设置每次poll操作返回的最大记录数properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,1);// 根据配置属性创建Kafka消费者实例return new KafkaConsumer<>(properties);
}

2.Kafka消费者代码

@Test
void KafkaConsumerTest() {// 创建Kafka消费者实例,通过getCustomer()方法获取KafkaConsumer<String, String> consumer = kafkaCustomer.getCustomer();// 订阅要消费的主题,这里是 "test-topic"consumer.subscribe(Collections.singletonList("test-topic"));// 从Kafka服务器拉取消息,poll等待的最长时间设置为10秒(10000000毫秒)ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(10000000));for (ConsumerRecord<String, String> record : records) {// 处理消息的逻辑// 打印消息的offset、key和valueSystem.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());//以下代码是我的场景,本人需要在某些情况跳转,而编写单元测试做试验的。boolean flag = true;if (flag){// 如果flag为true,则不自动提交offset,可以在这里添加业务逻辑处理消息// 如果需要手动提交offset,可以取消注释下面的代码// consumer.commitAsync();// 由于flag为true,这里会跳出循环,不再处理后续的消息break;}}// 关闭消费者,释放资源consumer.close();// 打印结束消费的日志System.out.println("结束消费");
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/59046.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

leetCode 739.每日温度

题目 给定一个整数数组 temperatures &#xff0c;表示每天的温度&#xff0c;返回一个数组 answer &#xff0c;其中 answer[i] 是指对于第 i 天&#xff0c;下一个更高温度出现在几天后。如果气温在这之后都不会升高&#xff0c;请在该位置用 0 来代替。 示例 1: 输入: te…

el-select下拉菜单虚拟列表化(含搜索功能)

需求简介 vue2element-ui项目中&#xff0c;当el-select中数据量较大时&#xff0c;会导致页面加载和渲染卡顿。在现在的el-select的基础上使用分页或者虚拟列表的形式去处理大量的下拉菜单&#xff0c;保证页面的正常渲染及el-select的正常回显。 需求分析 主要涉及几个点&…

DPPE-N3中叠氮基团使得DPPE-N3能够与含有炔基的材料在铜离子的催化下发生点击化学反应,生成稳定的1,2,3-三唑环结构,2252461-33-7

一、基本信息 英文名称&#xff1a;DPPE-N3&#xff0c;DPPE-Azide 中文名称&#xff1a;DPPE-叠氮 CAS号&#xff1a;2252461-33-7 分子式&#xff1a;C43H83N4O9P 分子量&#xff1a;831.13 供应商&#xff1a;陕西新研博美生物科技 结构式&#xff1a; 二、结构特点…

算法学习第一弹——C++基础

早上好啊&#xff0c;大佬们。来看看咱们这回学点啥&#xff0c;在前不久刚出完C语言写的PTA中L1的题目&#xff0c;想必大家都不过瘾&#xff0c;感觉那些题都不过如此&#xff0c;所以&#xff0c;为了我们能更好的去处理更难的题目&#xff0c;小白兔决定奋发图强&#xff0…

[AcWing算法基础课]动态规划之01背包

题目链接&#xff1a;01背包 有 N 件物品和一个容量是 V 的背包。每件物品只能使用一次。第 i 件物品的体积是 vi&#xff0c;价值是 wi。求解将哪些物品装入背包&#xff0c;可使这些物品的总体积不超过背包容量&#xff0c;且总价值最大。输出最大价值。 首先&#xff0c;我们…

【机器学习】机器学习中用到的高等数学知识

机器学习是一个跨学科领域&#xff0c;涉及多种高等数学知识。以下是一些在机器学习中常用的数学概念和技术&#xff1a; 1. 线性代数 (Linear Algebra) 向量和矩阵&#xff1a;用于表示数据集和特征。矩阵运算&#xff1a;加法、乘法和逆矩阵等&#xff0c;用于计算模型参数…

FreeRTOS 24:事件组EventGroup等待、清零、获取操作

等待事件标志位xEventGroupWaitBits() 既然标记了事件的发生&#xff0c;那么我怎么知道他到底有没有发生&#xff0c;这也是需要一个函数来获 取 事 件 是 否 已 经 发 生 &#xff0c; FreeRTOS 提 供 了 一 个 等 待 指 定 事 件 的 函 数 — — xEventGroupWaitBits()&…

世界坐标和Local坐标的区分

TargetPos.position(-TargetPos.forward*-4)TargetPos.up*7 这是相对于TargetPos的位置进行偏移&#xff0c; 动态的与Target的本地坐标改变 new Vector3(TargetPos.position.x, TargetPos.position.y 7, TargetPos.position.z - 5) 这个是直接new了一个世界坐标的Vector3 &…

Ubuntu 22.04 的Python3.11.8 安装

背景 新版本的Python需要更高版本的OpenSSL 依赖。使用操作系统的SSL不然会提示缺少SSL的报错。 部署 ## Openssl部署 wget https://github.com/openssl/openssl/releases/download/openssl-3.4.0/openssl-3.4.0.tar.gz## ./config --prefix/usr/local/openssl make &&…

在Ubuntu下安装RabbitMQ、添加一个新的登录用户并设置密码

在Ubuntu下安装RabbitMQ、添加一个新的登录用户并设置密码 在Ubuntu下安装RabbitMQ可以按照以下步骤进行&#xff1a;步骤 1: 更新系统步骤 2: 安装Erlang步骤 3: 添加RabbitMQ仓库步骤 4: 更新APT索引并安装RabbitMQ步骤 5: 启动RabbitMQ服务步骤 6: 检查RabbitMQ状态步骤 7: …

nacos单机源码解析-服务发现和心跳检测

目录 1 服务发现 1.1 客户端 1.1.1 入口 1.1.2 定时拉取 1.1.3 总结 1.2 服务端 2 心跳检测 2.1 客户端 2.2 服务端 2.2.1 处理心跳请求 2.2.2 开启定时任务进行心跳检测 2.2.3 总结 1 服务发现 服务列表&#xff1a;Nacos 维护一个服务列表&#xff0c;记录所有已注…

在线绘制带community的蛋白质-蛋白质相互作用(PPI)网络图

导读&#xff1a;分子相互作用网络图揭示了细胞内部分子间的复杂相互作用。通过识别网络中密集连接的节点所形成的社区&#xff08;community&#xff09;&#xff0c;可以揭示它们之间以前未知的功能联系。这些社区可能代表了具有共同功能的功能模块&#xff0c;对于理解细胞生…

【商城系统搭建流程】

商城系统的搭建流程可以分为以下几个步骤&#xff1a; 1.需求分析&#xff1a;确定商城系统的功能和特性&#xff0c;例如商品展示、购物车、订单管理、支付等。 2.系统设计&#xff1a;根据需求分析结果设计商城系统的架构&#xff0c;包括前端页面设计和后端数据库设计。 …

qt QTableView详解

1、概述 QTableView 是 Qt 框架中的一个高级视图类&#xff0c;用于以表格形式展示二维数据。它基于 QAbstractItemView&#xff0c;并与模型&#xff08;通常是 QAbstractTableModel 或 QStandardItemModel&#xff09;结合使用&#xff0c;以实现数据的展示和交互。QTableVi…

Orleans集群及Placement设置

服务端界面使用相同的clusterid和serviceid&#xff0c;相同ip地址&#xff0c;不同网关端口号和服务端口号&#xff0c;启动两个silo服务&#xff0c;并使用MySql数据库做Silo间信息同步&#xff0c;实现集群。 silo服务启动代码如下&#xff08;从nuget下载Microsoft.Orleans…

OceanBase 4.3.3 功能解析:列存副本

OceanBase 从4.3.0 版本开始&#xff0c;引入了列式存储的支持。用户可以根据业务的具体需求&#xff0c;选择创建列存表、行存表或是行列混存表。无论选择哪种表类型&#xff0c;在不同的Zone内&#xff0c;租户使用的副本模式都是一致的。详见官网文档&#xff1a; https://w…

【Linux】 IPC 进程间通信(三)(消息队列 信号量)

&#x1f4c3;个人主页&#xff1a;island1314 &#x1f525;个人专栏&#xff1a;Linux—登神长阶 ⛺️ 欢迎关注&#xff1a;&#x1f44d;点赞 &#x1f442;&#x1f3fd;留言 &#x1f60d;收藏 &#x1f49e; &#x1f49e; &#x1f49e; 一、消息队列 &#x1f48c;…

如何管理PHP API版本

管理PHP API版本是确保API稳定性和兼容性的关键步骤。以下是一些有效的PHP API版本管理方法&#xff1a; 一、使用命名空间和类库 在PHP中&#xff0c;可以通过命名空间和类库来实现API版本的管理。通过为不同版本的API创建不同的命名空间&#xff0c;可以将它们隔离开来&…

Docker:镜像构建 DockerFile

Docker&#xff1a;镜像构建 DockerFile 镜像构建docker build DockerfileFROMCOPYENVWORKDIRADDRUNCMDENTRYPOINTUSERARGVOLUME 镜像构建 在Docker官方提供的镜像中&#xff0c;大部分都是基础镜像&#xff0c;他们只提供某个简单的功能&#xff0c;如果想要一个功能更加丰富…

遥控器数图控链路系统核心技术+算法详解

一、核心技术 无线通信技术 遥控器数图控链路系统主要基于无线通信技术进行数据传输。通过特定的调制、编码和信号处理技术&#xff0c;将遥控器的操作指令转化为无线电信号&#xff0c;并传输给被控制设备。被控制设备接收到信号后&#xff0c;再将其解码为可识别的指令&…