写在前面
ELK三剑客(ElasticSearch,Logstash,Kibana)基本上可以满足日志采集、信息处理、统计分析、可视化报表等一些日志分析的工作,但是对我们来说……太重了,并且技术栈不是一路的。我们的场景是需要采集各个业务部门服务器上面的各个业务系统,所以尽量不要影响到服务器的性能,以侵入性最低的方式进行采集,不做其他多余操作。因而,在前端日志采集这块,对比其他Logstash、Flume等采集工具之后,决定采用轻量的Filebeat作为日志采集工具,Filebeat采用go开发,运行不需要额外部署环境,相比Flume依赖jdk轻量了不少,且占用内存低。
采集链路如下所示:Filebeat日志采集、处理转换之后,推送到kafka,采用Clickhouse的kafka引擎进行消费存储。因而,我暂且称之为KFC????组合。
Filebeat部署
采集目标环境:
系统:Window Server 2008 R2 Enterprise
日志类别:IIS日志、业务系统日志
日志路径:D:/IIS/www.A.com/logs/*.txt 、
D:/IIS/www.B.com/logs/*.txt、
D:/IIS/www.C.com/logs/*.txt
Filebeat:7.12.1
因为采集的是windows操作系统,建议下载Filebeat压缩包,并以windows服务的方式运行,使用安装包msi安装不方便调试,需要频繁的卸载、安装操作,下载之后解压出来进行对配置文件 filebeat.yml 进行配置。
业务系统日志格式示例:
2021-04-06 11:21:17,940 [39680] DEBUG Zc - time:0ms update XXX set ModifyTime=GETDATE(), [State] = 190, [FuZeRen] = '张三' where [ID] = '90aa9a69-7a33-420e-808c-624693c65aef' and [CompanyID] = '9e52867e-2035-4148-b09e-55a90b3020d5'
2021-04-06 11:21:21,612 [22128] DEBUG Service ModelBase - time:0ms (/api/XXX/XXX/XXX?InfoID=6d43b831-6169-46d2-9518-f7c9ed6fe39c&ValidateStatus=1)更新材料状态
2021-04-06 11:21:21,612 [22128] DEBUG Zc - time:0ms select ID from XXX where InfoRelationID='6d43b831-6169-46d2-9518-f7c9ed6fe39c'
2021-04-06 11:21:21,612 [22128] DEBUG Zc - time:0ms insert into XXXX(ValidateDate ,[ID],[ValidateState],[ValidateUser],[ValidateUserID],[ValidateUnit],[ValidateUnitID],[ValidateUnitType],[InfoRelationID]) values( GETDATE(),'c77cf4ab-71b5-46c7-b91b-2829d73aa700',1,'XXXX','0387f889-e1d4-48aa-b275-2241da1d2c9e','XXXXX有限公司','2f2a94c8-c23c-4e8a-98b3-c32a9b0487f7',0,'6d43b831-6119-46d2-9518-f7c9ed6fe39c')
2021-04-06 03:25:22,237 [46840] ERROR ASP.global_asax - time:0ms 客户端信息:Ip:116.238.55.21, 173.131.245.61 浏览器:Chrome 版本:68 操作系统:WinNT服务端错误信息:
页面:http://www.A.com:803/dbapp_53475dbapp_e524534.php
错误源:System.Web.Mvc
堆栈跟踪:at System.Web.Mvc.DefaultControllerFactory.GetControllerInstance(RequestContext requestContext, Type controllerType)
at System.Web.Mvc.DefaultControllerFactory.CreateController(RequestContext requestContext, String controllerName)
at System.Web.Mvc.MvcHandler.ProcessRequestInit(HttpContextBase httpContext, IController& controller, IControllerFactory& factory)
at System.Web.Mvc.MvcHandler.BeginProcessRequest(HttpContextBase httpContext, AsyncCallback callback, Object state)
at System.Web.HttpApplication.CallHandlerExecutionStep.System.Web.HttpApplication.IExecutionStep.Execute()
at System.Web.HttpApplication.ExecuteStep(IExecutionStep step, Boolean& completedSynchronously)
FileBeat配置:
max_procs: 2
queue:mem:events: 2048flush.min_events: 2048
# ============================== Filebeat inputs ===============================filebeat.inputs:# 管理系统
- type: logenabled: trueencoding: GB2312paths:- D:/IIS/www.A.com/logs/*.txtmultiline.pattern: '^\d{4}-\d{1,2}-\d{1,2}'multiline.negate: truemultiline.match: afterfields:topic: 'dlbZcZGBSyslogs'fields_under_root: true# 单位系统
- type: logenabled: trueencoding: GB2312paths:- D:/IIS/www.B.com/logs/*.txt### Multiline optionsmultiline.pattern: '^\d{4}-\d{1,2}-\d{1,2}'multiline.negate: truemultiline.match: afterfields:topic: 'dlbZcDWSyslogs'fields_under_root: true# 个人系统
- type: logenabled: trueencoding: GB2312paths:- D:/IIS/www.C.com/logs/*.txt### Multiline optionsmultiline.pattern: '^\d{4}-\d{1,2}-\d{1,2}'multiline.negate: truemultiline.match: afterfields:topic: 'dlbZcMySyslogs'fields_under_root: true# 调试输出
#output.console:
# pretty: true
#output.file:
# path: "D:/bigData"
# filename: filebeat.log# -------------------------------- Kafka Output --------------------------------
output.kafka:# Boolean flag to enable or disable the output module.enabled: truehosts: ["192.168.1.10:9092"]# The Kafka topic used for produced events. The setting can be a format string# using any event field. To set the topic from document type use `%{[type]}`.topic: '%{[topic]}'# Authentication details. Password is required if username is set.#username: ''#password: ''# The number of concurrent load-balanced Kafka output workers.worker: 2max_message_bytes: 10000000# ================================= Processors =================================
processors:- add_host_metadata:when.not.contains.tags: forwarded- add_cloud_metadata: ~- add_docker_metadata: ~- add_kubernetes_metadata: ~- script:lang: javascriptid: my_filtertag: enablesource: >function process(event) {var str = event.Get("message");var sp = str.split(" "); var log_datetime = sp.slice(0,2).join(" ");var regEx = /^\d{4}-\d{2}-\d{2}$/;var prefix_date = log_datetime.substring(0, 10);if(prefix_date.match(regEx) != null){event.Put("server","221");log_datetime = log_datetime.replace(",",".");log_datetime = log_datetime.replace("'","");regEx = /^\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}.\d{3}$/;if(log_datetime.match(regEx) != null){event.Put("log_datetime",log_datetime);event.Put("log_index",sp.slice(2,3).join(" ").replace("[","").replace("]",""));event.Put("log_level",sp.slice(3,4).join(" "));if(str.match(/(?<=time:)\S*(?=ms)/)!=null){var spTime= str.split("time:");var spPre = spTime[0].split(" ");var spNext = spTime[1].split(" "); event.Put("log_class",spPre.slice(4).join(" ")); var log_execTime= spNext.slice(0,1).join(" ").replace("ms","");regEx = /^(\-|\+)?\d+(\.\d+)?$/;if(regEx.test(log_execTime)){event.Put("log_execTime",log_execTime);}else{event.Put("log_execTime","-1");}event.Put("log_message",spNext.slice(1).join(" "));}else{event.Put("log_class",sp.slice(4,5).join(" "));event.Put("log_execTime","-1");event.Put("log_message",sp.slice(6).join(" "));}return;}}event.Cancel();}- drop_fields:fields: ["@timestamp", "message", "host", "ecs", "agent", "@metadata", "log", "input"]
以上的配置说明:
max_procs:设置可以同时执行的最大CPU数;
queue :内部队列信息;
Filebeat inputs:日志数据源采集的入口;
其他字段如下说明:
#日志类型
- type: log
#开启enabled: true
#编码格式,有中文必须设置encoding: GB2312
#路径paths:- D:/IIS/www.A.com/logs/*.txt
#多行匹配前缀multiline.pattern: '^\d{4}-\d{1,2}-\d{1,2}'
#开启多行匹配multiline.negate: true
#开启多行之后,匹配是合并到上一条信息multiline.match: after
#增加一个字段,用于kafka的topic的识别fields:topic: 'dlbZcZGBSyslogs'
# 字段增加在输出json的根目录下fields_under_root: true//https://www.cnblogs.com/EminemJK/p/15165961.html
Kafka Output:kafka的配置信息,主要是 topic: '%{[topic]}' 的设置,因为这里采集多个数据源,对于不同的topic,在数据源输入的时候,已经设置好字段如 topic: 'dlbZcZGBSyslogs' ,所以此处使用占位符灵活设置;
Processors:配置处理器,即对采集的日志信息进行处理,处理是按行处理,当字符串处理即可,可以使用js语法进行对字符串进行处理;Filebeat的处理器可以多种多样,具体可以看文档。
另外,在调试的时候,可以采用文件输出或Console输出来观察处理后输出的数据格式,再进行微调:
output.file:path: "D:/bigData"filename: filebeat.log
IIS的日志也差不多,只是微调处理逻辑就可以了,一通百通。
其他配置可以参考官网文档:
https://www.elastic.co/guide/en/beats/filebeat/current/index.html
Kafka配置
Kafka没有特别的处理,在这里只是进行消息的接收,新建好主题就可以。
//个人系统
bin/kafka-topics.sh --create --zookeeper 192.168.1.10:2181 --replication-factor 1 --partitions 3 --topic dlbZcMySyslogs
//单位系统
bin/kafka-topics.sh --create --zookeeper 192.168.1.10:2181 --replication-factor 1 --partitions 3 --topic dlbZcDWSyslogs
//管理系统
bin/kafka-topics.sh --create --zookeeper 192.168.1.10:2181 --replication-factor 1 --partitions 3 --topic dlbZcZGBSyslogs
partitions 分区数的大小,取决设置了多少个消费者,这里我们有三台服务器做了Clickhouse的集群作为消费者,所以分区数设置为3,一般情况,消费者总数不应该大于分区数,每个分区只能分配一个消费者。
Clickhouse配置
Clickhouse三个分片的集群,如果你是单机的,只需要把语法相应的修改一下即可。
在每台服务器上创建kafka引擎表:
CREATE TABLE kafka_dlb_ZC_My_syslogs (log_datetime DateTime64,log_index String,log_level String,log_class String,log_message String,log_execTime Float32,server String) ENGINE = KafkaSETTINGS kafka_broker_list = '192.168.1.10:9092',kafka_topic_list = 'dlbZcMySyslogs',kafka_group_name = 'dlbZcMySyslogs_sys',kafka_format = 'JSONEachRow',kafka_num_consumers = 1;
创建实体表:
CREATE TABLE dlb_ZC_My_syslogs on cluster cluster_3s_1r(log_datetime DateTime64,log_index String,log_level String,log_class String,log_message String,log_execTime Float32,server String) ENGINE = ReplicatedMergeTree('/clickhouse/tables/{layer}-{shard}/dlb_ZC_My_syslogs', '{replica}')ORDER BY toDate(log_datetime)PARTITION BY toYYYYMM(log_datetime);//https://www.cnblogs.com/EminemJK/p/15165961.html
实体表是使用集群来创建的,如果是单机请删除
on cluster cluster_3s_1r ,并修改表引擎即可。如果已经开启了zookeeper且开启复制表,在任一一台服务器运行运行一次即可。
在每台服务器上创建物化视图:
CREATE MATERIALIZED VIEW viem_dlb_ZC_My_syslogs_consumer TO dlb_ZC_My_syslogsAS SELECT *FROM kafka_dlb_ZC_My_syslogs;
创建分布式视图(可选,单机请忽略):
CREATE TABLE Dis_dlb_ZC_My_syslogs ON CLUSTER cluster_3s_1rAS LogsDataBase.dlb_ZC_My_syslogsENGINE = Distributed(cluster_3s_1r, 'LogsDataBase', 'dlb_ZC_My_syslogs',rand());
分布式表将聚合集群中每个分片的表信息,进行执行一次。
运行
顺便提供一个快速运行Filebeat和卸载的bat脚本:
运行服务:
//windows server2008以上版本的服务器
cd %~dp0
.\install-service-filebeat.ps1
pause//windows server 2008 和以下的服务器cd %~dp0PowerShell.exe -ExecutionPolicy RemoteSigned -File .\install-service-filebeat.ps1pause
卸载服务:
//windows server2008以上版本的服务器
cd %~dp0
.\uninstall-service-filebeat.ps1
pause//windows server2008和以下版本的服务器
cd %~dp0
PowerShell.exe -ExecutionPolicy RemoteSigned -File .\uninstall-service-filebeat.ps1
pause
运行之后,在任务管理器中,将服务设置为运行即可。
查看分布式数据:
数据采集完毕,完美。
既然数据已经有了,数据可视化方面可以采用多种方式了,以Grafana为例:
最后
下班,周末愉快。