Spark Structured Streaming使用教程

文章目录

    • 1、输入数据源
    • 2、输出模式
    • 3、sink输出结果
    • 4、时间窗口
      • 4.1、时间窗口
      • 4.2、时间水印(Watermarking)
    • 5、使用例子

Structured Streaming是一个基于Spark SQL引擎的可扩展和容错流处理引擎,Spark SQL引擎将负责增量和连续地运行它,并在流数据继续到达时更新最终结果。
Structured Streaming把持续不断的流式数据当做一个不断追加的表,这使得新的流处理模型与批处理模型非常相似。您将把流计算表示为在静态表上的标准批处理查询,Spark将其作为无界输入表上的增量查询运行。

1、输入数据源

  • File source - 以数据流的形式读取写入目录中的文件。文件将按照文件修改时间的先后顺序进行处理。如果设置了latestFirst,则顺序将相反。支持的文件格式为text, CSV, JSON, ORC, Parquet。请参阅DataStreamReader接口的文档,了解最新的列表,以及每种文件格式支持的选项。注意,监视目录的文件改变,只能是原子性的改变,比如把文件放入该目录,而不是持续写入该目录中的某个文件。
  • Kafka source - 从Kafka读取数据。它兼容Kafka代理版本0.10.0或更高版本。查看Kafka集成指南了解更多细节。
  • Socket source (用于测试) - 从套接字连接读取UTF8文本数据。监听服务器套接字位于驱动程序。请注意,这应该仅用于测试,因为它不提供端到端的容错保证。
  • Rate source (用于测试) - 以每秒指定的行数生成数据,每个输出行包含一个时间戳和值。其中timestamp为包含消息发送时间的timestamp类型,value为包含消息计数的Long类型,从0开始作为第一行。该源代码用于测试和基准测试。

2、输出模式

  • 我们可以定义每次结果表中的数据更新时,以何种方式,将哪些数据写入外部存储。有3种模式:
  • complete mode:所有数据都会被写入外部存储。具体如何写入,是根据不同的外部存储自身来决定的。
  • append mode:只有新的数据,以追加的方式写入外部存储。只有当我们确定,result table中已有的数据是肯定不会被改变时,才应该使用append mode。
  • update mode:只有被更新的数据,包括增加的和修改的,会被写入外部存储中。
aggDF.writeStream().outputMode("complete").format("console").start();

3、sink输出结果

  • File sink - 将输出存储到一个目录。
    输出模式支持Append
writeStream.format("parquet")        // can be "orc", "json", "csv", etc..option("path", "path/to/destination/dir").start()
  • Kafka sink - 将输出存储到Kafka中的一个或多个主题。
    输出模式支持Append, Update, Complete
writeStream.format("kafka").option("kafka.bootstrap.servers", "host1:port1,host2:port2").option("topic", "updates").start()
  • Console sink (用于测试) - 每次有触发器时,将输出打印到控制台/标准输出。支持两种输出模式:Append和Complete。这应该用于在低数据量上进行调试,因为在每次触发后都会收集整个输出并将其存储在驱动程序的内存中。
    输出模式支持Append, Update, Complete
writeStream.format("console").start()
  • Memory sink (用于测试) - 输出以内存表的形式存储在内存中。支持两种输出模式:Append和Complete。这应该用于低数据量的调试目的,因为整个输出被收集并存储在驱动程序的内存中。因此,请谨慎使用。
    输出模式支持Append, Complete
输出以内存表的形式存储在内存中。支持两种输出模式:Append和Complete。这应该用于低数据量的调试目的,因为整个输出被收集并存储在驱动程序的内存中。因此,请谨慎使用。
  • 自定义输出Foreach和ForeachBatch - Foreach:针对每条数据的输出;ForeachBatch:针对每批次的数据输出。
    输出模式支持Append, Update, Complete
// Foreach
streamingDatasetOfString.writeStream().foreach(new ForeachWriter<String>() {@Override public boolean open(long partitionId, long version) {// Open connection}@Override public void process(String record) {// Write string to connection}@Override public void close(Throwable errorOrNull) {// Close the connection}}
).start();// ForeachBatch
streamingDatasetOfString.writeStream().foreachBatch(new VoidFunction2<Dataset<String>, Long>() {public void call(Dataset<String> dataset, Long batchId) {// Transform and write batchDF}    }
).start();

4、时间窗口

4.1、时间窗口

在业务场景中,经常会遇到按时间段进行聚合操作,Spark提供了基于滑动窗口的事件时间集合操作,每个时间段作为一个分组,并对每个组内的每行数据进行聚合操作。
在这里插入图片描述

可以使用groupBy()和window()操作来表示窗口聚合。

Dataset<Row> words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }// Group the data by window and word and compute the count of each group
Dataset<Row> windowedCounts = words.groupBy(functions.window(words.col("timestamp"), "10 minutes", "5 minutes"),words.col("word")
).count();

4.2、时间水印(Watermarking)

WaterMarking的作用主要是为了解决:延迟到达的数据是否丢弃,系统可以删除过期的数据。
在这里插入图片描述

Dataset<Row> words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }// Group the data by window and word and compute the count of each group
Dataset<Row> windowedCounts = words.withWatermark("timestamp", "10 minutes") // 延迟10分钟后到达的数据将会被丢弃.groupBy(window(col("timestamp"), "10 minutes", "5 minutes"),col("word")).count();

5、使用例子

package com.penngo.spark;import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.*;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.roaringbitmap.art.Art;import java.io.Serializable;
import java.sql.Timestamp;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.functions.window;public class SparkStructStream {private static final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");public static class DataTxt implements Serializable {private String text;private Timestamp time;public DataTxt(String text, LocalDateTime time) {this.text = text;this.time = Timestamp.valueOf(time);}public String getText() {return text;}public void setText(String text) {this.text = text;}public Timestamp getTime() {return time;}public void setTime(Timestamp time) {this.time = time;}}public static void socket(SparkSession spark) throws Exception{// 运行:nc -lk 9999Dataset<Row> lines = spark.readStream().format("socket").option("host", "localhost").option("port", 9999).load();Dataset<DataTxt> words = lines.as(Encoders.STRING()).map((MapFunction<String, DataTxt>) x -> {String[] strs = x.split(",");LocalDateTime date = LocalDateTime.parse(strs[1],formatter);Arrays.asList(x.split(",")).iterator();DataTxt data = new DataTxt(strs[0], date);return data;}, Encoders.bean(DataTxt.class));Dataset<Row> wordCounts = words.toDF().withWatermark("time", "10 minutes") // 延迟10分钟后到达的数据将会被丢弃.groupBy(window(col("time"), "10 minutes", "5 minutes"),col("text")).count();wordCounts.writeStream().outputMode("append").foreach(new ForeachWriter<Row>() {@Override public boolean open(long partitionId, long version) {
//                        System.out.println("open==========partitionId:" + partitionId + ",version:" + version);return true;}@Override public void process(Row record) {// Write string to connectionSystem.out.println("recordxxxxxxxxxxxxxxxxxx:======" + record);}@Override public void close(Throwable errorOrNull) {// Close the connection
//                        System.out.println("close==========errorOrNull:" + errorOrNull);}})
//                .format("console").start().awaitTermination();}public static void kafka(SparkSession spark) throws Exception{// Subscribe to 1 topicDataset<Row> df = spark.readStream().format("kafka").option("kafka.bootstrap.servers", "192.168.245.1:9092").option("subscribe", "topic-news").option("startingOffsets","latest").option("maxOffsetsPerTrigger",1000).load();df = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");df.printSchema();df.writeStream().outputMode("append").format("console").start().awaitTermination();}public static void main(String[] args) throws Exception{Logger.getLogger("org.apache.spark").setLevel(Level.WARN);Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF);Logger.getLogger("org.apache.kafka").setLevel(Level.WARN);System.setProperty("hadoop.home.dir", "/usr/local/hadoop-3.3.6");System.setProperty("HADOOP_USER_NAME", "root");SparkSession spark = SparkSession.builder().appName("SparkStructStream").master("local[*]").getOrCreate();//        socket(spark);kafka(spark);}
}

参考自官方文档:https://spark.apache.org/docs/3.1.2/structured-streaming-programming-guide.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/205498.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Spring Security 自定义异常失效?从源码分析到解决方案

&#x1f680; 作者主页&#xff1a; 有来技术 &#x1f525; 开源项目&#xff1a; youlai-mall &#x1f343; vue3-element-admin &#x1f343; youlai-boot &#x1f33a; 仓库主页&#xff1a; Gitee &#x1f4ab; Github &#x1f4ab; GitCode &#x1f496; 欢迎点赞…

使用阿里巴巴同步工具DataX实现Mysql与ElasticSearch(ES)数据同步

一、Linux环境要求 二、准备工作 2.1 Linux安装jdk 2.2 linux安装python 2.3 下载DataX&#xff1a; 三、DataX压缩包导入&#xff0c;解压缩 四、编写同步Job 五、执行Job 六、定时更新 6.1 创建定时任务 6.2 提交定时任务 6.3 查看定时任务 七、增量更新思路 一、Linux环境要…

微信小程序js数组对象根据某个字段排序

一、排序栗子 注: 属性字段需要进行转换,如String类型或者Number类型 //升序排序 首元素(element1)在前 降序则(element1)元素在后 data data.sort((element1, element2) >element1.属性 - element2.属性 ); 二、代码 Page({/*** 页面的初始数据*/data: {user:…

SpringSecurity安全授权

目录 前言 正文 1.基本流程 2.基本用法 3.配置项 4.HttpSecurity 方式和内存认证方式 5.认证流程 6.基于数据库查询的登录验证 7.多种角色权限认证 8.自定义权限认证 总结 前言 安全对于任何系统来说都是非常重要的&#xff0c;权限的分配和管理一直都是开发者需…

C语言——输出菱形

法一&#xff1a; #include<stdio.h> #define N 7 //假设输出7层菱形 int main(){int i;//i控制第几行 int j;//j控制每一行空格的循环个数 int k;//k控制每一行*的循环次数 for(i1;i<4;i){//将图形分为两部分,前四行(第一部分) for(j1;j<4-i;j){//输出第i行的…

echarts双折线图

引用 //反应时长 durationCharts categoryCommonChart(studyBehavior.durationCharts, durationCharts) function categoryCommonChart(odata, dom){var myChart echarts.init(document.getElementById(dom));let oarr []oarr odata.series.map(function(item){let color…

随笔-这都是命吗

我与鹏哥、小付有个小群&#xff0c;前几天&#xff0c;鹏哥在群里发了一个图&#xff0c;是他那个城市准备扶持的高新产业&#xff0c;有元宇宙、量子信息、生物制药、人工智能什么的。 先前的时候鹏哥给我说过&#xff0c;当地准备了六百多亩地&#xff0c;准备发展高新产业…

Linux-进程之间的通信

目录 ​编辑 一.什么是进程之间的通信 二.进程之间的通信所访问的数据 三.进程之间的通信是如何做到的 四.基于内存文件级别的通信方式——管道 1.什么是管道 2.管道的建立过程——匿名管道 a.什么是匿名管道 b.匿名管道特点&#xff1a; c.使用匿名管道的…

风格迁移网络修改流程(自用版)

一. AdaAttN-Revisit Attention Mechanism in Arbitrary Neural Style Transfer&#xff08;ICCV2021&#xff09; 下载vgg_normalised.pth打开visdom python -m visdom.server在 train_adaattn.sh 中配置 content_path、style_path 和 image_encoder_path&#xff0c;分别表…

固态硬盘速度测试:硬盘实际性能是否符合标准?

在进行固态硬盘速度测试之前我们先来了解一下固态硬盘的读写速度是什么。固态硬盘的读写速度主要分为顺序读写和随机读写&#xff08;4K&#xff09;。 ​顺序读写&#xff1a;指的是硬盘在读写连贯、集中大文件时候的速度。比如在读取、拷贝单个视频文件时&#xff0c;就是硬盘…

【项目问题解决】IDEA2020.3 使用 lombok 插件 java: 找不到符号 符号: 方法 builder()

目录 lombok找不到符号问题修改 1.问题描述2.问题原因3.解决思路4.解决方案5.总结6.参考 文章所属专区 项目问题解决 1.问题描述 IDEA2020.3 使用 lombok 插件 java: 找不到符号 符号: 方法 builder()&#xff0c;无法使用lombok下应有的注解&#xff0c;一度怀疑是版本问题 …

使用Inno Setup 打包程序文件 怎么把其中一个文件安装时复制到指定系统文件夹

环境: Inno Setup 6.6 Win10 专业版 问题描述: 使用Inno Setup 打包程序文件 怎么把其中一个文件安装时复制到指定系统文件夹 将文件api-ms-win-shcore-scaling-l1-1-1.dll复制到system32里面 解决方案: 1.由于安全和权限的限制,直接在Inno Setup脚本中复制文件到C:\…

C++新经典模板与泛型编程:用成员函数重载实现std::is_class

用成员函数重载实现is_class std::is_class功能&#xff0c;是一个C11标准中用于判断某个类型是否为一个类类型&#xff08;但不是联合类型&#xff09;的类模板。当时在讲解的时候并没有涉及std::is_class的实现代码&#xff0c;在这里实现一下。简单地书写一个IsClass类模板…

python pydoc生成API文档

pydoc是python内置的一个文档生成模块。 pydoc 模块会根据 Python 模块来自动生成文档。 生成的文档可在控制台中显示为文本页面&#xff0c;提供给 Web 浏览器访问或者保存为 HTML 文件。 对于模块、类、函数和方法&#xff0c;显示的文档内容取自文档字符串&#xff08;即 _…

泰凌微(Telink)8258配置串口收发自定义数据

在官网下载SDK后&#xff08;以Mesh SDK为例&#xff09;使用Eclipse打开&#xff0c;对应MCU的配置文件在app_config_8258.h&#xff0c;默认的HCI接口是HCI_USE_NONE&#xff0c;如果改成HCI_USE_UART后可以通过串口收发数据&#xff0c;此时默认接收函数处理的是以Telink的协…

音视频学习(二十)——rtsp收流(udp方式)

前言 本文主要介绍通过udp方式实现rtsp拉流。 流程图 流程说明&#xff1a; 相较于tcp方式“信令数据”复用同一连接拉流&#xff0c;udp方式拉流“信令数据”采用不同的连接&#xff0c;信令传输采用tcp&#xff0c;流数据传输采用udp&#xff1b;客户端向服务端&#xff0…

数据库增删改查(CRUD)进阶版

目录 数据库约束 约束类型 表的设计 1.一对一 2.一对多 3.多对多 增删查改进阶操作 1. 插入查询结果 2.查询 聚合查询 聚合函数 group by having 联合查询 内连接 外连接 自连接 子查询 合并查询 数据库约束 创建表的时候制定的一些规则&#xff0c;在后续…

智能优化算法应用:基于北方苍鹰算法无线传感器网络(WSN)覆盖优化 - 附代码

智能优化算法应用&#xff1a;基于北方苍鹰算法无线传感器网络(WSN)覆盖优化 - 附代码 文章目录 智能优化算法应用&#xff1a;基于北方苍鹰算法无线传感器网络(WSN)覆盖优化 - 附代码1.无线传感网络节点模型2.覆盖数学模型及分析3.北方苍鹰算法4.实验参数设定5.算法结果6.参考…

生成式人工智能笔记-AIGC笔记

生成式人工智能笔记-AIGC笔记 十多年前&#xff0c;人工智能还只是一个不被人看好的小众领域&#xff0c;但是现在&#xff0c;它却已经成了街头巷尾的热点谈资&#xff0c;几乎任何事情都可以和人工智能联系在一起。 人工智能包括基础层、技术层和应用层。 基础层是人工智能…

收藏!当今最流行的10 种人工智能算法

人工智能的概念始于1956年的达特茅斯会议&#xff0c;由于受到数据、计算力、智能算法等多方面因素的影响&#xff0c;人工智能技术和应用发展经历了多次高潮和低谷。 2022年以来&#xff0c;以ChatGPT为代表的大模型一夜爆火&#xff0c;它能够基于在预训练阶段所见的模式和统…