上节回顾
在上一篇文章中我们介绍了编写客户端的四个步骤,分别是:
- 读取配置文件,寻找日志路径
- 初始化服务
- 根据日志路径l来收集日志
- 将收集到的日志发送Kafka中
关于上述的内容博主画了一个思维导图(有点丑,大家勉强看看,以前没画过):
对了,为了画这个思维导图昨天博主找了好久思维导图的软件,最后发现了Vscode上面有一个非常不错的插件:drawio
,样子大概是这样的:
大家如果没有合适的思维导图绘制根据,可以试试这个。好了,话不多说,开始今天的内容。
读取配置信息,获取日志信息
前言
这里读取日志信息我们选择的是go-ini
这一第三方包,具体的使用方法在我前面的博文这种有所介绍,大家不了解的话可以参考:
go语言并发实战——日志收集系统(五) 基于go-ini包读取日志收集服务的配置文件
需求分析
这里配置文件中我们主要要知道两个消息,一个Kafka的配置信息,一个是日志文件的路径,配置文件应该是这样的:
[kafka]
address=127.0.0.1:9092
topic=web.log
chan_size=100000[collect]
logfile_path:G:\goproject\-goroutine-\log-agent\log\log1
而为了方便我们利用反射来读取配置文件,我们来创建几个结构体来存储我们读到的配置信息:
- Kafka结构体
type Kafkaddress struct {Addr []string `ini:"address"`Topic string `ini:"topic"`MessageSize int64 `ini:"chan_size"`
}
- tail结构体
type LogFilePath struct {Path string `ini:"logfile_path"`
}
- 总的结构体
type Config struct {Kafakaddress Kafkaddress `ini:"kafka"`LogFilePath LogFilePath `ini:"collect"`
}
然后读取配置信息放入结构体中:
//读取配置文件,获取配置信息filename := "G:\\goproject\\-goroutine-\\log-agent\\conf\\config.ini"ConfigObj := new(Config)err := ini.MapTo(ConfigObj, filename)if err != nil {logrus.Error("%s Load failed,err:", filename, err)}
这样我们就获得我们所需要的配置消息了
初始化服务
前言
这里我们初始服务主要是初始化Kafka以及tail包,利用它们读取日志信息并将其发送Kafka中,具体介绍可以参考前面的几篇文章:
go语言并发实战——日志收集系统(三) 利用sarama包连接KafKa实现消息的生产与消费
go语言并发实战——日志收集系统(四) 利用tail包实现对日志文件的实时监控
Kafka的初始化
//初始化Kafkaerr = Kafka.InitKafka(ConfigObj.Kafakaddress.Addr, ConfigObj.Kafakaddress.MessageSize)if err != nil {logrus.Error("InitKafka failed, err:%v", err)return}logrus.Infof("InitKafka success")
tail的初始化
func InitTail(filename string) (err error) {config := tail.Config{Follow: true,ReOpen: true,MustExist: true,Poll: true,Location: &tail.SeekInfo{Offset: 0, Whence: 2},}TailObj, err = tail.TailFile(filename, config)if err != nil {logrus.Error("tail create tailObj for path:%s,err:%v", filename, err)return}return
}
根据路径来读取日志
需求分析
一般我们常见的想法会是我们先将日志消息读取出来然后发送给Kafka但是这样的串行操作无疑会大大增加程序的运行时间,所以这里我们选择将读到的日志信息打包发送到管道中,然后再看起一个协程来发送数据,这样实现了读取与发送的一步操作,可以有效降低程序的运行时间,而上面出现的MessageSiz
也就是我们设置的管道大小
func run(config *Config) (err error) {for {line, ok := <-tailFile.TailObj.Linesif !ok {logrus.Error("read from tail failed,err:", err)time.Sleep(2 * time.Second)continue}msg := &sarama.ProducerMessage{}msg.Topic = config.Kafakaddress.Topicmsg.Value = sarama.StringEncoder(line.Text)Kafka.MsgChan <- msg}
}
发送消息到KafKa
func SendMsg() {for {select {case msg := <-MsgChan:pid, offset, err := client.SendMessage(msg)if err != nil {logrus.Error("send msg to kafka failed,err:%v", err)return}logrus.Info("send msg to kafka success,pid:%d,offset:%d", pid, offset)}}
}
完整代码
- main.go
package mainimport ("github.com/Shopify/sarama""github.com/Shopify/toxiproxy/Godeps/_workspace/src/github.com/Sirupsen/logrus""github.com/go-ini/ini""log-agent/Kafka""log-agent/tailFile""time"
)type Config struct {Kafakaddress Kafkaddress `ini:"kafka"`LogFilePath LogFilePath `ini:"collect"`
}type Kafkaddress struct {Addr []string `ini:"address"`Topic string `ini:"topic"`MessageSize int64 `ini:"chan_size"`
}type LogFilePath struct {Path string `ini:"logfile_path"`
}func run(config *Config) (err error) {for {line, ok := <-tailFile.TailObj.Linesif !ok {logrus.Error("read from tail failed,err:", err)time.Sleep(2 * time.Second)continue}msg := &sarama.ProducerMessage{}msg.Topic = config.Kafakaddress.Topicmsg.Value = sarama.StringEncoder(line.Text)Kafka.MsgChan <- msg}
}func main() {//读取配置文件,获取配置信息filename := "G:\\goproject\\-goroutine-\\log-agent\\conf\\config.ini"ConfigObj := new(Config)err := ini.MapTo(ConfigObj, filename)if err != nil {logrus.Error("%s Load failed,err:", filename, err)}//初始化Kafkaerr = Kafka.InitKafka(ConfigObj.Kafakaddress.Addr, ConfigObj.Kafakaddress.MessageSize)if err != nil {logrus.Error("InitKafka failed, err:%v", err)return}logrus.Infof("InitKafka success")//初始化tailerr = tailFile.InitTail(ConfigObj.LogFilePath.Path)if err != nil {logrus.Error("InitTail failed, err:%v", err)return}logrus.Infof("InitTail success")//利用sarama报发送消息到Kafka中err = run(ConfigObj)
}
- Kafka.go
package Kafkaimport ("github.com/Shopify/sarama""github.com/Shopify/toxiproxy/Godeps/_workspace/src/github.com/Sirupsen/logrus"
)var (client sarama.SyncProducerMsgChan chan *sarama.ProducerMessage
)func InitKafka(address []string, Chan_size int64) (err error) {//初始化MsgChanMsgChan = make(chan *sarama.ProducerMessage, Chan_size)//初始化configconfig := sarama.NewConfig()config.Producer.RequiredAcks = sarama.WaitForAllconfig.Producer.Partitioner = sarama.NewRandomPartitionerconfig.Producer.Return.Successes = true//连接Kafkaclient, err = sarama.NewSyncProducer(address, config)if err != nil {logrus.Error("kafka connect error,err:%v", err)return}go SendMsg()return
}func SendMsg() {for {select {case msg := <-MsgChan:pid, offset, err := client.SendMessage(msg)if err != nil {logrus.Error("send msg to kafka failed,err:%v", err)return}logrus.Info("send msg to kafka success,pid:%d,offset:%d", pid, offset)}}
}
- tailFile.go
package tailFileimport ("github.com/Shopify/toxiproxy/Godeps/_workspace/src/github.com/Sirupsen/logrus""github.com/hpcloud/tail"
)var TailObj *tail.Tailfunc InitTail(filename string) (err error) {config := tail.Config{Follow: true,ReOpen: true,MustExist: true,Poll: true,Location: &tail.SeekInfo{Offset: 0, Whence: 2},}TailObj, err = tail.TailFile(filename, config)if err != nil {logrus.Error("tail create tailObj for path:%s,err:%v", filename, err)return}return
}
运行结果
在运行前打开ZooKeeper与Kafka,然后对日志文件进行操作,会出现:
出现
2024/04/22 20:26:34 Seeked G:\goproject\-goroutine-\log-agent\log\log1 - &{Offset:0 Whence:2}
INFO[0013] send msg to kafka success,pid:%d,offset:%d0 3
INFO[0013] send msg to kafka success,pid:%d,offset:%d0 4
就代表运行成功了。
结语
今天的有关内容就到此为止啦,有问题的话欢迎在评论区评论,大家可以集思广益,如果你觉得博主的内容对你有帮助,欢迎三连一下和订阅专栏
如果博主文章里面有什么错误页欢迎斧正(毕竟博主页只是个小蒟蒻鸡),下篇文章我们要进入etcd的有关学习了,好了,大家下篇文章见!