go语言并发实战——日志收集系统(三) 利用sarama包连接KafKa实现消息的生产与消费

环境的搭建

Kafka以及相关组件的下载

我们要实现今天的内容,不可避免的要进行对开发环境的配置,Kafka环境的配置比较繁琐,需要配置JDK,Scala,ZoopKeeper和Kafka,这里我们不做赘述,如果大家不知道如何配置环境,这里我们个大家找了一篇博文供大家参考:
Windows下安装Kafka(图文记录详细步骤)

sarama包的安装

今天我们所时机的内容需要用到go语言的第三方包sarama,由于1.19版本后添加了ztcd压缩算法,需要用到cgo,这里我们为了方便考虑选择下载sarama v1.19.0,所以这里我们不能直接使用go get'命令来安装第三方包,我们要使用/go mod文件来实现,下面是主要步骤:

  • 在项目中创建文件夹(博主的是Kafkademo)
  • 打开终端,输入go mod init,进行go.mod文件的初始化:
    在这里插入图片描述
    在这里插入图片描述
  • 我们在.mod文件内指定第三方包及其版本:
module Kafkademorequire (github.com/Shopify/sarama v1.19
)go 1.21.6

其实这是已经可以使用命令go mod tidy了,但是博主在做的时候发现,这样会直接清除掉.mod文件里面的内容,所以建议先创建一个producer文件,在文件里面写:

package mainimport ("fmt""github.com/Shopify/sarama"
)func main() {config := sarama.NewConfig()config.Producer.RequiredAcks = sarama.WaitForAll                                
}

这时候再打开终端输入go mod tidy
在这里插入图片描述
等待命令运行完毕,打开.mod文件,看到如下内容就OK了:
在这里插入图片描述

利用sarama向Kafka发送消息(消息的生产)

代码

package mainimport ("fmt""github.com/Shopify/sarama"
)func main() {config := sarama.NewConfig()                              //创建config实例config.Producer.RequiredAcks = sarama.WaitForAll          //发送完数据需要leader和follow都确认config.Producer.Partitioner = sarama.NewRandomPartitioner //创建随机分区config.Producer.Return.Successes = true                   //成功交付的消息将在success channel返回//创建信息msg := &sarama.ProducerMessage{}msg.Topic = "web.log"msg.Value = sarama.StringEncoder("this is a test log")//连接KafKaclient, err := sarama.NewSyncProducer([]string{"127.0.0.1:9092"}, config)if err != nil {fmt.Println("producer closed, err:", err)return}defer client.Close()//发送消息pid, offset, err := client.SendMessage(msg)if err != nil {fmt.Println("send msg failed,err:", err)return}fmt.Printf("pid:%v offset:%v\n", pid, offset)
}

运行过程

  • 首先我们打开终端开起ZooKepper服务
    zkServer
    
    在这里插入图片描述
  • 然后再Kafka所在文件夹下输入命令运行Kafka:
.\bin\windows\kafka-server-start.bat .\config\server.properties

在这里插入图片描述
最后运行程序即可,输出结果为:
在这里插入图片描述

补充:消息的消费

代码

package mainimport ("fmt""github.com/Shopify/sarama""time"
)func main() {customer, err := sarama.NewConsumer([]string{"127.0.0.1:9092"}, nil)if err != nil {fmt.Println("failed init customer,err:", err)return}partitionlist, err := customer.Partitions("web.log-0") //获取topic的所有分区if err != nil {fmt.Println("failed get partition list,err:", err)return}fmt.Println("partitions:", partitionlist)for partition := range partitionlist { // 遍历所有分区//根据消费者对象创建一个分区对象pc, err := customer.ConsumePartition("web.log", int32(partition), sarama.OffsetNewest)if err != nil {fmt.Println("failed get partition consumer,err:", err)return}defer pc.Close() // 移动到这里go func(consumer sarama.PartitionConsumer) {defer pc.AsyncClose() // 移除这行,因为已经在循环结束时关闭了for msg := range pc.Messages() {fmt.Printf("Partition:%d Offset:%d Key:%v Value:%v", msg.Partition, msg.Offset, msg.Key, msg.Value)}}(pc)time.Sleep(time.Second * 10)}
}

不过这个不能实现直接的消费,后续我们会对这个进行补充,这里仅作介绍。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/824360.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

元宇宙-虚拟世界的安全风险如何应对

元宇宙(Metaverse)是一个虚拟时空间的集合,由一系列的增强现实(AR)、虚拟现实(VR)和互联网(Internet)所组成。这个虚拟时空间是一个持续存在的、由众多虚拟世界互相连接而…

redis与etcd的对比

1.redis是一种高级的key:value存储系统,其中value支持五种数据类型: 1.1 字符串(strings) 1.2 字符串列表(lists) 1.3 字符串集合(sets) 1.4 有序字符串集合(…

STM32 HAL库F103系列之ADC实验(1)

ADC工作原理: 1、输入通道: 2、转换序列: A/D转换被组织为两组:规则组(常规转换组)和注入组(注入转换组) 规则组最多可以有16个转换,注入组最多有4个转换 规则组和注入…

Hudi-IDEA编程

项目 一、HudiSparkKafka(Scala) 配置详见【1.Scala配置】 依赖详见【1.HudiSparkKafka依赖】 1-1 构建SparkSession对象 def main(args: Array[String]): Unit {//1.构建SparkSession对象val spark: SparkSession SparkUtils.createSparkSession(…

中科亿海微-CL1656功能验证开发板

I. 引言 A. 研究背景与意义 CL1656是一款精度高、功耗低、成本低的5V单片低功耗运放,由核心互联公司研发制造,CL1656 是一个 16-bit、快速、低功耗逐次逼近型 ADC,吞吐速率高达 250 kSPS,并且内置低噪声、宽 带宽采样保持放大器。…

HarmonyOS开发实例:【分布式新闻客户端】

介绍 本篇Codelab基于栅格布局、设备管理和多端协同,实现一次开发,多端部署的分布式新闻客户端页面。主要包含以下功能: 展示新闻列表以及左右滑动切换新闻Tab。点击新闻展示新闻详情页。点击新闻详情页底部的分享按钮,发现周边…

Elasticsearch:如何将 MongoDB 数据引入 Elastic Cloud

作者:Hemendra Singh Lodhi Elastic Cloud 是由 Elastic 提供的基于云的托管服务。Elastic Cloud 允许客户在亚马逊网络服务 (AWS)、谷歌云平台 (GCP) 和微软 Azure 上部署、管理和扩展他们的 Elasticsearch 集群。 MongoDB 是一种流行的 NoSQL 文档导向数据库&am…

web安全学习笔记(10)

记一下第十四节课的内容。 一、MySQL学习 数据库基本结构:库——表——列——值 在本地打开navicat,连接数据库,新建一个liuyan库、liuyan库下新建一个member表: 在表里随意添加一些数据: 下面我们学习MySQL查询。新…

【Web】NewStarCTF 2022 题解(全)

目录 Week1 HTTP Head?Header! 我真的会谢 NotPHP Word-For-You Week2 Word-For-You(2 Gen) IncludeOne UnserializeOne ezAPI Week3 BabySSTI_One multiSQL IncludeTwo Maybe You Have To think More Week4 So Baby RCE BabySSTI_Two UnserializeT…

C++修炼之路之STL_stack,queue和容器适配器

目录 前言 一:SLT中stack和queue的基本使用 1.在官网中对stack和queue的简单介绍 2.数据结构中栈和队列的基本知识和操作 3. STL中stack的接口函数及使用 4.STL中queue的接口函数及使用 二:容器适配器Container 三:使用容器适配器…

springboot Logback 不同环境,配置不同的日志输出路径

1.背景: mac 笔记本开发,日志文件写到/data/logs/下,控制台报出:Failed to create parent directories for [/data/logs/........... 再去手动在命令窗口创建文件夹data,报Read-only file system 2.修改logback-spri…

Linux安装和使用Android Debug Bridge(ADB)

目录 1、开发环境和工具 2、ADB是什么? 3、安装ADB 3.1、使用包管理器安装 ADB 3.2、手动安装 ADB 4、使用ADB 4.1、连接设备 4.2、执行shell命令 4.3、安装应用程序 4.4、截取屏幕截图 4.5、模拟按键和手势 4.6、上传文件到Android设备 4.7、从Android设备下载文件…

BGP边界网关路由实验(华为)

一,技术简介 BGP(边界网关路由协议)是一种自治系统(AS)间的协议,主要用于在不同的AS之间交换路由信息。AS是一个由一组网络设备和路由器组成的网络集合,这些设备可以在一个共同的管理域中协同工…

1 回归:锂电池温度预测top2 代码部分(一) Tabnet

2024 iFLYTEK A.I.开发者大赛-讯飞开放平台 TabNet: 模型也是我在这个比赛一个意外收获,这个模型在比赛之中可用。但是需要GPU资源,否则运行真的是太慢了。后面针对这个模型我会写出如何使用的方法策略。 比赛结束后有与其他两位选手聊天&am…

win2022服务器apache配置https(ssl)真实环境实验(避坑之作)不依赖宝塔小皮等集成环境

本次实验背景: 完全参考官方 https://cloud.tencent.com/document/product/400/4143 文档流程,没有搞定,于是写下避坑之作。 服务器:腾讯云轻量应用服务器 操作系统: Windows Server 2022 DataCenter 64bit CN apache…

李沐45_SSD实现——自学笔记

主体思路: 1.生成一堆锚框 2.根据真实标签为每个锚框打标(类别、偏移、mask) 3.模型为每个锚框做一个预测(类别、偏移) 4.计算上述二者的差异损失,以更新模型weights 先读取一张图像。 它的高度和宽度分别为561和728像素。 %matplotlib inline import …

Photoshop 2024 (ps) v25.6中文 强大的图像处理软件 mac/win

Photoshop 2024 for Mac是一款强大的图像处理软件,专为Mac用户设计。它继承了Adobe Photoshop一贯的优秀功能,并进一步提升了性能和稳定性。 Mac版Photoshop 2024 (ps)v25.6中文激活版下载 win版Photoshop 2024 (ps)v25.6直装版下载 无论是专业的设计师还…

EI Scopus双检索 | 2024年清洁能源与智能电网国际会议(CCESG 2024)

会议简介 Brief Introduction 2024年清洁能源与智能电网国际会议(CCESG 2024) 会议时间:2024年 11月27-29日 召开地点:澳大利亚悉尼 大会官网:CCESG 2024-2024 International Joint Conference on Clean Energy and Smart Grid 由CoreShare科…

m4p转换mp3格式怎么转?3个Mac端应用~

M4P文件格式的诞生伴随着苹果公司引入FairPlay版权管理系统,该系统旨在保护音频的内容。M4P因此而生,成为受到FairPlay系统保护的音频格式,常见于苹果设备的iTunes等平台。 MP3文件格式的多个优点 MP3格式的优点显而易见。首先,其…

k8s之etcd

1.特点: etcd 是云原生架构中重要的基础组件。有如下特点: 简单:安装配置简单,而且提供了 HTTP API 进行交互,使用也很简单键值对存储:将数据存储在分层组织的目录中,如同在标准文件系统中监…