golang kafka client 消费者代码

启动kafka

[root@localhost kafka_2.12-2.5.1] # [kube:] cat start_zk.sh 
./bin/zookeeper-server-start.sh config/zookeeper.properties > /dev/null &
[root@localhost kafka_2.12-2.5.1] # [kube:] cat start_kafka.sh 
./bin/kafka-server-start.sh config/server.properties > /dev/null &[root@localhost kafka_2.12-2.5.1] # [kube:] netstat -lntup| grep java
tcp6       0      0 :::9092                 :::*                    LISTEN      8458/java           
tcp6       0      0 :::54948                :::*                    LISTEN      8104/java           
tcp6       0      0 :::2181                 :::*                    LISTEN      8104/java           
tcp6       0      0 :::60903                :::*                    LISTEN      8458/java

创建 topic

创建主题
bin/kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --replication-factor 1 --partitions 3 --topic test查看主题信息
bin/kafka-topics.sh --describe --zookeeper 127.0.0.1:2181 --topic test生产者生产消息
bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test
package mainimport ("context""fmt""github.com/Shopify/sarama"
)func main() {config := sarama.NewConfig()config.Version = sarama.V2_0_0_0 // kafka消费者组最低版本 V0_10_2_0config.Consumer.Offsets.Initial = sarama.OffsetNewestconfig.Consumer.Offsets.AutoCommit.Enable = false // 关闭自动提交config.Consumer.Return.Errors = true// config.Consumer.Group.Rebalance.Strategy = &balanceStrategy{}  支持自定义消费者重平衡策略group, err := sarama.NewConsumerGroup([]string{"localhost:9092"}, "g1", config)if err != nil {panic(err)}defer func() { _ = group.Close() }()// Track errorsgo func() {for err := range group.Errors() {fmt.Println("ERROR", err)}}()// Iterate over consumer sessions.ctx := context.Background()for {// `Consume` should be called inside an infinite loop, when a// server-side rebalance happens, the consumer session will need to be// recreated to get the new claimserr := group.Consume(ctx, []string{"test"}, &exampleConsumerGroupHandler{})if err != nil {panic(err)}}
}type exampleConsumerGroupHandler struct{}func (exampleConsumerGroupHandler) Setup(se sarama.ConsumerGroupSession) error {fmt.Printf("Setup %q %+v\n", se.MemberID(), se.Claims())return nil
}
func (exampleConsumerGroupHandler) Cleanup(se sarama.ConsumerGroupSession) error {fmt.Printf("Cleanup %q %+v\n", se.MemberID(), se.Claims())return nil
}
func (h exampleConsumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {for msg := range claim.Messages() {fmt.Printf("ConsumeClaim Message topic:%q partition:%d offset:%d ts:%s val:%s\n",msg.Topic, msg.Partition, msg.Offset, msg.Timestamp, msg.Value)sess.MarkMessage(msg, "")sess.Commit()}return nil
}

查看消费者组的消费情况
CURRENT-OFFSET:当前消费位移
LOG-END-OFFSET:最新可消费位移
LAG:消息堆积数量
LAG越大,堆积数越多

[root@localhost kafka_2.12-2.5.1] # [kube:] bin/kafka-consumer-groups.sh --describe --bootstrap-server 127.0.0.1:9092 --group g1 GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                 HOST             CLIENT-ID
g1              test            0          10              10              0               sarama-888ca2a0-05e9-43dd-a99a-6a02c8b10c00 /0:0:0:0:0:0:0:1 sarama
g1              test            1          15              15              0               sarama-888ca2a0-05e9-43dd-a99a-6a02c8b10c00 /0:0:0:0:0:0:0:1 sarama
g1              test            2          18              18              0               sarama-28558d3b-0c75-4b0d-a473-ced5e1b51fb3 /0:0:0:0:0:0:0:1 sarama
[root@localhost kafka_2.12-2.5.1] # [kube:] 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/763032.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

go执行go mod tidy时报错连接失败(go换依赖源)

报错: go: cloud.google.com/gov0.41.0: Get "https://proxy.golang.org/cloud.google.com/go/v/v0.41.0.mod": dial tcp 142.251.43.17:443: connectex: A connection attempt failed because the connected party did not properly respond after a pe…

ITK零碎笔记

1、ITK提取等高线&#xff08;区域轮廓&#xff09; itk::ContourExtractor2DImageFilter typedef itk::ContourExtractor2DImageFilter<FSliceType> ContourExtractorType; ContourExtractorType::Pointer contourFilter ContourExtractorType::New(); contourFilter-…

leetcode 2617. 网格图中最少访问的格子数【单调栈优化dp+二分】

原题链接&#xff1a;2617. 网格图中最少访问的格子数 题目描述&#xff1a; 给你一个下标从 0 开始的 m x n 整数矩阵 grid 。你一开始的位置在 左上角 格子 (0, 0) 。 当你在格子 (i, j) 的时候&#xff0c;你可以移动到以下格子之一&#xff1a; 满足 j < k < gri…

蓝桥杯算法练习系统—金属采集(树形dp)

问题描述 人类在火星上发现了一种新的金属&#xff01;这些金属分布在一些奇怪的地方&#xff0c;不妨叫它节点好了。一些节点之间有道路相连&#xff0c;所有的节点和道路形成了一棵树。一共有 n 个节点&#xff0c;这些节点被编号为 1~n 。人类将 k 个机器人送上了火星&…

谷歌地球三维模型

收费工具&#xff0c;白嫖党勿扰 收费金额2000元 0 概述 我也不知道为什么&#xff0c;之前发的谷歌地球三维模型相关的博客&#xff0c;被CSDN屏蔽&#xff0c;我问了客服&#xff0c;客服回答&#xff0c;他也不知道什么原因… 1 折中方案 同学们可以看这篇博客&#xff0…

【网络】:IP协议

IP协议 一.IP报头二.网段划分三.IP地址数量限制四.私有IP地址和公有IP地址五.路由 IP协议就是让数据有能力进行跨网络传输。 一.IP报头 4位版本号(version): 指定IP协议的版本, 对于IPv4来说, 就是4.4位头部长度(header length): IP头部的长度是多少个32bit, 也就是 length * …

【CMake】所见所闻所学

Note: 本贴仅记录遇到的CMake的问题&#xff0c;以问题为驱动。 - cmake_minimum_required - project - add_executable - target_include_directories - ExternalProject_Add ExternalProject_Add 是 CMake 中用于管理和构建外部项目的模块。通过 ExternalProject_Add&…

海外客户获取难?海外云手机助力电商引流!

海外电商面临的市场竞争激烈&#xff0c;如何在海外市场获客成为了摆在许多卖家面前的难题。而在这个问题的解决方案中&#xff0c;海外云手机崭露头角&#xff0c;成为助力电商引流的新利器。 在当前市场中&#xff0c;云手机主要用于游戏挂机&#xff0c;但其潜力在海外电商领…

WebGIS航线编辑器(无人机航线规划)

无人机航点、航线规划&#xff0c;实现全自动航点飞行作业及飞行航拍。禁飞区、作业区功能保障飞行安全。 GIS引擎加载 const viewer new Cesium.Viewer("cesiumContainer", { imageryProvider: new Cesium.IonImageryProvider({ assetId: 3872 }), }); const im…

License授权的基本思路

前言 对于收费软件&#xff0c;一般是我们需要去购买一个许可&#xff0c;然后输入这个许可到软件里就能够使用软件。 于是有的小伙伴就开始好奇这个许可是怎么实现的&#xff0c;特别是在离线情况下它是怎么给软件授权&#xff0c;同时又能避免被破解的。 License 内容 一个…

碳素光线疗法——动,植物 光育实验

碳素光线疗法——动&#xff0c;植物 光育实验 碳素光线疗法&#xff1a; 中西医、民间疗法融为一体&#xff0c;提高机体自身治愈力&#xff0c;免疫力&#xff0c;改善体质和保持健康&#xff0c;有助于疾病的预防和治疗的疗法。不吃药、不打针、不手术也能得健康&#xff0c…

【笔记】以论文发表形式通俗理解 TCP/IP模型

【笔记】以论文发表形式通俗理解 TCP/IP模型 前言TCP/IP模型理论通俗理解 前言 在网络基础学习过程中&#xff0c;以前只对TCP/IP理解个字面&#xff0c;网上查一下能知道个字面意思&#xff0c;但是连起来到底是什么意思&#xff0c;还是一知半解的&#xff0c;停留在表面&am…

springboot mail 发送163邮件基础配置操作

# 发送邮件配置 spring.mail.protocolsmtps # 配置 smtp 服务器地址 spring.mail.hostsmtp.163.com #服务器的端口 spring.mail.port465 # 配置邮箱用户名 spring.mail.usernamexxx163.com # 配置申请到的授权码(刚让复制的授权码) spring.mail.passwordxxx # 配置邮件编码 spr…

算法-图的广度优先搜索,图的深度优先搜索

1.图的广度优先搜索 (1). 定义 图的广度优先搜索&#xff08;Breadth-First Search, BFS&#xff09;是一种用于遍历或搜索树或图的算法。这个算法从图的某一节点&#xff08;源节点&#xff09;开始&#xff0c;探索最靠近源节点的节点&#xff0c;然后是一层一层地向外扩展&a…

蓝桥杯B组C++省赛 全球变暖【bfs】

题目描述&#xff1a; 你有一张某海域NxN像素的照片&#xff0c;"."表示海洋、"#"表示陆地&#xff0c;如下所示&#xff1a; ....... .##.... .##.... ....##. ..####. ...###. ....... 其中"上下左右"四个方向上连在一起的一片陆地组成一座…

SurfaceFlinger的面试题目

题目&#xff1a;简述SurfaceFlinger在Android系统中的作用是什么&#xff1f; **答案&#xff1a;**SurfaceFlinger是Android图形系统的核心组件之一&#xff0c;它负责合成各个应用程序产生的图形缓冲区内容&#xff0c;并将最终的图像呈现到屏幕上。SurfaceFlinger管理多个S…

211本科搞前端好找工作吗?AI程序员对程序员的影响有多大?

211本科去搞前端好找工作吗? 最近看到网友有问到这个问题,211本科搞前端好找工作吗?那么我根据自己的工作经历来深度聊聊网上的这个问题。 本人计算机硕士,在前端领域干了7年。我毕业后先后在上市国企,上市云计算公司,现在在一家正在IPO的公司,做新能源无人驾驶赛道,…

EDIUS11新版中国首发!新增多个AI功能比Sora更强!

2024年2月27日&#xff0c;中国苏州 - 著名专业视频剪辑工具EDIUS正式推出中国地区的最新版本EDIUS11。这次发布距上一版已有3年时间&#xff0c;EDIUS11带来了一系列创新&#xff0c;开发商Grass Valley团队引入了多项人工智能功能&#xff0c;涵盖特效、调色、降噪等多个方面…

网络安全必修课:20个核心知识点大揭秘

1、什么是SQL注入攻击 概述 攻击者在 HTTP 请求中注入恶意的 SQL 代码&#xff0c;服务器使用参数构建数据库 SQL 命令时&#xff0c;恶意SQL 被一起构造&#xff0c;并在数据库中执行。 注入方法 用户登录&#xff0c;输入用户名 lianggzone&#xff0c;密码 ‘ or ‘1’’…

XSS漏洞及其工具Beef使用

XSS(Cross Site Scripting,跨站脚本漏洞)漏洞&#xff0c;又叫 CSS 漏洞&#xff0c;是最常见的 Web 应用程序漏洞。其主要原理是当动态页面中插入的内容含有特殊字符(如<)时&#xff0c;用户浏览器会将其误认为是插入了HTML 标签&#xff0c;当这些HTML标签引入了一段 Java…