Spark实时(五):InputSource数据源案例演示

文章目录

InputSource数据源案例演示

一、​​​​​​​File Source

1、读取text文件

2、读取csv文件

3、读取json文件

二、Socket Source 

三、Rate Source


InputSource数据源案例演示

在Spark2.0版本之后,DataFrame和Dataset可以表示静态有边界的数据,也可以表示无边界的流式数据。在Structured Streaming中我们可以使用SparkSession针对流式数据源创建对应的Dataset或者DataFrame,并可以像处理批数据一样使用各种Operators操作处理流式数据。

Structured Streaming的数据源目前支持File Source 、Socket Source 、Rate Source、Kafka Source ,与Kafka的整合在后续整理,这里对其他三种数据源分别演示。

一、​​​​​​​​​​​​​​File Source

Sturctured Streaming可以读取写入目录的文件作为数据流,文件将按照文件修改时间的顺序进行处理,文件必须原子性的存入到监控目录中,支持的格式有text、csv、json、orc、parquet。

1、读取text文件

Scala代码如下:

package com.lanson.structuredStreaming.sourceimport org.apache.spark.sql.streaming.StreamingQuery
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}/***  Structured Streaming监控目录 text格式数据*/
object SSReadTextData {def main(args: Array[String]): Unit = {//1.创建对象val spark: SparkSession = SparkSession.builder().master("local").appName("SSReadTextData").config("spark.sql.shuffle.partitions", 1).getOrCreate()import  spark.implicits._spark.sparkContext.setLogLevel("Error")//2.监控目录val ds: Dataset[String] = spark.readStream.textFile("./data/")val result: DataFrame = ds.map(line => {val arr: Array[String] = line.split("-")(arr(0).toInt, arr(1), arr(2).toInt)}).toDF("id", "name", "age")val query: StreamingQuery = result.writeStream.format("console").start()query.awaitTermination()}}

 结果:

Java代码如下:

package com.lanson.structuredStreaming.source;import java.util.concurrent.TimeoutException;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.StreamingQueryException;
import scala.Tuple3;public class SSReadTextData01 {public static void main(String[] args) throws TimeoutException, StreamingQueryException {//1.创建对象SparkSession spark = SparkSession.builder().master("local").appName("SSReadSocketData01").config("spark.sql.shuffle.partitions", 1).getOrCreate();spark.sparkContext().setLogLevel("Error");Dataset<String> ds = spark.readStream().textFile("./data/");Dataset<Tuple3<Integer, String, Integer>> ds2 = ds.map(new MapFunction<String, Tuple3<Integer, String, Integer>>() {@Overridepublic Tuple3<Integer, String, Integer> call(String line) throws Exception {String[] arr = line.split("-");return new Tuple3<>(Integer.valueOf(arr[0]), arr[1],Integer.valueOf(arr[2]) );}}, Encoders.tuple(Encoders.INT(), Encoders.STRING(), Encoders.INT()));Dataset<Row> result = ds2.toDF("id", "name", "age");result.writeStream().format("console").start().awaitTermination();}
}

 结果:

以上代码编写完成之后,向监控的目录“./data”中不断写入含有以下内容的文件,可以看到控制台有对应的流数据输出,这里一定是原子性的将文件复制到对应目录下。文件内容如下:

1-zhangsan-18
2-lisi-19
3-ww-20

2、读取csv文件

Scala代码如下:

package com.lanson.structuredStreaming.sourceimport org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
import org.apache.spark.sql.streaming.StreamingQuery
import org.apache.spark.sql.types.StructType/*** Structured Streaming 读取CSV数据*/
object SSReadCsvData {def main(args: Array[String]): Unit = {//1.创建对象val spark: SparkSession = SparkSession.builder().master("local").appName("SSReadCsvData").config("spark.sql.shuffle.partitions", 1).getOrCreate()import  spark.implicits._spark.sparkContext.setLogLevel("Error")//2.创建CSV数据schemaval userSchema: StructType = new StructType().add("id", "integer").add("name", "string").add("gender", "string").add("age", "integer")val result: DataFrame = spark.readStream.option("sep", ",").schema(userSchema).csv("./data/")val query: StreamingQuery = result.writeStream.format("console").start()query.awaitTermination()}}

结果:

Java代码如下

package com.lanson.structuredStreaming.source;import java.util.concurrent.TimeoutException;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.StreamingQueryException;
import org.apache.spark.sql.types.StructType;/*** Structured Streaming 读取CSV数据*/public class SSReadCsvData01 {public static void main(String[] args) throws TimeoutException, StreamingQueryException {//1.创建对象SparkSession spark = SparkSession.builder().master("local").appName("SSReadCsvData").config("spark.sql.shuffle.partitions", 1).getOrCreate();spark.sparkContext().setLogLevel("Error");StructType userSchema = new StructType().add("id", "integer").add("name", "string").add("gender", "string").add("age", "integer");Dataset<Row> result = spark.readStream().option("sep", ",").schema(userSchema).csv("./data/");result.writeStream().format("console").start().awaitTermination();}
}

 结果:

以上代码运行之后向对应监控的目录下原子性写入含有数据的csv文件,在控制台可以看到实时监控内容。文件内容如下:

1,zhangsan,一班,100
2,lisi,二班,200
3,wangwu,一班,300
4,maliu,二班,100
5,tianqi,三班,100
6,gaoba,三班,50
7,zs2,四班,50

3、读取json文件

Scala代码如下:

package com.lanson.structuredStreaming.sourceimport org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.streaming.StreamingQuery
import org.apache.spark.sql.types.StructType/***  Structured Streaming 监控Json格式数据*/
object SSReadJsonData {def main(args: Array[String]): Unit = {//1.创建对象val spark: SparkSession = SparkSession.builder().master("local").appName("SSReadCsvData").config("spark.sql.shuffle.partitions", 1).getOrCreate()import  spark.implicits._spark.sparkContext.setLogLevel("Error")//2.创建 json 数据schemaval userSchema: StructType = new StructType().add("id", "integer").add("name", "string").add("age", "integer")val result: DataFrame = spark.readStream.schema(userSchema).json("./data/")val query: StreamingQuery = result.writeStream.format("console").start()query.awaitTermination()}}

结果:

Java代码如下

package com.lanson.structuredStreaming.source;import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;
import org.apache.spark.sql.types.StructType;
import java.util.concurrent.TimeoutException;/*** Structured Streaming实时监控目录中json文件作为数据流*/
public class SSReadJsonData01 {public static void main(String[] args) throws TimeoutException, StreamingQueryException {//1.创建对象SparkSession spark = SparkSession.builder().appName("File Source test").master("local").getOrCreate();//2.设置日志spark.sparkContext().setLogLevel("Error");//3.设置SchemaStructType userSchema = new StructType().add("id", "integer").add("name", "string").add("age", "integer");//4.指定监控目录读取数据json数据Dataset<Row> ds = spark.readStream().option("sep", ",").schema(userSchema).json("./data/");//5.打印数据到控制台StreamingQuery query =ds.writeStream().format("console").start();query.awaitTermination();}
}

结果:

以上代码启动之后,向监控的目录“./data”下原子写入含有以下内容的json文件,在控制台可以看到实时监控内容。json文件内容如下:

{"id":1,"name":"zs","age":18}
{"id":2,"name":"ls","age":19}
{"id":3,"name":"ww","age":20}
{"id":4,"name":"ml","age":21}

注意:实时监控json格式数据时,创建的Schema 中的字段需要与Json中的属性保持一致,否则在映射成表时,Schema中含有但在Json中没有的属性的字段对应的数据会为null。

二、Socket Source 

读取Socket方式需要指定对应的host和port,读取Socket数据源多用于测试场景,这里不再演示。

可以参考案例:

Spark实时(三):Structured Streaming入门案例-CSDN博客

三、Rate Source

Rate Source是以每秒指定的行数生成数据,每个输出行包含一个timestamp和value,其中timestamp是一个Timestamp含有信息分配的时间类型,value是从0开始的Long类型的数据,Rate Source式多用于测试。

scala代码如下:

package com.lanson.structuredStreaming.sourceimport org.apache.spark.sql.{DataFrame, SparkSession}/*** SSRateSource*/
object SSRateSource {def main(args: Array[String]): Unit = {//1.创建对象val spark: SparkSession = SparkSession.builder().master("local").appName("rate test")
//      .config("spark.sql.shuffle.partitions", 1).getOrCreate()val result: DataFrame = spark.readStream.format("rate")// 配置每秒生成多少行数据,默认1行.option("rowsPerSecond", "10").option("numPartitions", 5).load()result.writeStream.format("console").option("numRows","100").option("truncate","false").start().awaitTermination()}}

结果:

Java代码如下:

package com.lanson.structuredStreaming.source;import java.util.concurrent.TimeoutException;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.StreamingQueryException;public class ssratesource01 {public static void main(String[] args) throws TimeoutException, StreamingQueryException {//1.创建对象SparkSession spark = SparkSession.builder().master("local").appName("rate test").getOrCreate();spark.sparkContext().setLogLevel("Error");Dataset<Row> result = spark.readStream().format("rate")// 配置每秒生成多少行数据,默认1行.option("rowsPerSecond", "10").option("numPartitions", 5).load();result.writeStream().format("console").option("numRows","100").option("truncate","false").start().awaitTermination();}
}

结果: 

 


  • 📢博客主页:https://lansonli.blog.csdn.net
  • 📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正!
  • 📢本文由 Lansonli 原创,首发于 CSDN博客🙉
  • 📢停下休息的时候不要忘了别人还在奔跑,希望大家抓紧时间学习,全力奔赴更美好的生活✨

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/50233.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

移动式气象站:便携科技的天气守望者

在科技日新月异的今天&#xff0c;我们身边的许多设备都在向着更加智能化、便携化的方向发展。而在气象观测领域&#xff0c;移动式气象站的出现&#xff0c;不仅改变了传统气象观测的固有模式&#xff0c;更以其灵活性和实时性&#xff0c;在气象监测、灾害预警等领域发挥着越…

MySQL练习05

题目 步骤 触发器 use mydb16_trigger; #使用数据库create table goods( gid char(8) primary key, name varchar(10), price decimal(8,2), num int);create table orders( oid int primary key auto_increment, gid char(10) not null, name varchar(10), price decima…

基于python的BP神经网络红酒品质分类预测模型

1 导入必要的库 import pandas as pd import numpy as np import matplotlib.pyplot as plt from sklearn.model_selection import train_test_split from sklearn.preprocessing import LabelEncoder from tensorflow.keras.models import Sequential from tenso…

NET8部署Kestrel服务HTTPS深入解读TLS协议之Certificate证书

Certificate证书 Certificate称为数字证书。数字证书是一种证明身份的电子凭证&#xff0c;它包含一个公钥和一些身份信息&#xff0c;用于验证数字签名和加密通信。数字证书在网络通信、电子签名、认证授权等场景中都有广泛应用。其特征如下&#xff1a; 由权威机构颁发&…

跟李沐学AI:池化层

目录 二维最大池化 填充、步幅和多个通道 平均池化层 池化层总结 二维最大池化 返回滑动窗口中的最大值。 图为池化窗口形状为 22 的最大池化层。着色部分是第一个输出元素&#xff0c;以及用于计算这个输出的输入元素: max(0,1,3,4)4。池化层与卷积层类似&#xff0c;不断…

Spark核心知识要点(八)Shuffle配置调优

1、Shuffle优化配置 -spark.shuffle.file.buffer 默认值&#xff1a;32k参数说明&#xff1a;该参数用于设置shuffle write task的BufferedOutputStream的buffer缓冲大小。将数据写到磁盘文件之前&#xff0c;会先写入buffer缓冲中&#xff0c;待缓冲写满之后&#xff0c;才会…

Linux中,MySQL的用户管理

MySQL库中的表及其作用 user表 User表是MySQL中最重要的一个权限表&#xff0c;记录允许连接到服务器的帐号信息&#xff0c;里面的权限是全局级的。 db表和host表 db表和host表是MySQL数据中非常重要的权限表。db表中存储了用户对某个数据库的操作权限&#xff0c;决定用户…

单元测试的最佳实践

整体架构 合适的架构可以提升可测试性。比如菱形对称架构的模块化和解耦特性使得系统各个部分可以独立进行单元测试。这不仅提高了测试的效率&#xff0c;还能够减少测试的依赖性&#xff0c;提高测试准确性。 代码设计 代码设计和可测试性有密切关联。强烈建议一个方法的代码行…

Android 15 适配整理——实践版

背景 谷歌发布Android 15后&#xff0c;国内的手机厂商迅速行动&#xff0c;开始了新系统的适配工作。小米、OPPO、vivo和联想等金标联盟成员联合发布了适配公告&#xff0c;督促APP开发者在2024年8月31日前完成适配工作&#xff0c;否则将面临搜索标签提示、应用降级、分机型…

JavaWeb笔记_JSTL标签库JavaEE三层架构案例

一.JSTL标签库 1.1 JSTL概述 JSTL(jsp standard tag library):JSP标准标签库,它是针对EL表达式一个扩展,通过JSTL标签库与EL表达式结合可以完成更强大的功能 JSTL它是一种标签语言,JSTL不是JSP内置标签 JSTL标签库主要包含: ****核心标签 格式化标签 …

【算法】插入区间

难度&#xff1a;中等 题目&#xff1a; 给你一个 无重叠的 &#xff0c;按照区间起始端点排序的区间列表 intervals&#xff0c;其中 intervals[i] [starti, endi] 表示第 i 个区间的开始和结束&#xff0c;并且 intervals 按照 starti 升序排列。同样给定一个区间 newInte…

leetcode面试题17.最大子矩阵

sooooooo long没刷题了&#xff0c;汗颜 题目链接&#xff1a;leetcode面试题17 1.题目 给定一个正整数、负整数和 0 组成的 N M 矩阵&#xff0c;编写代码找出元素总和最大的子矩阵。 返回一个数组 [r1, c1, r2, c2]&#xff0c;其中 r1, c1 分别代表子矩阵左上角的行号和…

Android平台轻量级RTSP服务模块二次封装版调用说明

技术背景 在前面的blog&#xff0c;我们发布了Android平台轻量级RTSP服务模块的技术对接说明&#xff0c;好多开发者希望&#xff0c;更黑盒的对接轻量级RTSP服务这块&#xff0c;专注于自身业务逻辑。为此&#xff0c;我们针对Android平台轻量级RTSP服务模块&#xff0c;做了…

解析capl文件生成XML Test Module对应的xml工具

之前一直用的CAPL Test Module来写代码&#xff0c;所有的控制都是在MainTest()函数来实现的&#xff0c;但是有一次&#xff0c;代码都写完了&#xff0c;突然需要用xml的这种方式来实现&#xff0c;很突然&#xff0c;之前也没研究过&#xff0c;整理这个xml整的一身汗&#…

【Nacos】微服务注册在不同的group上,支持跨group服务发现,二次开发

Nacos具备微服务注册和发现&#xff0c;以及配置管理的功能&#xff0c;本文详情描述【注册和发现】的原理&#xff0c;以及基本使用&#xff0c;包括如何进行跨group注册和发现。 一、Nacos配置spring:cloud:nacos:discovery:server-addr: nacosIP地址:8849namespace: devgro…

无人机飞行姿态俯仰、横滚、偏航、油门详解

无人机飞行姿态涉及其在空中的空间位置和方向。飞行姿态控制的精确性和稳定性是无人机实现自主飞行和完成任务的关键。无人机的飞行姿态主要通过控制其横滚、俯仰和偏航来实现。以下是对无人机飞行姿态的详细解释&#xff1a; 1. 横滚&#xff08;Roll&#xff09; 定义&…

vue中的nexttrick

Vue.js 是一个用于构建用户界面的渐进式框架&#xff0c;它允许开发者通过声明式的数据绑定来构建网页应用。在 Vue 中&#xff0c;nextTick 是一个非常重要的 API&#xff0c;它用于延迟回调的执行&#xff0c;直到下次 DOM 更新循环之后。 为什么使用 nextTick&#xff1f; …

自定义维度映射:Kylin Cube设计的高级玩法

自定义维度映射&#xff1a;Kylin Cube设计的高级玩法 在数据仓库领域&#xff0c;Apache Kylin以其高性能的分析能力而闻名。Kylin通过构建多维数据立方体&#xff08;Cube&#xff09;来实现对大数据集的快速查询。Cube设计中的维度映射是优化查询性能的关键环节。本文将探讨…

vue3前端开发-小兔鲜项目-登录组件的开发表单验证

vue3前端开发-小兔鲜项目-登录组件的开发表单验证&#xff01;现在开始写登录页面的内容。首先这一次完成基础的首页按钮点击跳转&#xff0c;以及初始化一些简单的表单的输入验证。后期还会继续完善内容。 1&#xff1a;首先还是准备好login页面的组件代码内容。 <script …

企业风险管理的智能监控:Kompas.ai如何预警潜在风险

在企业运营中&#xff0c;风险管理是确保业务连续性和稳健发展的关键环节。随着技术的进步&#xff0c;智能风险管理成为预防损失和识别潜在风险的重要手段。Kompas.ai&#xff0c;作为一款先进的智能监控工具&#xff0c;正通过数据分析和模式识别技术&#xff0c;帮助企业实现…