【1】引入pom.xml
依赖
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-elasticsearch6_2.12</artifactId><version>1.10.0</version>
</dependency>
【2】ES6 Scala
代码,自动导入的scala
包需要修改为scala._
否则会出现错误。
package com.zzx.flinkimport java.utilimport org.apache.flink.api.common.functions.RuntimeContext
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkFunction, RequestIndexer}
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink
import org.apache.http.HttpHost
import org.elasticsearch.client.Requestsobject EsSinkTest {def main(args: Array[String]): Unit = {// 创建一个流处理执行环境val env = StreamExecutionEnvironment.getExecutionEnvironment//从文件中读取数据并转换为 类val inputStreamFromFile: DataStream[String] = env.readTextFile("E:\\Project\\flink\\src\\main\\resources\\wordcount.txt")//转换val dataStream: DataStream[SensorReading] = inputStreamFromFile.map( data => {var dataArray = data.split(",")SensorReading(dataArray(0),dataArray(1).toLong,dataArray(2).toDouble)})//定义一个 HttpHostsval httpHost = new util.ArrayList[HttpHost]()//默认 9200 我的修改为了 9201httpHost.add(new HttpHost("192.168.1.12",9200,"http"))httpHost.add(new HttpHost("127.0.0.1",9200,"http"))//定义一个 ElasticSearchFuntion 操作 es的functionval esSinkFunc = new ElasticsearchSinkFunction[SensorReading] {//element 每一条数据 通过 index 发送override def process(element: SensorReading, runtimeContext: RuntimeContext, index: RequestIndexer): Unit = {//包装写入 es 的数据val dataSource = new util.HashMap[String,String]()dataSource.put("sensor_id",element.id)dataSource.put("temp",element.temperature.toString)dataSource.put("ts",element.timestamp.toString)//indexval indexRequest = Requests.indexRequest().index("sensor_temp").`type`("readingdata").source(dataSource)index.add(indexRequest)println("saved successfully " + element.toString)}}//输出值 esdataStream.addSink(new ElasticsearchSink.Builder[SensorReading](httpHost,esSinkFunc).build())env.execute("es")}
}
【3】ES6
输出展示