Kafka-go语言一命速通

记录

命令(终端操作kafka)

# 验证kafka是否启动
ps -ef | grep kafka # ps -ef 命令用于显示所有正在运行的进程的详细信息
lsof -i :9092# 启动kafka
brew services start zookeeper
brew services start kafka# 创建topic
kafka-topics --create --topic test --partitions 1 --replication-factor 1 --bootstrap-server localhost:9092
解释:kafka-topics:用于管理主题。–create:创建一个新的主题。–topic test:主题的名称为 test–partitions 1:有 1 个分区(partition)。–replication-factor 1:主题的副本因子为 1。表示没有冗余,数据仅存储在一个节点上。–bootstrap-server localhost:9092:localhost:9092 表示 Kafka 服务器运行在本地主机的 9092 端口。# 查看主题
kafka-topics --list --bootstrap-server localhost:9092#订阅(消费者) 新建一个终端,输入
kafka-console-consumer --bootstrap-server localhost:9092 --topic test --from-beginning#发布(生产者) 新建一个终端,输入
kafka-console-producer --bootstrap-server localhost:9092 --topic test# 删除Topic
kafka-topics --delete --topic test --bootstrap-server localhost:9092

代码操作Kafka

简单版本

生产者:

packagemainimport("github.com/IBM/sarama""log"
)funcmain() {config := sarama.NewConfig()config.Producer.RequiredAcks = sarama.WaitForAllconfig.Producer.Retry.Max = 5config.Producer.Return.Successes = trueproducer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, config)iferr != nil {log.Fatalf("Failed to start producer: %s", err)}deferproducer.Close()msg := &sarama.ProducerMessage{Topic: "test_topic",Value: sarama.StringEncoder("Hello, Kafka!"),}partition, offset, err := producer.SendMessage(msg)iferr != nil {log.Fatalf("Failed to send message: %s", err)}log.Printf("Message is stored in topic(%s)/partition(%d)/offset(%d)\n", "test_topic", partition, offset)
}

消费者

package mainimport ("fmt""github.com/IBM/sarama""log"
)func main() {config := sarama.NewConfig()config.Consumer.Return.Errors = true// 创建消费者consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, config)if err != nil {log.Fatalf("Failed to start consumer: %s", err)}defer consumer.Close()// 订阅 Kafka 主题partitionConsumer, err := consumer.ConsumePartition("test_topic", 0, sarama.OffsetNewest)if err != nil {log.Fatalf("Failed to start partition consumer: %s", err)}defer partitionConsumer.Close()// 消费消息for msg := range partitionConsumer.Messages() {log.Printf("Consumed message: %s, from partition(%d), offset(%d)\n", string(msg.Value), msg.Partition, msg.Offset)}
}

多点配置版本

生产者

packagemainimport("fmt""github.com/IBM/sarama""log""time"
)funcmain() {// 配置生产者config := sarama.NewConfig()config.Producer.RequiredAcks = sarama.WaitForAll // 等待所有副本确认config.Producer.Retry.Max = 5                    // 最大重试次数config.Producer.Return.Successes = true          // 返回成功的消息config.Producer.Return.Errors = true             // 返回失败的消息config.Producer.Timeout = 10 * time.Second       // 设置生产者的超时时间config.Net.MaxOpenRequests = 5                   // 控制最大请求数config.Version = sarama.V2_8_0_0                 // 配置 Kafka 版本(可根据实际 Kafka 版本调整)// 创建生产者实例producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, config)iferr != nil {log.Fatalf("Failed to start producer: %s", err)}deferproducer.Close()// 循环发送消息fori := 1; ; i++ {// 构造消息msg := &sarama.ProducerMessage{Topic: "test_topic",                                                                       // 目标主题Value: sarama.StringEncoder(fmt.Sprintf("Message #%d: Hello, Kafka! www.zpf0000.com", i)), // 动态生成消息内容}// 发送消息partition, offset, err := producer.SendMessage(msg)iferr != nil {// 错误处理:打印错误并继续发送下一条消息log.Printf("Failed to send message: %s", err)continue}// 成功发送消息后记录日志log.Printf("Message #%d is stored in topic(%s)/partition(%d)/offset(%d)", i, "test_topic", partition, offset)// 模拟消息生产间隔(例如每秒发送一条消息)time.Sleep(1 * time.Second)}
}

消费者

packagemainimport("github.com/IBM/sarama""log""os""os/signal""syscall""time"
)funcmain() {// 配置消费者config := sarama.NewConfig()config.Consumer.Return.Errors = true                  // 启用错误返回config.Consumer.Offsets.Initial = sarama.OffsetNewest // 从最新消息开始消费config.Version = sarama.V2_8_0_0                      // 配置 Kafka 版本(可根据实际 Kafka 版本调整)// 创建 Kafka 消费者实例consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, config)iferr != nil {log.Fatalf("Failed to start consumer: %s", err)}deferconsumer.Close()// 订阅主题的分区partitionConsumer, err := consumer.ConsumePartition("test_topic", 0, sarama.OffsetNewest)iferr != nil {log.Fatalf("Failed to start partition consumer: %s", err)}deferpartitionConsumer.Close()// 用于捕获系统信号(例如 Ctrl+C),在接收到信号时优雅地退出sigChan := make(chanos.Signal, 1)signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)// 用来跟踪消费者的状态,确保及时处理错误gofunc() {forerr := rangepartitionConsumer.Errors() {log.Printf("Error: %s", err.Error())}}()// 监听消息并处理log.Println("Consumer is ready, waiting for messages...")for{select{casemsg := <-partitionConsumer.Messages():// 打印收到的消息log.Printf("Received message: %s, from partition(%d), offset(%d)\n", string(msg.Value), msg.Partition, msg.Offset)// 处理消息(可以根据需求扩展处理逻辑)// 模拟消息处理时间time.Sleep(500 * time.Millisecond) // 例如处理消息需要 500 毫秒// 在这里,可以对消息进行确认或其他操作,例如处理完消息后将其存入数据库等case<-sigChan:// 捕获到退出信号,优雅退出log.Println("Received shutdown signal, exiting...")return}}
}

链接

https://cloud.tencent.com/developer/article/1547380 # 优质博客一篇

https://kafka1x.apachecn.org/documentation.html#producerapi # 官方中文文档

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/66521.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

sys.dm_exec_connections:查询与 SQL Server 实例建立的连接有关的信息以及每个连接的详细信息(客户端ip)

文章目录 引言I 基于dm_exec_connections查询客户端ip权限物理联接时间范围dm_exec_connections表see also: 监视SQL Server 内存使用量资源信号灯 DMV sys.dm_exec_query_resource_semaphores( 确定查询执行内存的等待)引言 查询历史数据库客户端ip应用场景: 安全分析缺乏…

阿里云发现后门webshell,怎么处理,怎么解决?

当收到如下阿里云通知邮件时&#xff0c;大部分管理员都会心里一惊吧&#xff01;出现Webshell&#xff0c;大概是网站被入侵了。 尊敬的 xxxaliyun.com&#xff1a; 云盾云安全中心检测到您的服务器&#xff1a;47.108.x.xx&#xff08;xx机&#xff09;出现了紧急安全事件…

【大模型】百度千帆大模型对接LangChain使用详解

目录 一、前言 二、LangChain架构与核心组件 2.1 LangChain 核心架构 2.2 LangChain 核心组件 三、环境准备 3.1 前置准备 3.1.1 创建应用并获取apikey 3.1.2 开通付费功能 3.2 获取LangChain文档 3.3 安装LangChain依赖包 四、百度千帆大模型对接 LangChain 4.1 LL…

maven如何从外部导包

1.找到你项目的文件位置&#xff0c;将外部要导入的包复制粘贴进你当前要导入的项目下。 2.从你的项目目录下选中要导入的包的pom文件即可导包成功 注意一定是选中对应的pom文件 导入成功之后对应的pom.xml文件就会被点亮

graylog配置日志关键字邮件Email告警

文章目录 前言一、配置邮箱报警二、邮件告警配置1.设置邮箱告警通知2.设置告警事件 三、告警结果示例 前言 在TO/B、TO/C项目的告警链路中&#xff0c;端口告警、服务器基础资源告警、接口告警等不同类型的告警都扮演着重要角色。这些告警能够帮助团队及时发现潜在的系统问题&…

GraphQL:强大的API查询语言

&#x1f90d; 前端开发工程师、技术日更博主、已过CET6 &#x1f368; 阿珊和她的猫_CSDN博客专家、23年度博客之星前端领域TOP1 &#x1f560; 牛客高级专题作者、打造专栏《前端面试必备》 、《2024面试高频手撕题》 &#x1f35a; 蓝桥云课签约作者、上架课程《Vue.js 和 E…

集合卡尔曼滤波ENKF的学习笔记

集合卡尔曼滤波ENKF的学习笔记 什么是数据同化&#xff1f;天气模型预测是25C&#xff08;这叫"背景场"&#xff09;&#xff0c;实际观测是23C&#xff08;这叫"观测值"&#xff09;&#xff0c;数据同化就是把这两个信息合理结合。 # 最简单的组合方式&…

DeepSeek-V3 通俗详解:从诞生到优势,以及与 GPT-4o 的对比

1. DeepSeek 的前世今生 1.1 什么是 DeepSeek&#xff1f; DeepSeek 是一家专注于人工智能技术研发的公司&#xff0c;致力于打造高性能、低成本的 AI 模型。它的目标是让 AI 技术更加普惠&#xff0c;让更多人能够用上强大的 AI 工具。 1.2 DeepSeek-V3 的诞生 DeepSeek-V…

UI自动化测试保姆级教程--pytest详解(精简易懂)

欢迎来到啊妮莫的学习小屋 别让过去的悲伤&#xff0c;毁掉当下的快乐一《借东西的小人阿莉埃蒂》 简介 pytest是一个用于Python的测试框架, 支持简单的单元测试和复杂的功能测试. 和Python自带的UnitTest框架类似, 但是相比于UnitTest更加简洁, 效率更高. 特点 非常容易上手…

Javascript算法——贪心算法(一)

贪心算法详解&#xff08;JavaScript&#xff09;&#xff08;局部最优->全局最优&#xff09; 贪心算法&#xff08;Greedy Algorithm&#xff09;是一种在每一步选择中都采取当前状态下的最优选择&#xff08;局部最优&#xff09;的算法设计方法。通过局部最优解的累积&…

CK18——肝损伤无创诊断标志物

肝脏作为人体至关重要的代谢与解毒器官&#xff0c;极易遭受病毒、药物、酒精及不良饮食等多种因素的损害&#xff0c;进而引发一系列如非酒精性脂肪肝&#xff08;NAFLD&#xff09;、肝纤维化、肝硬化、肝细胞癌以及各类肝炎等病症。因此&#xff0c;确定一种高可靠性、非侵入…

.NET Core + Kafka 开发指南

什么是Kafka Apache Kafka是一个分布式流处理平台,由LinkedIn开发并开源,后来成为Apache软件基金会的顶级项目。Kafka主要用于构建实时数据管道和流式应用程序。 Kafka 架构 从下面3张架构图中可以看出Kafka Server 实际扮演的是Broker的角色, 一个Kafka Cluster由多个Bro…

[离线数仓] 总结二、Hive数仓分层开发

接 [离线数仓] 总结一、数据采集 5.8 数仓开发之ODS层 ODS层的设计要点如下: (1)ODS层的表结构设计依托于从业务系统同步过来的数据结构。 (2)ODS层要保存全部历史数据,故其压缩格式应选择压缩比率,较高的,此处选择gzip。 CompressedStorage - Apache Hive - Apac…

Unity3D仿星露谷物语开发19之库存栏丢弃及交互道具

1、目标 从库存栏中把道具拖到游戏场景中&#xff0c;库存栏中道具数相应做减法或者删除道具。同时在库存栏中可以交换两个道具的位置。 2、UIInventorySlot设置Raycast属性 在UIInventorySlot中&#xff0c;我们只希望最外层的UIInventorySlot响应Raycast&#xff0c;他下面…

阿里云代理商热销产品推荐

在数字化浪潮的推动下&#xff0c;企业对于云计算的依赖日益加深。阿里云&#xff0c;作为中国领先的云计算服务提供商&#xff0c;为企业提供了丰富多样的云产品和服务。本文将聚焦于阿里云代理商热销产品推荐&#xff0c;探讨其如何帮助企业高效利用云资源&#xff0c;加速数…

江科大STM32入门——IIC通信笔记总结

wx&#xff1a;嵌入式工程师成长日记 &#xff08;一&#xff09;简介 STM32内部集成了硬件I2C收发电路&#xff0c;可以由硬件自动执行时钟生成、起始终止条件生成、应答位收发、数据收发等功能&#xff0c;减轻CPU的负担 支持多主机 支持7位/10位地址模式 支持不同的通讯速…

MySQL安装,配置教程

一、Linux在线yum仓库安装 打开MySQL官方首页&#xff0c;链接为&#xff1a;https://www.mysql.com/ 界面如下&#xff1a; 在该页面中找到【DOWNOADS】选项卡&#xff0c;点击进入下载页面。 在下载界面中&#xff0c;可以看到不同版本的下载链接&#xff0c;这里选择【My…

四、VSCODE 使用GIT插件

VSCODE 使用GIT插件 一下载git插件与git Graph插件二、git插件使用三、文件提交到远程仓库四、git Graph插件 一下载git插件与git Graph插件 二、git插件使用 git插件一般VSCode自带了git&#xff0c;就是左边栏目的图标 在下载git软件后vscode的git插件会自动识别当前项目 …

消息队列MQ(二)

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 MQ学习笔记 前言一、发送者的可靠性1. 生产者重试机制2. 生产者确认机制3. 实现生产者确认 二、MQ的可靠性1. 数据持久化2. LazyQueue 前言 在用MQ实现异步调用时&#xff0…

数据分析思维(九):分析方法——AARRR模型分析方法

数据分析并非只是简单的数据分析工具三板斧——Excel、SQL、Python&#xff0c;更重要的是数据分析思维。没有数据分析思维和业务知识&#xff0c;就算拿到一堆数据&#xff0c;也不知道如何下手。 推荐书本《数据分析思维——分析方法和业务知识》&#xff0c;本文内容就是提取…