Flink 输出至 Elasticsearch

【1】引入pom.xml依赖

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-elasticsearch6_2.12</artifactId><version>1.10.0</version>
</dependency>

【2】ES6 Scala代码,自动导入的scala包需要修改为scala._ 否则会出现错误。

package com.zzx.flinkimport java.utilimport org.apache.flink.api.common.functions.RuntimeContext
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkFunction, RequestIndexer}
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink
import org.apache.http.HttpHost
import org.elasticsearch.client.Requestsobject EsSinkTest {def main(args: Array[String]): Unit = {// 创建一个流处理执行环境val env = StreamExecutionEnvironment.getExecutionEnvironment//从文件中读取数据并转换为 类val inputStreamFromFile: DataStream[String] = env.readTextFile("E:\\Project\\flink\\src\\main\\resources\\wordcount.txt")//转换val dataStream: DataStream[SensorReading] = inputStreamFromFile.map( data => {var dataArray = data.split(",")SensorReading(dataArray(0),dataArray(1).toLong,dataArray(2).toDouble)})//定义一个 HttpHostsval httpHost = new util.ArrayList[HttpHost]()//默认 9200 我的修改为了 9201httpHost.add(new HttpHost("192.168.1.12",9200,"http"))httpHost.add(new HttpHost("127.0.0.1",9200,"http"))//定义一个 ElasticSearchFuntion 操作 es的functionval esSinkFunc = new ElasticsearchSinkFunction[SensorReading] {//element 每一条数据 通过 index 发送override def process(element: SensorReading, runtimeContext: RuntimeContext, index: RequestIndexer): Unit = {//包装写入 es 的数据val dataSource = new util.HashMap[String,String]()dataSource.put("sensor_id",element.id)dataSource.put("temp",element.temperature.toString)dataSource.put("ts",element.timestamp.toString)//indexval indexRequest = Requests.indexRequest().index("sensor_temp").`type`("readingdata").source(dataSource)index.add(indexRequest)println("saved successfully " + element.toString)}}//输出值 esdataStream.addSink(new ElasticsearchSink.Builder[SensorReading](httpHost,esSinkFunc).build())env.execute("es")}
}

【3】ES6输出展示

​ [点击并拖拽以移动] ​​

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/584692.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

私有部署ELK,搭建自己的日志中心(五)-- 使用filebeat采集K8S pod的jvm日志

一、背景 前文采集的日志是来源于虚拟机&#xff0c;在云原生下的容器化部署时&#xff0c;需要采集Pod里的jvm日志。 二、术语 1、临时存储emptyDir emptyDir在Pod被分配到Node时创建的&#xff0c;它的初始内容为空&#xff0c;并且无须指定宿主机上对应的目录文件&#…

分布式系统架构设计之分布式系统架构演进和版本管理

在分布式系统的生命周期中&#xff0c;架构演进和版本管理是很重要的两个环节。本部分会介绍分布式系统架构演进的原则、策略以及版本管理的最佳实践&#xff0c;以帮助研发团队更好地应对需求变化、技术发展和系统升级。 架构演进 演进原则 渐进式演进 采用渐进式演进的原…

VsCode(Visual Studio Code) 安装插件教程

VsCode&#xff08;Visual Studio Code&#xff09; 安装插件教程 1、打开VsCode 桌面上、最近安装程序上找找吧 在桌面上双击&#xff0c;打开 2、打开的效果图 3、点击左侧最后的插件图标 4、打开后如下图所示 5、在输入框中输入Go 6、同样在安装插件页面也可以卸载、安装…

WPF 显示气泡提示框

气泡提示框应用举例 有时候在我们开发的软件经常会遇到需要提示用户的地方&#xff0c;为了让用户更直观&#xff0c;快速了解提示信息&#xff0c;使用简洁、好看又方便的气泡提示框显得更加方便&#xff0c;更具人性化。如下面例子&#xff1a;(当用户未输入账号时&#xff0…

抬头举手阅读YOLOV8NANO

首先用YOLOV8NANO得到PT模型&#xff0c;转换成ONNX,OPENCV调用&#xff0c;PYTHON,C,ANDROID都可以举手写字阅读YOLOV8NANO

android studio 将含有jni c++ 的library项目封装成jar并调用

请参考博客&#xff1a;android studio 4.1.1 将library项目封装成aar 并调用_android studio 4.1 aar release-CSDN博客 一 . 简单叙述 android studio 中可以创建Module 的两种属性&#xff0c;可以在build.gradle 中查看&#xff1a; 1. application属性&#xff1a;可以独…

DM、Oracle、GaussDB、Kingbase8(人大金仓数据库)和HIVE给列增加注释

DM数据库给列增加注释 1、创建表 CREATE TABLE test222 ( id int NOT NULL PRIMARY KEY, name varchar(1000) DEFAULT NULL, email varchar(1000) DEFAULT NULL, phone varchar(1000) DEFAULT NULL ) 2、给列添加注释 comment on column TEST222.NAME is 这是一个列注释; 例如…

[Angular] 笔记 17:提交表单 - ngSubmit

Submitting Forms (ngSubmit) 表单的一般完整写法&#xff1a; 如果表单验证失败&#xff0c;必须 disable 提交按钮&#xff0c;阻止用户提交不合法的数据。 提交表单后&#xff0c;与表单对应的 json 数据 post 到后端&#xff1a; {"id":1,"name":…

【Week-P3】CNN天气识别

文章目录 一、环境配置二、准备数据三、搭建网络结构四、开始训练五、查看训练结果六、总结6.1 不改变学习率的前提下&#xff0c;将训练epoch分别增加到50、60、70、80、90&#xff08;1&#xff09;epoch 50 的训练情况如下&#xff1a;&#xff08;2&#xff09;epoch 60 …

JAVA B/S架构智慧工地源码,PC后台管理端、APP移动端

智慧工地系统充分利用计算机技术、互联网、物联网、云计算、大数据等新一代信息技术&#xff0c;以PC端&#xff0c;移动端&#xff0c;设备端三位一体的管控方式为企业现场工程管理提供了先进的技术手段。让劳务、设备、物料、安全、环境、能源、资料、计划、质量、视频监控等…

前后端分离nodejs+vue+ElementUi网上订餐系统69b9

课题主要分为两大模块&#xff1a;即管理员模块和用户模块&#xff0c;主要功能包括个人中心、用户管理、菜品类型管理、菜品信息管理、留言反馈、在线交流、系统管理、订单管理等&#xff1b; 运行软件:vscode 前端nodejsvueElementUi 语言 node.js 框架&#xff1a;Express/k…

STM32CubeMX教程10 RTC 实时时钟 - 周期唤醒、闹钟A/B事件和备份寄存器

目录 1、准备材料 2、实验目标 3、实验流程 3.0、前提知识 3.1、CubeMX相关配置 3.1.1 、时钟树配置 3.1.2、外设参数配置 3.1.3 、外设中断配置 3.2、生成代码 3.2.1、外设初始化函数调用流程 3.2.2、外设中断函数调用流程 3.2.3、添加其他必要代码 4、常用函数 …

Linux性能优化全景指南

Part1 Linux性能优化 1、性能优化性能指标 高并发和响应快对应着性能优化的两个核心指标&#xff1a;吞吐和延时 应用负载角度&#xff1a;直接影响了产品终端的用户体验系统资源角度&#xff1a;资源使用率、饱和度等 性能问题的本质就是系统资源已经到达瓶颈&#xff0c;但…

深度学习在自然语言处理中的应用

深度学习在自然语言处理中的应用 一、引言 随着人工智能技术的飞速发展&#xff0c;自然语言处理&#xff08;NLP&#xff09;作为其重要分支&#xff0c;已经在诸多领域取得了令人瞩目的成果。深度学习作为当前最炙手可热的技术&#xff0c;为NLP带来了革命性的变革。本文将…

python+django网上银行业务综合管理系统vue_bvj8b

本课题主要研究如何用信息化技术改善传统网上银行综合管理行业的经营和管理模式&#xff0c;简化网上银行综合管理的难度&#xff0c;根据管理实际业务需求&#xff0c;调研、分析和编写系统需求文档&#xff0c;设计编写符合银行需要的系统说明书&#xff0c;绘制数据库结构模…

php获取访客IP、UA、操作系统、浏览器等信息

最近有个需求就是获取下本地的ip地址、网上搜索了相关的教程&#xff0c;总结一下分享给大家、有需要的小伙伴可以参考一下 一、简单的获取 User Agent 信息代码: echo $_SERVER[HTTP_USER_AGENT]; 二、获取访客操作系统信息: /** * 获取客户端操作系统信息,包括win10 * pa…

SAP缓存 表缓存( Table Buffering)

本文主要介绍SAP中的表缓存在查询数据&#xff0c;更新数据时的工作情况以及对应概念。 SAP表缓存的工作 查询数据 更新数据 删除数据 表缓存的概念 表缓存技术设置属性 不允许缓冲&#xff1a; 允许缓冲&#xff0c;但已关闭&#xff1a; 缓冲已激活&#xff1a; 已…

搜索引擎推广的实践技巧提升你的品牌影响力-华媒舍

搜索引擎推广是一种有效提升品牌影响力的推广策略。通过关键词优化、广告创意设计、定向投放和数据分析与优化等实践技巧&#xff0c;可以提高品牌的知名度、点击率和转化率。在实施引擎霸屏推广之前&#xff0c;还需对实践效果进行评估&#xff0c;以确保推广策略的有效性和适…

鸿蒙Harmony(七)ArkUI--循环foreachList组件自定义组件

循环foreach import Prompt from system.promptclass Item {icon: Resourcename: stringprice: numberconstructor(icon: Resource, name: string, price: number) {this.icon iconthis.name namethis.price price} }Entry Component struct Index {State message: string …

百度地图添加坐标点

​​​​​​html <!DOCTYPE html><html xmlns"http://www.w3.org/1999/xhtml"> <head runat"server"><meta http-equiv"Content-Type" content"text/html; charsetutf-8" /><title>查看签到信息-地图…