Spark实时(三):Structured Streaming入门案例

文章目录

Structured Streaming入门案例

一、Scala代码如下

二、Java 代码如下

三、以上代码注意点如下


Structured Streaming入门案例

我们使用Structured Streaming来监控socket数据统计WordCount。这里我们使用Spark版本为3.4.3版本,首先在Maven pom文件中导入以下依赖:

 <!-- 配置以下可以解决 在jdk1.8环境下打包时报错 “-source 1.5 中不支持 lambda 表达式” --><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target><spark.version>3.4.3</spark.version></properties><dependencies><!-- Spark-core --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.12</artifactId><version>${spark.version}</version></dependency><!-- SparkSQL --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.12</artifactId><version>${spark.version}</version></dependency><!-- SparkSQL  ON  Hive--><dependency><groupId>org.apache.spark</groupId><artifactId>spark-hive_2.12</artifactId><version>${spark.version}</version></dependency><!--mysql依赖的jar包--><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.47</version></dependency><!--SparkStreaming--><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.12</artifactId><version>${spark.version}</version></dependency><!-- Kafka 0.10+ Source For Structured Streaming--><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql-kafka-0-10_2.12</artifactId><version>${spark.version}</version></dependency><!-- 向kafka 生产数据需要包 --><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.8.0</version></dependency><!-- Scala 包--><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>2.12.15</version></dependency><dependency><groupId>org.scala-lang</groupId><artifactId>scala-compiler</artifactId><version>2.12.15</version></dependency><dependency><groupId>org.scala-lang</groupId><artifactId>scala-reflect</artifactId><version>2.12.15</version></dependency><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.12</version></dependency><dependency><groupId>com.google.collections</groupId><artifactId>google-collections</artifactId><version>1.0</version></dependency></dependencies>

一、Scala代码如下

package com.lanson.structuredStreaming/***  Structured Streaming 实时读取Socket数据*/import org.apache.spark.sql.streaming.StreamingQuery
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}/*** Structured Streaming 读取Socket数据*/
object SSReadSocketData {def main(args: Array[String]): Unit = {//1.创建SparkSession对象val spark: SparkSession = SparkSession.builder().master("local").appName("StructuredSocketWordCount")//默认200个并行度,由于源头数据量少,可以设置少一些并行度.config("spark.sql.shuffle.partitions",1).getOrCreate()import spark.implicits._spark.sparkContext.setLogLevel("Error")//2.读取Socket中的每行数据,生成DataFrame默认列名为"value"val lines: DataFrame = spark.readStream.format("socket").option("host", "node3").option("port", 9999).load()//3.将每行数据切分成单词,首先通过as[String]转换成Dataset操作val words: Dataset[String] = lines.as[String].flatMap(line=>{line.split(" ")})//4.按照单词分组,统计个数,自动多一个列countval wordCounts: DataFrame = words.groupBy("value").count()//5.启动流并向控制台打印结果val query: StreamingQuery = wordCounts.writeStream//更新模式设置为complete.outputMode("complete").format("console").start()query.awaitTermination()}}

 

二、Java 代码如下

package com.lanson.structuredStreaming;import java.util.Arrays;
import java.util.Iterator;
import java.util.concurrent.TimeoutException;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;public class SSReadSocketData01 {public static void main(String[] args) throws StreamingQueryException, TimeoutException {SparkSession spark = SparkSession.builder().master("local").appName("SSReadSocketData01").config("spark.sql.shuffle.partitions", 1).getOrCreate();spark.sparkContext().setLogLevel("Error");Dataset<Row> lines = spark.readStream().format("socket").option("host", "node3").option("port", 9999).load();Dataset<String> words = lines.as(Encoders.STRING()).flatMap(new FlatMapFunction<String, String>() {@Overridepublic Iterator<String> call(String line) throws Exception {return Arrays.asList(line.split(" ")).iterator();}}, Encoders.STRING());Dataset<Row> wordCounts = words.groupBy("value").count();StreamingQuery query = wordCounts.writeStream().outputMode("complete").format("console").start();query.awaitTermination();}
}

 

以上代码编写完成之后,在node3节点执行“nc -lk 9999”启动socket服务器,然后启动代码,向socket中输入以下数据:

第一次输入:a b c
第二次输入:d a c
第三次输入:a b c

可以看到控制台打印如下结果:

-------------------------------------------
Batch: 1
-------------------------------------------
+-----+-----+
|value|count|
+-----+-----+
|    c|    1|
|    b|    1|
|    a|    1|
+-----+-----+-------------------------------------------
Batch: 2
-------------------------------------------
+-----+-----+
|value|count|
+-----+-----+
|    d|    1|
|    c|    2|
|    b|    1|
|    a|    2|
+-----+-----+-------------------------------------------
Batch: 3
-------------------------------------------
+-----+-----+
|value|count|
+-----+-----+
|    d|    1|
|    c|    3|
|    b|    2|
|    a|    3|
+-----+-----+

三、以上代码注意点如下

  • SparkSQL 默认并行度为200,这里由于数据量少,可以将并行度通过参数“spark.sql.shuffle.partitions”设置少一些。
  • StructuredStreaming读取过来数据默认是DataFrame,默认有“value”名称的列
  • 对获取的DataFrame需要通过as[String]转换成Dataset进行操作
  • 结果输出时的OutputMode有三种输出模式:Complete Mode、Append Mode、Update Mode。

  • 📢博客主页:https://lansonli.blog.csdn.net
  • 📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正!
  • 📢本文由 Lansonli 原创,首发于 CSDN博客🙉
  • 📢停下休息的时候不要忘了别人还在奔跑,希望大家抓紧时间学习,全力奔赴更美好的生活✨

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/50740.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

在 Spring Boot 中使用 Filters 实现请求过滤和预处理

​ 博客主页: 南来_北往 系列专栏&#xff1a;Spring Boot实战 什么是过滤器 过滤器&#xff08;Filter&#xff09;是一种在Web应用中用于拦截和处理HTTP请求和响应的对象。 在Java Web开发中&#xff0c;过滤器是实现特定功能&#xff0c;如认证、日志记录和字符编码处…

X-AnyLabeling标注软件使用方法

第一步 下载 官方X-AnyLabeling下载地址 github&#xff1a;X-AnyLabeling 第二步 配置环境 使用conda创建新的虚拟环境 conda create -n xanylabel python3.8进入环境 conda activate xanylabel进入X-AnyLabeling文件夹内&#xff0c;运行下面内容 依赖文件系统环境运行环…

MyBatisPlus复习

目录 自定义sql swagger工具 IService批量新增 代码生成器 DB静态工具&#xff0c;hutool工具 逻辑删除 枚举处理器 Json处理器 分页 自定义sql swagger工具 IService批量新增 代码生成器 DB静态工具&#xff0c;hutool工具 逻辑删除 枚举处理器 Json处理器 分页

ks滑块验证码逆向分析与python识别

文章目录 1. 写在前面3. 接口分析3. 算法实现 【&#x1f3e0;作者主页】&#xff1a;吴秋霖 【&#x1f4bc;作者介绍】&#xff1a;擅长爬虫与JS加密逆向分析&#xff01;Python领域优质创作者、CSDN博客专家、阿里云博客专家、华为云享专家。一路走来长期坚守并致力于Python…

大模型技术:发展历程、经典模型、微调与应用[更新中...]

文章目录 一、预训练语言模型发展历程二、经典的Pre-trained任务2.1 Masked Language Modeling2.2 Next Sentence Prediction 三、Task-specific Fine-tuning 任务3.1 Single-text Classification (单句分类)3.2 Sentence-pair Classification (句子匹配/成对分类)3.3 Span Tex…

谷粒商城实战笔记-71-商品服务-API-属性分组-前端组件抽取父子组件交互

文章目录 一&#xff0c;一次性创建所有的菜单二&#xff0c;开发属性分组界面1&#xff0c;左侧三级分类树形组件2&#xff0c;右侧分组列表3&#xff0c;左右两部分通信3.1 子组件发送数据3.2&#xff0c;父组件接收数据 Vue的父子组件通信父组件向子组件传递数据子组件向父组…

【BUG】已解决:The above exception was the direct cause of the following exception:

The above exception was the direct cause of the following exception: 目录 The above exception was the direct cause of the following exception: 【常见模块错误】 【解决方案】 欢迎来到英杰社区https://bbs.csdn.net/topics/617804998 欢迎来到我的主页&#xff0c…

【杰理蓝牙开发】AC695x 音频部分

本文主要记录 杰理蓝牙audio接口的使用&#xff0c;包括ADC和DAC原理的介绍和API接口的使用。 【杰理蓝牙开发】AC695x 音频部分 0. 个人简介 && 授权须知1. ADC【音频数据采集】硬件部分1.1 单片机引脚1.2 硬件电路设计1.3 MIC 输入通路解释 2. 【DAC】音频信号编解码…

Super 4PCS配准算法

Nicolas Mellado&#xff0c;CNRS&#xff08;Centre national de la recherche scientifique&#xff0c;法国国家科学研究中心&#xff09;的研究员&#xff0c;在IRIT&#xff08;Institut de Recherche en Informatique de Toulouse&#xff0c;图卢兹计算机科学研究所&…

SAPUI5基础知识20 - 对话框和碎片(Dialogs and Fragments)

1. 背景 在 SAPUI5 中&#xff0c;Fragments 是一种轻量级的 UI 组件&#xff0c;类似于视图&#xff08;Views&#xff09;&#xff0c;但它们没有自己的控制器&#xff08;Controller&#xff09;。Fragments 通常用于定义可以在多个视图中重用的 UI 片段&#xff0c;从而提…

linux系统安装pytorch_中文地址命名实体识别案例

命名实体有关文章参考这篇文章 中文地址命名实体识别训练和预测 win10系统安装cuda环境参考这篇文章 搭建Pytorch的GPU环境超详细 1、下载python https://www.python.org/downloads/release/python-368/ 2、下载python包 https://pypi.org/search/?q=transformers 1、搜…

如何录制电脑内部声音?全方位介绍电脑录音软件:8款在线录音!(2024重新整理)

如何录制电脑内部声音&#xff1f;不管是娱乐圈还是现实生活&#xff0c;【录音】这个功能的重要性不言而喻。而电脑录音已在影视配音、音视频剪辑、会议记录、在线教育等多个领域发光发热&#xff01; 本文将为您推荐8款电脑录音软件&#xff0c;并详细介绍电脑录音的多种方式…

Python番外篇:变量是盒子还是标签

引言 前面通过几十篇文章&#xff0c;大概把Python的一些比较实用的基础做了一些介绍&#xff0c;学会这些&#xff0c;基本能应付日常的小的需求开发了&#xff0c;写一些小工具&#xff0c;提高工作的处理效率。 接下来&#xff0c;准备开始进入一个新的篇章&#xff0c;也…

C#如何引用dll动态链接库文件的注释

1、dll动态库文件项目生成属性中要勾选“XML文档文件” 注意&#xff1a;XML文件的名字切勿修改。 2、添加引用时XML文件要与DLL文件在同一个目录下。 3、如果要是添加引用的时候XML不在相同目录下&#xff0c;之后又将XML文件复制到相同的目录下&#xff0c;需要删除引用&am…

当设计模式牵手LLM

模版方法模式 何为模版设计模式 想象一下 如果我们要泡一杯茶 我们要循序渐进地 煮水温杯注水浸茶茶水入杯加点配料 如此&#xff0c;泡茶的工序就完成了&#xff0c;那么模板方法模式&#xff0c;相信各位也有了一定的概念&#xff1a;定义了一个算法的骨架&#xff0c;而…

UDP的报文结构及其注意事项

1. 概述 UDP&#xff08;User Datagram Protocol&#xff09;是一种无连接的传输层协议&#xff0c;它提供了一种简单的数据传输服务&#xff0c;不保证数据的可靠传输。在网络通信中&#xff0c;UDP通常用于一些对实时性要求较高、数据量较小、传输延迟较低的应用&#xff0c…

【JVM基础07】——类加载器-什么是类加载器?类加载器有哪些?双亲委派了解吗?

目录 1- 引言&#xff1a;类加载器1-1 类加载器是什么&#xff1f;(What)1-2 为什么要用类加载器&#xff1f; 作用&#xff1a;类加载的过程&#xff1f;(Why) 2- ⭐核心&#xff1a;类加载器详解(How)2-1 类加载器分类2-2 什么是双亲委派模型&#xff1f;2-3 为什么采用双亲委…

Pytorch基础:Tensor的squeeze和unsqueeze方法

相关阅读 Pytorch基础https://blog.csdn.net/weixin_45791458/category_12457644.html?spm1001.2014.3001.5482 在Pytorch中&#xff0c;squeeze和unsqueeze是Tensor的一个重要方法&#xff0c;同时它们也是torch模块中的一个函数&#xff0c;它们的语法如下所示。 Tensor.…

【SpringBoot】1 Gitee

本项目 Gitee 地址&#xff1a;https://gitee.com/Lin_DH/system idea中可能装个gitee的插件&#xff0c;这样操作起来比较方便。 1&#xff09;登录 Gitee 官网&#xff08;https://gitee.com/&#xff09;&#xff0c;新建仓库。 2&#xff09;复制新建的 Gitee 仓库地址&am…

Unity3D之TextMeshPro使用

文章目录 1. TextMeshPro简介2. TextMeshPro创建3. TextMeshPro脚本中调用4. TextMeshPro字体设置及中文支持过程中出现的一些问题 1. TextMeshPro简介 【官网文档】https://docs.unity.cn/cn/2020.3/Manual/com.unity.textmeshpro.html TextMeshPro 是 Unity 的最终文本解决…