如何使用Logstash搜集日志传输到es集群并使用kibana检测

引言:上一期我们进行了对Elasticsearch和kibana的部署,今天我们来解决如何使用Logstash搜集日志传输到es集群并使用kibana检测

目录

Logstash部署

1.安装配置Logstash

(1)安装

(2)测试文件

(3)配置

grok

1、手动输入日志数据

数据链路

2、手动输入数据,并存储到 es

数据链路

3、自定义日志1

数据链路

5、nginx access 日志

数据链路

6、nginx error日志

数据链路

7、filebate 传输给 logstash

filebeat 日志模板

Logstash部署

  • 服务器

安装软件主机名IP地址系统版本配置
LogstashElk10.12.153.71centos7.5.18042核4G
  • 软件版本:logstash-7.13.2.tar.gz

1.安装配置Logstash

Logstash运行同样依赖jdk,本次为节省资源,故将Logstash安装在了10.12.153.71节点。

(1)安装
tar zxf /usr/local/package/logstash-7.13.2.tar.gz -C /usr/local/
(2)测试文件

标准输入=>标准输出

1、启动logstash

2、logstash启动后,直接进行数据输入

3、logstash处理后,直接进行返回

input {stdin {}
}
output {stdout {codec => rubydebug}
}

标准输入=>标准输出及es集群

1、启动logstash

2、启动后直接在终端输入数据

3、数据会由logstash处理后返回并存储到es集群中

input {stdin {}
}
output {stdout {codec => rubydebug}elasticsearch {hosts => ["10.12.153.71","10.12.153.72","10.12.153.133"]index => 'logstash-debug-%{+YYYY-MM-dd}'}
}

端口输入=>字段匹配=>标准输出及es集群

1、由tcp 的8888端口将日志发送到logstash

2、数据被grok进行正则匹配处理

3、处理后,数据将被打印到终端并存储到es

input {tcp {port => 8888}
}
filter {grok {match => {"message" => "%{DATA:key} %{NUMBER:value:int}"} }
}
output {stdout {codec => rubydebug}elasticsearch {hosts => ["10.12.153.71","10.12.153.72","10.12.153.133"]index => 'logstash-debug-%{+YYYY-MM-dd}'}
}
# yum install -y nc
# free -m |awk 'NF==2{print $1,$3}' |nc logstash_ip 8888

文件输入=>字段匹配及修改时间格式修改=>es集群

1、直接将本地的日志数据拉去到logstash当中

2、将日志进行处理后存储到es

input {file {type => "nginx-log"path => "/var/log/nginx/error.log"start_position => "beginning" # 此参数表示在第一次读取日志时从头读取# sincedb_path => "自定义位置"  # 此参数记录了读取日志的位置,默认在 data/plugins/inputs/file/.sincedb*}
}
filter {grok {match => { "message" => '%{DATESTAMP:date} [%{WORD:level}] %{DATA:msg} client: %{IPV4:cip},%{DATA}"%{DATA:url}"%{DATA}"%{IPV4:host}"'}    }    date {match => [ "timestamp" , "dd/MMM/YYYY:HH:mm:ss Z" ]    }    
}
​
output {if [type] == "nginx-log" {elasticsearch {hosts => ["10.12.153.71","10.12.153.72","10.12.153.133"]index => 'logstash-audit_log-%{+YYYY-MM-dd}'}}}filebeat => 字段匹配 => 标准输出及esinput {beats {port => 5000}
}
filter {grok {match => {"message" => "%{IPV4:cip}"}   }
}
output {elasticsearch {hosts => ["192.168.249.139:9200","192.168.249.149:9200","192.168.249.159:9200"]index => 'test-%{+YYYY-MM-dd}'}stdout { codec => rubydebug }
}
(3)配置

创建目录,我们将所有input、filter、output配置文件全部放到该目录中。

mkdir -p /usr/local/logstash-7.13.2/etc/conf.d
vim /usr/local/logstash-7.13.2/etc/conf.d/input.conf
input { 
kafka {type => "audit_log"codec => "json"topics => "nginx"decorate_events => truebootstrap_servers => "10.12.153.71","10.12.153.72","10.12.153.133"}
}
​
vim /usr/local/logstash-7.13.2/etc/conf.d/filter.conf
filter {json { # 如果日志原格式是json的,需要用json插件处理source => "message"target => "nginx" # 组名}
}
​
vim /usr/local/logstash-7.13.2/etc/conf.d/output.conf
output {if [type] == "audit_log" {elasticsearch {hosts => ["10.12.153.71","10.12.153.72","10.12.153.133"]index => 'logstash-audit_log-%{+YYYY-MM-dd}'}}}

(3)启动

cd /usr/local/logstash-7.13.2
nohup bin/logstash -f etc/conf.d/  --config.reload.automatic &

grok

1、手动输入日志数据

一般为debug 方式,检测 ELK 集群是否健康,这种方法在 logstash 启动后可以直接手动数据数据,并将格式化后的数据打印出来。

数据链路

1、启动logstash

2、logstash启动后,直接进行数据输入

3、logstash处理后,直接进行返回

input {stdin {}
}
output {stdout {codec => rubydebug}
}

2、手动输入数据,并存储到 es

数据链路

1、启动logstash

2、启动后直接在终端输入数据

3、数据会由logstash处理后返回并存储到es集群中

input {stdin {}
}
output {stdout {codec => rubydebug}elasticsearch {hosts => ["10.12.153.71","10.12.153.72","10.12.153.133"]index => 'logstash-debug-%{+YYYY-MM-dd}'}
}

3、自定义日志1

数据链路

1、由tcp 的8888端口将日志发送到logstash

2、数据被grok进行正则匹配处理

3、处理后,数据将被打印到终端并存储到es

input {tcp {port => 8888}
}
filter {grok {match => {"message" => "%{DATA:key} %{NUMBER:value:int}"}   }
}
output {stdout {codec => rubydebug}elasticsearch {hosts => [""10.12.153.71","10.12.153.72","10.12.153.133""]index => 'logstash-debug-%{+YYYY-MM-dd}'}
}
# yum install -y nc
# free -m |awk 'NF==2{print $1,$3}' |nc logstash_ip 8888
​
4、自定义日志2
数据链路
1、由tcp 的8888端口将日志发送到logstash2、数据被grok进行正则匹配处理3、处理后,数据将被打印到终端input {tcp {port => 8888}
}
filter {grok {match => {"message" => "%{WORD:username}\:%{WORD:passwd}\:%{INT:uid}\:%{INT:gid}\:%{DATA:describe}\:%{DATA:home}\:%{GREEDYDATA:shell}"}}
}
output {stdout {codec => rubydebug}
}
​
# cat /etc/passwd | nc logstash_ip 8888

5、nginx access 日志

数据链路

1、在filebeat配置文件中,指定kafka集群ip [output.kafka] 的指定topic当中

2、在logstash配置文件中,input区域内指定kafka接口,并指定集群ip和相应topic

3、logstash 配置filter 对数据进行清洗

4、将数据通过 output 存储到es指定index当中

5、kibana 添加es 索引,展示数据

input {kafka {type => "audit_log"codec => "json"topics => "haha"#decorate_events => true#enable_auto_commit => trueauto_offset_reset => "earliest"bootstrap_servers => ["192.168.52.129:9092,192.168.52.130:9092,192.168.52.131:9092"]}
}
​
filter {grok {match => { "message" => "%{COMBINEDAPACHELOG} %{QS:x_forwarded_for}"}    }    date {match => [ "timestamp" , "dd/MMM/YYYY:HH:mm:ss Z" ]    }    geoip {source => "lan_ip"    }
}
​
output {if [type] == "audit_log" {stdout {codec => rubydebug}elasticsearch {hosts => ["192.168.52.129","192.168.52.130","192.168.52.131"]index => 'tt-%{+YYYY-MM-dd}'}}}
​#filebeat 配置filebeat.prospectors:
- input_type: logpaths:-  /opt/logs/server/nginx.logjson.keys_under_root: truejson.add_error_key: truejson.message_key: log
​
output.kafka:   hosts: [""10.12.153.71","10.12.153.72","10.12.153.133""]topic: 'nginx'
​# nginx 配置log_format main        '{"user_ip":"$http_x_real_ip","lan_ip":"$remote_addr","log_time":"$time_iso8601","user_req":"$request","http_code":"$status","body_bytes_sents":"$body_bytes_sent","req_time":"$request_time","user_ua":"$http_user_agent"}';access_log  /var/log/nginx/access.log  main;
​

6、nginx error日志

数据链路

1、直接将本地的日志数据拉去到logstash当中

2、将日志进行处理后存储到es

input {file {type => "nginx-log"path => "/var/log/nginx/error.log"start_position => "beginning"}
}
filter {grok {match => { "message" => '%{DATESTAMP:date} [%{WORD:level}] %{DATA:msg} client: %{IPV4:cip},%{DATA}"%{DATA:url}"%{DATA}"%{IPV4:host}"'}    }    date {match => [ "timestamp" , "dd/MMM/YYYY:HH:mm:ss Z" ]    }    
}
​
output {if [type] == "nginx-log" {elasticsearch {hosts => [""10.12.153.71:9200","10.12.153.72:9200","10.12.153.133:9200""]index => 'logstash-audit_log-%{+YYYY-MM-dd}'}}}

7、filebate 传输给 logstash

input {beats {port => 5000}
}
filter {grok {match => {"message" => "%{IPV4:cip}"}   }
}
output {elasticsearch {hosts => ["192.168.249.139:9200","192.168.249.149:9200","192.168.249.159:9200"]index => 'test-%{+YYYY-MM-dd}'}stdout { codec => rubydebug }
}
​
filebeat.inputs:
- type: logenabled: truepaths:- /var/log/nginx/access.log
output.logstash:hosts: ["192.168.52.134:5000"]

filebeat 日志模板

filebeat.inputs:
- type: logenabled: truepaths:- /var/log/nginx/access.log
output.kafka:hosts: ["192.168.52.129:9092","192.168.52.130:9092","192.168.52.131:9092"]topic: hahapartition.round_robin:reachable_only: truerequired_acks: 1

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/706816.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

集群分发脚本xsync

集群分发脚本xsync 一、简介二、环境准备三、添加到机器的 hosts 文件四、ping 命令测试五、SSH 配置5.1.本地先生成公钥和私钥5.2.将公钥拷贝到其他机器 六、xsync 脚本编写6.1.安装 rsync6.2.新建 xsync.sh6.3.xsync.sh脚本6.4.赋予脚本执行权限6.5.测试 endl 一、简介 配置…

完全分布式运行模式

完全分布式运行模式 分析:之前已经配置完成 ​ 1)准备3台客户机(关闭防火墙、静态ip、主机名称) ​ 2)安装JDK ​ 3)配置环境变量 ​ 4)安装Hadoop ​ 5)配置环境变量 ​ 6&am…

163邮箱SMTP端口号及服务器地址详细设置?

163邮箱SMTP端口号是什么?163邮件SMTP设置教程? 除了基本的邮箱账号和密码外,还需要了解SMTP服务器地址和端口号,以及相应的设置。这些设置对于确保邮件能够顺利发送至关重要。下面,蜂邮EDM将详细介绍163邮箱SMTP端口…

Ubuntu常用状态命令

目录 一、温度 1,查看CPU温度 2,查看硬盘温度 二、CPU状态 1,显示CPU的详细信息,包括型号、频率、缓存等 2,显示CPU架构、CPU核心数、线程数、频率等信息 三、登录状态 1,查看成功登录的用户 2&am…

2024年腾讯云4核8G12M配置的轻量服务器同时支持多大访问量?

腾讯云4核8G服务器支持多少人在线访问?支持25人同时访问。实际上程序效率不同支持人数在线人数不同,公网带宽也是影响4核8G服务器并发数的一大因素,假设公网带宽太小,流量直接卡在入口,4核8G配置的CPU内存也会造成计算…

第12届生物发酵产品与技术装备展火热登场-通用环境控制技术

参展企业介绍 合肥通用环境控制技术有限责任公司隶属于中国机械工业集团有限公司(世界500强排名279),是中央直接管理的国有重要骨干上市央企(国机通用 600444),是国家级高新技术企业、国家火炬计划重点高新…

关于大语言模型LLM相关的数据集、预训练模型、提示词、微调的文心一言问答

文章目录 关于大语言模型LLM相关的数据集、预训练模型、提示词、微调的文心一言问答先总结一下Q:LLM模型预训练前与提示词关系,LLM模型预训练后与提示词关系Q:预训练用的数据集与提示词有什么异同Q:为什么我看到的数据集结构和提示…

区块链智能合约开发

一.区块链的回顾 1.区块链 区块链实质上是一个去中心化、分布式的可进行交易的数据库或账本 特征: 去中心化:简单来说,在网络上一个或多个服务器瘫痪的情况下,应用或服务仍然能够持续地运行,这就是去中心化。服务和应用部署在…

HarmonyOS | 状态管理(五) | @Observed装饰器和@ObjectLink装饰器

系列文章目录 1.HarmonyOS | 状态管理(一) | State装饰器 2.HarmonyOS | 状态管理(二) | Prop装饰器 3.HarmonyOS | 状态管理(三) | Link装饰器 4.HarmonyOS | 状态管理(四) | Provide和Consume装饰器 文章目录 系列文章目录前言一、ObjectLink和Observed类装饰器用于哪些场景…

Mendix 10.7 发布- Go Mac It!

在我们上个月发布了硕果累累的 Mendix 10.6 MTS 之后,您是否还没有抚平激动的情绪?好吧,不管您是否已经准备好,本月将带来另一个您想知道的大亮点——Mac版Studio Pro!但这还不是全部。本月,我们还将推出Re…

Kafka安全模式之身份认证

一、简介 Kafka作为一个分布式的发布-订阅消息系统,在日常项目中被频繁使用,通常情况下无论是生产者还是消费者只要订阅Topic后,即可进行消息的发送和接收。而kafka在0.9.0.0版本后添加了身份认证和权限控制两种安全服务,本文主要…

Android和Linux的开发差异

最近开始投入Android的怀抱。说来惭愧,08年就听说这东西,当时也有同事投入去看,因为恶心Java,始终对这玩意无感,没想到现在不会这个嵌入式都快要没法搞了。为了不中年失业,所以只能回过头又来学。 首先还是…

dcat admin 自定义页面

自定义用户详情页 整体分为两部分:用户信息、tab框 用户信息采用自定义页面加载,controller代码如下: protected function detail($id) {return Show::make($id, GameUser::with(finance), function (Show $show) {// 这段就是加载自定义页面…

git 中使用git clean删除未跟踪Untracked的文件

git clean -nf 是 Git 中的一个命令。让我们分解一下这个命令的意思: git clean: 这是一个命令,其功能是用来删除未被跟踪的文件。 -n:这是一个选项,也可以写作 --dry-run。添加这个选项后,命令将显示哪些文件会被删除…

go 的使用总结

go的内存逃逸? go语言在编辑阶段通过逃逸分析把分配在栈上变量 分配到堆上去。 栈内存: 一段连续的内存,便于高效运行指令过程中的临时变量存储。 堆内存: 主要由垃圾回收器 回收没有被引用的指针。 逃逸分析:栈内…

.NET Core Web API注册和发现实例

在.NET Core Web API中,服务注册和发现是实现微服务架构的重要组成部分。通过注册服务实例,客户端能够动态地找到可用的服务端点,从而实现服务的透明调用。在.NET Core中,有多种方式可以实现服务注册和发现,例如使用Co…

frp 内网穿透 linux部署版

frp 内网穿透 linux部署版 前提安装 frp阿里云服务器配置测试服务器配置访问公网 前提 使用 frp,您可以安全、便捷地将内网服务暴露到公网,通过访问公网 IP 直接可以访问到内网的测试环境。准备如下: 公网 IP已部署好的测试服务 IP:端口号阿…

【可实战】被测系统业务架构、系统架构、技术架构、数据流、业务逻辑分析

一、为什么要学习 更深的理解业务逻辑(公司是做什么的?它最重要的商务决策是什么?它里面的数据流是怎么做的?有哪些业务场景?考验你对这家公司、对所负责业务的熟悉程度。公司背后服务器用什么软件搭建的?…

小程序框架(概念、工作原理、发展及应用)

引言 移动应用的普及使得用户对于轻量级、即时可用的应用程序需求越来越迫切。在这个背景下,小程序应运而生,成为一种无需下载安装、即点即用的应用形式,为用户提供了更便捷的体验。小程序的快速发展离不开强大的开发支持,而小程…

Cypher语句查询neo4j数据库教程

文章目录 Cypher介绍执行Cypher语句查询总结 Cypher介绍 NodeMatcher和RelationshipMatcher能够表达的匹配条件相对简单,更加复杂的查询还是需要用Cypher语句来表达。 Py2neo本身支持执行Cypher语句的执行,可以将复杂的查询写成Cypher语句,…