go语言并发实战——日志收集系统(三) 利用sarama包连接KafKa实现消息的生产与消费

环境的搭建

Kafka以及相关组件的下载

我们要实现今天的内容,不可避免的要进行对开发环境的配置,Kafka环境的配置比较繁琐,需要配置JDK,Scala,ZoopKeeper和Kafka,这里我们不做赘述,如果大家不知道如何配置环境,这里我们个大家找了一篇博文供大家参考:
Windows下安装Kafka(图文记录详细步骤)

sarama包的安装

今天我们所时机的内容需要用到go语言的第三方包sarama,由于1.19版本后添加了ztcd压缩算法,需要用到cgo,这里我们为了方便考虑选择下载sarama v1.19.0,所以这里我们不能直接使用go get'命令来安装第三方包,我们要使用/go mod文件来实现,下面是主要步骤:

  • 在项目中创建文件夹(博主的是Kafkademo)
  • 打开终端,输入go mod init,进行go.mod文件的初始化:
    在这里插入图片描述
    在这里插入图片描述
  • 我们在.mod文件内指定第三方包及其版本:
module Kafkademorequire (github.com/Shopify/sarama v1.19
)go 1.21.6

其实这是已经可以使用命令go mod tidy了,但是博主在做的时候发现,这样会直接清除掉.mod文件里面的内容,所以建议先创建一个producer文件,在文件里面写:

package mainimport ("fmt""github.com/Shopify/sarama"
)func main() {config := sarama.NewConfig()config.Producer.RequiredAcks = sarama.WaitForAll                                
}

这时候再打开终端输入go mod tidy
在这里插入图片描述
等待命令运行完毕,打开.mod文件,看到如下内容就OK了:
在这里插入图片描述

利用sarama向Kafka发送消息(消息的生产)

代码

package mainimport ("fmt""github.com/Shopify/sarama"
)func main() {config := sarama.NewConfig()                              //创建config实例config.Producer.RequiredAcks = sarama.WaitForAll          //发送完数据需要leader和follow都确认config.Producer.Partitioner = sarama.NewRandomPartitioner //创建随机分区config.Producer.Return.Successes = true                   //成功交付的消息将在success channel返回//创建信息msg := &sarama.ProducerMessage{}msg.Topic = "web.log"msg.Value = sarama.StringEncoder("this is a test log")//连接KafKaclient, err := sarama.NewSyncProducer([]string{"127.0.0.1:9092"}, config)if err != nil {fmt.Println("producer closed, err:", err)return}defer client.Close()//发送消息pid, offset, err := client.SendMessage(msg)if err != nil {fmt.Println("send msg failed,err:", err)return}fmt.Printf("pid:%v offset:%v\n", pid, offset)
}

运行过程

  • 首先我们打开终端开起ZooKepper服务
    zkServer
    
    在这里插入图片描述
  • 然后再Kafka所在文件夹下输入命令运行Kafka:
.\bin\windows\kafka-server-start.bat .\config\server.properties

在这里插入图片描述
最后运行程序即可,输出结果为:
在这里插入图片描述

补充:消息的消费

代码

package mainimport ("fmt""github.com/Shopify/sarama""time"
)func main() {customer, err := sarama.NewConsumer([]string{"127.0.0.1:9092"}, nil)if err != nil {fmt.Println("failed init customer,err:", err)return}partitionlist, err := customer.Partitions("web.log-0") //获取topic的所有分区if err != nil {fmt.Println("failed get partition list,err:", err)return}fmt.Println("partitions:", partitionlist)for partition := range partitionlist { // 遍历所有分区//根据消费者对象创建一个分区对象pc, err := customer.ConsumePartition("web.log", int32(partition), sarama.OffsetNewest)if err != nil {fmt.Println("failed get partition consumer,err:", err)return}defer pc.Close() // 移动到这里go func(consumer sarama.PartitionConsumer) {defer pc.AsyncClose() // 移除这行,因为已经在循环结束时关闭了for msg := range pc.Messages() {fmt.Printf("Partition:%d Offset:%d Key:%v Value:%v", msg.Partition, msg.Offset, msg.Key, msg.Value)}}(pc)time.Sleep(time.Second * 10)}
}

不过这个不能实现直接的消费,后续我们会对这个进行补充,这里仅作介绍。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/824360.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

元宇宙-虚拟世界的安全风险如何应对

元宇宙(Metaverse)是一个虚拟时空间的集合,由一系列的增强现实(AR)、虚拟现实(VR)和互联网(Internet)所组成。这个虚拟时空间是一个持续存在的、由众多虚拟世界互相连接而…

MySQL常见问题汇总

1、"Host is not allowed to connect to this MySQL server" 方法1:GRANT ALL PRIVILEGES ON *.* TO root% WITH GRANT OPTION //赋予任何主机访问数据的权限 方法2:update user set host % where user root; 重启mysql 服务 2、mysqldum…

redis与etcd的对比

1.redis是一种高级的key:value存储系统,其中value支持五种数据类型: 1.1 字符串(strings) 1.2 字符串列表(lists) 1.3 字符串集合(sets) 1.4 有序字符串集合(…

STM32 HAL库F103系列之ADC实验(1)

ADC工作原理: 1、输入通道: 2、转换序列: A/D转换被组织为两组:规则组(常规转换组)和注入组(注入转换组) 规则组最多可以有16个转换,注入组最多有4个转换 规则组和注入…

Hudi-IDEA编程

项目 一、HudiSparkKafka(Scala) 配置详见【1.Scala配置】 依赖详见【1.HudiSparkKafka依赖】 1-1 构建SparkSession对象 def main(args: Array[String]): Unit {//1.构建SparkSession对象val spark: SparkSession SparkUtils.createSparkSession(…

中科亿海微-CL1656功能验证开发板

I. 引言 A. 研究背景与意义 CL1656是一款精度高、功耗低、成本低的5V单片低功耗运放,由核心互联公司研发制造,CL1656 是一个 16-bit、快速、低功耗逐次逼近型 ADC,吞吐速率高达 250 kSPS,并且内置低噪声、宽 带宽采样保持放大器。…

基于双向长短期神经网络bilstm的径流量预测,基于gru神经网络的径流量预测

目录 背影 摘要 LSTM的基本定义 LSTM实现的步骤 BILSTM神经网络 基于双向长短期神经网络bilstm的径流量预测,基于gru神经网络的径流量预测 完整代码:基于双向长短期神经网络bilstm的径流量预测,基于gru神经网络的径流量预测(代码完整,数据齐全)资源-CSDN文库 https://dow…

HarmonyOS开发实例:【分布式新闻客户端】

介绍 本篇Codelab基于栅格布局、设备管理和多端协同,实现一次开发,多端部署的分布式新闻客户端页面。主要包含以下功能: 展示新闻列表以及左右滑动切换新闻Tab。点击新闻展示新闻详情页。点击新闻详情页底部的分享按钮,发现周边…

Elasticsearch:如何将 MongoDB 数据引入 Elastic Cloud

作者:Hemendra Singh Lodhi Elastic Cloud 是由 Elastic 提供的基于云的托管服务。Elastic Cloud 允许客户在亚马逊网络服务 (AWS)、谷歌云平台 (GCP) 和微软 Azure 上部署、管理和扩展他们的 Elasticsearch 集群。 MongoDB 是一种流行的 NoSQL 文档导向数据库&am…

web安全学习笔记(10)

记一下第十四节课的内容。 一、MySQL学习 数据库基本结构:库——表——列——值 在本地打开navicat,连接数据库,新建一个liuyan库、liuyan库下新建一个member表: 在表里随意添加一些数据: 下面我们学习MySQL查询。新…

36-5 Python 编写poc基础

一、相关概念介绍 在漏洞研究和网络安全领域,常常会遇到一些特定术语和概念,例如PoC、Exploit和Payload。下面是它们的概念介绍: PoC(Proof of Concept): PoC是“Proof of Concept”的缩写,意为“概念验证”或“概念证明”。在网络安全领域,PoC通常指的是一种演示性质…

【微服务】Gateway的基本配置详解

目录 什么是gateway 基本配置详解 1. 路由配置 2. 过滤器配置 3. 路由断言 4. 过滤器工厂 什么是gateway Spring Cloud Gateway 是 Spring Cloud 生态系统中的一个全新的微服务网关,它基于 Spring 5、Project Reactor 和 Spring Boot 2 技术栈,提供…

【Web】NewStarCTF 2022 题解(全)

目录 Week1 HTTP Head?Header! 我真的会谢 NotPHP Word-For-You Week2 Word-For-You(2 Gen) IncludeOne UnserializeOne ezAPI Week3 BabySSTI_One multiSQL IncludeTwo Maybe You Have To think More Week4 So Baby RCE BabySSTI_Two UnserializeT…

iOS知识点 --- Runtime

Objective-C (OC) 中的 Runtime 原理: Objective-C Runtime 是一套用于支持 Objective-C 动态特性的底层 C 语言 API。它为 Objective-C 提供了以下核心功能: 动态类型:在运行时确定对象的确切类型,允许在程序执行过程中进行类型…

C++修炼之路之STL_stack,queue和容器适配器

目录 前言 一:SLT中stack和queue的基本使用 1.在官网中对stack和queue的简单介绍 2.数据结构中栈和队列的基本知识和操作 3. STL中stack的接口函数及使用 4.STL中queue的接口函数及使用 二:容器适配器Container 三:使用容器适配器…

springboot Logback 不同环境,配置不同的日志输出路径

1.背景: mac 笔记本开发,日志文件写到/data/logs/下,控制台报出:Failed to create parent directories for [/data/logs/........... 再去手动在命令窗口创建文件夹data,报Read-only file system 2.修改logback-spri…

milvus querynode启动源码分析

querynode启动源码分析 结构体 // QueryNode implements QueryNode grpc server // cmd\components\query_node.go type QueryNode struct {ctx context.Contextsvr *grpcquerynode.Server }// Server is the grpc server of QueryNode. type Server struct {querynode typ…

Android笔记: mkdirs不生效失败

Manifest已经配置权限,代码中也动态获取权限,mkdirs一直返回false File.mkdirs()方法创建文件夹失败 1、动态申请读写权限 <!--SDCard写权限--> <uses-permission android:name="android.permission.WRITE_EXTERNAL_STORAGE" /> <!--SDCard读权…

Linux安装和使用Android Debug Bridge(ADB)

目录 1、开发环境和工具 2、ADB是什么&#xff1f; 3、安装ADB 3.1、使用包管理器安装 ADB 3.2、手动安装 ADB 4、使用ADB 4.1、连接设备 4.2、执行shell命令 4.3、安装应用程序 4.4、截取屏幕截图 4.5、模拟按键和手势 4.6、上传文件到Android设备 4.7、从Android设备下载文件…

常见的并发编程问题,如死锁、竞态条件、线程不安全、内存可见性问题等,如何在Java中避免这些问题?

死锁&#xff1a;发生在两个或更多线程互相等待对方持有的资源&#xff0c;导致所有的线程都无法进行下去。避免死锁的一个常见方法是遵循资源顺序访问&#xff0c;将系统中的资源排序&#xff0c;并约定每个线程都按序请求资源。 竞态条件&#xff1a;两个或更多线程同时访问…