Spark实时(四):Strctured Streaming简单应用

文章目录

Strctured Streaming简单应用

一、Output Modes输出模式

二、Streaming Table API

三、​​​​​​​​​​​​​​Triggers

1、​​​​​​​unspecified(默认模式)

2、​​​​​​​​​​​​​​Fixed interval micro-batches(固定间隔批次)

3、 ​​​​​​​​​​​​​​One-time micro-batch (仅一次触发)

4、​​​​​​​​​​​​​​Continuous with fixed checkpoint interval(连续处理)


Strctured Streaming简单应用

一、Output Modes输出模式

Structured Streaming中结果输出时outputMode可以设置三种模式,三种默认区别如下:

  • Append Mode(默认模式):追加模式,只有自上次触发后追加到结果表中的新行才会被输出。只有select、where、map、flatmap、filter、join查询支持追加模式。
  • Complete Mode(完整模式):将整个更新的结果输出。仅可用于代码中有聚合查询情况,代码中没有聚合查询不能使用。
  • Update Mode(更新模式):自Spark2.1.1版本后可用,只有自上次触发后更新的行才会被输出。这种模式仅仅输出自上次触发以来发生更改的行。如果结果数据没有聚合操作那么相当于Append Mode。

二、​​​​​​​​​​​​​​Streaming Table API

在Spark3.1版本之后,我们可以通过DataStreamReader.table()方式实时读取流式表中的数据,使用DataStreamWriter.toTable()向表中实时写数据。

案例:读取Socket数据实时写入到Spark流表中,然后读取流表数据展示数据。

代码示例如下:

package com.lanson.structuredStreamingimport org.apache.spark.sql.streaming.StreamingQuery
import org.apache.spark.sql.{DataFrame, SparkSession}object StreamTableAPI {def main(args: Array[String]): Unit = {//1.创建对象val spark: SparkSession = SparkSession.builder().master("local").appName("StreamTableAPI").config("spark.sql.shuffle.partitions", 1).config("spark.sql.warehouse.dir", "./my-spark-warehouse").getOrCreate()spark.sparkContext.setLogLevel("Error");import spark.implicits._//2.读取socket数据,注册流表val df: DataFrame = spark.readStream.format("socket").option("host", "node3").option("port", 9999).load()//3.对df进行转换val personinfo: DataFrame = df.as[String].map(line => {val arr: Array[String] = line.split(",")(arr(0).toInt, arr(1), arr(2).toInt)}).toDF("id", "name", "age")//4.将以上personinfo 写入到流表中personinfo.writeStream.option("checkpointLocation","./checkpoint/dir1").toTable("mytbl")import org.apache.spark.sql.functions._//5.读取mytbl 流表中的数据val query: StreamingQuery = spark.readStream.table("mytbl").withColumn("new_age", col("age").plus(6)).select("id", "name", "age", "new_age").writeStream.format("console").start()query.awaitTermination()}
}

以上代码编写完成后启动,向控制台输入以下数据:

1,zs,18
2,ls,19
3,ww,20
4,ml,21
5,tq,22

结果输入如下:

注意:以上代码执行时Spark中写出的表由Spark 参数”spark.sql.warehouse.dir”指定的路径临时维护数据,每次执行时,需要将该路径下的表数据清空。

三、​​​​​​​​​​​​​​Triggers

Structured Streaming Triggers 决定了流式数据被处理时是微批处理还是连续实时处理,以下是支持的Triggers:

实时处理,以下是支持的Triggers:

Trigger Type

描述

Unspecified(默认)

  • 代码使用:Trigger.ProcessingTime(0L)。
  • 代码中没有明确指定触发类型则查询默认以微批模式执行,表示尽可能快的执行查询。

Fixed interval micro-batches(固定间隔批次)

  • 代码使用:Trigger.ProcessingTime(long interval,TimeUnit timeUnit)
  • 查询将以微批模式处理,批次间隔根据用户指定的时间间隔决定
  1. 如果前一个微批处理时间在时间间隔内完成,则会等待间隔时间完成后再开始下一个微批处理
  2. 如果前一个微批处理时间超过了时间间隔,那么下一个微批处理将在前一个微批处理完成后立即开始。
  3. 如果没有新数据可用,则不会启动微批处理。

One-time micro-batch(仅一次性触发)

  • 代码使用:Trigger.Once()
  • 只执行一个微批次查询所有可用数据,然后自动停止,适用于一次性作业。

Continuous with fixed checkpoint interval(以固定checkpoint interval连续处理(实验阶段))

  • 代码使用:Trigger.Continuous(long interval,TimeUnit timeUnit)
  • 以固定的Checkpoint间隔(interval)连续处理。在这种模式下,连续处理引擎将每隔一定的间隔(interval)做一次checkpoint,可获得低至1ms的延迟。

下面以读取Socket数据为例,Scala代码演示各个模式

1、​​​​​​​unspecified(默认模式)

代码如下:

//3.默认微批模式执行查询,尽快将结果写出到控制台
val query: StreamingQuery = frame.writeStream.format("console").start()query.awaitTermination()

2、​​​​​​​​​​​​​​Fixed interval micro-batches(固定间隔批次)

代码如下:

//3.用户指定固定间隔批次触发查询val query: StreamingQuery = frame.writeStream.format("console").trigger(Trigger.ProcessingTime("5 seconds"))
//      .trigger(Trigger.ProcessingTime(5,TimeUnit.SECONDS).start()query.awaitTermination()

注意:这种固定间隔批次指的是第一批次处理完成,等待间隔时间,然后处理第二批次数据,依次类推。

3、 ​​​​​​​​​​​​​​One-time micro-batch (仅一次触发)

代码如下:

//4.仅一次触发执行
val query: StreamingQuery = frame.writeStream.format("console").trigger(Trigger.Once()).start()
query.awaitTermination()

4、​​​​​​​​​​​​​​Continuous with fixed checkpoint interval(连续处理)

Continuous不再是周期性启动task的批量执行数,而是启动长期运行的task,而是不断一个一个数据进行处理,周期性的通过指定checkpoint来记录状态(如果不指定checkpoint目录,会将状态记录在Temp目录下),保证exactly-once语义,这样就可以实现低延迟。详细内容可以参照后续“Continuous处理”章节。

代码如下:

//3.Continuous 连续触发执行
val query: StreamingQuery = frame.writeStream.format("console")//每10ms 记录一次状态,而不是执行一次.trigger(Trigger.Continuous(10,TimeUnit.MILLISECONDS)).option("checkpointLocation","./checkpint/dir4").start()
query.awaitTermination()

  • 📢博客主页:https://lansonli.blog.csdn.net
  • 📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正!
  • 📢本文由 Lansonli 原创,首发于 CSDN博客🙉
  • 📢停下休息的时候不要忘了别人还在奔跑,希望大家抓紧时间学习,全力奔赴更美好的生活✨

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/49683.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

利用Java调用人脸身份证比对接口

一、什么是人脸身份证比对接口? 人脸身份证比对接口是一种特定的 API 接口服务,主要用于将提供的人脸图片和对应的身份证照片/号码进行比对,以此验证其身份。 这种接口的功能基于复杂的人脸识别技术,一般通过使用人工智能和深度学…

C语言刷题小记2

前言 本篇博客还是为大家分享一些C语言的OJ题目,如果你感兴趣,希望大佬一键三连。多多支持。下面进入正文部分。 题目1竞选社长 分析:本题要求我们输入一串字符,并且统计个数的多少,那么我们可以通过getchar函数来获…

软件开发者消除edge浏览器下载时“此应用不安全”的拦截方法

当Microsoft Edge浏览器显示“此应用不安全”或者“已阻止此不安全的下载”这类警告时,通常是因为Windows Defender SmartScreen或者其他安全功能认为下载的文件可能存在安全风险。对于软件开发者来说,大概率是由于软件没有进行数字签名,导致…

【React】useState:状态更新规则详解

文章目录 一、基本用法二、直接修改状态 vs 使用 setState 更新状态三、对象状态的更新四、深层次对象的更新五、函数式更新六、优化性能的建议 在 React 中,useState 是一个非常重要的 Hook,用于在函数组件中添加状态管理功能。正确理解和使用 useState…

未来的智能交通系统:智能合约在交通管理中的应用前景

随着城市化进程的加快和交通问题日益突出,智能交通系统成为了解决城市交通拥堵和安全问题的重要手段。本文将探讨智能合约在未来智能交通系统中的应用前景,分析其在交通管理中的潜力和优势。 什么是智能交通系统? 智能交通系统利用先进的信息…

MySQL之索引及简单运用

索引: 什么是索引 索引是数据库中一种非常重要的数据结构,用于帮助快速查询数据库表中的数据。它就像一本书的目录,能够让你快速定位到书中的某个具体章节或内容,而不需要一页一页地翻阅整本书。 在数据库管理系统中,…

Linux下如何设置系统定时任务

在Linux系统中,用户可以使用cron工具来设置定时任务。cron是一个守护进程,用于在指定的时间间隔执行指定的命令或脚本。下面是在Linux系统中设置系统定时任务的步骤。 使用crontab命令编辑定时任务列表: crontab -e该命令会打开一个文本编辑…

设计模式 之 —— 单例模式

目录 什么是单例模式? 定义 单例模式的主要特点 单例模式的几种设计模式 1.懒汉式:线程不安全 2.懒汉式:线程安全 3.饿汉式 4.双重校验锁 单例模式的优缺点 优点: 缺点: 适用场景: 什么是单例模…

Highcharts 饼图:数据可视化的魅力

Highcharts 饼图:数据可视化的魅力 引言 在数据可视化的世界中,饼图作为一种经典且直观的图表类型,被广泛应用于各种领域。Highcharts,作为一个功能强大且易于使用的JavaScript图表库,为我们提供了创建精美饼图的便捷途径。本文将深入探讨Highcharts饼图的特点、应用场景…

VBA实例-从Excel整理数据到Word

实现目录 功能需求数据结构复制数据到新sheet并分类数据添加序号、日期、时间三列数据添加序号列添加时间列 将名称和类别复制到word文件中将参数5和参数9中的一个复制到word文件中 实例 功能需求 1、将原始数据中不要的数据剔除 2、原始数据中增加序号、日期和时间三列数据&a…

图片上传成功却无法显示:静态资源路径配置问题解析

1、故事的背景 最近,有个学弟做了一个简单的后台管理页面。于是他开始巴拉巴拉撘框架,写代码,一顿操作猛如虎,终于将一个简单的壳子搭建完毕。但是在实现功能:点击头像弹出上传图片进行头像替换的时候,卡壳…

三星Unpacked发布会即将举行:有新款折叠屏手机,还有智能戒指

随着7月的脚步渐近,科技界的目光再次聚焦于三星,它即将在法国巴黎举办今年的第二场Unpacked发布会。这不仅是一场新品的展示,更是三星对创新科技的一次深刻诠释。 从Galaxy Z Fold 6的全新设计,到Galaxy Z Flip 6的显著升级&…

CSS实现表格无限轮播

<div className{styles.tableTh}><div className{styles.thItem} style{{ width: 40% }}>报警名称</div><div className{styles.thItem} style{{ width: 35% }}>开始时间</div><div className{styles.thItem} style{{ width: 25% }}>状态&…

C++学习笔记-基类、派生类与虚函数关系

在C的面向对象编程中&#xff0c;基类&#xff08;Base Class&#xff09;、派生类&#xff08;Derived Class&#xff09;以及虚函数&#xff08;Virtual Functions&#xff09;构成了多态性的基石。这三者之间的关系错综复杂而又紧密相连&#xff0c;它们共同支撑起C中复杂而…

《后端程序猿 · @Value 注释说明》

&#x1f4e2; 大家好&#xff0c;我是 【战神刘玉栋】&#xff0c;有10多年的研发经验&#xff0c;致力于前后端技术栈的知识沉淀和传播。 &#x1f497; &#x1f33b; CSDN入驻不久&#xff0c;希望大家多多支持&#xff0c;后续会继续提升文章质量&#xff0c;绝不滥竽充数…

定制ESXi 8镜像教程

本文将详细说明从安装 PowerCLI 到定制 ESXi 8 镜像的整个过程。 安装 VMware PowerCLI 1. 打开 PowerShell 打开 PowerShell&#xff1a; 在 Windows 搜索栏中输入 PowerShell&#xff0c;然后右键点击“Windows PowerShell”&#xff0c;选择“以管理员身份运行”。你会看…

【Unity PC端打包exe封装一个并添加安装引导】

Unity PC端打包exe封装一个并添加安装引导 比特虫在线制作ico图标ico图标转换工具 选中打包出来的所有文件和ico图标 右键 使用RAR软件 添加到压缩文件 两个名称要相同 设置完点击确认等待压缩完成 然后就可以使用 Smart Install Maker制作引导安装程序了

过滤出List集合的元素是Person对象,过滤出每个元素非null的name字段得到String类型的集合

import java.util.Arrays; import java.util.List; import java.util.stream.Collectors; public class Main { public static void main(String[] args) { List people Arrays.asList( new Person(“Alice”, 30), new Person(null, 25), new Person(“Charlie”, 35) ); //…

Thinkphp5实现前后端通过接口通讯基本操作方法

在ThinkPHP5框架中&#xff0c;实现前后端通过接口通讯是一个常见的需求&#xff0c;尤其是在开发RESTful API时。下面是一个基本的步骤指南&#xff0c;用于设置ThinkPHP5来创建API接口&#xff0c;并使前端能够通过HTTP请求与后端进行通讯。 1. 创建API模块 首先&#xff0…

解决Pycharm找不到conda可执行文件

解决&#xff1a; 在 ‘Conda 可执行文件’ 的输入框里面&#xff0c;找到并选中 anaconda\library\bin 路径下的“ conda.bat ” ,再点击‘ 加载环境 ’&#xff0c;即可出现 ‘ 使用现有环境 ’ 的输入框&#xff0c;如图所示。