【天衍系列 04】深入理解Flink的ElasticsearchSink组件:实时数据流如何无缝地流向Elasticsearch

文章目录

  • 01 Elasticsearch Sink 基础概念
  • 02 Elasticsearch Sink 工作原理
  • 03 Elasticsearch Sink 核心组件
  • 04 Elasticsearch Sink 配置参数
  • 05 Elasticsearch Sink 依赖管理
  • 06 Elasticsearch Sink 初阶实战
  • 07 Elasticsearch Sink 进阶实战
    • 7.1 包结构 & 项目配置
      • 项目配置application.properties
      • 日志配置log4j2.properties
      • 项目pom.xml文件
    • 7.2 实体类ElasticsearchEntity
    • 7.3 客户端工厂类CustomRestClientFactory
    • 7.4 回调函数类CustomRequestConfigCallback
    • 7.5 客户端配置类CustomHttpClientConfigCallback
    • 7.6 Es操作类CustomElasticsearchSinkFunction
    • 7.7 异常处理类CustomActionRequestFailureHandler
    • 7.8 作业主类ElasticsearchSinkStreamJobAdvancedDemo

01 Elasticsearch Sink 基础概念

Flink的Elasticsearch Sink是用于将Flink数据流(DataStream)中的数据发送到Elasticsearch的组件。它是Flink的一个连接器(Connector),用于实现将实时处理的结果或数据持续地写入Elasticsearch集群中的索引中。

下面是一些关于Flink的Elasticsearch Sink的基础概念:

  1. 数据源(Source):Flink数据流的源头,可以是各种数据源,例如Kafka、文件系统、Socket等。Elasticsearch Sink通常是连接到Flink数据流的末端,用于将最终处理结果或数据写入Elasticsearch。
  2. Elasticsearch集群:一个或多个Elasticsearch节点的集合,用于存储和处理数据。Elasticsearch提供了分布式的数据存储和搜索功能。
  3. 索引(Index):在Elasticsearch中,索引是存储相关数据的地方,类似于关系数据库中的表。每个索引可以包含多个文档(Document),每个文档包含一个或多个字段(Field)。
  4. 文档(Document):在Elasticsearch中,文档是最小的数据单元。它们以JSON格式表示,并存储在索引中。
  5. Elasticsearch Sink:是Flink的一个数据接收器,用于将数据流中的数据发送到Elasticsearch集群中的特定索引。Sink负责将Flink数据流中的事件转换为Elasticsearch要求的格式,并将其发送到指定的索引。
  6. 序列化与映射:在将数据写入Elasticsearch之前,通常需要对数据进行序列化和映射。序列化是将数据从Flink的内部表示转换为Elasticsearch要求的JSON格式。映射则是定义如何将Flink数据流中的字段映射到Elasticsearch文档中的字段。
  7. 并行度控制:Elasticsearch Sink支持并行度控制,可以根据需要调整并发写入Elasticsearch的任务数量。这有助于优化性能并避免对Elasticsearch集群造成过大的负载。

总的来说,Flink的Elasticsearch Sink是一个关键的组件,用于将实时处理的结果或数据可靠地写入Elasticsearch中,从而支持各种实时数据分析和搜索应用。

02 Elasticsearch Sink 工作原理

Elasticsearch Sink 是 Apache Flink 提供的一个连接器,用于将 Flink 数据流中的数据发送到 Elasticsearch 集群中。以下是 Elasticsearch Sink 的工作原理:

  1. 数据流入 Flink 程序: 数据首先从外部数据源(如 Kafka、RabbitMQ、文件系统等)进入到 Flink 程序中。Flink 以流式处理的方式处理数据,这意味着数据会一条一条地进入 Flink 的数据流中。
  2. 数据转换与处理: 一旦数据进入 Flink,您可以对数据进行各种转换和处理。这可能包括数据清洗、转换、聚合、窗口操作等。在您的 Flink 程序中,您可以通过各种 Flink 的算子来实现这些转换和处理。
  3. Elasticsearch Sink 的配置: 当需要将数据写入 Elasticsearch 时,您需要配置 Elasticsearch Sink。这通常包括指定 Elasticsearch 集群的地址、端口、索引名称等信息。您还可以配置其他参数,例如批量写入的大小、超时时间等。
  4. 数据发送到 Elasticsearch: 一旦配置完成,Elasticsearch Sink 会将 Flink 数据流中的数据转换为 JSON 格式,并通过 Elasticsearch 的 REST API 将数据发送到指定的索引中。通常,Elasticsearch Sink 会将数据批量发送到 Elasticsearch,以提高写入的效率和性能。
  5. 序列化与映射: 在发送数据之前,通常需要将 Flink 数据流中的数据序列化为 JSON 格式,并根据 Elasticsearch 索引的映射规则进行字段映射。这确保了发送到 Elasticsearch 的数据与索引的结构一致。
  6. 容错与错误处理: Flink 提供了容错机制来确保数据的可靠性和一致性。如果在数据发送过程中发生错误,例如网络故障或 Elasticsearch 集群不可用,Flink 会自动进行故障恢复,并重新发送丢失的数据,以确保数据不会丢失。
  7. 性能优化: 为了提高性能,Elasticsearch Sink 可以通过调整批量写入的大小、并发度等参数来优化性能。这可以减少与 Elasticsearch 的通信开销,并提高写入的效率。

总的来说,Elasticsearch Sink 通过将 Flink 数据流中的数据转换为 JSON 格式,并利用 Elasticsearch 的 REST API 将数据发送到指定的索引中,实现了将实时流数据写入 Elasticsearch 的功能。

03 Elasticsearch Sink 核心组件

Elasticsearch Sink 在 Apache Flink 中是一个核心组件,它负责将 Flink 数据流中的数据发送到 Elasticsearch。下面是 Elasticsearch Sink 的核心组件:

  1. SinkFunction: SinkFunction 是 Flink 中的一个接口,用于定义将数据发送到外部系统的逻辑。在 Elasticsearch Sink 中,您需要实现 SinkFunction 接口,以将 Flink 数据流中的数据发送到 Elasticsearch。通常,您需要在 SinkFunction 中实现将数据转换为 JSON 格式,并通过 Elasticsearch 的 REST API 将数据发送到指定的索引中。
  2. BulkProcessor: BulkProcessor 是 Elasticsearch Java 客户端提供的一个功能,用于批量写入数据到 Elasticsearch。在 Elasticsearch Sink 中,BulkProcessor 负责将 Flink 数据流中的数据批量发送到 Elasticsearch。您可以通过 BulkProcessor 来配置批量写入的大小、并发度等参数,以优化写入性能。
  3. TransportClient 或 RestHighLevelClient: 在 Elasticsearch Sink 中,您可以使用 Elasticsearch Java 客户端的 TransportClient 或 RestHighLevelClient 来与 Elasticsearch 集群进行通信。这些客户端提供了与 Elasticsearch 集群交互的接口,使您可以发送数据到 Elasticsearch、执行查询、索引管理等操作。
  4. 序列化器(Serializer): 在将数据发送到 Elasticsearch 之前,通常需要将 Flink 数据流中的数据序列化为 JSON 格式。序列化器负责将 Flink 数据流中的数据转换为 Elasticsearch 所需的 JSON 格式。您可以根据具体的数据类型和业务需求来实现自定义的序列化器。
  5. Elasticsearch 连接配置: 在 Elasticsearch Sink 中,您需要配置与 Elasticsearch 集群的连接信息,包括 Elasticsearch 集群的地址、端口、索引名称等。这些配置信息通常在初始化 Elasticsearch Sink 时进行设置,并在发送数据时使用。
  6. 容错与错误处理机制: Elasticsearch Sink 需要具备容错和错误处理机制,以确保数据的可靠性和一致性。如果在数据发送过程中发生错误,例如网络故障或 Elasticsearch 集群不可用,Sink 需要能够进行故障恢复,并重新发送丢失的数据,以确保数据不会丢失。

这些组件共同作用,构成了 Elasticsearch Sink 在 Flink 中的核心功能,使得 Flink 用户可以轻松地将实时流数据发送到 Elasticsearch,并实现各种实时数据分析和搜索应用。

04 Elasticsearch Sink 配置参数

nodes :Elasticsearch 集群的节点地址列表

port :Elasticsearch 集群的端口

Elasticsearch 集群的节点地址列表

scheme : Elasticsearch 集群的通信协议,http或https

type :Elasticsearch 集群的文档类型,es7以后是_doc

index :Elasticsearch 集群的索引名称

bulkFlushMaxActions :内部批量处理器,刷新前最大缓存的操作数

bulkFlushMaxSizeMb :刷新前最大缓存的数据量(以兆字节为单位)

bulkFlushInterval :刷新的时间间隔(不论缓存操作的数量或大小如何)

bulkFlushBackoff :是否启用批量写入的退避策略,当Elasticsearch 写入失败时,可以启用退避策略,以避免频繁的重试。此时,setBulkFlushBackoffDelay 和 setBulkFlushBackoffRetries 参数生效。

bulkFlushBackoffDelay :设置批量写入的退避延迟时间,在发生写入失败后,等待指定的延迟时间后再进行重试

bulkFlushBackoffRetries :设置批量写入的最大重试次数,设置在写入失败后的最大重试次数。超过这个次数后,将不再重试

connectTimeout :设置与 Elasticsearch 集群建立连接的超时时间,单位为毫秒。在指定的时间内无法建立连接将会抛出连接超时异常

socketTimeout :设置与 Elasticsearch 连接的套接字超时时间,单位为毫秒。该参数定义了在建立连接后从服务器读取数据的超时时间。

connectionRequestTimeout :设置连接请求超时时间,单位为毫秒。该参数表示从连接池获取连接的超时时间。如果在指定的时间内无法获得连接,将会抛出连接请求超时异常。

redirectsEnabled :设置是否允许重定向。如果设置为true,则当遇到重定向响应时,客户端将跟随重定向并继续请求;如果设置为false,重定向响应将被视为错误。

maxRedirects :客户端允许的最大重定向次数

authenticationEnabled :启用身份验证功能。通过设置该参数为true,可以提供用户名和密码进行身份验证,以连接到 Elasticsearch 集群。

circularRedirectsAllowed :设置是否允许循环重定向。如果设置为true,则允许在重定向过程中发生循环重定向;如果设置为false,则在检测到循环重定向时,将会抛出异常。

contentCompressionEnabled :设置是否启用内容压缩。如果设置为true,则允许客户端和 Elasticsearch 之间进行内容压缩,以减少数据传输量。

expectContinueEnabled :设置是否启用 “Expect: continue” 机制。当设置为true时,在发送请求之前,客户端会发送一个请求头部,询问服务器是否接受请求的主体部分。如果服务器响应允许继续发送请求主体,则客户端会继续发送请求;如果服务器响应拒绝继续发送请求主体,则客户端会放弃该请求。

normalizeUri :设置是否标准化 URI。如果设置为true,则客户端会尝试标准化请求 URI,以便消除多余和重复的斜杠等

05 Elasticsearch Sink 依赖管理

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.14.4</version>
</dependency>
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_1.12</artifactId><version>1.14.4</version>
</dependency>
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_1.12</artifactId><version>1.14.4</version>
</dependency>
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-elasticsearch7_1.12</artifactId><version>1.14.4</version>
</dependency>

06 Elasticsearch Sink 初阶实战

package com.aurora.demo;import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
import org.apache.http.HttpHost;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;
import org.elasticsearch.common.xcontent.XContentType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.UUID;/*** 描述:Flink集成Elasticsearch Connector连接器快速入门运行demo* 实现实时数据流如何无缝地流向Elasticsearch** @author 浅夏的猫* @version 1.0.0* @date 2024-02-13 22:25:58*/
public class ElasticsearchSinkStreamJobQuickDemo {private static final Logger logger = LoggerFactory.getLogger(ElasticsearchSinkStreamJobQuickDemo.class);public static void main(String[] args) throws Exception {// 创建elasticsearch集群的httpHost连接HttpHost httpHost = new HttpHost("localhost", 9200, "http");List<HttpHost> httpHosts = new ArrayList<>();httpHosts.add(httpHost);// 创建elasticsearchSinkFunction函数对象,专门用于处理数据写入elasticsearchSink算子队列,会自动创建索引ElasticsearchSinkFunction<JSONObject> elasticsearchSinkFunction = new ElasticsearchSinkFunction<JSONObject>() {@Overridepublic void process(JSONObject element, RuntimeContext runtimeContext, RequestIndexer indexer) {String transId = element.getString("transId");String tradeTime = element.getString("tradeTime");String index = "flink_" + tradeTime;logger.info("交易流水={},数据写入索引{}成功", transId, index);IndexRequest indexRequest = Requests.indexRequest().index(index).type("_doc").id(transId).source(element, XContentType.JSON);indexer.add(indexRequest);}};// 构建elasticsearchSink算子BuilderElasticsearchSink.Builder<JSONObject> esSinkBuilder = new ElasticsearchSink.Builder<>(httpHosts, elasticsearchSinkFunction);// 每个请求最多发送的文档数量esSinkBuilder.setBulkFlushMaxActions(1);// 每次发送请求的时间间隔esSinkBuilder.setBulkFlushInterval(1000);//构建elasticsearchSink算子ElasticsearchSink<JSONObject> sink = esSinkBuilder.build();// 自定义数据源,模拟生产环境交易接入,每秒下发一个json格式数据SourceFunction<JSONObject> dataSource = new SourceFunction<JSONObject>() {@Overridepublic void run(SourceContext sourceContext) throws Exception {while (true) {//交易流水号String tradeId = UUID.randomUUID().toString();//交易发生时间戳long timeStamp = System.currentTimeMillis();//交易发生金额long tradeAmount = new Random().nextInt(1000);//交易名称String tradeName = "支付宝转账";JSONObject dataObj = new JSONObject();dataObj.put("transId", tradeId);dataObj.put("timeStamp", timeStamp);dataObj.put("tradeTime", dateUtil(timeStamp));dataObj.put("tradeAmount", tradeAmount);dataObj.put("tradeName", tradeName);//模拟生产,每隔1秒生成一笔交易Thread.sleep(1000);logger.info("源交易流水={},原始报文={}", tradeId, dataObj.toJSONString());sourceContext.collect(dataObj);}}@Overridepublic void cancel() {}};// 创建运行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 构建数据源DataStreamSource<JSONObject> dataStreamSource = env.addSource(dataSource);// 数据源写入数据算子,进行输出到elasticsearchdataStreamSource.addSink(sink);// 执行任务env.execute();}/*** 描述:时间格式化工具类** @param timestamp 时间戳* @return {@code String }*/private static String dateUtil(long timestamp) {//时间戳加工timestamp = timestamp / 1000;// 将时间戳转换为 LocalDateTime 对象LocalDateTime dateTime = LocalDateTime.ofInstant(Instant.ofEpochSecond(timestamp), ZoneId.systemDefault());// 定义日期时间格式DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyyMMdd");// 格式化日期时间对象为指定格式的字符串String dateTimeFormat = formatter.format(dateTime);return dateTimeFormat;}
}

启动上述作业后,根据对应的交易流水号查询es,或者查询es的索引数据,但是索引数据一般是一段时间才更新

验证1:检查索引数据变化
http://127.0.0.1:9200/_cat/indices?v

在这里插入图片描述

验证2:根据id查询es的文档记录

在这里插入图片描述
在这里插入图片描述

07 Elasticsearch Sink 进阶实战

进阶实战主要是包括ElasticsearchSink的各种参数配置,以及性能调优

7.1 包结构 & 项目配置

在这里插入图片描述

项目配置application.properties

es.cluster.hosts=localhost
es.cluster.port=9200
es.cluster.scheme=http
es.cluster.type=_doc
es.cluster.indexPrefix=flink_#内部批量处理器,刷新前最大缓存的操作数
es.cluster.bulkFlushMaxActions=1
#刷新前最大缓存的数据量(以兆字节为单位)
es.cluster.bulkFlushMaxSizeMb=10
#刷新的时间间隔(不论缓存操作的数量或大小如何)
es.cluster.bulkFlushInterval=10000#是否启用批量写入的退避策略,当Elasticsearch 写入失败时,可以启用退避策略,以避免频繁的重试。此时,setBulkFlushBackoffDelay 和 setBulkFlushBackoffRetries 参数生效。
es.cluster.bulkFlushBackoff=false
#设置批量写入的退避延迟时间,在发生写入失败后,等待指定的延迟时间后再进行重试
es.cluster.bulkFlushBackoffDelay=10000
#设置批量写入的最大重试次数,设置在写入失败后的最大重试次数。超过这个次数后,将不再重试
es.cluster.bulkFlushBackoffRetries=3#设置与 Elasticsearch 集群建立连接的超时时间,单位为毫秒。在指定的时间内无法建立连接将会抛出连接超时异常
es.cluster.connectTimeout=10000
#设置与 Elasticsearch 连接的套接字超时时间,单位为毫秒。该参数定义了在建立连接后从服务器读取数据的超时时间。
es.cluster.socketTimeout=10000
#设置连接请求超时时间,单位为毫秒。该参数表示从连接池获取连接的超时时间。如果在指定的时间内无法获得连接,将会抛出连接请求超时异常。
es.cluster.connectionRequestTimeout=10000
设置是否允许重定向。如果设置为true,则当遇到重定向响应时,客户端将跟随重定向并继续请求;如果设置为false,重定向响应将被视为错误。
es.cluster.redirectsEnabled=false
#客户端允许的最大重定向次数
es.cluster.maxRedirects=3#启用身份验证功能。通过设置该参数为true,可以提供用户名和密码进行身份验证,以连接到 Elasticsearch 集群。
es.cluster.authenticationEnabled=false
#设置是否允许循环重定向。如果设置为true,则允许在重定向过程中发生循环重定向;如果设置为false,则在检测到循环重定向时,将会抛出异常。
es.cluster.circularRedirectsAllowed=false
#设置是否启用内容压缩。如果设置为true,则允许客户端和 Elasticsearch 之间进行内容压缩,以减少数据传输量。
es.cluster.contentCompressionEnabled=false
#设置是否启用 "Expect: continue" 机制。当设置为true时,在发送请求之前,客户端会发送一个请求头部,询问服务器是否接受请求的主体部分。如果服务器响应允许继续发送请求主体,则客户端会继续发送请求;如果服务器响应拒绝继续发送请求主体,则客户端会放弃该请求。
es.cluster.expectContinueEnabled=false
#设置是否标准化 URI。如果设置为true,则客户端会尝试标准化请求 URI,以便消除多余和重复的斜杠等。
es.cluster.normalizeUri=false

日志配置log4j2.properties

rootLogger.level=INFO
rootLogger.appenderRef.console.ref=ConsoleAppender
appender.console.name=ConsoleAppender
appender.console.type=CONSOLE
appender.console.layout.type=PatternLayout
appender.console.layout.pattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
log.file=D:\\tmprootLogger.level=INFO
rootLogger.appenderRef.console.ref=ConsoleAppender
appender.console.name=ConsoleAppender
appender.console.type=CONSOLE
appender.console.layout.type=PatternLayout
appender.console.layout.pattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
log.file=D:\\tmp

项目pom.xml文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.aurora</groupId><artifactId>aurora_elasticsearch_connector</artifactId><version>1.0-SNAPSHOT</version><!--属性设置--><properties><!--java_JDK版本--><java.version>1.8</java.version><!--maven打包插件--><maven.plugin.version>3.8.1</maven.plugin.version><!--编译编码UTF-8--><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><!--输出报告编码UTF-8--><project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding><!--json数据格式处理工具--><fastjson.version>1.2.75</fastjson.version><!--log4j版本--><log4j.version>2.17.1</log4j.version><!--flink版本--><flink.version>1.14.4</flink.version><!--scala版本--><scala.binary.version>2.12</scala.binary.version></properties><!--依赖管理--><dependencies><!-- fastJson工具类依赖 start --><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>${fastjson.version}</version></dependency><!-- fastJson工具类依赖 end --><!-- log4j日志框架依赖 start --><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-slf4j-impl</artifactId><version>${log4j.version}</version></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-api</artifactId><version>${log4j.version}</version></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-core</artifactId><version>${log4j.version}</version></dependency><!-- log4j日志框架依赖 end --><!-- Flink基础依赖 start --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><!-- Flink基础依赖 end --><!-- Flink Elasticsearch 连接器依赖 start --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-elasticsearch7_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><!-- Flink Elasticsearch 连接器依赖 end --></dependencies><!--编译打包--><build><finalName>${project.name}</finalName><!--资源文件打包--><resources><resource><directory>src/main/resources</directory></resource><resource><directory>src/main/java</directory><includes><include>**/*.xml</include></includes></resource></resources><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.1.1</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><artifactSet><excludes><exclude>org.apache.flink:force-shading</exclude><exclude>org.google.code.flindbugs:jar305</exclude><exclude>org.slf4j:*</exclude><excluder>org.apache.logging.log4j:*</excluder></excludes></artifactSet><filters><filter><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers><transformerimplementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass>com.aurora.demo,ElasticsearchSinkStreamingJobDemo</mainClass></transformer></transformers></configuration></execution></executions></plugin></plugins><!--插件统一管理--><pluginManagement><plugins><!--maven打包插件--><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><version>${spring.boot.version}</version><configuration><fork>true</fork><finalName>${project.build.finalName}</finalName></configuration><executions><execution><goals><goal>repackage</goal></goals></execution></executions></plugin><!--编译打包插件--><plugin><artifactId>maven-compiler-plugin</artifactId><version>${maven.plugin.version}</version><configuration><source>${java.version}</source><target>${java.version}</target><encoding>UTF-8</encoding><compilerArgs><arg>-parameters</arg></compilerArgs></configuration></plugin></plugins></pluginManagement></build></project>

7.2 实体类ElasticsearchEntity

package com.aurora.advanced;import java.io.Serializable;/*** 描述:elasticsearch实体类** @author 浅夏的猫* @version 1.0.0* @date 2024-02-10 20:08:20*/
public class ElasticsearchEntity implements Serializable {private static final long serialVersionUID = 1L;/*** 集群地址* */private String hosts;/*** 集群端口* */private Integer port;/***执行计划* */private String scheme;/*** 文档类型,es7一般都是_doc* */private String type;/*** 索引前缀* */private String indexPrefix;/*** 内部批量处理器,刷新前最大缓存的操作数* */private Integer bulkFlushMaxActions=1;/*** 刷新前最大缓存的数据量(以兆字节为单位)* */private Integer bulkFlushMaxSizeMb=10;/*** 刷新的时间间隔(不论缓存操作的数量或大小如何)* */private Integer bulkFlushInterval=10000;/*** 是否启用批量写入的退避策略,当Elasticsearch 写入失败时,可以启用退避策略,以避免频繁的重试。* 此时,setBulkFlushBackoffDelay 和 setBulkFlushBackoffRetries 参数生效。* */private Boolean bulkFlushBackoff=false;/*** 设置批量写入的退避延迟时间,在发生写入失败后,等待指定的延迟时间后再进行重试* */private Integer bulkFlushBackoffDelay=10000;/*** 设置批量写入的最大重试次数,设置在写入失败后的最大重试次数。超过这个次数后,将不再重试* */private Integer bulkFlushBackoffRetries=3;/*** 设置与 Elasticsearch 集群建立连接的超时时间,单位为毫秒。在指定的时间内无法建立连接将会抛出连接超时异常* */private Integer connectTimeout=10000;/*** 设置与 Elasticsearch 连接的套接字超时时间,单位为毫秒。该参数定义了在建立连接后从服务器读取数据的超时时间。* */private Integer socketTimeout=10000;/*** 设置连接请求超时时间,单位为毫秒。该参数表示从连接池获取连接的超时时间。如果在指定的时间内无法获得连接,将会抛出连接请求超时异常。* */private Integer connectionRequestTimeout=10000;/*** 设置是否允许重定向。如果设置为true,则当遇到重定向响应时,客户端将跟随重定向并继续请求;如果设置为false,重定向响应将被视为错误。* */private Boolean redirectsEnabled=false;/*** 客户端允许的最大重定向次数* */private Integer maxRedirects=3;/*** 启用身份验证功能。通过设置该参数为true,可以提供用户名和密码进行身份验证,以连接到 Elasticsearch 集群。* */private Boolean authenticationEnabled=true;/*** 设置是否允许循环重定向。如果设置为true,则允许在重定向过程中发生循环重定向;如果设置为false,则在检测到循环重定向时,将会抛出异常。* */private Boolean circularRedirectsAllowed=false;/*** 设置是否启用内容压缩。如果设置为true,则允许客户端和 Elasticsearch 之间进行内容压缩,以减少数据传输量。* */private Boolean contentCompressionEnabled=false;/*** 设置是否启用 "Expect: continue" 机制。当设置为true时,在发送请求之前,客户端会发送一个请求头部,询问服务器是否接受请求的主体部分。* 如果服务器响应允许继续发送请求主体,则客户端会继续发送请求;如果服务器响应拒绝继续发送请求主体,则客户端会放弃该请求。* */private Boolean expectContinueEnabled=false;/*** 设置是否标准化 URI。如果设置为true,则客户端会尝试标准化请求 URI,以便消除多余和重复的斜杠等。* */private Boolean normalizeUri=false;/*** 用于设置 HTTP 请求的路径前缀。* 这个配置选项通常用于设置反向代理或者负载均衡器等中间件与 Elasticsearch 集群之间的连接* */private String pathPrefix;public String getHosts() {return hosts;}public void setHosts(String hosts) {this.hosts = hosts;}public Integer getPort() {return port;}public void setPort(Integer port) {this.port = port;}public String getScheme() {return scheme;}public void setScheme(String scheme) {this.scheme = scheme;}public String getType() {return type;}public void setType(String type) {this.type = type;}public String getIndexPrefix() {return indexPrefix;}public void setIndexPrefix(String indexPrefix) {this.indexPrefix = indexPrefix;}public Integer getBulkFlushMaxActions() {return bulkFlushMaxActions;}public void setBulkFlushMaxActions(Integer bulkFlushMaxActions) {this.bulkFlushMaxActions = bulkFlushMaxActions;}public Integer getBulkFlushMaxSizeMb() {return bulkFlushMaxSizeMb;}public void setBulkFlushMaxSizeMb(Integer bulkFlushMaxSizeMb) {this.bulkFlushMaxSizeMb = bulkFlushMaxSizeMb;}public Integer getBulkFlushInterval() {return bulkFlushInterval;}public void setBulkFlushInterval(Integer bulkFlushInterval) {this.bulkFlushInterval = bulkFlushInterval;}public Boolean getBulkFlushBackoff() {return bulkFlushBackoff;}public void setBulkFlushBackoff(Boolean bulkFlushBackoff) {this.bulkFlushBackoff = bulkFlushBackoff;}public Integer getBulkFlushBackoffDelay() {return bulkFlushBackoffDelay;}public void setBulkFlushBackoffDelay(Integer bulkFlushBackoffDelay) {this.bulkFlushBackoffDelay = bulkFlushBackoffDelay;}public Integer getBulkFlushBackoffRetries() {return bulkFlushBackoffRetries;}public void setBulkFlushBackoffRetries(Integer bulkFlushBackoffRetries) {this.bulkFlushBackoffRetries = bulkFlushBackoffRetries;}public Integer getConnectTimeout() {return connectTimeout;}public void setConnectTimeout(Integer connectTimeout) {this.connectTimeout = connectTimeout;}public Integer getSocketTimeout() {return socketTimeout;}public void setSocketTimeout(Integer socketTimeout) {this.socketTimeout = socketTimeout;}public Integer getConnectionRequestTimeout() {return connectionRequestTimeout;}public void setConnectionRequestTimeout(Integer connectionRequestTimeout) {this.connectionRequestTimeout = connectionRequestTimeout;}public Boolean getRedirectsEnabled() {return redirectsEnabled;}public void setRedirectsEnabled(Boolean redirectsEnabled) {this.redirectsEnabled = redirectsEnabled;}public Integer getMaxRedirects() {return maxRedirects;}public void setMaxRedirects(Integer maxRedirects) {this.maxRedirects = maxRedirects;}public Boolean getAuthenticationEnabled() {return authenticationEnabled;}public void setAuthenticationEnabled(Boolean authenticationEnabled) {this.authenticationEnabled = authenticationEnabled;}public Boolean getCircularRedirectsAllowed() {return circularRedirectsAllowed;}public void setCircularRedirectsAllowed(Boolean circularRedirectsAllowed) {this.circularRedirectsAllowed = circularRedirectsAllowed;}public Boolean getContentCompressionEnabled() {return contentCompressionEnabled;}public void setContentCompressionEnabled(Boolean contentCompressionEnabled) {this.contentCompressionEnabled = contentCompressionEnabled;}public Boolean getExpectContinueEnabled() {return expectContinueEnabled;}public void setExpectContinueEnabled(Boolean expectContinueEnabled) {this.expectContinueEnabled = expectContinueEnabled;}public Boolean getNormalizeUri() {return normalizeUri;}public void setNormalizeUri(Boolean normalizeUri) {this.normalizeUri = normalizeUri;}public String getPathPrefix() {return pathPrefix;}public void setPathPrefix(String pathPrefix) {this.pathPrefix = pathPrefix;}
}

7.3 客户端工厂类CustomRestClientFactory

作用:设置用于创建 Elasticsearch REST 客户端的工厂,可以自定义创建 Elasticsearch REST 客户端的逻辑,实现 ElasticsearchSinkBase.RestClientFactory 接口

package com.aurora.advanced;import org.apache.commons.lang3.StringUtils;
import org.apache.flink.streaming.connectors.elasticsearch7.RestClientFactory;
import org.apache.http.Header;
import org.apache.http.message.BasicHeader;
import org.elasticsearch.client.NodeSelector;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;/*** 描述:设置用于创建 Elasticsearch REST 客户端的工厂* 解释:可以自定义创建 Elasticsearch REST 客户端的逻辑,实现 ElasticsearchSinkBase.RestClientFactory 接口** @author 浅夏的猫* @version 1.0.0* @date 2024-02-13 00:12:15*/
public class CustomRestClientFactory implements RestClientFactory {private ElasticsearchEntity elasticsearchEntity;public CustomRestClientFactory(ElasticsearchEntity elasticsearchEntity) {this.elasticsearchEntity = elasticsearchEntity;}@Overridepublic void configureRestClientBuilder(RestClientBuilder restClientBuilder) {//设置默认的 HTTP 头部信息,这些信息将在每个请求中包含Header contentType = new BasicHeader("Content-Type", "application/json");Header authorization = new BasicHeader("Authorization", "Bearer your_access_token");Header[] headers = {contentType, authorization};restClientBuilder.setDefaultHeaders(headers);//设置用于监听节点故障的监听器。当节点发生故障时,可以执行特定的操作restClientBuilder.setFailureListener(new RestClient.FailureListener());//配置用于选择与之通信的节点的策略。这涉及到 Elasticsearch 集群中多个节点的选择。restClientBuilder.setNodeSelector(NodeSelector.ANY);//为每个请求设置路径前缀。这可以用于将请求定向到特定的子路径。if(StringUtils.isNoneBlank(elasticsearchEntity.getPathPrefix())){restClientBuilder.setPathPrefix(elasticsearchEntity.getPathPrefix());}//允许在创建每个请求的时候进行额外的请求配置。restClientBuilder.setRequestConfigCallback(new CustomRequestConfigCallback(elasticsearchEntity));//允许在创建 CloseableHttpClient 实例时进行额外的 HTTP 客户端配置。restClientBuilder.setHttpClientConfigCallback(new CustomHttpClientConfigCallback(elasticsearchEntity));//设置是否启用严格的废弃模式,用于警告有关已弃用功能的使用。restClientBuilder.setStrictDeprecationMode(false);}
}

7.4 回调函数类CustomRequestConfigCallback

作用:允许在创建每个请求的时候进行额外的请求配置

package com.aurora.advanced;import org.apache.http.client.config.RequestConfig;
import org.apache.http.impl.cookie.DefaultCookieSpec;
import org.elasticsearch.client.RestClientBuilder;/*** 描述:* 允许在创建每个请求的时候进行额外的请求配置* @author 浅夏的猫* @version 1.0.0* @date 2024-02-13 23:24:42*/
public class CustomRequestConfigCallback implements RestClientBuilder.RequestConfigCallback {private ElasticsearchEntity elasticsearchEntity;public CustomRequestConfigCallback(ElasticsearchEntity elasticsearchEntity) {this.elasticsearchEntity = elasticsearchEntity;}@Overridepublic RequestConfig.Builder customizeRequestConfig(RequestConfig.Builder custom) {// 设置与 Elasticsearch 集群建立连接的超时时间,单位为毫秒。在指定的时间内无法建立连接将会抛出连接超时异常custom.setConnectTimeout(elasticsearchEntity.getConnectTimeout());// 设置与 Elasticsearch 连接的套接字超时时间,单位为毫秒。该参数定义了在建立连接后从服务器读取数据的超时时间。如果在指定的时间内没有读取到数据,将会抛出超时异常。custom.setSocketTimeout(elasticsearchEntity.getSocketTimeout());// 设置连接请求超时时间,单位为毫秒。该参数表示从连接池获取连接的超时时间。如果在指定的时间内无法获得连接,将会抛出连接请求超时异常。custom.setConnectionRequestTimeout(elasticsearchEntity.getConnectionRequestTimeout());// 设置是否允许重定向。如果设置为true,则当遇到重定向响应时,客户端将跟随重定向并继续请求;如果设置为false,重定向响应将被视为错误。custom.setRedirectsEnabled(elasticsearchEntity.getRedirectsEnabled());// 设置最大重定向次数。当允许重定向时,该参数指定在遇到重定向响应时,最多可以重定向的次数。custom.setMaxRedirects(elasticsearchEntity.getMaxRedirects());// 设置是否允许循环重定向。如果设置为true,则允许在重定向过程中发生循环重定向;如果设置为false,则在检测到循环重定向时,将会抛出异常。custom.setCircularRedirectsAllowed(elasticsearchEntity.getCircularRedirectsAllowed());// 设置是否启用内容压缩。如果设置为true,则允许客户端和 Elasticsearch 之间进行内容压缩,以减少数据传输量。custom.setContentCompressionEnabled(elasticsearchEntity.getContentCompressionEnabled());// 设置是否启用 "Expect: continue" 机制。当设置为true时,在发送请求之前,客户端会发送一个请求头部,询问服务器是否接受请求的主体部分。//  如果服务器响应允许继续发送请求主体,则客户端会继续发送请求;如果服务器响应拒绝继续发送请求主体,则客户端会放弃该请求。custom.setExpectContinueEnabled(elasticsearchEntity.getExpectContinueEnabled());// 设置是否标准化 URI。如果设置为true,则客户端会尝试标准化请求 URI,以便消除多余和重复的斜杠等。custom.setNormalizeUri(elasticsearchEntity.getNormalizeUri());// 设置使用的 Cookie 规范。可以指定客户端在处理与 Elasticsearch 服务器之间的 Cookie 交互时使用的 Cookie 规范custom.setCookieSpec(new DefaultCookieSpec().toString());return custom;}
}

7.5 客户端配置类CustomHttpClientConfigCallback

作用:允许在创建 CloseableHttpClient 实例时进行额外的 HTTP 客户端配置

package com.aurora.advanced;import org.apache.http.client.config.RequestConfig;
import org.apache.http.impl.cookie.DefaultCookieSpec;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.elasticsearch.client.RestClientBuilder;/*** 描述:客户端配置* 允许在创建 CloseableHttpClient 实例时进行额外的 HTTP 客户端配置** @author 浅夏的猫* @version 1.0.0* @date 2024-02-13 23:28:15*/
public class CustomHttpClientConfigCallback implements RestClientBuilder.HttpClientConfigCallback {private ElasticsearchEntity elasticsearchEntity;CustomHttpClientConfigCallback(ElasticsearchEntity elasticsearchEntity) {this.elasticsearchEntity = elasticsearchEntity;}@Overridepublic HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpAsyncClientBuilder) {RequestConfig.Builder custom = RequestConfig.custom();// 设置与 Elasticsearch 集群建立连接的超时时间,单位为毫秒。在指定的时间内无法建立连接将会抛出连接超时异常custom.setConnectTimeout(elasticsearchEntity.getConnectTimeout());// 设置与 Elasticsearch 连接的套接字超时时间,单位为毫秒。该参数定义了在建立连接后从服务器读取数据的超时时间。如果在指定的时间内没有读取到数据,将会抛出超时异常。custom.setSocketTimeout(elasticsearchEntity.getSocketTimeout());// 设置连接请求超时时间,单位为毫秒。该参数表示从连接池获取连接的超时时间。如果在指定的时间内无法获得连接,将会抛出连接请求超时异常。custom.setConnectionRequestTimeout(elasticsearchEntity.getConnectionRequestTimeout());// 设置是否允许重定向。如果设置为true,则当遇到重定向响应时,客户端将跟随重定向并继续请求;如果设置为false,重定向响应将被视为错误。custom.setRedirectsEnabled(elasticsearchEntity.getRedirectsEnabled());// 设置最大重定向次数。当允许重定向时,该参数指定在遇到重定向响应时,最多可以重定向的次数。custom.setMaxRedirects(elasticsearchEntity.getMaxRedirects());// 启用身份验证功能。通过设置该参数为true,可以提供用户名和密码进行身份验证,以连接到 Elasticsearch 集群。custom.setAuthenticationEnabled(elasticsearchEntity.getAuthenticationEnabled());// 设置是否允许循环重定向。如果设置为true,则允许在重定向过程中发生循环重定向;如果设置为false,则在检测到循环重定向时,将会抛出异常。custom.setCircularRedirectsAllowed(elasticsearchEntity.getCircularRedirectsAllowed());// 设置是否启用内容压缩。如果设置为true,则允许客户端和 Elasticsearch 之间进行内容压缩,以减少数据传输量。custom.setContentCompressionEnabled(elasticsearchEntity.getContentCompressionEnabled());// 设置是否启用 "Expect: continue" 机制。当设置为true时,在发送请求之前,客户端会发送一个请求头部,询问服务器是否接受请求的主体部分。//  如果服务器响应允许继续发送请求主体,则客户端会继续发送请求;如果服务器响应拒绝继续发送请求主体,则客户端会放弃该请求。custom.setExpectContinueEnabled(elasticsearchEntity.getExpectContinueEnabled());// 设置是否标准化 URI。如果设置为true,则客户端会尝试标准化请求 URI,以便消除多余和重复的斜杠等。custom.setNormalizeUri(elasticsearchEntity.getNormalizeUri());// 设置使用的 Cookie 规范。可以指定客户端在处理与 Elasticsearch 服务器之间的 Cookie 交互时使用的 Cookie 规范custom.setCookieSpec(new DefaultCookieSpec().toString());return httpAsyncClientBuilder.setDefaultRequestConfig(custom.build());}
}

7.6 Es操作类CustomElasticsearchSinkFunction

作用:实时把数据写入到队列中,再通过批量提交到Elasticsearch中,实现数据写入

package com.aurora.advanced;import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;
import org.elasticsearch.common.xcontent.XContentType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;/*** 描述:自定义elasticsearch sink 算子函数* ElasticsearchSinkFunction 是用于将数据流写入 Elasticsearch 的接口。* 它允许您自定义如何将 Flink 流式处理的数据写入 Elasticsearch 索引** @author 浅夏的猫* @version 1.0.0* @date 2024-02-12 23:49:22*/
public class CustomElasticsearchSinkFunction implements ElasticsearchSinkFunction<JSONObject> {private static final Logger logger = LoggerFactory.getLogger(CustomElasticsearchSinkFunction.class);private ElasticsearchEntity elasticsearchEntity;public CustomElasticsearchSinkFunction(ElasticsearchEntity elasticsearchEntity) {this.elasticsearchEntity = elasticsearchEntity;}@Overridepublic void process(JSONObject element, RuntimeContext runtimeContext, RequestIndexer indexer) {String transId = element.getString("transId");String tradeTime = element.getString("tradeTime");String index = elasticsearchEntity.getIndexPrefix() + tradeTime;logger.info("交易流水={},数据写入索引{}成功", tradeTime, index);IndexRequest indexRequest = Requests.indexRequest().index(index).type(elasticsearchEntity.getType()).id(transId).source(element, XContentType.JSON);indexer.add(indexRequest);}
}

7.7 异常处理类CustomActionRequestFailureHandler

作用:当sink写Elasticsearch出现异常时,可以自定义操作策略

package com.aurora.advanced;import org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;/*** 描述:es写入异常处理** @author 浅夏的猫* @version 1.0.0* @date 2024-02-13 00:04:24*/
public class CustomActionRequestFailureHandler implements ActionRequestFailureHandler {private static final Logger logger = LoggerFactory.getLogger(CustomActionRequestFailureHandler.class);@Overridepublic void onFailure(ActionRequest action, Throwable throwable, int restStatusCode, RequestIndexer requestIndexer) throws Throwable {// 处理不同类型的异常if (throwable instanceof EsRejectedExecutionException) {// 如果是由于线程池饱和导致的拒绝执行异常,可以采取相应的处理措施logger.warn("Elasticsearch action execution was rejected due to thread pool saturation.");// 这里你可以选择执行重试或者其他处理逻辑,例如将数据写入到一个备用存储// 例如: indexer.add(createAnotherRequest(action));} else {// 对于其他类型的异常,默认返回放弃策略logger.error("Unhandled failure, abandoning request: {}", action.toString());}}
}

7.8 作业主类ElasticsearchSinkStreamJobAdvancedDemo

package com.aurora.advanced;import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
import org.apache.http.HttpHost;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.io.IOException;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.UUID;/*** 描述:Flink集成Elasticsearch Connector连接器进阶Demo* 实现实时数据流如何无缝地流向Elasticsearch** @author 浅夏的猫* @version 1.0.0* @date 2024-02-11 22:06:45*/
public class ElasticsearchSinkStreamJobAdvancedDemo {private static final Logger logger = LoggerFactory.getLogger(ElasticsearchSinkStreamJobAdvancedDemo.class);public static void main(String[] args) {try {// 读取配置参数ElasticsearchEntity elasticsearchEntity = paramsInit();// 设置elasticsearch节点List<HttpHost> httpHosts = esClusterHttpHostHandler(elasticsearchEntity);// 创建esSinkFunction函数ElasticsearchSinkFunction<JSONObject> esSinkFunction = new CustomElasticsearchSinkFunction(elasticsearchEntity);// 构建ElasticsearchSink算子builderElasticsearchSink.Builder<JSONObject> esSinkBuilder = new ElasticsearchSink.Builder<>(httpHosts, esSinkFunction);// es参数配置esBuilderHandler(esSinkBuilder, elasticsearchEntity);// 构建sink算子ElasticsearchSink<JSONObject> esSink = esSinkBuilder.build();// 自定义数据源,模拟生产环境交易接入,json格式数据SourceFunction<JSONObject> dataSource = new SourceFunction<JSONObject>() {@Overridepublic void run(SourceContext sourceContext) throws Exception {while (true) {//交易流水号String tradeId = UUID.randomUUID().toString();//交易发生时间戳long timeStamp = System.currentTimeMillis();//交易发生金额long tradeAmount = new Random().nextInt(100);//交易名称String tradeName = "支付宝转账";JSONObject dataObj = new JSONObject();dataObj.put("transId", tradeId);dataObj.put("timeStamp", timeStamp);dataObj.put("tradeTime", dateUtil(timeStamp));dataObj.put("tradeAmount", tradeAmount);dataObj.put("tradeName", tradeName);//模拟生产,每隔1秒生成一笔交易Thread.sleep(1000);logger.info("交易接入,原始报文={}", dataObj.toJSONString());sourceContext.collect(dataObj);}}@Overridepublic void cancel() {}};// 创建运行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 构建数据源DataStreamSource<JSONObject> dataStreamSource = env.addSource(dataSource);// 构建sink算子dataStreamSource.addSink(esSink);// 运行作业env.execute();} catch (Exception e) {e.printStackTrace();}}/*** 描述:Flink参数配置读取** @return {@code ElasticsearchEntity }* @throws IOException*/private static ElasticsearchEntity paramsInit() throws IOException {// 通过flink内置工具类获取命令行参数String propertiesFilePath = "E:\\project\\aurora_dev\\aurora_flink_connector_elasticsearch\\src\\main\\resources\\application.properties";ParameterTool paramsMap = ParameterTool.fromPropertiesFile(propertiesFilePath);ElasticsearchEntity elasticsearchEntity = new ElasticsearchEntity();String hosts = paramsMap.get("es.cluster.hosts");int port = paramsMap.getInt("es.cluster.port");String scheme = paramsMap.get("es.cluster.scheme");String type = paramsMap.get("es.cluster.type");String indexPrefix = paramsMap.get("es.cluster.indexPrefix");int bulkFlushMaxActions = paramsMap.getInt("es.cluster.bulkFlushMaxActions");int bulkFlushMaxSizeMb = paramsMap.getInt("es.cluster.bulkFlushMaxSizeMb");int bulkFlushInterval = paramsMap.getInt("es.cluster.bulkFlushInterval");boolean bulkFlushBackoff = paramsMap.getBoolean("es.cluster.bulkFlushBackoff");int bulkFlushBackoffDelay = paramsMap.getInt("es.cluster.bulkFlushBackoffDelay");int bulkFlushBackoffRetries = paramsMap.getInt("es.cluster.bulkFlushBackoffRetries");int connectTimeout = paramsMap.getInt("es.cluster.connectTimeout");int socketTimeout = paramsMap.getInt("es.cluster.socketTimeout");int connectionRequestTimeout = paramsMap.getInt("es.cluster.connectionRequestTimeout");boolean redirectsEnabled = paramsMap.getBoolean("es.cluster.redirectsEnabled");int maxRedirects = paramsMap.getInt("es.cluster.maxRedirects");boolean authenticationEnabled = paramsMap.getBoolean("es.cluster.authenticationEnabled");boolean circularRedirectsAllowed = paramsMap.getBoolean("es.cluster.circularRedirectsAllowed");boolean contentCompressionEnabled = paramsMap.getBoolean("es.cluster.contentCompressionEnabled");boolean expectContinueEnabled = paramsMap.getBoolean("es.cluster.expectContinueEnabled");boolean normalizeUri = paramsMap.getBoolean("es.cluster.normalizeUri");elasticsearchEntity.setHosts(hosts);elasticsearchEntity.setPort(port);elasticsearchEntity.setScheme(scheme);elasticsearchEntity.setType(type);elasticsearchEntity.setIndexPrefix(indexPrefix);elasticsearchEntity.setBulkFlushMaxActions(bulkFlushMaxActions);elasticsearchEntity.setBulkFlushMaxSizeMb(bulkFlushMaxSizeMb);elasticsearchEntity.setBulkFlushInterval(bulkFlushInterval);elasticsearchEntity.setBulkFlushBackoff(bulkFlushBackoff);elasticsearchEntity.setBulkFlushBackoffDelay(bulkFlushBackoffDelay);elasticsearchEntity.setBulkFlushBackoffRetries(bulkFlushBackoffRetries);elasticsearchEntity.setConnectTimeout(connectTimeout);elasticsearchEntity.setSocketTimeout(socketTimeout);elasticsearchEntity.setConnectionRequestTimeout(connectionRequestTimeout);elasticsearchEntity.setRedirectsEnabled(redirectsEnabled);elasticsearchEntity.setMaxRedirects(maxRedirects);elasticsearchEntity.setAuthenticationEnabled(authenticationEnabled);elasticsearchEntity.setCircularRedirectsAllowed(circularRedirectsAllowed);elasticsearchEntity.setExpectContinueEnabled(expectContinueEnabled);elasticsearchEntity.setContentCompressionEnabled(contentCompressionEnabled);elasticsearchEntity.setNormalizeUri(normalizeUri);return elasticsearchEntity;}/*** 描述:时间格式化工具类** @param timestamp 时间戳* @return {@code String }*/private static String dateUtil(long timestamp) {timestamp = timestamp / 1000;// 将时间戳转换为 LocalDateTime 对象LocalDateTime dateTime = LocalDateTime.ofInstant(Instant.ofEpochSecond(timestamp), ZoneId.systemDefault());// 定义日期时间格式DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyyMMdd");// 格式化日期时间对象为指定格式的字符串String dateTimeFormat = formatter.format(dateTime);return dateTimeFormat;}/*** 描述:es参数配置** @param esSinkBuilder       esSinkBuilder建造器* @param elasticsearchEntity es实体类*/private static void esBuilderHandler(ElasticsearchSink.Builder<JSONObject> esSinkBuilder, ElasticsearchEntity elasticsearchEntity) {// 设置触发批量写入的最大动作数,// 解释:当达到指定的最大动作数时,将触发批量写入到 Elasticsearch。如果你希望在每次写入到 Elasticsearch 时都进行批量写入,可以将该值设置为 1esSinkBuilder.setBulkFlushMaxActions(elasticsearchEntity.getBulkFlushMaxActions());// 设置触发批量写入的最大数据量// 解释:当写入的数据量达到指定的最大值时,将触发批量写入到 Elasticsearch。单位为 MBesSinkBuilder.setBulkFlushMaxSizeMb(elasticsearchEntity.getBulkFlushMaxSizeMb());// 设置批量写入的时间间隔// 解释:每隔指定的时间间隔,无论是否达到最大动作数或最大数据量,都会触发批量写入esSinkBuilder.setBulkFlushInterval(elasticsearchEntity.getBulkFlushInterval());// 启用批量写入的退避策略// 解释:当 Elasticsearch 写入失败时,可以启用退避策略,以避免频繁的重试。此时,setBulkFlushBackoffDelay 和 setBulkFlushBackoffRetries 参数生效。esSinkBuilder.setBulkFlushBackoff(elasticsearchEntity.getBulkFlushBackoff());// 设置批量写入的退避延迟时间// 解释:在发生写入失败后,等待指定的延迟时间后再进行重试esSinkBuilder.setBulkFlushBackoffDelay(elasticsearchEntity.getBulkFlushBackoffDelay());// 设置批量写入的最大重试次数// 解释:设置在写入失败后的最大重试次数。超过这个次数后,将不再重试esSinkBuilder.setBulkFlushBackoffRetries(elasticsearchEntity.getBulkFlushBackoffRetries());// 设置写入失败时的处理策略// 解释:可以自定义处理失败的策略,实现 ElasticsearchSinkFunction.FailureHandler 接口esSinkBuilder.setFailureHandler(new CustomActionRequestFailureHandler());// 设置用于创建 Elasticsearch REST 客户端的工厂// 解释:可以自定义创建 Elasticsearch REST 客户端的逻辑,实现 ElasticsearchSinkBase.RestClientFactory 接口esSinkBuilder.setRestClientFactory(new CustomRestClientFactory(elasticsearchEntity));}/*** 描述:* elasticsearch 节点配置** @param elasticsearchEntity es实体类* @return {@code List<HttpHost> }*/private static List<HttpHost> esClusterHttpHostHandler(ElasticsearchEntity elasticsearchEntity) {List<HttpHost> httpHosts = new ArrayList<>();String[] clusterArray = elasticsearchEntity.getHosts().split(",");for (String node : clusterArray) {httpHosts.add(new HttpHost(node, elasticsearchEntity.getPort(), elasticsearchEntity.getScheme()));}return httpHosts;}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/686101.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

专业140+总410+合工大合肥工业大学833信号分析与处理综合考研经验电子信息与通信工程,真题,大纲,参考书。

经过一年努力奋战&#xff0c;今年初试总分410&#xff0c;其中专业课833信号分析与处理综合&#xff08;ss和dsp&#xff09;140&#xff08;感谢信息通信Jenny老师去年的悉心指导&#xff09;&#xff0c;数一130&#xff0c;顺利上岸&#xff0c;被合工大录取&#xff0c;看…

必考板子题【Py/Java/C++三种语言详解】LeetCode每日一题240214【二叉树BFS】LeetCode102、二叉树的层序遍历

有LeetCode交流群/华为OD考试扣扣交流群可加&#xff1a;948025485 可上全网独家的 欧弟OJ系统 练习华子OD、大厂真题 绿色聊天软件戳 od1336了解算法冲刺训练 文章目录 题目链接题目描述解题思路DFS和BFS异同用队列维护的BFS 代码PythonPythonJavaC时空复杂度 相关习题华为OD算…

初始Git及Linux Centos下安装Git

文章目录 前言版本控制器注意Git安装 前言 不知道你⼯作或学习时&#xff0c;有没有遇到这样的情况&#xff1a;我们在编写各种⽂档时&#xff0c;为了防⽌⽂档丢失&#xff0c;更改失误&#xff0c;失误后能恢复到原来的版本&#xff0c;不得不复制出⼀个副本&#xff0c;⽐如…

一个小白的转行Python的经历!

1. 寻找一个导师 导师可以降低你加入一个新行业的成本&#xff0c;帮助你熟悉环境和行业规则&#xff0c;也会鼓励你完成心理方面的转变。 2. 建立新的社交网络 过去的人脉关系会阻碍你的转行&#xff0c;因为他们是以过去对你的认知来评价你。新领域的人脉&#xff0c;会给你提…

互联网时代的文学复兴:中文诗词大数据分析 | 开源日报 No.170

chinese-poetry/chinese-poetry Stars: 45.4k License: MIT 最全的中文诗歌古典文集数据库&#xff0c;包含 5.5 万首唐诗、26 万首宋诗、2.1 万首宋词和其他古典文集。数据来源于互联网。该开源项目旨在通过 JSON 格式分发&#xff0c;方便用户开始自己的项目&#xff0c;并借…

文件夹删不掉,显示在另一个文件中打开怎么办

问题&#xff1a; 一、想要删掉这个文件夹&#xff0c;却因为文件夹中的文件打开了删不掉&#xff0c;这里我因为做的测试&#xff0c;所以是知道打开了什么 二、一般情况下文件比较多时&#xff0c;是不知道打开了什么的&#xff0c;长这个样子 解决&#xff1a; 一、打开任…

嵌入式培训机构四个月实训课程笔记(完整版)-Linux ARM驱动编程第五天-ARM Linux编程之字符设备驱动(物联技术666)

链接&#xff1a;https://pan.baidu.com/s/1V0E9IHSoLbpiWJsncmFgdA?pwd1688 提取码&#xff1a;1688 教学内容&#xff1a; 1、内核模块的简单框架&#xff1a; __init __exit执行完后就释放空间 简单框架&#xff1a;包含三个部分 1&#xff09;模块初始化和模块退出函数…

Python算法题集_将有序数组转换为二叉搜索树

Python算法题集_将有序数组转换为二叉搜索树 题108&#xff1a;将有序数组转换为二叉搜索树1. 示例说明2. 题目解析- 题意分解- 优化思路- 测量工具 3. 代码展开1) 标准求解【极简代码递归】2) 改进版一【多行代码递归】3) 改进版二【极简代码递归传递下标】 4. 最优算法 本文为…

备战蓝桥杯---图论之最小生成树

首先&#xff0c;什么是最小生成树&#xff1f; 他就是无向图G中的所有生成树中树枝权值总和最小的。 如何求&#xff1f; 我们不妨采用以下的贪心策略&#xff1a; Prim算法&#xff08;复杂度&#xff1a;&#xff08;nm)logm)&#xff1a; 我们对于把上述的点看成两个集…

NX二次开发树列表双击快速进入编辑状态

先将这几个树列表回调注释给解开 int TreeColumn0;//定义一个全局边量记录点击的那一列NXOpen::BlockStyler::Tree::BeginLabelEditState OnBeginLabelEditCallback(NXOpen::BlockStyler::Tree *tree,NXOpen::BlockStyler::Node *node,int columID) {if(columnIDTreeColumnID)…

无人机基本知识,无人机遥控器功能详解与调试方法

无人机作为一种新兴的飞行器&#xff0c;近年来在各个领域得到了广泛的应用。而无人机遥控器则是控制无人机飞行的重要工具。 无人机遥控器是一种无线设备&#xff0c;通过它来远程控制无人机的飞行。遥控器通常包括一个或多个摇杆&#xff0c;用于控制无人机的各种动作&#x…

QGIS004:【10栅格地形分析工具箱】-坡度、坡向、山体阴影

摘要&#xff1a;QGIS栅格地形分析工具箱常用工具有坡度、坡向、山体阴影等选项&#xff0c;本文介绍各选项的基本操作。 实验数据&#xff1a; 链接&#xff1a;https://pan.baidu.com/s/1gYZ_om4AlSdal0bts2mt-A?pwd4rrn 提取码&#xff1a;4rrn 一、坡度 工具功能&…

B端系统从0到1:有几步,其中需求分析要做啥?

一款B系统从无到有都经历了啥&#xff0c;而其中的需求分析又要做什么&#xff1f;贝格前端工场给老铁们做一下分析&#xff0c;文章写作不易&#xff0c;如果咱们有界面设计和前端开发需求&#xff0c;别忘了私信我呦&#xff0c;开始了。 一、B端系统从0到1都有哪些要走的步骤…

Vue练习3:组件开发3(页面切换)

预览 ——————————————————————————————————————————— 组件文档 Pager组件 属性 属性名含义类型必填默认值current当前页码&#xff08;总数据量/单页容量&#xff09;Number否1total总数据量Number否0limit单页容量Number否10vis…

Day-02-01

内容管理模块项目开发 Swagger的使用 1. 导入依赖 <!-- Spring Boot 集成 swagger --> <dependency><groupId>com.spring4all</groupId><artifactId>swagger-spring-boot-starter</artifactId> </dependency> 2. 配置信息 # 在app…

为何重复造轮子

重复造轮子&#xff0c;意思是说&#xff0c;一个项目本身存在开源组件&#xff0c;但开发团队还是选择重新手写一套组件库或框架的情况&#xff0c;这在软件业界比比皆是。 下面说下游戏项目里重复造轮子的几点原因。 一&#xff0c;精简化 一般开源项目为了适应多场景多业…

【制作100个unity游戏之25】3D背包、库存、制作、快捷栏、存储系统、砍伐树木获取资源、随机战利品宝箱2(附带项目源码)

效果演示 文章目录 效果演示系列目录前言拖放、交换物品绘制拖拽物品插槽UI修改Inventory&#xff0c;控制拖放功能 源码完结 系列目录 前言 欢迎来到【制作100个Unity游戏】系列&#xff01;本系列将引导您一步步学习如何使用Unity开发各种类型的游戏。在这第25篇中&#xf…

什么原因导致百度百科建立一直审核不通过?

百科词条对网络营销实在是太重要了&#xff0c;不管是个人还是企业想在网上开展业务&#xff0c;都必要建立百科词条。自己动手编辑百科词条&#xff0c;搞个几十次也审核不过的情况比比皆是。 为什么百度百科总是审核不通过&#xff1f;百度官方发表过声明表示百度百科词条是人…

【JS逆向+Python模拟API请求】逆向某一个略微中等的混淆网站,并模拟调用api请求 仅供学习。注:不是源代码混淆,而是一个做代码混淆业务的网站,

逆向日期&#xff1a;2024.02.16 使用工具&#xff1a;Node.js 加密方法&#xff1a;RSA标准库 文章全程已做去敏处理&#xff01;&#xff01;&#xff01; 【需要做的可联系我】 AES解密处理&#xff08;直接解密即可&#xff09;&#xff08;crypto-js.js 标准算法&#xf…

ubuntu22.04安装jenkins并配置

准备 更新系统 sudo apt update sudo apt upgrade环境准备 jdk 安装 sudo apt install openjdk-11-jdk验证 java -versiongit ubuntu配置git maven ubuntu配置maven 部署 添加 Jenkins 存储库 导入Jenkins存储库的GPG密钥 wget -q -O - https://pkg.jenkins.io/de…