文章目录
- 模拟测试
- 测试
- 返回结果
- 实际应用
- 创建Pipeline
- 查看创建Pipeline
- 新增数据测试
- 查看新增数据
- 创建索引时直接设置Pipeline
模拟测试
测试
POST _ingest/pipeline/_simulate
{"pipeline": {"processors": [{"set": {"field": "timestamp","value": "{{_ingest.timestamp}}"}},{"script": {"lang": "painless","source": """ ZonedDateTime zdt = ZonedDateTime.parse(ctx.timestamp); DateTimeFormatter dtf = DateTimeFormatter.ofPattern( "yyyy-MM-dd HH:mm:ss"); String datetime = zdt.format(dtf); ctx.newdate = datetime; ctx.newdate2 = System.currentTimeMillis()/1000; ctx.newdate3 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());"""}}]},"docs": [{"_source": {"message": "测试"}}]
}
注意:这里可能有时区问题,慢8个小时,可临时使用 ctx.date_zsh = new Date(System.currentTimeMillis()+1000l*60*60*8); 来处理
我们使用_ingest.timestamp 与painless 多种方式设置了数据最新更新时间
返回结果
newdate2 为数据更新时间秒,newdate为格式转换后的数据,timestamp 为 _ingest.timestamp 获取到的时间
{"docs" : [{"doc" : {"_index" : "_index","_type" : "_doc","_id" : "_id","_source" : {"newdate2" : 1624848304,"message" : "测试","newdate" : "2021-06-28 02:45:04","timestamp" : "2021-06-28T02:45:04.759053131Z"},"_ingest" : {"timestamp" : "2021-06-28T02:45:04.759053131Z"}}}]
}
实际应用
创建Pipeline
PUT _ingest/pipeline/add_timestamp
{"processors": [{"set": {"field": "timestamp","value": "{{_ingest.timestamp}}"}},{"script": {"lang": "painless","source": """ ZonedDateTime zdt = ZonedDateTime.parse(ctx.timestamp); DateTimeFormatter dtf = DateTimeFormatter.ofPattern( "yyyy-MM-dd HH:mm:ss"); String datetime = zdt.format(dtf); ctx.newdate = datetime; ctx.newdate2 = System.currentTimeMillis()/1000; """}}]}
查看创建Pipeline
GET _ingest/pipeline/add_timestamp{"add_timestamp" : {"processors" : [{"set" : {"field" : "timestamp","value" : "{{_ingest.timestamp}}"}},{"script" : {"lang" : "painless","source" : """ ZonedDateTime zdt = ZonedDateTime.parse(ctx.timestamp); DateTimeFormatter dtf = DateTimeFormatter.ofPattern( "yyyy-MM-dd HH:mm:ss"); String datetime = zdt.format(dtf); ctx.newdate = datetime; ctx.newdate2 = System.currentTimeMillis()/1000; """}}]}
}
新增数据测试
PUT test_index_20210628/_doc/1?pipeline=add_timestamp
{"test":"测试数据"
}
查看新增数据
GET test_index_20210628/_doc/1 {"_index" : "test_index_20210628","_type" : "_doc","_id" : "1","_version" : 1,"_seq_no" : 0,"_primary_term" : 1,"found" : true,"_source" : {"newdate2" : 1624849340,"test" : "测试数据","newdate" : "2021-06-28 03:02:20","timestamp" : "2021-06-28T03:02:20.252887295Z"}
}
创建索引时直接设置Pipeline
我们也可以在创建索引时设置Pipeline,这时就不需要每次添加数据时指定Pipeline
# 创建索引指定pipeline
PUT test_index_20210628_02
{"settings": {"default_pipeline": "add_timestamp"}
}# 添加测试数据
PUT test_index_20210628_02/_doc/1
{"test":"测试数据"
}# 获取数据
GET test_index_20210628_02/_doc/1# 返回结果
{"_index" : "test_index_20210628_02","_type" : "_doc","_id" : "1","_version" : 1,"_seq_no" : 0,"_primary_term" : 1,"found" : true,"_source" : {"newdate2" : 1624849478,"test" : "测试数据","newdate" : "2021-06-28 03:04:38","timestamp" : "2021-06-28T03:04:38.940542643Z"}
}
个人公众号(大数据学习交流): hadoopwiki