flink输出至es,连接es7时使用的sink类是Elasticsearch7SinkBuilder,与es6链接方式不同,官网地址:fink连接es官方地址。
flink连接es7官方示例
依赖jar:
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-elasticsearch7</artifactId><version>3.0.1-1.16</version>
</dependency>
官方连接es7代码
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.connector.elasticsearch.sink.Elasticsearch7SinkBuilder;
import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.http.HttpHost;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;import java.util.HashMap;
import java.util.Map;DataStream<String> input = ...;input.sinkTo(new Elasticsearch7SinkBuilder<String>().setBulkFlushMaxActions(1) // Instructs the sink to emit after every element, otherwise they would be buffered.setHosts(new HttpHost("127.0.0.1", 9200, "http")).setEmitter((element, context, indexer) ->indexer.add(createIndexRequest(element))).build());private static IndexRequest createIndexRequest(String element) {Map<String, Object> json = new HashMap<>();json.put("data", element);return Requests.indexRequest().index("my-index").id(element).source(json);
}
本地运行报错
java.lang.IllegalStateException: The elasticsearch emitter must be serializable.
错误提示很明确:emitter没有序列化,点击查看ElasticsearchEmitter的源码其实最终是有继承序列化接口的,那就猜测是使用lambda方式不能自己实现序列化,那就自己写个实现类ElasticsearchEmitter接口然后显示序列化一次。
修改后代码
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.connector.elasticsearch.sink.Elasticsearch7SinkBuilder;
import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.http.HttpHost;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;import java.util.HashMap;
import java.util.Map;DataStream<String> input = ...;input.sinkTo(new Elasticsearch7SinkBuilder<String>().setBulkFlushMaxActions(1) // Instructs the sink to emit after every element, otherwise they would be buffered.setHosts(new HttpHost("127.0.0.1", 9200, "http")).setEmitter((element, context, indexer) ->indexer.add(createIndexRequest(element))).build());private static IndexRequest createIndexRequest(String element) {Map<String, Object> json = new HashMap<>();json.put("data", element);return Requests.indexRequest().index("my-index").id(element).source(json);
}//自己新增类继承ElasticsearchEmitter,实现序列化接口static class MyElasticsearchEmitter implements ElasticsearchEmitter<String>, Serializable {@Overridepublic void emit(String element, SinkWriter.Context context, RequestIndexer indexer) {indexer.add(createIndexRequest(element));}}
启动后验证成功。
flink连接es7抽出完成工具类
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.connector.elasticsearch.sink.*;
import org.apache.http.HttpHost;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;import java.io.Serializable;@Slf4j
public class Es7SinkFactory {public static Sink createSink(String host, Integer port, String schema, String user, String pwd, String index) {try {ElasticsearchSink<String> elasticsearchSink = new Elasticsearch7SinkBuilder<String>().setHosts(new HttpHost(host, port, schema)).setConnectionUsername(user).setConnectionPassword(pwd).setEmitter(new MyElasticsearchEmitter(index)).setBulkFlushMaxSizeMb(5)//最大缓冲大小.setBulkFlushInterval(5000)//刷新间隔.setBulkFlushMaxActions(1000)//批量插入最大条数.setBulkFlushBackoffStrategy(FlushBackoffType.CONSTANT, 5, 100).build();return elasticsearchSink;} catch (Exception e) {log.error("Es7SinkFactory.createSink error e=", e);}return null;}private static IndexRequest createIndexRequest(String element, String index) {return Requests.indexRequest().index(index).source(JSON.parseObject(element));}//匿名内部类/lamda表达式未集成序列化接口,需要实现序列化接口static class MyElasticsearchEmitter implements ElasticsearchEmitter<String>, Serializable {private String index;MyElasticsearchEmitter(String index) {this.index = index;}@Overridepublic void emit(String element, SinkWriter.Context context, RequestIndexer indexer) {indexer.add(createIndexRequest(element, index));}}}