java 中 main 方法使用 KafkaConsumer 拉取 kafka 消息如何禁止输出 debug 日志

pom 依赖:

<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.5.14.RELEASE</version>
</dependency>

 或者

<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.5.1</version>
</dependency>

 ps:前面的 spring-kafka 依赖中已经包含了后面的 kafka-clients

KafkaConsumerDemo.java:


import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.LoggerFactory;import ch.qos.logback.classic.Level;
import ch.qos.logback.classic.LoggerContext;import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.*;
import java.util.logging.Logger;public class KafkaConsumerDemo {static Map<String,Object> properties = new HashMap<String,Object>();private static KafkaConsumer kafkaConsumer = null;/**** windows 环境需要将下面 8 行添加到 "C:\Windows\System32\drivers\etc\hosts" 文件中:*      xxx.xxx.xxx.xxx1 xxx-data01*      xxx.xxx.xxx.xxx2 xxx-data02*      xxx.xxx.xxx.xxx3 xxx-data03*      xxx.xxx.xxx.xxx4 xxx-data04*      xxx.xxx.xxx.xxx5 xxx-data05*      xxx.xxx.xxx.xxx6 xxx-data06*      xxx.xxx.xxx.xxx7 xxx-data07*      xxx.xxx.xxx.xxx8 xxx-data08* @param args*/public static void main(String[] args) {// 禁止控制台输出一些 org.apache.kafka.xxx 相关的日志LoggerContext loggerContext = (LoggerContext) LoggerFactory.getILoggerFactory();loggerContext.getLogger("org.apache.kafka.clients.consumer.internals.ConsumerCoordinator").setLevel(Level.OFF);loggerContext.getLogger("org.apache.kafka.clients.FetchSessionHandler").setLevel(Level.OFF);loggerContext.getLogger("org.apache.kafka.clients.consumer.internals.Fetcher").setLevel(Level.OFF);loggerContext.getLogger("org.apache.kafka.clients.consumer.internals.AbstractCoordinator").setLevel(Level.OFF);loggerContext.getLogger("org.apache.kafka.clients.NetworkClient").setLevel(Level.OFF);loggerContext.getLogger("org.apache.kafka.common.network.Selector").setLevel(Level.OFF);loggerContext.getLogger("org.apache.kafka.clients.consumer.internals.OffsetsForLeaderEpochClient").setLevel(Level.OFF);loggerContext.getLogger("org.apache.kafka.clients.Metadata").setLevel(Level.OFF);loggerContext.getLogger("org.apache.kafka.clients.consumer.KafkaConsumer").setLevel(Level.OFF);loggerContext.getLogger("org.apache.kafka.common.utils.AppInfoParser").setLevel(Level.OFF);loggerContext.getLogger("org.apache.kafka.clients.consumer.KafkaConsumer").setLevel(Level.OFF);loggerContext.getLogger("org.apache.kafka.clients.consumer.ConsumerConfig").setLevel(Level.OFF);properties.put("bootstrap.servers","127.0.0.1:9192,127.0.0.1:9192,127.0.0.1:9192");  // 指定 Brokerproperties.put("group.id", "11111111111111111111111");              // 指定消费组群 ID,为防止自己启动拉取消息导致其他生产环境的消费者无法消费该消息,请设置一个绝对不重复的值,以起到隔离的作用properties.put("max.poll.records", "1000");// todo 设置可批量拉取???properties.put("enable.auto.commit", "false");properties.put("key.deserializer", StringDeserializer.class); // 将 key 的字节数组转成 Java 对象properties.put("value.deserializer", StringDeserializer.class);  // 将 value 的字节数组转成 Java 对象kafkaConsumer = new KafkaConsumer<String, String>(properties);// List<String> topics = queryAllTopics( consumer );kafkaConsumer.subscribe( Collections.singletonList( "ods_carbon_rfid_device_record" ) );  // 订阅主题 order-eventsnew Thread(new Runnable() {@Overridepublic void run() {receiveMessage();}}).start();}/*** 查询全部的主题(topic)列表* @param kafkaConsumer* @return*/private static List<String> queryAllTopics(KafkaConsumer kafkaConsumer) {if( kafkaConsumer == null ){return null;}Map<String, List<PartitionInfo>> map = kafkaConsumer.listTopics();if( map == null ){return null;}return new ArrayList<String>( map.keySet() );}public static void receiveMessage() {try {while ( true ){synchronized (KafkaConsumerDemo.class) {// ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(Long.MAX_VALUE));// 30L 表示超时时间为 30秒,有消息立即返回,没消息最多等 30 秒后返回SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");ConsumerRecords<String,String> records = kafkaConsumer.poll(Duration.ofSeconds(30L));String date = sdf.format(new Date());if( records == null ){System.out.println( date + " 本次未拉取到任何消息" );}else {System.out.println( date + " 本次拉取到 " + records.count() + " 条消息" );int i = 1;for (ConsumerRecord<String,String> record: records) {String info = String.format("[Topic: %s][Partition:%d][Offset:%d][Key:%s][Message:%s]", record.topic(), record.partition(), record.offset(), record.key(), record.value());System.out.println( "第" + i + "条消息:" + info );i++;}kafkaConsumer.commitSync();}/*** 当你用 KafkaConsumer从Kafka里读取消息并且处理完后,commitSync 方法会帮你把这些消息的处理进度(也就是偏移量 offset )同步地告诉 Kafka 服务器。* 这样,Kafka 就知道你已经处理到哪儿了。如果消费者(也就是读取消息的程序)突然崩溃或者重启,Kafka 就能根据最后一次提交的偏移量,让你从上一次处理* 完的地方继续开始,而不会漏掉或者重复处理消息。* 简单来说,commitSync 方 法就是用来“保存进度”的,确保消息处理的可靠性和顺序性。*/// Thread.sleep( 5000L );}}} catch (Exception e){e.printStackTrace();} finally {kafkaConsumer.close();}}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/65823.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

车联网安全--TLS握手过程详解

目录 1. TLS协议概述 2. 为什么要握手 2.1 Hello 2.2 协商 2.3 同意 3.总共握了几次手&#xff1f; 1. TLS协议概述 车内各ECU间基于CAN的安全通讯--SecOC&#xff0c;想必现目前多数通信工程师们都已经搞的差不多了&#xff08;不要再问FvM了&#xff09;&#xff1b;…

RuoYi Cloud项目解读【四、项目配置与启动】

四、项目配置与启动 当上面环境全部准备好之后&#xff0c;接下来就是项目配置。需要将项目相关配置修改成当前相关环境。 1 后端配置 1.1 数据库 创建数据库ry-cloud并导入数据脚本ry_2024xxxx.sql&#xff08;必须&#xff09;&#xff0c;quartz.sql&#xff08;可选&…

C#对象池

一、资源管理的困境与破局 在软件开发的征程中&#xff0c;我们时常陷入资源管理的泥沼。以一个繁忙餐厅为例&#xff0c;每个顾客都急需一个盘子盛美食&#xff0c;可盘子数量有限&#xff0c;如果每次顾客用完盘子后&#xff0c;都不假思索地去清洗一个全新的盘子来供下一位…

Vue.js组件开发-如何使用moment.js

在Vue.js组件开发中&#xff0c;需要处理日期和时间&#xff0c;moment.js 是一个非常有用的库。moment.js 提供了丰富的API来解析、验证、操作和显示日期和时间。 步骤&#xff1a; 1. 安装moment.js 首先&#xff0c;需要通过npm或yarn安装moment.js。在项目根目录下运行以…

微信小程序mp3音频播放组件,仅需传入url即可

// index.js // packageChat/components/audio-player/index.js Component({/*** 组件的属性列表*/properties: {/*** MP3 文件的 URL*/src: {type: String,value: ,observer(newVal, oldVal) {if (newVal ! oldVal && newVal) {// 如果 InnerAudioContext 已存在&…

要避免除数绝对值远远小于被除数绝对值的除法

要避免除数绝对值远远小于被除数绝对值的除法 用绝对值小的数作除数&#xff0c;舍人误差会增大&#xff0c;如计算 x y \frac xy yx​,若 0 < ∣ y ∣ < ∣ x ∣ 0<|y|<|x| 0<∣y∣<∣x∣&#xff0c;则可能对计算结果带来严重影响&#xff0c;应尽量避免…

深入了解OpenStack中的隧道网络

在OpenStack环境中&#xff0c;隧道网络是一项关键技术&#xff0c;它确保了虚拟机之间以及虚拟机与外部网络之间的安全通信。通过隧道机制&#xff0c;我们可以有效地隔离不同租户的流量&#xff0c;并支持多租户环境下的复杂网络需求。之前我们介绍了隧道网络&#xff0c;下面…

4. scala高阶之隐式转换与泛型

背景 上一节&#xff0c;我介绍了scala中的面向对象相关概念&#xff0c;还有一个特色功能&#xff1a;模式匹配。本文&#xff0c;我会介绍另外一个特别强大的功能隐式转换&#xff0c;并在最后介绍scala中泛型的使用 1. 隐式转换 Scala提供的隐式转换和隐式参数功能&#…

pandas与sql对应关系【帮助sql使用者快速上手pandas】

本页旨在提供一些如何使用pandas执行各种SQL操作的示例&#xff0c;来帮助SQL使用者快速上手使用pandas。 目录 SQL语法一、选择SELECT1、选择2、添加计算列 二、连接JOIN ON1、内连接2、左外连接3、右外连接4、全外连接 三、过滤WHERE1、AND2、OR3、IS NULL4、IS NOT NULL5、B…

第432场周赛:跳过交替单元格的之字形遍历、机器人可以获得的最大金币数、图的最大边权的最小值、统计 K 次操作以内得到非递减子数组的数目

Q1、跳过交替单元格的之字形遍历 1、题目描述 给你一个 m x n 的二维数组 grid&#xff0c;数组由 正整数 组成。 你的任务是以 之字形 遍历 grid&#xff0c;同时跳过每个 交替 的单元格。 之字形遍历的定义如下&#xff1a; 从左上角的单元格 (0, 0) 开始。在当前行中向…

《探索鸿蒙Next上开发人工智能游戏应用的技术难点》

在科技飞速发展的当下&#xff0c;鸿蒙Next系统为应用开发带来了新的机遇与挑战&#xff0c;开发一款运行在鸿蒙Next上的人工智能游戏应用更是备受关注。以下是在开发过程中可能会遇到的一些技术难点&#xff1a; 鸿蒙Next系统适配性 多设备协同&#xff1a;鸿蒙Next的一大特色…

Harry技术添加存储(minio、aliyun oss)、短信sms(aliyun、模拟)、邮件发送等功能

Harry技术添加存储&#xff08;minio、aliyun oss&#xff09;、短信sms&#xff08;aliyun、模拟&#xff09;、邮件发送等功能 基于SpringBoot3Vue3前后端分离的Java快速开发框架 项目简介&#xff1a;基于 JDK 17、Spring Boot 3、Spring Security 6、JWT、Redis、Mybatis-P…

Vue2: el-table为每一行添加超链接,并实现光标移至文字上时改变形状

为表格中的某一列添加超链接 一个表格通常有许多列,网上许多教程都可以实现为某一列添加超链接,如下,实现了当光标悬浮在“姓名”上时,改变为手形,点击可实现跳转。 <el-table :data="tableData"><el-table-column label="姓名" prop=&quo…

R数据分析:多分类问题预测模型的ROC做法及解释

有同学做了个多分类的预测模型,结局有三个类别,做的模型包括多分类逻辑回归、随机森林和决策树,多分类逻辑回归是用ROC曲线并报告AUC作为模型评估的,后面两种模型报告了混淆矩阵,审稿人就提出要统一模型评估指标。那么肯定是统一成ROC了,刚好借这个机会给大家讲讲ROC在多…

A3. Springboot3.x集成LLama3.2实战

本文将介绍集成ollama官网提供的API在Springboot工程中进行整合。由于没找到java-llama相关合适的sdk可以使用,因此只好对接官方给出的API开发一套RESTFull API服务。下面将从Ollama以下几个API展开介绍,逐渐的了解其特性以及可以干些什么。具体llama API说明可参数我前面写的…

面试:类模版中函数声明在.h,定义在.cpp中,其他cpp引用引入这个头文件,会有什么错误?

1、概述 类模版中函数声明在.h&#xff0c;定义在.cpp中&#xff0c;其他cpp引用引入这个头文件&#xff0c;会有什么错误?报编译错误&#xff1a;error C2512: Demo<int>: no appropriate default constructor available 举例如下代码&#xff1a;demo.h 声明模版类 …

记一次学习skynet中的C/Lua接口编程解析protobuf过程

1.引言 最近在学习skynet过程中发现在网络收发数据的过程中数据都是裸奔&#xff0c;就想加入一种数据序列化方式&#xff0c;json、xml简单好用&#xff0c;但我就是不想用&#xff0c;于是就想到了protobuf&#xff0c;对于protobuf C/C的使用个人感觉有点重&#xff0c;正好…

SQLAlchemy

https://docs.sqlalchemy.org.cn/en/20/orm/quickstart.htmlhttps://docs.sqlalchemy.org.cn/en/20/orm/quickstart.html 声明模型 在这里&#xff0c;我们定义模块级构造&#xff0c;这些构造将构成我们从数据库中查询的结构。这种结构被称为 声明式映射&#xff0c;它同时定…

Trimble自动化激光监测支持历史遗产实现可持续发展【沪敖3D】

故事桥&#xff08;Story Bridge&#xff09;位于澳大利亚布里斯班&#xff0c;建造于1940年&#xff0c;全长777米&#xff0c;横跨布里斯班河&#xff0c;可载汽车、自行车和行人往返于布里斯班的北部和南部郊区。故事桥是澳大利亚最长的悬臂桥&#xff0c;是全世界两座手工建…

CentOS 和 Ubantu你该用哪个

文章目录 **一、CentOS 和 Ubuntu 的详细介绍****1. CentOS****1.1 基本信息****1.2 特点****1.3 缺点** **2. Ubuntu****2.1 基本信息****2.2 特点****2.3 缺点** **二、CentOS 和 Ubuntu 的异同****1. 相同点****2. 不同点****3. 使用体验对比** **三、总结和选择建议** Cent…