背景:想要通过logstash将以txt结尾的文件数据导入到es中,这种TXT文件每天都会生成,然后将增量的TXT文件加载到es中,之前没用过,做个记录
ES中的表结构如下
{"user_name" : {"aliases" : { },"mappings" : {"properties" : {"@timestamp" : {"type" : "date"},"@version" : {"type" : "text","fields" : {"keyword" : {"type" : "keyword","ignore_above" : 256}}},"age" : {"type" : "keyword"},"height" : {"type" : "text","index" : false},"hobby" : {"type" : "text","index" : false},"host" : {"type" : "text","fields" : {"keyword" : {"type" : "keyword","ignore_above" : 256}}},"message" : {"type" : "text","fields" : {"keyword" : {"type" : "keyword","ignore_above" : 256}}},"name" : {"type" : "text"},"path" : {"type" : "text","fields" : {"keyword" : {"type" : "keyword","ignore_above" : 256}}},"tags" : {"type" : "text","fields" : {"keyword" : {"type" : "keyword","ignore_above" : 256}}},"weight" : {"type" : "text","index" : false}}},"settings" : {"index" : {"routing" : {"allocation" : {"include" : {"_tier_preference" : "data_content"}}},"number_of_shards" : "1","provided_name" : "user_name","creation_date" : "1706495322364","number_of_replicas" : "1","uuid" : "kUQ4w30sSN-clEGZg4YaQg","version" : {"created" : "7100299"}}}}
}
原始文件的格式如下
[root@hcss-ecs-04be data_to_es]# cat data.txt
{"name":"wzx","age":"37","weight":"70kg","height":"175cm","hobby":"basketball"}
{"name":"lhc","age":"50","weight":"80kg","height":"180cm","hobby":"dugujiujian"}
{"name":"rwx","age":"60","weight":"85kg","height":"165cm","hobby":"xixingdafa"}
logstash.conf的配置文件如下
input {file {path => "/usr/local/soft/data_to_es/data.txt"start_position => "beginning"sincedb_path => "/dev/null"codec => json {charset => "UTF-8"}}
}filter {json {source => "message"}
}output {elasticsearch {hosts => ["localhost:9200"]user => "elastic"password => "es123!@#"index => "user_name"}
}
执行logstash,结果显示如下,可以看到数据不是按es创建的索引结构加载进来的,而是都加载到了message 这个字段
"hits" : [{"_index" : "user_name","_type" : "_doc","_id" : "t71VU40BV6_zhd5-n9uY","_score" : 1.0,"_source" : {"message" : """{"name":"wzx","age":37","weight":"70kg","height":"175cm","hobby":"basketball"}""","@timestamp" : "2024-01-29T03:48:56.645Z","tags" : ["_jsonparsefailure"],"path" : "/usr/local/soft/data_to_es/data.txt","@version" : "1","host" : "hcss-ecs-04be"}},
修改logstash.conf的配置文件如下
主要是加了JSON格式的类型type=》json
input {file {path => "/usr/local/soft/data_to_es/data.txt"start_position => "beginning"sincedb_path => "/dev/null"type => "json"codec => json {charset => "UTF-8"}}
}filter {json {source => "message"}
}output {elasticsearch {hosts => ["localhost:9200"]user => "elastic"password => "es123!@#"index => "user_name"}
}
再次去看es中添加的数据,已经没问题了
{"_index" : "user_name","_type" : "_doc","_id" : "u71ZU40BV6_zhd5-GdvK","_score" : 1.0,"_source" : {"hobby" : "xixingdafa","@version" : "1","@timestamp" : "2024-01-29T03:52:44.571Z","path" : "/usr/local/soft/data_to_es/data.txt","weight" : "85kg","height" : "165cm","name" : "rwx","host" : "hcss-ecs-04be","age" : "60"}},
数据看着是进来了,但是有些字段,我们是用不到的,例如@version、path、host,怎么去掉呢?在conf中配置如下
filter {json {source => "message"}mutate{#删除无效的字段remove_field => ["@version","message","host","path"]}
}
数据进来了,字段也格式化好了,现在的问题是,怎么实时的监听增量的文件
在conf中配置如下,这样就实现自动更新数据到es集群上了
#可选项,logstash多久检查一下path下有新文件,默认15s
discover_interval => 30
#可选项,logstash多久检查一次被监听文件的变化,默认1s;
stat_interval => 5
input {file {path => "/usr/local/soft/data_to_es/data*.txt"start_position => "beginning"sincedb_path => "/dev/null"type => "json"codec => json {charset => "UTF-8"}#可选项,logstash多久检查一下path下有新文件,默认15sdiscover_interval => 30#可选项,logstash多久检查一次被监听文件的变化,默认1s;stat_interval => 5}
}