【Kafka基础】消费者命令行完全指南:从基础到高级消费

Kafka消费者是消息系统的关键组成部分,掌握/export/home/kafka_zk/kafka_2.13-2.7.1/bin/kafka-console-consumer.sh工具的使用对于调试、测试和监控都至关重要。本文将全面介绍该工具的各种用法,帮助您高效地从Kafka消费消息。

1 基础消费模式

1.1 从最新位置消费

/export/home/kafka_zk/kafka_2.13-2.7.1/bin/kafka-console-consumer.sh \--bootstrap-server 192.168.10.33:9092 \--topic testtopic
参数解析
  • --bootstrap-server: 指定Kafka集群地址
  • --topic: 指定消费的主题名称
特点
  • 从该消费者组最后提交的offset开始消费
  • 如果没有提交记录,则从最新消息开始

1.2 从最早位置消费

/export/home/kafka_zk/kafka_2.13-2.7.1/bin/kafka-console-consumer.sh \--bootstrap-server 192.168.10.33:9092 \--topic testtopic \--from-beginning
关键参数
  • --from-beginning: 从主题最早的消息开始消费
应用场景
  • 数据回溯
  • 新消费者组初始化

2 消息元数据展示

2.1 显示消息Key

/export/home/kafka_zk/kafka_2.13-2.7.1/bin/kafka-console-consumer.sh \--bootstrap-server 192.168.10.33:9092 \--topic testtopic \--property print.key=true \--property key.separator=":"
参数说明
  • print.key=true: 显示消息key
  • key.separator: 指定key/value分隔符

2.2 显示完整元数据

/export/home/kafka_zk/kafka_2.13-2.7.1/bin/kafka-console-consumer.sh \--bootstrap-server 192.168.10.33:9092 \--topic testtopic \--property print.key=true \--property print.value=true \--property print.partition=true \--property print.offset=true \--property print.timestamp=true \--property key.separator=":" \--property line.separator="\n"
元数据参数
  • print.partition: 显示分区号
  • print.offset: 显示消息offset
  • print.timestamp: 显示时间戳

3 精准消费控制

3.1 指定分区消费

/export/home/kafka_zk/kafka_2.13-2.7.1/bin/kafka-console-consumer.sh \--bootstrap-server 192.168.10.33:9092 \--topic testtopic \--partition 1
参数说明
  • --partition: 指定消费的分区编号

3.2 指定Offset消费

/export/home/kafka_zk/kafka_2.13-2.7.1/bin/kafka-console-consumer.sh \--bootstrap-server 192.168.10.33:9092 \--topic testtopic \--partition 0 \--offset 1000
参数说明
  • --offset: 指定开始消费的offset位置

3.3 消费超时设置

/export/home/kafka_zk/kafka_2.13-2.7.1/bin/kafka-console-consumer.sh \--bootstrap-server 192.168.10.33:9092 \--topic testtopic \--timeout-ms 10000
参数说明
  • --timeout-ms: 设置无消息时的超时时间(毫秒)

4 消费者组管理

4.1 使用消费者组

/export/home/kafka_zk/kafka_2.13-2.7.1/bin/kafka-console-consumer.sh \--bootstrap-server 192.168.10.33:9092 \--topic testtopic \--group mygroup
参数说明
  • --group: 指定消费者组名称
特点
  • 支持offset自动提交
  • 支持消费者组rebalance

4.2 手动控制offset提交

/export/home/kafka_zk/kafka_2.13-2.7.1/bin/kafka-console-consumer.sh \--bootstrap-server 192.168.10.33:9092 \--topic testtopic \--group mygroup \--property enable.auto.commit=false
参数说明
  • enable.auto.commit=false: 关闭自动提交

5 高级消费配置

5.1 限制消费消息数

/export/home/kafka_zk/kafka_2.13-2.7.1/bin/kafka-console-consumer.sh \--bootstrap-server 192.168.10.33:9092 \--topic testtopic \--max-messages 100
参数说明
  • --max-messages: 最大消费消息数

5.2 过滤消费

/export/home/kafka_zk/kafka_2.13-2.7.1/bin/kafka-console-consumer.sh \--bootstrap-server 192.168.10.33:9092 \--topic testtopic \--property filter.key.regex="test.*"
参数说明
  • filter.key.regex: 按key正则过滤

5.3 消费速率控制

/export/home/kafka_zk/kafka_2.13-2.7.1/bin/kafka-console-consumer.sh \--bootstrap-server 192.168.10.33:9092 \--topic testtopic \--property fetch.min.bytes=1024 \--property fetch.max.wait.ms=500
参数说明
  • fetch.min.bytes: 最小拉取字节数
  • fetch.max.wait.ms: 最大等待时间

6 常用命令扩展

6.1 消费多个主题

/export/home/kafka_zk/kafka_2.13-2.7.1/bin/kafka-console-consumer.sh \--bootstrap-server 192.168.10.33:9092 \--whitelist "topic1|topic2"

6.2 显示消息头信息

/export/home/kafka_zk/kafka_2.13-2.7.1/bin/kafka-console-consumer.sh \--bootstrap-server 192.168.10.33:9092 \--topic testtopic \--property print.headers=true

6.3 消费特定时间后的消息

/export/home/kafka_zk/kafka_2.13-2.7.1/bin/kafka-console-consumer.sh \--bootstrap-server 192.168.10.33:9092 \--topic testtopic \--property offsets.storage=kafka \--property timestamp=1625097600000

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/77272.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

CausalML 基于机器学习算法的因果推理方法

CausalML 是一个 Python 包,它使用基于最新研究的机器学习算法提供一套提升建模和因果推理方法。它提供了一个标准界面,允许用户从实验或观察数据中估计条件平均处理效应 (CATE),也称为个体治疗效应 (ITE&a…

解锁深度学习激活函数

在深度学习的广袤天地里,激活函数宛如隐匿于神经网络架构中的神奇密码,掌控着模型学习与表达的关键力量。今天,就让我们一同深入探究这些激活函数的奇妙世界,揭开它们神秘的面纱。 一、激活函数为何不可或缺? 想象一…

从零到有的游戏开发(visual studio 2022 + easyx.h)

引言 本文章适用于C语言初学者掌握基本的游戏开发, 我将用详细的步骤引领大家如何开发属于自己的游戏。 作者温馨提示:不要认为开发游戏很难,一些基本的游戏逻辑其实很简单, 关于游戏的开发环境也不用担心,我会详细…

大数据专业学习路线

大数据专业学习路线 目录 基础知识核心技术进阶技能实战项目职业发展学习资源学习计划常见问题 1. 基础知识 1.1 编程语言 Python:大数据分析的基础语言 基础语法和数据类型函数和模块面向对象编程文件操作和异常处理常用库:NumPy, Pandas, Matplot…

flink部署使用(flink-connector-jdbc)连接达梦数据库并写入读取数据

flink介绍 1)Apache Flink 是一个框架和分布式处理引擎,用于对无界和有界数据流进行有状态计算。Flink 被设计在所有常见的集群环境中运行,以内存执行速度和任意规模来执行计算。 2)在实时计算或离线任务中,往往需要…

用swift playground写个ios应用和大模型或者网站交互

import SwiftUIstruct ContentView: View {State private var textFieldText: String ""State private var outputText: String "输出将会显示在这里"private let tip:String "消息已发送,请等待"State private var history:[Stri…

springboot+vue2集成JWT token实现权限验证

前端项目搭建参考: Vue项目的搭建和启动_vue项目启动 csdn-CSDN博客 Vue ElementUI 登录页面_vue用户登录页面-CSDN博客 跨域问题前端解决-CSDN博客 实现思路: 1. 实现的目的:为了保护网站安全信息,使用jwt进行权限验证&#xf…

Cursor编程-从入门到精通__0409

早期的Github Copilot 最近更新了,支持Agent编程,字节跳动Trae使用(免费),但成熟程度不如Cursor,Cursor前50次免费 Copilot VS Cursor*** 1,Cursor VSCode 二次开发,IDE级别 2&…

MyBatis 详解及代码示例

MyBatis 是一个 半自动 ORM 框架,主要用于 Java 与数据库之间的持久化操作,它本质是对 JDBC 的封装 全名:MyBatis(前身 iBATIS)核心作用:自动将 SQL 执行结果映射为 Java 对象;也可以将 Java 对…

1.6-抓包技术(Burp Suite\Yakit抓包\Web、APP、小程序)

1.6-抓包技术(Burp Suite\Yakit抓包\Web、APP、小程序) 如果要使用抓包软件,基本上第一步都是要安装证书的。原因如下: 客户端(浏览器或应用)会检测到证书不受信任,并弹出 证书错误&#xff0…

Java 大视界 -- 基于 Java 的大数据隐私保护在金融客户信息管理中的实践与挑战(178)

💖亲爱的朋友们,热烈欢迎来到 青云交的博客!能与诸位在此相逢,我倍感荣幸。在这飞速更迭的时代,我们都渴望一方心灵净土,而 我的博客 正是这样温暖的所在。这里为你呈上趣味与实用兼具的知识,也…

第十届 蓝桥杯 嵌入式 省赛

一、分析 这届的真题,有点像第七届的液位检测。 这届的题目开始,貌似比赛描述的功能,逻辑上变得更好梳理了。一开始就把大致的功能给你说明一遍,不像之前都是一块一块的说明。 1. 基本功能 1)测量竞赛板上电位器 R…

实现usb的MTP功能

前言:最终结果根据用户自主选择可实现host和device功能的切换。 效果展示: 当插入usb时设备会弹窗 当用户选择设备模式时pc端就会出现mtp设备盘符 实现mtp设备 ubuntu架构根文件系统通过uMTP-Responder实现usb的MTP功能 添加服务 /home/flynn/firfly_rootfs/lib/system…

React-05React中props属性(传递数据),propTypes校验,类式与函数式组件props的使用

1.类式组件props基本数据读取与解构运算符传递 <script type"text/babel">// 创建组件class PersonalInfo extends React.Component {render() {// 读取props属性 并读取值console.log(props,this.props);return(<ul><li>姓名&#xff1a;{this.p…

PCI认证 密钥注入 ECC算法工具 NID_secp521r1 国密算法 openssl 全套证书生成,从证书提取公私钥数组 x,y等

步骤 1.全套证书已经生成。OK 2.找国芯要ECC加密解密签名验签代码。给的逻辑说明没有示例代码很难的上。 3.集成到工具 与SP联调。 1.用openssl全套证书生成及验证 注意&#xff1a;这里CA 签发 KLD 证书用的是SHA256。因为芯片只支持SHA256算法,不支持SHA512。改成统一。…

蓝桥杯每日刷题c++

目录 P9240 [蓝桥杯 2023 省 B] 冶炼金属 - 洛谷 (luogu.com.cn) P8748 [蓝桥杯 2021 省 B] 时间显示 - 洛谷 (luogu.com.cn) P10900 [蓝桥杯 2024 省 C] 数字诗意 - 洛谷 (luogu.com.cn) P10424 [蓝桥杯 2024 省 B] 好数 - 洛谷 (luogu.com.cn) P8754 [蓝桥杯 2021 省 AB2…

oracle 数据库字段类型为NUMBER(5,2)时,并且数据库值为0.1,为什么Java执行SQL查出来时为“.1“?

在 Oracle 数据库中&#xff0c;当字段类型为 NUMBER(5,2) 且存储的值为 0.1 时&#xff0c;Java 程序查询结果可能显示为 ".1"&#xff08;省略前导零&#xff09;&#xff0c;这是由 Oracle JDBC 驱动默认的数字格式化行为 导致的。以下是原因分析和解决方案&#…

3月AI论文精选十篇

1. Feature-Level Insights into Artificial Text Detection with Sparse Autoencoders[1] 核心贡献&#xff1a;通过稀疏自编码器揭示AI生成文本的检测特征&#xff0c;提出基于特征分布的鉴别方法。研究发现&#xff0c;AI文本在稀疏编码空间中呈现独特的"高频低幅"…

STM32在裸机(无RTOS)环境下,需要手动实现队列机制来替代FreeRTOS的CAN发送接收函数

xQueueSendToBackFromISR(ecuCanRxQueue, hcan->pRxMsg, &xHigherPriorityTaskWoken)&#xff0c;xQueueReceive(mscCanRxQueue,&mscRxMsg,0)和xQueueSendToBack(mscCanTxQueue, &TxMessageTemp, 0 )这3个函数&#xff0c;在裸机下实现&#xff1a; 在裸机&…

使用PX4,gazebo,mavros为旋翼添加下视的相机(仿真采集openrealm数据集-第一步)

目录 一.方法一&#xff08;没成功&#xff09; 1.运行PX4 2.运行mavros通讯 3.启动仿真世界和无人机 &#xff08;1&#xff09;单独测试相机 &#xff08;2&#xff09;make px4_sitl gazebo启动四旋翼iris无人机 二.方法二&#xff08;成功&#xff09; 1.通过 rosl…