Elasticsearch 集成---Spark Streaming 框架集成

一.Spark Streaming 框架介绍

Spark Streaming Spark core API 的扩展,支持实时数据流的处理,并且具有可扩展,
高吞吐量,容错的特点。
数据可以从许多来源获取,如 Kafka Flume Kinesis TCP sockets
并且可以使用复杂的算法进行处理,这些算法使用诸如 map reduce join window 等高
级函数表示。 最后,处理后的数据可以推送到文件系统,数据库等。 实际上,您可以将
Spark 的机器学习和图形处理算法应用于数据流。

二.框架集成

1. 创建 Maven 项目

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.atguigu.es</groupId><artifactId>es-sparkstreaming</artifactId><version>1.0</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target></properties><dependencies><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.12</artifactId><version>3.0.0</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.12</artifactId><version>3.0.0</version></dependency><dependency><groupId>org.elasticsearch</groupId><artifactId>elasticsearch</artifactId><version>7.8.0</version></dependency><!-- elasticsearch的客户端 --><dependency><groupId>org.elasticsearch.client</groupId><artifactId>elasticsearch-rest-high-level-client</artifactId><version>7.8.0</version></dependency><!-- elasticsearch依赖2.x的log4j --><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-api</artifactId><version>2.8.2</version></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-core</artifactId><version>2.8.2</version></dependency><!--        <dependency>--><!--            <groupId>com.fasterxml.jackson.core</groupId>--><!--            <artifactId>jackson-databind</artifactId>--><!--            <version>2.11.1</version>--><!--        </dependency>--><!--        &lt;!&ndash; junit单元测试 &ndash;&gt;--><!--        <dependency>--><!--            <groupId>junit</groupId>--><!--            <artifactId>junit</artifactId>--><!--            <version>4.12</version>--><!--        </dependency>--></dependencies>
</project>

2.功能实现

package com.atguigu.esimport org.apache.http.HttpHost
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.elasticsearch.action.index.{IndexRequest, IndexResponse}
import org.elasticsearch.client.{RequestOptions, RestClient, RestHighLevelClient}
import org.elasticsearch.common.xcontent.XContentTypeobject SparkStreamingESTest {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("ESTest")val ssc = new StreamingContext(sparkConf, Seconds(3))val ds: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999)ds.foreachRDD(rdd => {rdd.foreach(data => {val client = new RestHighLevelClient(RestClient.builder(new HttpHost("localhost",9200, "http")))val ss = data.split(" ")val request = new IndexRequest()request.index("product").id(ss(0))val json =s"""| {  "data" : "${ss(1)}" }|""".stripMarginrequest.source(json, XContentType.JSON)val response: IndexResponse = client.index(request, RequestOptions.DEFAULT)println(response.getResult)client.close()})})ssc.start()ssc.awaitTermination()}
}

3.界面截图

三.安装NetCat

1.下载网址:netcat 1.11 for Win32/Win64

2.解压压缩包

右键zip文件-->解压到当前文件夹

3.配置环境变量

右键此电脑-->属性-->高级系统设置-->环境变量

四.测试

Window + R  重新启动cmd命令窗口

4.1测试:输入 nc -l -p 9999

4.2 启动测试

4.3 cmd输入 1001 jianzi

 4.4 postman 查看

get    http://127.0.0.1:9200/product/_doc/1001

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/58181.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

全景图像生成算法

摘要 全景图像生成是计算机视觉领域的一个重要研究方向。本文对五种经典的全景图像生成算法进行综述&#xff0c;包括基于相机运动估计的算法、基于特征匹配的算法、基于图像切割的算法、基于多项式拟合的算法和基于深度学习的算法。通过对这些算法的原理、优缺点、适用场景等…

【附源码】Axure RP Pro8.0安装教程|HTML|网页设计

软件下载 软件&#xff1a;Axure版本&#xff1a;8.0语言&#xff1a;简体中文大小&#xff1a;82.53M安装环境&#xff1a;Win11/Win10/Win8/Win7硬件要求&#xff1a;CPU2.0GHz 内存4G(或更高&#xff09;下载通道①百度网盘丨下载链接&#xff1a;https://pan.baidu.com/s/…

部署 ssm 项目到云服务器上(购买云服务器 + 操作远程云服务器 + 服务器中的环境搭建 + 部署项目到服务器)

部署 Web 项目 1、获取 Linux 环境1.1、如何去买一个云服务器1.2、远程操作云服务器1.3、在 Linux 系统中搭建 Java Web 的运行环境。1&#xff09;安装 JDK&#xff08;使用包管理器 yum 来安装&#xff09;2&#xff09; 安装Tomcat3&#xff09;安装 MySQL。 1.4、在云服务器…

【力扣每日一题】2023.8.28 插入区间

目录 题目&#xff1a; 示例&#xff1a; 分析&#xff1a; 代码&#xff1a; 题目&#xff1a; 示例&#xff1a; 分析&#xff1a; 和昨天的题大差不差&#xff0c;我们仍然是有一堆区间&#xff0c;题目给我们一个新的区间&#xff0c;要我们把新区间插入到原本的区间数…

腾讯云便宜购买指南(腾讯云怎样购买划算)

腾讯云是国内知名的云计算服务商&#xff0c;拥有广泛的应用和用户群体。对于有需要的用户来说&#xff0c;怎样便宜购买腾讯云产品是一个值得关注的问题&#xff0c;下面给大家分享腾讯云便宜购买指南。 腾讯云便宜购买指南&#xff1a;1、新用户专属礼包&#xff1b;2、老用户…

Postman —— postman实现参数化

什么时候会用到参数化 比如&#xff1a;一个模块要用多组不同数据进行测试 验证业务的正确性 Login模块&#xff1a;正确的用户名&#xff0c;密码 成功&#xff1b;错误的用户名&#xff0c;正确的密码 失败 postman实现参数化 在实际的接口测试中&#xff0c;部分参数每…

远程连接虚拟机中ubuntu报错:Network error:Connection refused

ping检测一下虚拟机 可以ping通&#xff0c;说明主机是没问题 #检查ssh是否安装&#xff1a; ps -e |grep ssh发现ssh没有安装 #安装openssh-server sudo apt-get install openssh-server#启动ssh service ssh startps -e |grep ssh检查一下防火墙 #防火墙状态查看 sudo ufw…

使用 Transformer 和 Amazon OpenSearch Service 构建基于列的语义搜索引擎

在数据湖中&#xff0c;对于数据清理和注释、架构匹配、数据发现和跨多个数据来源进行分析等许多操作&#xff0c;查找相似的列有着重要的应用。如果不能从多个不同的来源准确查找和分析数据&#xff0c;就会严重拉低效率&#xff0c;不论是数据科学家、医学研究人员、学者&…

智慧化工地SaaS平台源码,PC端+APP端+智慧数据可视化大屏端,源码完全开源不封装,自主研发,支持二开,项目使用,微服务+Java++vue+mysql

智慧工地管理平台充分运用数字化技术&#xff0c;聚焦施工现场岗位一线&#xff0c;依托物联网、互联网、AI等技术&#xff0c;围绕施工现场管理的人、机、料、法、环五大维度&#xff0c;以及施工过程管理的进度、质量、安全三大体系为基础应用&#xff0c;实现全面高效的工程…

es和数据库同步方案

5.5 课程信息索引同步 5.5.1 技术方案 通过向索引中添加课程信息最终实现了课程的搜索&#xff0c;我们发现课程信息是先保存在关系数据库中&#xff0c;而后再写入索引&#xff0c;这个过程是将关系数据中的数据同步到elasticsearch索引中的过程&#xff0c;可以简单成为索引…

数据结构之哈希

哈希 1. 哈希概念2. 哈希冲突3. 哈希冲突解决3.1 哈希表的闭散列3.2 哈希表的开散列 4. 哈希的应用4.1 位图4.2 布隆过滤器 哈希&#xff08;Hash&#xff09;是一种将任意长度的二进制明文映射为较短的二进制串的算法。它是一种重要的存储方式&#xff0c;也是一种常见的检索方…

Oracle数据库快速入门

前言&#xff1a; 我想现在很多人的入门数据库都是mysql&#xff0c;但是由于工作中会接触到Oracle数据库&#xff0c;如果你有MySQL的基础的话&#xff0c;这篇文章能让你很快掌握Oracle。 目录 1.体系结构 2.创建用户和表空间 2.1.创建表空间 2.2.创建用户 3.数据类型…

无涯教程-分类算法 - 简介

分类可以定义为根据观测值或给定数据点预测类别的过程。分类的输出可以采用"黑色"或"白色"或"垃圾邮件"或"非垃圾邮件"的形式。 在数学上&#xff0c;分类是从输入变量(X)到输出变量(Y)近似映射函数(f)的任务&#xff0c;它属于有监督…

MATLAB算法实战应用案例精讲-【自然语言处理】语义分割模型-DeepLabV3

目录 1、DeepLab系列简介 1.1.DeepLabV1 1.1.1创新点&#xff1a; 1.1.2. 动机&#xff1a; 1.1.3. 应对策略&#xff1a; 1.2.DeepLabV2 1.2.1.创新点&#xff1a; 1.2.2.动机 1.2.3. 应对策略&#xff1a; 1.3.DeepLabV3 1.3.1创新点&#xff1a; 1.3.2. 动机&am…

5G NR:RACH流程-- Msg1之生成PRACH Preamble

随机接入流程中的Msg1&#xff0c;即在PRACH信道上发送random access preamble。涉及到两个问题&#xff1a; 一个是如何产生preamble&#xff1f;一个是如何选择正确的PRACH时频资源发送所选的preamble? 一、PRACH Preamble是什么 PRACH Preamble从数学上来讲是一个长度为…

马斯克遭冷遇,Twitter更名近一个月,许多品牌仍未删除蓝鸟标志

根据报道&#xff0c;Twitter更名为X已经近一个月了&#xff0c;但许多主要品牌仍然没有完全删除其营销中的蓝鸟标志。只有宝洁这一家美国广告支出最高的公司在其网站的社交媒体联系信息中将蓝鸟换成了新的X标志。 另外&#xff0c;Expedia和IBM这两家公司在其网站上甚至没有显…

[C++ 网络协议] 套接字的多种可选项

目录 1. 套接字的可选项 2. 获取/设置套接字可选项 2.1 getsockopt函数&#xff08;获取套接字可选项&#xff09; 2.2 setsockopt函数&#xff08;设置套接字可选项&#xff09; 3. 常用套接字可选项 3.1 SOL_SOCKET协议层的SO_TYPE可选项 3.2 SOL_SOCKET协议层的SO_SN…

Matlab(变量与文本读取)

目录 1.变量&#xff08;数据&#xff09;类型转换 1.1 字符 1.2 字符串 1.3 逻辑操作与赋值 2.Struct结构体数组 2.1函数的详细介绍&#xff1a; 2.1.1 cell2struct 2.1.1.1 垂直维度转换 2.1.1.2 水平维度转换 2.1.1.3 部分进行转换 2.1.2 rmfield 2.1.3 fieldnames(查…

【真题解析】系统集成项目管理工程师 2022 年上半年真题卷(案例分析)

本文为系统集成项目管理工程师考试(软考) 2022 年上半年真题&#xff08;全国卷&#xff09;&#xff0c;包含答案与详细解析。考试共分为两科&#xff0c;成绩均 ≥45 即可通过考试&#xff1a; 综合知识&#xff08;选择题 75 道&#xff0c;75分&#xff09;案例分析&#x…

使用MATLAB解算炼油厂的选址

背景 记得有一年的数据建模大赛&#xff0c;试题是炼油厂的选址&#xff0c;最后我们采用MATLAB编写&#xff08;复制&#xff09;蒙特卡洛算法&#xff0c;还到了省级一等奖&#xff0c;这里把仅有一些记忆和材料&#xff0c;放到这里来&#xff0c;用来纪念消失的青春。 本…