Flink 输出至 Elasticsearch

【1】引入pom.xml依赖

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-elasticsearch6_2.12</artifactId><version>1.10.0</version>
</dependency>

【2】ES6 Scala代码,自动导入的scala包需要修改为scala._ 否则会出现错误。

package com.zzx.flinkimport java.utilimport org.apache.flink.api.common.functions.RuntimeContext
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkFunction, RequestIndexer}
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink
import org.apache.http.HttpHost
import org.elasticsearch.client.Requestsobject EsSinkTest {def main(args: Array[String]): Unit = {// 创建一个流处理执行环境val env = StreamExecutionEnvironment.getExecutionEnvironment//从文件中读取数据并转换为 类val inputStreamFromFile: DataStream[String] = env.readTextFile("E:\\Project\\flink\\src\\main\\resources\\wordcount.txt")//转换val dataStream: DataStream[SensorReading] = inputStreamFromFile.map( data => {var dataArray = data.split(",")SensorReading(dataArray(0),dataArray(1).toLong,dataArray(2).toDouble)})//定义一个 HttpHostsval httpHost = new util.ArrayList[HttpHost]()//默认 9200 我的修改为了 9201httpHost.add(new HttpHost("192.168.1.12",9200,"http"))httpHost.add(new HttpHost("127.0.0.1",9200,"http"))//定义一个 ElasticSearchFuntion 操作 es的functionval esSinkFunc = new ElasticsearchSinkFunction[SensorReading] {//element 每一条数据 通过 index 发送override def process(element: SensorReading, runtimeContext: RuntimeContext, index: RequestIndexer): Unit = {//包装写入 es 的数据val dataSource = new util.HashMap[String,String]()dataSource.put("sensor_id",element.id)dataSource.put("temp",element.temperature.toString)dataSource.put("ts",element.timestamp.toString)//indexval indexRequest = Requests.indexRequest().index("sensor_temp").`type`("readingdata").source(dataSource)index.add(indexRequest)println("saved successfully " + element.toString)}}//输出值 esdataStream.addSink(new ElasticsearchSink.Builder[SensorReading](httpHost,esSinkFunc).build())env.execute("es")}
}

【3】ES6输出展示

​ [点击并拖拽以移动] ​​

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/584692.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

私有部署ELK,搭建自己的日志中心(五)-- 使用filebeat采集K8S pod的jvm日志

一、背景 前文采集的日志是来源于虚拟机&#xff0c;在云原生下的容器化部署时&#xff0c;需要采集Pod里的jvm日志。 二、术语 1、临时存储emptyDir emptyDir在Pod被分配到Node时创建的&#xff0c;它的初始内容为空&#xff0c;并且无须指定宿主机上对应的目录文件&#…

2. 云原生实战之kubesphere搭建

文章目录 机器介绍centos基本配置安装 VMware Tools设置静态ip关闭防火墙关闭SELinux开启时间同步配置host和hostname 安装kubesphere依赖项安装配置文件准备执行安装命令 机器介绍 在ESXI中准备虚拟机&#xff0c;部署参考官网&#xff1a;https://kubesphere.io/zh/ CentOs…

分布式系统架构设计之分布式系统架构演进和版本管理

在分布式系统的生命周期中&#xff0c;架构演进和版本管理是很重要的两个环节。本部分会介绍分布式系统架构演进的原则、策略以及版本管理的最佳实践&#xff0c;以帮助研发团队更好地应对需求变化、技术发展和系统升级。 架构演进 演进原则 渐进式演进 采用渐进式演进的原…

VsCode(Visual Studio Code) 安装插件教程

VsCode&#xff08;Visual Studio Code&#xff09; 安装插件教程 1、打开VsCode 桌面上、最近安装程序上找找吧 在桌面上双击&#xff0c;打开 2、打开的效果图 3、点击左侧最后的插件图标 4、打开后如下图所示 5、在输入框中输入Go 6、同样在安装插件页面也可以卸载、安装…

WPF 显示气泡提示框

气泡提示框应用举例 有时候在我们开发的软件经常会遇到需要提示用户的地方&#xff0c;为了让用户更直观&#xff0c;快速了解提示信息&#xff0c;使用简洁、好看又方便的气泡提示框显得更加方便&#xff0c;更具人性化。如下面例子&#xff1a;(当用户未输入账号时&#xff0…

抬头举手阅读YOLOV8NANO

首先用YOLOV8NANO得到PT模型&#xff0c;转换成ONNX,OPENCV调用&#xff0c;PYTHON,C,ANDROID都可以举手写字阅读YOLOV8NANO

android studio 将含有jni c++ 的library项目封装成jar并调用

请参考博客&#xff1a;android studio 4.1.1 将library项目封装成aar 并调用_android studio 4.1 aar release-CSDN博客 一 . 简单叙述 android studio 中可以创建Module 的两种属性&#xff0c;可以在build.gradle 中查看&#xff1a; 1. application属性&#xff1a;可以独…

DM、Oracle、GaussDB、Kingbase8(人大金仓数据库)和HIVE给列增加注释

DM数据库给列增加注释 1、创建表 CREATE TABLE test222 ( id int NOT NULL PRIMARY KEY, name varchar(1000) DEFAULT NULL, email varchar(1000) DEFAULT NULL, phone varchar(1000) DEFAULT NULL ) 2、给列添加注释 comment on column TEST222.NAME is 这是一个列注释; 例如…

[Angular] 笔记 17:提交表单 - ngSubmit

Submitting Forms (ngSubmit) 表单的一般完整写法&#xff1a; 如果表单验证失败&#xff0c;必须 disable 提交按钮&#xff0c;阻止用户提交不合法的数据。 提交表单后&#xff0c;与表单对应的 json 数据 post 到后端&#xff1a; {"id":1,"name":…

【Week-P3】CNN天气识别

文章目录 一、环境配置二、准备数据三、搭建网络结构四、开始训练五、查看训练结果六、总结6.1 不改变学习率的前提下&#xff0c;将训练epoch分别增加到50、60、70、80、90&#xff08;1&#xff09;epoch 50 的训练情况如下&#xff1a;&#xff08;2&#xff09;epoch 60 …

数据结构(栈和列队模拟实现)

栈和列队模拟实现 一.栈1.1栈的概念及其结构1.2栈的实现1.2.1 stack.h1.2.2 stack.c 二.列队2.1队列的概念及结构 2.2队列的实现2.2.1 Queue.h2.2.2 Queue.cpp 一.栈 1.1栈的概念及其结构 栈&#xff1a;一种特殊的线性表&#xff0c;其只允许在固定的一端进行插入和删除元素操…

JAVA B/S架构智慧工地源码,PC后台管理端、APP移动端

智慧工地系统充分利用计算机技术、互联网、物联网、云计算、大数据等新一代信息技术&#xff0c;以PC端&#xff0c;移动端&#xff0c;设备端三位一体的管控方式为企业现场工程管理提供了先进的技术手段。让劳务、设备、物料、安全、环境、能源、资料、计划、质量、视频监控等…

前后端分离nodejs+vue+ElementUi网上订餐系统69b9

课题主要分为两大模块&#xff1a;即管理员模块和用户模块&#xff0c;主要功能包括个人中心、用户管理、菜品类型管理、菜品信息管理、留言反馈、在线交流、系统管理、订单管理等&#xff1b; 运行软件:vscode 前端nodejsvueElementUi 语言 node.js 框架&#xff1a;Express/k…

【go语言】Chromeless简介及Chromedp库实现模拟登录截屏

一、什么是Chromeless chromeless 是一个基于 Node.js 的库&#xff0c;用于通过无头浏览器&#xff08;Headless Chrome&#xff09;进行自动化测试和网页截图。它允许开发者使用 JavaScript 脚本来控制和操作浏览器&#xff0c;而无需实际打开浏览器窗口。 以下是一些 chro…

GET和POST请求

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 前言一、GET请求二、POST请求三.幂等性是什么总结 前言 GET和POST是HTTP协议中的两种常见的请求方法&#xff0c;它们定义了客户端与服务器之间进行通信时的不同方…

STM32CubeMX教程10 RTC 实时时钟 - 周期唤醒、闹钟A/B事件和备份寄存器

目录 1、准备材料 2、实验目标 3、实验流程 3.0、前提知识 3.1、CubeMX相关配置 3.1.1 、时钟树配置 3.1.2、外设参数配置 3.1.3 、外设中断配置 3.2、生成代码 3.2.1、外设初始化函数调用流程 3.2.2、外设中断函数调用流程 3.2.3、添加其他必要代码 4、常用函数 …

Linux性能优化全景指南

Part1 Linux性能优化 1、性能优化性能指标 高并发和响应快对应着性能优化的两个核心指标&#xff1a;吞吐和延时 应用负载角度&#xff1a;直接影响了产品终端的用户体验系统资源角度&#xff1a;资源使用率、饱和度等 性能问题的本质就是系统资源已经到达瓶颈&#xff0c;但…

深度学习在自然语言处理中的应用

深度学习在自然语言处理中的应用 一、引言 随着人工智能技术的飞速发展&#xff0c;自然语言处理&#xff08;NLP&#xff09;作为其重要分支&#xff0c;已经在诸多领域取得了令人瞩目的成果。深度学习作为当前最炙手可热的技术&#xff0c;为NLP带来了革命性的变革。本文将…

python+django网上银行业务综合管理系统vue_bvj8b

本课题主要研究如何用信息化技术改善传统网上银行综合管理行业的经营和管理模式&#xff0c;简化网上银行综合管理的难度&#xff0c;根据管理实际业务需求&#xff0c;调研、分析和编写系统需求文档&#xff0c;设计编写符合银行需要的系统说明书&#xff0c;绘制数据库结构模…

一周中的第几天

一周中的第几天 import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Arrays; import java.util.Calendar; import java.util.Date;class Solution {public String dayOfTheWeek(int day, int month, int year) {String[] weekDays {&qu…