Elasticsearch 集成--Flink 框架集成

一、Flink 框架介绍

       Apache Spark 是一种基于内存的快速、通用、可扩展的大数据分析计算引擎。
Apache Spark 掀开了内存计算的先河,以内存作为赌注,赢得了内存计算的飞速发展。
但是在其火热的同时,开发人员发现,在 Spark 中,计算框架普遍存在的缺点和不足依然没
有完全解决,而这些问题随着 5G 时代的来临以及决策者对实时数据分析结果的迫切需要而
凸显的更加明显:
  •  数据精准一次性处理(Exactly-Once
  • 乱序数据,迟到数据
  •  低延迟,高吞吐,准确性
  •  容错性
        Apache Flink 是一个框架和分布式处理引擎,用于对无界和有界数据流进行有状态计算。在
Spark 火热的同时,也默默地发展自己,并尝试着解决其他计算框架的问题。
慢慢地,随着这些问题的解决, Flink 慢慢被绝大数程序员所熟知并进行大力推广,阿里公
司在 2015 年改进 Flink ,并创建了内部分支 Blink ,目前服务于阿里集团内部搜索、推荐、
广告和蚂蚁等大量核心实时业务。

二、框架集成

2.1创建 Maven 项目

依赖

<?xml version="1.0" encoding="UTF-8"?>
<projectxmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.lun.es</groupId><artifactId>flink-elasticsearch</artifactId><version>1.0</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-scala_2.12</artifactId><version>1.12.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_2.12</artifactId><version>1.12.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>1.12.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-elasticsearch7_2.11</artifactId><version>1.12.0</version></dependency><!-- jackson --><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-core</artifactId><version>2.11.1</version></dependency></dependencies>
</project>

功能实现

package com.xmx.es;import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
import org.apache.http.HttpHost;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;public class FlinkElasticsearchSinkTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> source = env.socketTextStream("localhost", 9999);List<HttpHost> httpHosts = new ArrayList<>();httpHosts.add(new HttpHost("127.0.0.1", 9200, "http"));//httpHosts.add(new HttpHost("10.2.3.1", 9200, "http"));// use a ElasticsearchSink.Builder to create an ElasticsearchSinkElasticsearchSink.Builder<String> esSinkBuilder = new ElasticsearchSink.Builder<>(httpHosts,new ElasticsearchSinkFunction<String>() {public IndexRequest createIndexRequest(String element) {Map<String, String> json = new HashMap<>();json.put("data", element);return Requests.indexRequest().index("my-index")//.type("my-type").source(json);}@Overridepublic void process(String element, RuntimeContext ctx, RequestIndexer indexer) {indexer.add(createIndexRequest(element));}});// configuration for the bulk requests; this instructs the sink to emit after every element, otherwise they would be bufferedesSinkBuilder.setBulkFlushMaxActions(1);// provide a RestClientFactory for custom configuration on the internally createdREST client// esSinkBuilder.setRestClientFactory(// restClientBuilder -> {// restClientBuilder.setDefaultHeaders(...)// restClientBuilder.setMaxRetryTimeoutMillis(...)// restClientBuilder.setPathPrefix(...)// restClientBuilder.setHttpClientConfigCallback(...)// }// );source.addSink(esSinkBuilder.build());env.execute("flink-es");}
}

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/57687.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

EasyExcel自定义字段对象转换器支持转换实体和集合实体

文章目录 1. 实现ObjectConverter2. 使用3. 测试3.1 导出excel3.2 导入excel 1. 实现ObjectConverter package com.tophant.cloud.common.excel.converters;import cn.hutool.json.JSONUtil; import com.alibaba.excel.converters.Converter; import com.alibaba.excel.enums.…

Redis之Sentinel(哨兵)机制

一、Sentinel是什么&#xff1f; Sentinel&#xff08;哨岗、哨兵&#xff09;是Redis的高可用性&#xff08;high availability&#xff09;解决方案&#xff1a;由一个或多个Sentinel实例&#xff08;instance&#xff09;组成的Sentinel系统&#xff08;system&#xff09;…

设计模式—原型模式(Prototype)

目录 一、什么是原型模式&#xff1f; 二、原型模式具有什么优缺点吗&#xff1f; 三、有什么缺点&#xff1f; 四、什么时候用原型模式&#xff1f; 五、代码展示 ①、简历代码初步实现 ②、原型模式 ③、简历的原型实现 ④、深复制 ⑤、浅复制 一、什么是原型模式&…

c++之指针

总结性质 我们如何在一个函数中获取数组的长度&#xff1a; 我们都知道&#xff0c;在main函数中我们获得数组的长度只需要使用sizeof&#xff08;a&#xff09;/sizeof&#xff08;a【0】&#xff09;即可获得&#xff0c;但当我们把一个数组传入到方法时&#xff0c;c默认把…

以楼宇自控系统为基础的系统集成方式

系统集成主要通过建筑与建筑群综合布线系统和计算机网络技术&#xff0c;使构成智能 建筑的各个主要子系统具有开放式结构、协议和接口都标准化和规范化。 系统集成的方式主要包括&#xff1a; &#xff08; 1&#xff09;智能建筑的系统集成可采用以 BAS 为中心&#xff0c;…

重生c++系列之类与对象(中篇)

好的继上期&#xff0c;我们今天带来c类与对象系列的继续学习。 类的6个默认成员函数 如果一个类中什么成员都没有&#xff0c;简称为空类。 空类中真的什么都没有吗&#xff1f;并不是&#xff0c;任何类在什么都不写时&#xff0c;编译器会自动生成以下6个默认成员 函数。 …

pyqt5-快捷键QShortcut

import sys from PyQt5.QtWidgets import * from PyQt5.QtCore import * from PyQt5.QtGui import *""" 下面示例揭示了&#xff0c;当关键字绑定的控件出现的时候&#xff0c;快捷键才管用&#xff0c; 绑定的控件没有出现的时候快捷键无效 """…

vscode使用anaconda自带的python环境在终端运行时报错

目录 具体报错内容官方翻译报错讲人话解决方法 具体报错内容 CommandNotFoundError: Your shell has not been properly configured to use conda activate. If your shell is Bash or a Bourne variant, enable conda for the current user with$ echo ". E:\Anaconda/e…

无涯教程-Android - Activity

Activity代表具有用户界面的单个屏幕&#xff0c;就像Java的窗口或框架一样。Android Activity 是ContextThemeWrapper类的子类。 如果您使用过C&#xff0c;C或Java编程语言&#xff0c;那么您一定已经看到您的程序从 main()函数开始。与之非常相似&#xff0c;Android系统以 …

MySQL日期格式及日期函数实践

目录 日期格式 日期函数 CURDATE()和CURRENT_DATE()CURTIME()和CURRENT_TIME()NOW()和CURRENT_TIMESTAMP()DATE_FORMAT()DATE_ADD()和DATE_SUB()DATEDIFF()DATE()DAYNAME()和MONTHNAME() 1. 日期格式 在MySQL中&#xff0c;日期可以使用多种格式进行存储和表示。常见的日期格式…

m3u8 blob视频免费下载

F12点开找到这个视频url最后是.m3u8结尾 http://blog.luckly-mjw.cn/tool-show/m3u8-downloader/index.html 在上边的网址转Mp4下载即可

后端面试话术集锦第二篇:spring boot面试话术

🚗后端面试集锦目录 💖后端面试话术集锦第 1 篇:spring面试话术💖 💖后端面试话术集锦第 2 篇:spring boot面试话术💖 💖后端面试话术集锦第 3 篇:spring cloud面试话术💖 💖后端面试话术集锦第 4 篇:ElasticSearch面试话术💖 💖后端面试话术集锦第 5 …

Flutter可执行屏幕动画的AnimateView

1.让动画使用起来就像使用widget。 2.可自定义动画。 3.内置平移动画。 演示&#xff1a; 代码: import dart:math; import package:flutter/cupertino.dart;class AnimateView extends StatefulWidget {///子Widgetfinal Widget child;///动画自定义final IAnimate? anim…

HTML-常见标签、HTML5新特性

HTML 软件架构 1.C/S架构 (1) C/S架构即Client/Server&#xff08;客户机/服务器&#xff09;结构。 (2) C/S 架构特点 ​ C/S结构在技术上很成熟&#xff0c;它的主要特点是交互性强、具有安全的存取模式、网络通信量低、响应速度快、利于处理大量数据。但是该结构的程序是…

ssm会议管理系统源码和论文

ssm会议管理系统源码和论文087 开发工具&#xff1a;idea 数据库mysql5.7 数据库链接工具&#xff1a;navcat,小海豚等 技术&#xff1a;ssm 摘 要 现代经济快节奏发展以及不断完善升级的信息化技术&#xff0c;让传统数据信息的管理升级为软件存储&#xff0c;归纳&…

玩转 PI 系列-看起来像服务器的 ARM 开发板矩阵-Firefly Cluster Server

前言 基于我个人的工作内容和兴趣&#xff0c;想要在家里搞一套服务器集群&#xff0c;用于容器/K8s 等方案的测试验证。 考虑过使用二手服务器&#xff0c;比如 Dell R730, 还搞了一套配置清单&#xff0c;如下&#xff1a; Dell R7303.5 尺寸规格硬盘CPU: 2686v4*2 内存&a…

Oracle-day4:分组查询(带条件)、DDL建表、约束、主从表

一、内容回顾 /*------------------内容回顾------------------------上周内容回顾--1、单表的基础查询--A、select * from emp;--B、列的运算 --数字类型运算 - * /--函数运算 mod ceil floor round upper lower--C、取别名--列、表达书取别名--*表示所有的列和列同时存在时…

深入浅出SSD:固态存储核心技术、原理与实战(文末赠书)

名字&#xff1a;阿玥的小东东 学习&#xff1a;Python、C/C 主页链接&#xff1a;阿玥的小东东的博客_CSDN博客-python&&c高级知识,过年必备,C/C知识讲解领域博主 目录 内容简介 作者简介 使用Python做一个计算器 本期赠书 近年来国家大力支持半导体行业&#xff0…

MySQL----索引

一、索引的概念 索引是一个排序的列表&#xff0c;在这个列表中存储着索引的值和包含这个值的数据所在行的物理地址&#xff08;类似于c语言的链表通过指针指向数据记录的内存地址&#xff09;。使用索引后可以不用扫描全表来定位某行的数据&#xff0c;而是先通过索引表找到该…

Mysql--技术文档--MVCC(Multi-Version Concurrency Control | 多版本并发控制)

MVCC到底是什么 MVCC&#xff08;Multi-Version Concurrency Control&#xff09;是一种并发控制机制&#xff0c;用于解决并发访问数据库时的数据一致性和隔离性问题。MVCC允许多个事务同时读取数据库的同一数据&#xff0c;而不会相互干扰或导致冲突。 在传统的并发控制机制中…