Elasticsearch 集成---Spark Streaming 框架集成

一.Spark Streaming 框架介绍

Spark Streaming Spark core API 的扩展,支持实时数据流的处理,并且具有可扩展,
高吞吐量,容错的特点。
数据可以从许多来源获取,如 Kafka Flume Kinesis TCP sockets
并且可以使用复杂的算法进行处理,这些算法使用诸如 map reduce join window 等高
级函数表示。 最后,处理后的数据可以推送到文件系统,数据库等。 实际上,您可以将
Spark 的机器学习和图形处理算法应用于数据流。

二.框架集成

1. 创建 Maven 项目

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.atguigu.es</groupId><artifactId>es-sparkstreaming</artifactId><version>1.0</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target></properties><dependencies><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.12</artifactId><version>3.0.0</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.12</artifactId><version>3.0.0</version></dependency><dependency><groupId>org.elasticsearch</groupId><artifactId>elasticsearch</artifactId><version>7.8.0</version></dependency><!-- elasticsearch的客户端 --><dependency><groupId>org.elasticsearch.client</groupId><artifactId>elasticsearch-rest-high-level-client</artifactId><version>7.8.0</version></dependency><!-- elasticsearch依赖2.x的log4j --><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-api</artifactId><version>2.8.2</version></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-core</artifactId><version>2.8.2</version></dependency><!--        <dependency>--><!--            <groupId>com.fasterxml.jackson.core</groupId>--><!--            <artifactId>jackson-databind</artifactId>--><!--            <version>2.11.1</version>--><!--        </dependency>--><!--        &lt;!&ndash; junit单元测试 &ndash;&gt;--><!--        <dependency>--><!--            <groupId>junit</groupId>--><!--            <artifactId>junit</artifactId>--><!--            <version>4.12</version>--><!--        </dependency>--></dependencies>
</project>

2.功能实现

package com.atguigu.esimport org.apache.http.HttpHost
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.elasticsearch.action.index.{IndexRequest, IndexResponse}
import org.elasticsearch.client.{RequestOptions, RestClient, RestHighLevelClient}
import org.elasticsearch.common.xcontent.XContentTypeobject SparkStreamingESTest {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("ESTest")val ssc = new StreamingContext(sparkConf, Seconds(3))val ds: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999)ds.foreachRDD(rdd => {rdd.foreach(data => {val client = new RestHighLevelClient(RestClient.builder(new HttpHost("localhost",9200, "http")))val ss = data.split(" ")val request = new IndexRequest()request.index("product").id(ss(0))val json =s"""| {  "data" : "${ss(1)}" }|""".stripMarginrequest.source(json, XContentType.JSON)val response: IndexResponse = client.index(request, RequestOptions.DEFAULT)println(response.getResult)client.close()})})ssc.start()ssc.awaitTermination()}
}

3.界面截图

三.安装NetCat

1.下载网址:netcat 1.11 for Win32/Win64

2.解压压缩包

右键zip文件-->解压到当前文件夹

3.配置环境变量

右键此电脑-->属性-->高级系统设置-->环境变量

四.测试

Window + R  重新启动cmd命令窗口

4.1测试:输入 nc -l -p 9999

4.2 启动测试

4.3 cmd输入 1001 jianzi

 4.4 postman 查看

get    http://127.0.0.1:9200/product/_doc/1001

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/58181.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

全景图像生成算法

摘要 全景图像生成是计算机视觉领域的一个重要研究方向。本文对五种经典的全景图像生成算法进行综述&#xff0c;包括基于相机运动估计的算法、基于特征匹配的算法、基于图像切割的算法、基于多项式拟合的算法和基于深度学习的算法。通过对这些算法的原理、优缺点、适用场景等…

【附源码】Axure RP Pro8.0安装教程|HTML|网页设计

软件下载 软件&#xff1a;Axure版本&#xff1a;8.0语言&#xff1a;简体中文大小&#xff1a;82.53M安装环境&#xff1a;Win11/Win10/Win8/Win7硬件要求&#xff1a;CPU2.0GHz 内存4G(或更高&#xff09;下载通道①百度网盘丨下载链接&#xff1a;https://pan.baidu.com/s/…

Springboot四种实现鉴权方式各自的优缺点

拦截器&#xff1a;拦截器是基于SpringMVC的一种机制&#xff0c;它可以在请求到达控制器之前或之后进行拦截和处理&#xff0c;比如验证用户身份&#xff0c;记录日志&#xff0c;添加响应头等。 优点&#xff1a;拦截器可以获取到请求的上下文信息&#xff0c;如请求路径&…

部署 ssm 项目到云服务器上(购买云服务器 + 操作远程云服务器 + 服务器中的环境搭建 + 部署项目到服务器)

部署 Web 项目 1、获取 Linux 环境1.1、如何去买一个云服务器1.2、远程操作云服务器1.3、在 Linux 系统中搭建 Java Web 的运行环境。1&#xff09;安装 JDK&#xff08;使用包管理器 yum 来安装&#xff09;2&#xff09; 安装Tomcat3&#xff09;安装 MySQL。 1.4、在云服务器…

【力扣每日一题】2023.8.28 插入区间

目录 题目&#xff1a; 示例&#xff1a; 分析&#xff1a; 代码&#xff1a; 题目&#xff1a; 示例&#xff1a; 分析&#xff1a; 和昨天的题大差不差&#xff0c;我们仍然是有一堆区间&#xff0c;题目给我们一个新的区间&#xff0c;要我们把新区间插入到原本的区间数…

腾讯云便宜购买指南(腾讯云怎样购买划算)

腾讯云是国内知名的云计算服务商&#xff0c;拥有广泛的应用和用户群体。对于有需要的用户来说&#xff0c;怎样便宜购买腾讯云产品是一个值得关注的问题&#xff0c;下面给大家分享腾讯云便宜购买指南。 腾讯云便宜购买指南&#xff1a;1、新用户专属礼包&#xff1b;2、老用户…

Postman —— postman实现参数化

什么时候会用到参数化 比如&#xff1a;一个模块要用多组不同数据进行测试 验证业务的正确性 Login模块&#xff1a;正确的用户名&#xff0c;密码 成功&#xff1b;错误的用户名&#xff0c;正确的密码 失败 postman实现参数化 在实际的接口测试中&#xff0c;部分参数每…

远程连接虚拟机中ubuntu报错:Network error:Connection refused

ping检测一下虚拟机 可以ping通&#xff0c;说明主机是没问题 #检查ssh是否安装&#xff1a; ps -e |grep ssh发现ssh没有安装 #安装openssh-server sudo apt-get install openssh-server#启动ssh service ssh startps -e |grep ssh检查一下防火墙 #防火墙状态查看 sudo ufw…

使用 Transformer 和 Amazon OpenSearch Service 构建基于列的语义搜索引擎

在数据湖中&#xff0c;对于数据清理和注释、架构匹配、数据发现和跨多个数据来源进行分析等许多操作&#xff0c;查找相似的列有着重要的应用。如果不能从多个不同的来源准确查找和分析数据&#xff0c;就会严重拉低效率&#xff0c;不论是数据科学家、医学研究人员、学者&…

智慧化工地SaaS平台源码,PC端+APP端+智慧数据可视化大屏端,源码完全开源不封装,自主研发,支持二开,项目使用,微服务+Java++vue+mysql

智慧工地管理平台充分运用数字化技术&#xff0c;聚焦施工现场岗位一线&#xff0c;依托物联网、互联网、AI等技术&#xff0c;围绕施工现场管理的人、机、料、法、环五大维度&#xff0c;以及施工过程管理的进度、质量、安全三大体系为基础应用&#xff0c;实现全面高效的工程…

es和数据库同步方案

5.5 课程信息索引同步 5.5.1 技术方案 通过向索引中添加课程信息最终实现了课程的搜索&#xff0c;我们发现课程信息是先保存在关系数据库中&#xff0c;而后再写入索引&#xff0c;这个过程是将关系数据中的数据同步到elasticsearch索引中的过程&#xff0c;可以简单成为索引…

C++类相关知识

鸽了好久&#xff0c;回来更新下吧 C类 类用于指定对象的形式&#xff0c;是一种用户自定义的数据类型&#xff0c;它是一种封装了数据和函数的组合。类中的数据称为成员变量&#xff0c;函数称为成员函数。类可以被看作是一种模板&#xff0c;可以用来创建具有相同属性和行为…

ip_vs 原理解析 (四)hook 后的开始 NF_INET_LOCAL_IN

文章目录 ip_vs hook 后NF_INET_LOCAL_IN 本章重点&#xff1a; k8s 如何利用 ip_vs 实现源 IP 会话亲和性。 ip_vs hook 后 NF_INET_LOCAL_IN 根据优先级依次是 ip_vs_reply4&#xff0c;ip_vs_remote_request4 ip_vs_reply4| -- ip_vs_out| -- skb_to_full_sk(skb&#xf…

免费API集合分享,赶紧收藏起来~

天气预警&#xff1a;支持输入经纬度或者区域编码&#xff0c;获取指定城市当前生效中的各类天气预警&#xff0c;如寒潮蓝色预警信号&#xff0c;或一次性拉取全国所有生效中的天气预警。 通知短信&#xff1a;当您需要快速通知用户时&#xff0c;通知短信是最快捷有效的方式…

Redis之发布订阅

一、Redis的发布订阅 Redis的发布与订阅功能由PUBLISH、SUBSCRIBE、PSUBSCRIBE等命令组成。通过执行SUBSCRIBE命令&#xff0c;客户端可以订阅一个或多个频道&#xff0c;从而成为这些频道的订阅者&#xff08;subscriber&#xff09;&#xff1a;每当有其他客户端向被订阅的频…

数据结构之哈希

哈希 1. 哈希概念2. 哈希冲突3. 哈希冲突解决3.1 哈希表的闭散列3.2 哈希表的开散列 4. 哈希的应用4.1 位图4.2 布隆过滤器 哈希&#xff08;Hash&#xff09;是一种将任意长度的二进制明文映射为较短的二进制串的算法。它是一种重要的存储方式&#xff0c;也是一种常见的检索方…

Oracle数据库快速入门

前言&#xff1a; 我想现在很多人的入门数据库都是mysql&#xff0c;但是由于工作中会接触到Oracle数据库&#xff0c;如果你有MySQL的基础的话&#xff0c;这篇文章能让你很快掌握Oracle。 目录 1.体系结构 2.创建用户和表空间 2.1.创建表空间 2.2.创建用户 3.数据类型…

无涯教程-分类算法 - 简介

分类可以定义为根据观测值或给定数据点预测类别的过程。分类的输出可以采用"黑色"或"白色"或"垃圾邮件"或"非垃圾邮件"的形式。 在数学上&#xff0c;分类是从输入变量(X)到输出变量(Y)近似映射函数(f)的任务&#xff0c;它属于有监督…

MATLAB算法实战应用案例精讲-【自然语言处理】语义分割模型-DeepLabV3

目录 1、DeepLab系列简介 1.1.DeepLabV1 1.1.1创新点&#xff1a; 1.1.2. 动机&#xff1a; 1.1.3. 应对策略&#xff1a; 1.2.DeepLabV2 1.2.1.创新点&#xff1a; 1.2.2.动机 1.2.3. 应对策略&#xff1a; 1.3.DeepLabV3 1.3.1创新点&#xff1a; 1.3.2. 动机&am…

5G NR:RACH流程-- Msg1之生成PRACH Preamble

随机接入流程中的Msg1&#xff0c;即在PRACH信道上发送random access preamble。涉及到两个问题&#xff1a; 一个是如何产生preamble&#xff1f;一个是如何选择正确的PRACH时频资源发送所选的preamble? 一、PRACH Preamble是什么 PRACH Preamble从数学上来讲是一个长度为…