Spark实时(三):Structured Streaming入门案例

文章目录

Structured Streaming入门案例

一、Scala代码如下

二、Java 代码如下

三、以上代码注意点如下


Structured Streaming入门案例

我们使用Structured Streaming来监控socket数据统计WordCount。这里我们使用Spark版本为3.4.3版本,首先在Maven pom文件中导入以下依赖:

 <!-- 配置以下可以解决 在jdk1.8环境下打包时报错 “-source 1.5 中不支持 lambda 表达式” --><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target><spark.version>3.4.3</spark.version></properties><dependencies><!-- Spark-core --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.12</artifactId><version>${spark.version}</version></dependency><!-- SparkSQL --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.12</artifactId><version>${spark.version}</version></dependency><!-- SparkSQL  ON  Hive--><dependency><groupId>org.apache.spark</groupId><artifactId>spark-hive_2.12</artifactId><version>${spark.version}</version></dependency><!--mysql依赖的jar包--><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.47</version></dependency><!--SparkStreaming--><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.12</artifactId><version>${spark.version}</version></dependency><!-- Kafka 0.10+ Source For Structured Streaming--><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql-kafka-0-10_2.12</artifactId><version>${spark.version}</version></dependency><!-- 向kafka 生产数据需要包 --><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.8.0</version></dependency><!-- Scala 包--><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>2.12.15</version></dependency><dependency><groupId>org.scala-lang</groupId><artifactId>scala-compiler</artifactId><version>2.12.15</version></dependency><dependency><groupId>org.scala-lang</groupId><artifactId>scala-reflect</artifactId><version>2.12.15</version></dependency><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.12</version></dependency><dependency><groupId>com.google.collections</groupId><artifactId>google-collections</artifactId><version>1.0</version></dependency></dependencies>

一、Scala代码如下

package com.lanson.structuredStreaming/***  Structured Streaming 实时读取Socket数据*/import org.apache.spark.sql.streaming.StreamingQuery
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}/*** Structured Streaming 读取Socket数据*/
object SSReadSocketData {def main(args: Array[String]): Unit = {//1.创建SparkSession对象val spark: SparkSession = SparkSession.builder().master("local").appName("StructuredSocketWordCount")//默认200个并行度,由于源头数据量少,可以设置少一些并行度.config("spark.sql.shuffle.partitions",1).getOrCreate()import spark.implicits._spark.sparkContext.setLogLevel("Error")//2.读取Socket中的每行数据,生成DataFrame默认列名为"value"val lines: DataFrame = spark.readStream.format("socket").option("host", "node3").option("port", 9999).load()//3.将每行数据切分成单词,首先通过as[String]转换成Dataset操作val words: Dataset[String] = lines.as[String].flatMap(line=>{line.split(" ")})//4.按照单词分组,统计个数,自动多一个列countval wordCounts: DataFrame = words.groupBy("value").count()//5.启动流并向控制台打印结果val query: StreamingQuery = wordCounts.writeStream//更新模式设置为complete.outputMode("complete").format("console").start()query.awaitTermination()}}

 

二、Java 代码如下

package com.lanson.structuredStreaming;import java.util.Arrays;
import java.util.Iterator;
import java.util.concurrent.TimeoutException;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;public class SSReadSocketData01 {public static void main(String[] args) throws StreamingQueryException, TimeoutException {SparkSession spark = SparkSession.builder().master("local").appName("SSReadSocketData01").config("spark.sql.shuffle.partitions", 1).getOrCreate();spark.sparkContext().setLogLevel("Error");Dataset<Row> lines = spark.readStream().format("socket").option("host", "node3").option("port", 9999).load();Dataset<String> words = lines.as(Encoders.STRING()).flatMap(new FlatMapFunction<String, String>() {@Overridepublic Iterator<String> call(String line) throws Exception {return Arrays.asList(line.split(" ")).iterator();}}, Encoders.STRING());Dataset<Row> wordCounts = words.groupBy("value").count();StreamingQuery query = wordCounts.writeStream().outputMode("complete").format("console").start();query.awaitTermination();}
}

 

以上代码编写完成之后,在node3节点执行“nc -lk 9999”启动socket服务器,然后启动代码,向socket中输入以下数据:

第一次输入:a b c
第二次输入:d a c
第三次输入:a b c

可以看到控制台打印如下结果:

-------------------------------------------
Batch: 1
-------------------------------------------
+-----+-----+
|value|count|
+-----+-----+
|    c|    1|
|    b|    1|
|    a|    1|
+-----+-----+-------------------------------------------
Batch: 2
-------------------------------------------
+-----+-----+
|value|count|
+-----+-----+
|    d|    1|
|    c|    2|
|    b|    1|
|    a|    2|
+-----+-----+-------------------------------------------
Batch: 3
-------------------------------------------
+-----+-----+
|value|count|
+-----+-----+
|    d|    1|
|    c|    3|
|    b|    2|
|    a|    3|
+-----+-----+

三、以上代码注意点如下

  • SparkSQL 默认并行度为200,这里由于数据量少,可以将并行度通过参数“spark.sql.shuffle.partitions”设置少一些。
  • StructuredStreaming读取过来数据默认是DataFrame,默认有“value”名称的列
  • 对获取的DataFrame需要通过as[String]转换成Dataset进行操作
  • 结果输出时的OutputMode有三种输出模式:Complete Mode、Append Mode、Update Mode。

  • 📢博客主页:https://lansonli.blog.csdn.net
  • 📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正!
  • 📢本文由 Lansonli 原创,首发于 CSDN博客🙉
  • 📢停下休息的时候不要忘了别人还在奔跑,希望大家抓紧时间学习,全力奔赴更美好的生活✨

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/50740.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

在 Spring Boot 中使用 Filters 实现请求过滤和预处理

​ 博客主页: 南来_北往 系列专栏&#xff1a;Spring Boot实战 什么是过滤器 过滤器&#xff08;Filter&#xff09;是一种在Web应用中用于拦截和处理HTTP请求和响应的对象。 在Java Web开发中&#xff0c;过滤器是实现特定功能&#xff0c;如认证、日志记录和字符编码处…

X-AnyLabeling标注软件使用方法

第一步 下载 官方X-AnyLabeling下载地址 github&#xff1a;X-AnyLabeling 第二步 配置环境 使用conda创建新的虚拟环境 conda create -n xanylabel python3.8进入环境 conda activate xanylabel进入X-AnyLabeling文件夹内&#xff0c;运行下面内容 依赖文件系统环境运行环…

MyBatisPlus复习

目录 自定义sql swagger工具 IService批量新增 代码生成器 DB静态工具&#xff0c;hutool工具 逻辑删除 枚举处理器 Json处理器 分页 自定义sql swagger工具 IService批量新增 代码生成器 DB静态工具&#xff0c;hutool工具 逻辑删除 枚举处理器 Json处理器 分页

ks滑块验证码逆向分析与python识别

文章目录 1. 写在前面3. 接口分析3. 算法实现 【&#x1f3e0;作者主页】&#xff1a;吴秋霖 【&#x1f4bc;作者介绍】&#xff1a;擅长爬虫与JS加密逆向分析&#xff01;Python领域优质创作者、CSDN博客专家、阿里云博客专家、华为云享专家。一路走来长期坚守并致力于Python…

大模型技术:发展历程、经典模型、微调与应用[更新中...]

文章目录 一、预训练语言模型发展历程二、经典的Pre-trained任务2.1 Masked Language Modeling2.2 Next Sentence Prediction 三、Task-specific Fine-tuning 任务3.1 Single-text Classification (单句分类)3.2 Sentence-pair Classification (句子匹配/成对分类)3.3 Span Tex…

谷粒商城实战笔记-71-商品服务-API-属性分组-前端组件抽取父子组件交互

文章目录 一&#xff0c;一次性创建所有的菜单二&#xff0c;开发属性分组界面1&#xff0c;左侧三级分类树形组件2&#xff0c;右侧分组列表3&#xff0c;左右两部分通信3.1 子组件发送数据3.2&#xff0c;父组件接收数据 Vue的父子组件通信父组件向子组件传递数据子组件向父组…

vector的相关内容介绍及模拟实现

一.内容介绍 1.vector是一个模板&#xff0c;不支持流插入和流提取&#xff0c;因为它支持多种方式的输出&#xff0c;不需要局限于流提取的方式 2.关于vector所涉及的函数接口与string类的用法类似&#xff0c;有兴趣可参考小编的另一篇博客 3.vector的迭代器失效问题 1&g…

后端面试题日常练-day08 【Java基础】

题目 希望这些选择题能够帮助您进行后端面试的准备&#xff0c;答案在文末 Java中的静态变量和实例变量有何区别&#xff1f; a) 静态变量属于类&#xff0c;实例变量属于对象 b) 静态变量只能在静态方法中访问&#xff0c;实例变量只能在实例方法中访问 c) 静态变量在类加载时…

【BUG】已解决:The above exception was the direct cause of the following exception:

The above exception was the direct cause of the following exception: 目录 The above exception was the direct cause of the following exception: 【常见模块错误】 【解决方案】 欢迎来到英杰社区https://bbs.csdn.net/topics/617804998 欢迎来到我的主页&#xff0c…

【杰理蓝牙开发】AC695x 音频部分

本文主要记录 杰理蓝牙audio接口的使用&#xff0c;包括ADC和DAC原理的介绍和API接口的使用。 【杰理蓝牙开发】AC695x 音频部分 0. 个人简介 && 授权须知1. ADC【音频数据采集】硬件部分1.1 单片机引脚1.2 硬件电路设计1.3 MIC 输入通路解释 2. 【DAC】音频信号编解码…

JVM 高级面试题及答案整理,最新面试题

JVM中的垃圾收集器有哪些,它们的工作原理是什么? JVM中的垃圾收集器主要包括以下几种: 1、 Serial收集器:它是一个单线程收集器,工作时会暂停所有其他工作线程("Stop-The-World"),它的优点是简单高效(与其他收集器的单线程比),适用于单核处理器的环境。 2…

stm32F1xx外设GPIO的用法总结

目录 前言一、概念二、主要文件三、库函数和寄存器四、几个数据结构五、使用方式 前言 本文笔记总结stm32F1xx的GPIO的寄存器说明和标准库中的函数说明、用法&#xff0c;使用案例; 一、概念 GPIO为通用输入输出端口的简称&#xff0c;作为主控芯片的一个外设在芯片中是一个…

Super 4PCS配准算法

Nicolas Mellado&#xff0c;CNRS&#xff08;Centre national de la recherche scientifique&#xff0c;法国国家科学研究中心&#xff09;的研究员&#xff0c;在IRIT&#xff08;Institut de Recherche en Informatique de Toulouse&#xff0c;图卢兹计算机科学研究所&…

SAPUI5基础知识20 - 对话框和碎片(Dialogs and Fragments)

1. 背景 在 SAPUI5 中&#xff0c;Fragments 是一种轻量级的 UI 组件&#xff0c;类似于视图&#xff08;Views&#xff09;&#xff0c;但它们没有自己的控制器&#xff08;Controller&#xff09;。Fragments 通常用于定义可以在多个视图中重用的 UI 片段&#xff0c;从而提…

linux系统安装pytorch_中文地址命名实体识别案例

命名实体有关文章参考这篇文章 中文地址命名实体识别训练和预测 win10系统安装cuda环境参考这篇文章 搭建Pytorch的GPU环境超详细 1、下载python https://www.python.org/downloads/release/python-368/ 2、下载python包 https://pypi.org/search/?q=transformers 1、搜…

如何录制电脑内部声音?全方位介绍电脑录音软件:8款在线录音!(2024重新整理)

如何录制电脑内部声音&#xff1f;不管是娱乐圈还是现实生活&#xff0c;【录音】这个功能的重要性不言而喻。而电脑录音已在影视配音、音视频剪辑、会议记录、在线教育等多个领域发光发热&#xff01; 本文将为您推荐8款电脑录音软件&#xff0c;并详细介绍电脑录音的多种方式…

Git 从入门到精通:全面掌握版本控制(IntelliJ IDEA 中 Git 的使用指南)

引言 Git 是目前世界上最流行的版本控制系统&#xff0c;由 Linux 内核的创始人 Linus Torvalds 开发。它不仅拥有强大的分支管理功能&#xff0c;还具备了优秀的合并能力。本文将从 Git 的基本概念开始&#xff0c;逐步深入到 Git 的使用和一些高级技巧。 Git 简介 Git 是一…

Python番外篇:变量是盒子还是标签

引言 前面通过几十篇文章&#xff0c;大概把Python的一些比较实用的基础做了一些介绍&#xff0c;学会这些&#xff0c;基本能应付日常的小的需求开发了&#xff0c;写一些小工具&#xff0c;提高工作的处理效率。 接下来&#xff0c;准备开始进入一个新的篇章&#xff0c;也…

del 语句

使用 del 语句可以删除任何对象&#xff0c;包括字典对象。删除之后&#xff0c;之前的引用将失效&#xff0c;尝试使用该对象会导致 NameError 错误。因此&#xff0c;删除字典对象的命令是 del myDict。 元组 (Tuple) 元组是不可变的&#xff0c;因此你不能修改元组的内容&a…

C#如何引用dll动态链接库文件的注释

1、dll动态库文件项目生成属性中要勾选“XML文档文件” 注意&#xff1a;XML文件的名字切勿修改。 2、添加引用时XML文件要与DLL文件在同一个目录下。 3、如果要是添加引用的时候XML不在相同目录下&#xff0c;之后又将XML文件复制到相同的目录下&#xff0c;需要删除引用&am…