Spark实时(四):Strctured Streaming简单应用

文章目录

Strctured Streaming简单应用

一、Output Modes输出模式

二、Streaming Table API

三、​​​​​​​​​​​​​​Triggers

1、​​​​​​​unspecified(默认模式)

2、​​​​​​​​​​​​​​Fixed interval micro-batches(固定间隔批次)

3、 ​​​​​​​​​​​​​​One-time micro-batch (仅一次触发)

4、​​​​​​​​​​​​​​Continuous with fixed checkpoint interval(连续处理)


Strctured Streaming简单应用

一、Output Modes输出模式

Structured Streaming中结果输出时outputMode可以设置三种模式,三种默认区别如下:

  • Append Mode(默认模式):追加模式,只有自上次触发后追加到结果表中的新行才会被输出。只有select、where、map、flatmap、filter、join查询支持追加模式。
  • Complete Mode(完整模式):将整个更新的结果输出。仅可用于代码中有聚合查询情况,代码中没有聚合查询不能使用。
  • Update Mode(更新模式):自Spark2.1.1版本后可用,只有自上次触发后更新的行才会被输出。这种模式仅仅输出自上次触发以来发生更改的行。如果结果数据没有聚合操作那么相当于Append Mode。

二、​​​​​​​​​​​​​​Streaming Table API

在Spark3.1版本之后,我们可以通过DataStreamReader.table()方式实时读取流式表中的数据,使用DataStreamWriter.toTable()向表中实时写数据。

案例:读取Socket数据实时写入到Spark流表中,然后读取流表数据展示数据。

代码示例如下:

package com.lanson.structuredStreamingimport org.apache.spark.sql.streaming.StreamingQuery
import org.apache.spark.sql.{DataFrame, SparkSession}object StreamTableAPI {def main(args: Array[String]): Unit = {//1.创建对象val spark: SparkSession = SparkSession.builder().master("local").appName("StreamTableAPI").config("spark.sql.shuffle.partitions", 1).config("spark.sql.warehouse.dir", "./my-spark-warehouse").getOrCreate()spark.sparkContext.setLogLevel("Error");import spark.implicits._//2.读取socket数据,注册流表val df: DataFrame = spark.readStream.format("socket").option("host", "node3").option("port", 9999).load()//3.对df进行转换val personinfo: DataFrame = df.as[String].map(line => {val arr: Array[String] = line.split(",")(arr(0).toInt, arr(1), arr(2).toInt)}).toDF("id", "name", "age")//4.将以上personinfo 写入到流表中personinfo.writeStream.option("checkpointLocation","./checkpoint/dir1").toTable("mytbl")import org.apache.spark.sql.functions._//5.读取mytbl 流表中的数据val query: StreamingQuery = spark.readStream.table("mytbl").withColumn("new_age", col("age").plus(6)).select("id", "name", "age", "new_age").writeStream.format("console").start()query.awaitTermination()}
}

以上代码编写完成后启动,向控制台输入以下数据:

1,zs,18
2,ls,19
3,ww,20
4,ml,21
5,tq,22

结果输入如下:

注意:以上代码执行时Spark中写出的表由Spark 参数”spark.sql.warehouse.dir”指定的路径临时维护数据,每次执行时,需要将该路径下的表数据清空。

三、​​​​​​​​​​​​​​Triggers

Structured Streaming Triggers 决定了流式数据被处理时是微批处理还是连续实时处理,以下是支持的Triggers:

实时处理,以下是支持的Triggers:

Trigger Type

描述

Unspecified(默认)

  • 代码使用:Trigger.ProcessingTime(0L)。
  • 代码中没有明确指定触发类型则查询默认以微批模式执行,表示尽可能快的执行查询。

Fixed interval micro-batches(固定间隔批次)

  • 代码使用:Trigger.ProcessingTime(long interval,TimeUnit timeUnit)
  • 查询将以微批模式处理,批次间隔根据用户指定的时间间隔决定
  1. 如果前一个微批处理时间在时间间隔内完成,则会等待间隔时间完成后再开始下一个微批处理
  2. 如果前一个微批处理时间超过了时间间隔,那么下一个微批处理将在前一个微批处理完成后立即开始。
  3. 如果没有新数据可用,则不会启动微批处理。

One-time micro-batch(仅一次性触发)

  • 代码使用:Trigger.Once()
  • 只执行一个微批次查询所有可用数据,然后自动停止,适用于一次性作业。

Continuous with fixed checkpoint interval(以固定checkpoint interval连续处理(实验阶段))

  • 代码使用:Trigger.Continuous(long interval,TimeUnit timeUnit)
  • 以固定的Checkpoint间隔(interval)连续处理。在这种模式下,连续处理引擎将每隔一定的间隔(interval)做一次checkpoint,可获得低至1ms的延迟。

下面以读取Socket数据为例,Scala代码演示各个模式

1、​​​​​​​unspecified(默认模式)

代码如下:

//3.默认微批模式执行查询,尽快将结果写出到控制台
val query: StreamingQuery = frame.writeStream.format("console").start()query.awaitTermination()

2、​​​​​​​​​​​​​​Fixed interval micro-batches(固定间隔批次)

代码如下:

//3.用户指定固定间隔批次触发查询val query: StreamingQuery = frame.writeStream.format("console").trigger(Trigger.ProcessingTime("5 seconds"))
//      .trigger(Trigger.ProcessingTime(5,TimeUnit.SECONDS).start()query.awaitTermination()

注意:这种固定间隔批次指的是第一批次处理完成,等待间隔时间,然后处理第二批次数据,依次类推。

3、 ​​​​​​​​​​​​​​One-time micro-batch (仅一次触发)

代码如下:

//4.仅一次触发执行
val query: StreamingQuery = frame.writeStream.format("console").trigger(Trigger.Once()).start()
query.awaitTermination()

4、​​​​​​​​​​​​​​Continuous with fixed checkpoint interval(连续处理)

Continuous不再是周期性启动task的批量执行数,而是启动长期运行的task,而是不断一个一个数据进行处理,周期性的通过指定checkpoint来记录状态(如果不指定checkpoint目录,会将状态记录在Temp目录下),保证exactly-once语义,这样就可以实现低延迟。详细内容可以参照后续“Continuous处理”章节。

代码如下:

//3.Continuous 连续触发执行
val query: StreamingQuery = frame.writeStream.format("console")//每10ms 记录一次状态,而不是执行一次.trigger(Trigger.Continuous(10,TimeUnit.MILLISECONDS)).option("checkpointLocation","./checkpint/dir4").start()
query.awaitTermination()

  • 📢博客主页:https://lansonli.blog.csdn.net
  • 📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正!
  • 📢本文由 Lansonli 原创,首发于 CSDN博客🙉
  • 📢停下休息的时候不要忘了别人还在奔跑,希望大家抓紧时间学习,全力奔赴更美好的生活✨

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/49683.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

C语言刷题小记2

前言 本篇博客还是为大家分享一些C语言的OJ题目,如果你感兴趣,希望大佬一键三连。多多支持。下面进入正文部分。 题目1竞选社长 分析:本题要求我们输入一串字符,并且统计个数的多少,那么我们可以通过getchar函数来获…

软件开发者消除edge浏览器下载时“此应用不安全”的拦截方法

当Microsoft Edge浏览器显示“此应用不安全”或者“已阻止此不安全的下载”这类警告时,通常是因为Windows Defender SmartScreen或者其他安全功能认为下载的文件可能存在安全风险。对于软件开发者来说,大概率是由于软件没有进行数字签名,导致…

【React】useState:状态更新规则详解

文章目录 一、基本用法二、直接修改状态 vs 使用 setState 更新状态三、对象状态的更新四、深层次对象的更新五、函数式更新六、优化性能的建议 在 React 中,useState 是一个非常重要的 Hook,用于在函数组件中添加状态管理功能。正确理解和使用 useState…

未来的智能交通系统:智能合约在交通管理中的应用前景

随着城市化进程的加快和交通问题日益突出,智能交通系统成为了解决城市交通拥堵和安全问题的重要手段。本文将探讨智能合约在未来智能交通系统中的应用前景,分析其在交通管理中的潜力和优势。 什么是智能交通系统? 智能交通系统利用先进的信息…

MySQL之索引及简单运用

索引: 什么是索引 索引是数据库中一种非常重要的数据结构,用于帮助快速查询数据库表中的数据。它就像一本书的目录,能够让你快速定位到书中的某个具体章节或内容,而不需要一页一页地翻阅整本书。 在数据库管理系统中,…

设计模式 之 —— 单例模式

目录 什么是单例模式? 定义 单例模式的主要特点 单例模式的几种设计模式 1.懒汉式:线程不安全 2.懒汉式:线程安全 3.饿汉式 4.双重校验锁 单例模式的优缺点 优点: 缺点: 适用场景: 什么是单例模…

VBA实例-从Excel整理数据到Word

实现目录 功能需求数据结构复制数据到新sheet并分类数据添加序号、日期、时间三列数据添加序号列添加时间列 将名称和类别复制到word文件中将参数5和参数9中的一个复制到word文件中 实例 功能需求 1、将原始数据中不要的数据剔除 2、原始数据中增加序号、日期和时间三列数据&a…

图片上传成功却无法显示:静态资源路径配置问题解析

1、故事的背景 最近,有个学弟做了一个简单的后台管理页面。于是他开始巴拉巴拉撘框架,写代码,一顿操作猛如虎,终于将一个简单的壳子搭建完毕。但是在实现功能:点击头像弹出上传图片进行头像替换的时候,卡壳…

三星Unpacked发布会即将举行:有新款折叠屏手机,还有智能戒指

随着7月的脚步渐近,科技界的目光再次聚焦于三星,它即将在法国巴黎举办今年的第二场Unpacked发布会。这不仅是一场新品的展示,更是三星对创新科技的一次深刻诠释。 从Galaxy Z Fold 6的全新设计,到Galaxy Z Flip 6的显著升级&…

CSS实现表格无限轮播

<div className{styles.tableTh}><div className{styles.thItem} style{{ width: 40% }}>报警名称</div><div className{styles.thItem} style{{ width: 35% }}>开始时间</div><div className{styles.thItem} style{{ width: 25% }}>状态&…

《后端程序猿 · @Value 注释说明》

&#x1f4e2; 大家好&#xff0c;我是 【战神刘玉栋】&#xff0c;有10多年的研发经验&#xff0c;致力于前后端技术栈的知识沉淀和传播。 &#x1f497; &#x1f33b; CSDN入驻不久&#xff0c;希望大家多多支持&#xff0c;后续会继续提升文章质量&#xff0c;绝不滥竽充数…

【Unity PC端打包exe封装一个并添加安装引导】

Unity PC端打包exe封装一个并添加安装引导 比特虫在线制作ico图标ico图标转换工具 选中打包出来的所有文件和ico图标 右键 使用RAR软件 添加到压缩文件 两个名称要相同 设置完点击确认等待压缩完成 然后就可以使用 Smart Install Maker制作引导安装程序了

解决Pycharm找不到conda可执行文件

解决&#xff1a; 在 ‘Conda 可执行文件’ 的输入框里面&#xff0c;找到并选中 anaconda\library\bin 路径下的“ conda.bat ” ,再点击‘ 加载环境 ’&#xff0c;即可出现 ‘ 使用现有环境 ’ 的输入框&#xff0c;如图所示。

【实现100个unity特效之8】使用ShaderGraph实现2d贴图中指定部分局部发光效果

最终效果 寒冰法师 火焰法师 文章目录 最终效果寒冰法师火焰法师 素材一、功能分析实现方法基本思路Unity的Bloom后处理为什么关键部位白色&#xff1f;最终结果 二、 新建URP项目三、合并图片四、使用PS制作黑白图片方法一 手动涂鸦方法二 魔棒工具1. 拖入图片进PS&#xff0…

Unity3d打包到Android

本文参考&#xff1a; Unity3D新手教程&#xff1a;如何打包发布到Android_哔哩哔哩_bilibili 一、Unity 打包Android的环境搭建 1、工具安装 Unity Hub已经集成了Android的环境搭建。 选择Add modules 然后安装Android Build Support下的所有工具。 如果各个工具都安装成功…

给Windows系统中注入服务,即windwos守护进程

最近总是在windwos环境下测试nginx&#xff0c;总是需要频繁重启nginx服务。于是考虑有没有可能把nginx加入到系统服务的操作。在网上找了一大堆资料&#xff0c;现在来总结一下&#xff01; 方法1&#xff1a;利用nssm工具实现 这是一个守护进程的软件&#xff0c;可以在win…

Optima: 一个用于 Tapestri 平台的单细胞多组学数据分析的开源 R 包

分子条形码技术的最新进展使得在单细胞水平进行下一代转录组测序成为可能&#xff0c;例如10 Genomics Chromium和DropSeq。此外&#xff0c;CITE-seq 的出现使得可以在对单个细胞进行转录组分析的基础上同时对表面蛋白进行分析。同时&#xff0c;为了表征 DNA 和蛋白质谱&…

ubuntu那些ppa源在哪

Ubuntu中的 PPA 终极指南 - UBUNTU粉丝之家 什么是PPA PPA 代表个人包存档。 PPA 允许应用程序开发人员和 Linux 用户创建自己的存储库来分发软件。 使用 PPA&#xff0c;您可以轻松获取较新的软件版本或官方 Ubuntu 存储库无法提供的软件。 为什么使用PPA&#xff1f; 正如…

添加动态云层

<template> <div class"topbox"> xx卫星管理 </div> <div class"selectbox"> <div class"title"> 卫星列表 </div> <el-table :data"tableData" style"width: 100%;height:230px;" …

亚信安慧AntDB-M负载均衡

负载均衡是分布式系统中常用的技术&#xff0c;主要是将工作任务均衡分布到系统的各个资源点上&#xff0c;可以充分利用系统资源。 AntDB-M分布式内存数据库节点角色可以分为管理节点(MN)、计算节点(CN)和数据节点(DN)三种。管理节点收到客户端连接请求后&#xff0c;会经由负…