go语言并发实战——日志收集系统(六) 编写日志收集系统客户端

上节回顾

在上一篇文章中我们介绍了编写客户端的四个步骤,分别是:

  • 读取配置文件,寻找日志路径
  • 初始化服务
  • 根据日志路径l来收集日志
  • 将收集到的日志发送Kafka中
    关于上述的内容博主画了一个思维导图(有点丑,大家勉强看看,以前没画过):
    在这里插入图片描述
    对了,为了画这个思维导图昨天博主找了好久思维导图的软件,最后发现了Vscode上面有一个非常不错的插件:drawio,样子大概是这样的:
    在这里插入图片描述
    大家如果没有合适的思维导图绘制根据,可以试试这个。好了,话不多说,开始今天的内容。

读取配置信息,获取日志信息

前言

这里读取日志信息我们选择的是go-ini这一第三方包,具体的使用方法在我前面的博文这种有所介绍,大家不了解的话可以参考:
go语言并发实战——日志收集系统(五) 基于go-ini包读取日志收集服务的配置文件

需求分析

这里配置文件中我们主要要知道两个消息,一个Kafka的配置信息,一个是日志文件的路径,配置文件应该是这样的:

[kafka]
address=127.0.0.1:9092
topic=web.log
chan_size=100000[collect]
logfile_path:G:\goproject\-goroutine-\log-agent\log\log1

而为了方便我们利用反射来读取配置文件,我们来创建几个结构体来存储我们读到的配置信息:

  • Kafka结构体
type Kafkaddress struct {Addr        []string `ini:"address"`Topic       string   `ini:"topic"`MessageSize int64    `ini:"chan_size"`
}
  • tail结构体
type LogFilePath struct {Path string `ini:"logfile_path"`
}
  • 总的结构体

type Config struct {Kafakaddress Kafkaddress `ini:"kafka"`LogFilePath  LogFilePath `ini:"collect"`
}

然后读取配置信息放入结构体中:

	//读取配置文件,获取配置信息filename := "G:\\goproject\\-goroutine-\\log-agent\\conf\\config.ini"ConfigObj := new(Config)err := ini.MapTo(ConfigObj, filename)if err != nil {logrus.Error("%s Load failed,err:", filename, err)}

这样我们就获得我们所需要的配置消息了

初始化服务

前言

这里我们初始服务主要是初始化Kafka以及tail包,利用它们读取日志信息并将其发送Kafka中,具体介绍可以参考前面的几篇文章:
go语言并发实战——日志收集系统(三) 利用sarama包连接KafKa实现消息的生产与消费
go语言并发实战——日志收集系统(四) 利用tail包实现对日志文件的实时监控

Kafka的初始化

	//初始化Kafkaerr = Kafka.InitKafka(ConfigObj.Kafakaddress.Addr, ConfigObj.Kafakaddress.MessageSize)if err != nil {logrus.Error("InitKafka failed, err:%v", err)return}logrus.Infof("InitKafka success")

tail的初始化

func InitTail(filename string) (err error) {config := tail.Config{Follow:    true,ReOpen:    true,MustExist: true,Poll:      true,Location:  &tail.SeekInfo{Offset: 0, Whence: 2},}TailObj, err = tail.TailFile(filename, config)if err != nil {logrus.Error("tail create tailObj for path:%s,err:%v", filename, err)return}return
}

根据路径来读取日志

需求分析

一般我们常见的想法会是我们先将日志消息读取出来然后发送给Kafka但是这样的串行操作无疑会大大增加程序的运行时间,所以这里我们选择将读到的日志信息打包发送到管道中,然后再看起一个协程来发送数据,这样实现了读取与发送的一步操作,可以有效降低程序的运行时间,而上面出现的MessageSiz也就是我们设置的管道大小

func run(config *Config) (err error) {for {line, ok := <-tailFile.TailObj.Linesif !ok {logrus.Error("read from tail failed,err:", err)time.Sleep(2 * time.Second)continue}msg := &sarama.ProducerMessage{}msg.Topic = config.Kafakaddress.Topicmsg.Value = sarama.StringEncoder(line.Text)Kafka.MsgChan <- msg}
}

发送消息到KafKa

func SendMsg() {for {select {case msg := <-MsgChan:pid, offset, err := client.SendMessage(msg)if err != nil {logrus.Error("send msg to kafka failed,err:%v", err)return}logrus.Info("send msg to kafka success,pid:%d,offset:%d", pid, offset)}}
}

完整代码

  • main.go
package mainimport ("github.com/Shopify/sarama""github.com/Shopify/toxiproxy/Godeps/_workspace/src/github.com/Sirupsen/logrus""github.com/go-ini/ini""log-agent/Kafka""log-agent/tailFile""time"
)type Config struct {Kafakaddress Kafkaddress `ini:"kafka"`LogFilePath  LogFilePath `ini:"collect"`
}type Kafkaddress struct {Addr        []string `ini:"address"`Topic       string   `ini:"topic"`MessageSize int64    `ini:"chan_size"`
}type LogFilePath struct {Path string `ini:"logfile_path"`
}func run(config *Config) (err error) {for {line, ok := <-tailFile.TailObj.Linesif !ok {logrus.Error("read from tail failed,err:", err)time.Sleep(2 * time.Second)continue}msg := &sarama.ProducerMessage{}msg.Topic = config.Kafakaddress.Topicmsg.Value = sarama.StringEncoder(line.Text)Kafka.MsgChan <- msg}
}func main() {//读取配置文件,获取配置信息filename := "G:\\goproject\\-goroutine-\\log-agent\\conf\\config.ini"ConfigObj := new(Config)err := ini.MapTo(ConfigObj, filename)if err != nil {logrus.Error("%s Load failed,err:", filename, err)}//初始化Kafkaerr = Kafka.InitKafka(ConfigObj.Kafakaddress.Addr, ConfigObj.Kafakaddress.MessageSize)if err != nil {logrus.Error("InitKafka failed, err:%v", err)return}logrus.Infof("InitKafka success")//初始化tailerr = tailFile.InitTail(ConfigObj.LogFilePath.Path)if err != nil {logrus.Error("InitTail failed, err:%v", err)return}logrus.Infof("InitTail success")//利用sarama报发送消息到Kafka中err = run(ConfigObj)
}
  • Kafka.go
package Kafkaimport ("github.com/Shopify/sarama""github.com/Shopify/toxiproxy/Godeps/_workspace/src/github.com/Sirupsen/logrus"
)var (client  sarama.SyncProducerMsgChan chan *sarama.ProducerMessage
)func InitKafka(address []string, Chan_size int64) (err error) {//初始化MsgChanMsgChan = make(chan *sarama.ProducerMessage, Chan_size)//初始化configconfig := sarama.NewConfig()config.Producer.RequiredAcks = sarama.WaitForAllconfig.Producer.Partitioner = sarama.NewRandomPartitionerconfig.Producer.Return.Successes = true//连接Kafkaclient, err = sarama.NewSyncProducer(address, config)if err != nil {logrus.Error("kafka connect error,err:%v", err)return}go SendMsg()return
}func SendMsg() {for {select {case msg := <-MsgChan:pid, offset, err := client.SendMessage(msg)if err != nil {logrus.Error("send msg to kafka failed,err:%v", err)return}logrus.Info("send msg to kafka success,pid:%d,offset:%d", pid, offset)}}
}
  • tailFile.go
package tailFileimport ("github.com/Shopify/toxiproxy/Godeps/_workspace/src/github.com/Sirupsen/logrus""github.com/hpcloud/tail"
)var TailObj *tail.Tailfunc InitTail(filename string) (err error) {config := tail.Config{Follow:    true,ReOpen:    true,MustExist: true,Poll:      true,Location:  &tail.SeekInfo{Offset: 0, Whence: 2},}TailObj, err = tail.TailFile(filename, config)if err != nil {logrus.Error("tail create tailObj for path:%s,err:%v", filename, err)return}return
}

运行结果

在运行前打开ZooKeeper与Kafka,然后对日志文件进行操作,会出现:
在这里插入图片描述
出现

2024/04/22 20:26:34 Seeked G:\goproject\-goroutine-\log-agent\log\log1 - &{Offset:0 Whence:2}
INFO[0013] send msg to kafka success,pid:%d,offset:%d0 3 
INFO[0013] send msg to kafka success,pid:%d,offset:%d0 4

就代表运行成功了。

结语

今天的有关内容就到此为止啦,有问题的话欢迎在评论区评论,大家可以集思广益,如果你觉得博主的内容对你有帮助,欢迎三连一下和订阅专栏
如果博主文章里面有什么错误页欢迎斧正(毕竟博主页只是个小蒟蒻鸡),下篇文章我们要进入etcd的有关学习了,好了,大家下篇文章见!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/1976.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

燃冬之yum、vim和你

了解了很多指令和权限&#xff0c;搞点真枪实弹来瞅瞅 学Linux不是天天就在那掰扯指令玩&#xff0c;也不是就研究那个权限 准备好迎接Linux相关工具的使用了么码农桑~ yum 软件包 什么是软件包呢&#xff1f; 首先来举个生活中常见点的例子&#xff1a;比如我的手机是华为…

Tensor张量的属性dim、type、size、shape、numel的使用方法介绍

本文重点 Tensor张量有一些常用的属性&#xff0c;我们可以通过这些基本的属性判断张量的类型&#xff0c;维度&#xff0c;以及元素个数&#xff0c;本节课程我们将对tensor属性进行简单的介绍。 代码 import torch import numpy as np atorch.Tensor(2,3,4) print(a) prin…

【论文笔记 | 异步联邦】 FedBuff

1. 论文信息 Federated Learning with Buffered Asynchronous Aggregation&#xff0c;International Conference on Artificial Intelligence and Statistics&#xff0c;2022&#xff0c;ccfc 2. introduction 2.1.1. 背景&#xff1a; 同步 FL &#xff0c;随训练过程中…

AI视频分析技术的常用开源模型及TSINGSEE青犀AI视频识别分析能力介绍

AI视频分析技术是指利用人工智能技术来对视频数据进行分析和处理的技术。开源模型是指可以免费获取和使用的代码模型&#xff0c;可以帮助开发人员快速构建和部署AI视频分析应用程序。 以下是一些业内常用的用于AI视频分析技术的开源模型&#xff1a; OpenCV&#xff1a;Open…

给字符串添加加粗标签(AC自动机+Python)

可以暴力解决&#xff0c;但是为了锻炼一下ac自动机的编程&#xff0c;我们使用ac自动机。 ac自动机主要维护两个列表&#xff0c;一个列表ch&#xff0c;ch[f][idx]表示从父节点f向idx这个方向走&#xff0c;走到的节点。另一个列表nex&#xff0c;nex[i]表示节点i回跳边的节…

01 【哈工大_操作系统】x86_64 常用寄存器大全

在学习CPU底层技术的时候&#xff0c;难免会接触到各式各样的寄存器。尤其是在使用汇编语言编写操作系统时&#xff0c;寄存器更是必不可少的。因此&#xff0c;这篇文章将来详细聊聊 x86_64 架构中的所有寄存器&#xff0c;按照从 常用->不常用 的顺序来进行介绍。 首先&a…

Spark-机器学习(4)回归学习之逻辑回归

在之前的文章中&#xff0c;我们来学习我们回归中的线性回归&#xff0c;了解了它的算法&#xff0c;知道了它的用法&#xff0c;并带来了简单案例。想了解的朋友可以查看这篇文章。同时&#xff0c;希望我的文章能帮助到你&#xff0c;如果觉得我的文章写的不错&#xff0c;请…

Syncovery for Mac v10.14.3激活版:文件备份和同步工具

Syncovery for Mac是一款高效且灵活的文件备份与同步工具&#xff0c;专为Mac用户设计&#xff0c;旨在确保数据的安全性和完整性。该软件支持多种备份和同步方式&#xff0c;包括本地备份、网络备份以及云备份&#xff0c;用户可以根据实际需求选择最合适的方案。 Syncovery f…

信息系统项目管理师0062:需求分析(5信息系统工程—5.1软件工程—5.1.2需求分析)

点击查看专栏目录 文章目录 5.1.2需求分析1.需求的层次2.需求过程3.UML4.面向对象分析记忆要点总结5.1.2需求分析 软件需求是指用户对新系统在功能、行为、性能、设计约束等方面的期望。根据IEEE的软件工程标准词汇表,软件需求是指用户解决问题或达到目标所需的条件或能力,是…

【深度学习】烟雾和火焰数据集,野外数据集,超大量数据集,目标检测,YOLOv5

标注了2w张数据集&#xff0c;是目标检测yolo格式的&#xff0c;有火焰、烟雾两个目标。 训练方法看这里&#xff1a; https://qq742971636.blog.csdn.net/article/details/138097481 打包 依据不一样的需求&#xff0c; 详情请查看 https://docs.qq.com/sheet/DUEdqZ2l…

多元函数泰勒公式(含黑塞矩阵)

一元函数的泰勒公式&#xff1a; 接下来&#xff0c;由一元函数有关知识&#xff0c;我们有: 注意这里的dxn中&#xff0c;应把dx看作一个整体&#xff0c;即一个微小变量的n次方 我们接下来推导微分算子&#xff1a; 接下来&#xff0c;把一元泰勒公式转为微分形式: 对于二元…

React.js 3D开发快速入门

如果你对 3D 图形的可能性着迷&#xff0c;但发现从头开始创建 3D 模型的想法是不可能的 - 不用担心&#xff01; Three.js 是一个强大的 JavaScript 库&#xff0c;它可以帮助我们轻松地将现有的 3D 模型集成到 React 应用程序中。因此&#xff0c;在本文中&#xff0c;我将深…

ExcelVBA把当前工作表导出为PDF文档

我们先问问Kimi Excel导出为PDF的方法有多种&#xff0c;以下是一些常见的方法&#xff1a; 1 使用Excel软件的内置功能&#xff1a; 打开Excel文件&#xff0c;点击“文件”菜单。选择“另存为”&#xff0c;在“保存类型”中选择“PDF”。设置保存路径和文件名&#xff0c;点…

【机器学习】重塑汽车设计与制造:实例与代码探索

机器学习重塑汽车设计与制造 一、机器学习在汽车设计中的应用二、机器学习在智能制造与生产中的应用 在数字化浪潮的推动下&#xff0c;机器学习技术正逐步成为汽车行业的创新引擎。从概念设计到智能制造&#xff0c;机器学习正以其独特的优势助力汽车产业的革新与发展。本文将…

数据挖掘实验(Apriori,fpgrowth)

Apriori&#xff1a;这里做了个小优化&#xff0c;比如abcde和adcef自连接出的新项集abcdef&#xff0c;可以用abcde的位置和f的位置取交集&#xff0c;这样第n项集的计算可以用n-1项集的信息和数字本身的位置信息计算出来&#xff0c;只需要保存第n-1项集的位置信息就可以提速…

day06 51单片机-点阵led

1 点阵LED 1.1 需求描述 本案例介绍如何使用点阵LED显示一排由左上到右下的斜线。 1.2 硬件设计 1.2.1 硬件原理图 点阵内部的原理图: 点阵LED的原理也非常简单,就是LED点灯。例如,我们想要让13列(阳极端)9行(阴极端)的LED点亮,需要13为高电平,9为低电平。注意对于…

苍穹外卖day8(2)用户下单、微信支付

文章目录 前言一、用户下单1. 业务流程2. 接口设计3. 数据库设计3.1 订单表orders3.2 订单明细表 order_detail 4. 代码实现 二、订单支付 前言 用户下单 因为订单信息中包含了其他业务中的数据&#xff0c;在逻辑处理中涉及了多个其他业务&#xff0c;比如要判断地址簿、购物…

基于SSM+Vue的护工预约服务小程序和后台管理系统

1、系统演示视频&#xff08;演示视频&#xff09; 2、需要请联系

虚拟化+Docker基本管理

一、虚拟化简介 1、云端 华为云、谷歌云、腾讯云、阿里云、亚马逊、百度云、移动云、天翼云、西部数码云等 1.国内云 华为云、阿里云、腾讯云、天翼云(私有云) 2.国外云 谷歌云、亚马逊 2、云计算的服务模式是分层的 IaaS&#xff1a;Infrastructure&#xff08;基础设…

计算机网络【CN】Ch3 数据链路层

目录 数据链路层的功能 【※】VLAN 三种划分VLAN的方法&#xff1a; 【※】MAC帧格式 【※】三种可靠传输机制 ​编辑 【※】介质访问控制 信道划分介质访问控制 随机介质访问控制 CSMA CSMA/CD【有线】 CSMA/CA【无线】 信道利用率技巧 循环冗余校验CRC 以太网[802.3] 以太网…