Spark实时(五):InputSource数据源案例演示

文章目录

InputSource数据源案例演示

一、​​​​​​​File Source

1、读取text文件

2、读取csv文件

3、读取json文件

二、Socket Source 

三、Rate Source


InputSource数据源案例演示

在Spark2.0版本之后,DataFrame和Dataset可以表示静态有边界的数据,也可以表示无边界的流式数据。在Structured Streaming中我们可以使用SparkSession针对流式数据源创建对应的Dataset或者DataFrame,并可以像处理批数据一样使用各种Operators操作处理流式数据。

Structured Streaming的数据源目前支持File Source 、Socket Source 、Rate Source、Kafka Source ,与Kafka的整合在后续整理,这里对其他三种数据源分别演示。

一、​​​​​​​​​​​​​​File Source

Sturctured Streaming可以读取写入目录的文件作为数据流,文件将按照文件修改时间的顺序进行处理,文件必须原子性的存入到监控目录中,支持的格式有text、csv、json、orc、parquet。

1、读取text文件

Scala代码如下:

package com.lanson.structuredStreaming.sourceimport org.apache.spark.sql.streaming.StreamingQuery
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}/***  Structured Streaming监控目录 text格式数据*/
object SSReadTextData {def main(args: Array[String]): Unit = {//1.创建对象val spark: SparkSession = SparkSession.builder().master("local").appName("SSReadTextData").config("spark.sql.shuffle.partitions", 1).getOrCreate()import  spark.implicits._spark.sparkContext.setLogLevel("Error")//2.监控目录val ds: Dataset[String] = spark.readStream.textFile("./data/")val result: DataFrame = ds.map(line => {val arr: Array[String] = line.split("-")(arr(0).toInt, arr(1), arr(2).toInt)}).toDF("id", "name", "age")val query: StreamingQuery = result.writeStream.format("console").start()query.awaitTermination()}}

 结果:

Java代码如下:

package com.lanson.structuredStreaming.source;import java.util.concurrent.TimeoutException;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.StreamingQueryException;
import scala.Tuple3;public class SSReadTextData01 {public static void main(String[] args) throws TimeoutException, StreamingQueryException {//1.创建对象SparkSession spark = SparkSession.builder().master("local").appName("SSReadSocketData01").config("spark.sql.shuffle.partitions", 1).getOrCreate();spark.sparkContext().setLogLevel("Error");Dataset<String> ds = spark.readStream().textFile("./data/");Dataset<Tuple3<Integer, String, Integer>> ds2 = ds.map(new MapFunction<String, Tuple3<Integer, String, Integer>>() {@Overridepublic Tuple3<Integer, String, Integer> call(String line) throws Exception {String[] arr = line.split("-");return new Tuple3<>(Integer.valueOf(arr[0]), arr[1],Integer.valueOf(arr[2]) );}}, Encoders.tuple(Encoders.INT(), Encoders.STRING(), Encoders.INT()));Dataset<Row> result = ds2.toDF("id", "name", "age");result.writeStream().format("console").start().awaitTermination();}
}

 结果:

以上代码编写完成之后,向监控的目录“./data”中不断写入含有以下内容的文件,可以看到控制台有对应的流数据输出,这里一定是原子性的将文件复制到对应目录下。文件内容如下:

1-zhangsan-18
2-lisi-19
3-ww-20

2、读取csv文件

Scala代码如下:

package com.lanson.structuredStreaming.sourceimport org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
import org.apache.spark.sql.streaming.StreamingQuery
import org.apache.spark.sql.types.StructType/*** Structured Streaming 读取CSV数据*/
object SSReadCsvData {def main(args: Array[String]): Unit = {//1.创建对象val spark: SparkSession = SparkSession.builder().master("local").appName("SSReadCsvData").config("spark.sql.shuffle.partitions", 1).getOrCreate()import  spark.implicits._spark.sparkContext.setLogLevel("Error")//2.创建CSV数据schemaval userSchema: StructType = new StructType().add("id", "integer").add("name", "string").add("gender", "string").add("age", "integer")val result: DataFrame = spark.readStream.option("sep", ",").schema(userSchema).csv("./data/")val query: StreamingQuery = result.writeStream.format("console").start()query.awaitTermination()}}

结果:

Java代码如下

package com.lanson.structuredStreaming.source;import java.util.concurrent.TimeoutException;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.StreamingQueryException;
import org.apache.spark.sql.types.StructType;/*** Structured Streaming 读取CSV数据*/public class SSReadCsvData01 {public static void main(String[] args) throws TimeoutException, StreamingQueryException {//1.创建对象SparkSession spark = SparkSession.builder().master("local").appName("SSReadCsvData").config("spark.sql.shuffle.partitions", 1).getOrCreate();spark.sparkContext().setLogLevel("Error");StructType userSchema = new StructType().add("id", "integer").add("name", "string").add("gender", "string").add("age", "integer");Dataset<Row> result = spark.readStream().option("sep", ",").schema(userSchema).csv("./data/");result.writeStream().format("console").start().awaitTermination();}
}

 结果:

以上代码运行之后向对应监控的目录下原子性写入含有数据的csv文件,在控制台可以看到实时监控内容。文件内容如下:

1,zhangsan,一班,100
2,lisi,二班,200
3,wangwu,一班,300
4,maliu,二班,100
5,tianqi,三班,100
6,gaoba,三班,50
7,zs2,四班,50

3、读取json文件

Scala代码如下:

package com.lanson.structuredStreaming.sourceimport org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.streaming.StreamingQuery
import org.apache.spark.sql.types.StructType/***  Structured Streaming 监控Json格式数据*/
object SSReadJsonData {def main(args: Array[String]): Unit = {//1.创建对象val spark: SparkSession = SparkSession.builder().master("local").appName("SSReadCsvData").config("spark.sql.shuffle.partitions", 1).getOrCreate()import  spark.implicits._spark.sparkContext.setLogLevel("Error")//2.创建 json 数据schemaval userSchema: StructType = new StructType().add("id", "integer").add("name", "string").add("age", "integer")val result: DataFrame = spark.readStream.schema(userSchema).json("./data/")val query: StreamingQuery = result.writeStream.format("console").start()query.awaitTermination()}}

结果:

Java代码如下

package com.lanson.structuredStreaming.source;import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;
import org.apache.spark.sql.types.StructType;
import java.util.concurrent.TimeoutException;/*** Structured Streaming实时监控目录中json文件作为数据流*/
public class SSReadJsonData01 {public static void main(String[] args) throws TimeoutException, StreamingQueryException {//1.创建对象SparkSession spark = SparkSession.builder().appName("File Source test").master("local").getOrCreate();//2.设置日志spark.sparkContext().setLogLevel("Error");//3.设置SchemaStructType userSchema = new StructType().add("id", "integer").add("name", "string").add("age", "integer");//4.指定监控目录读取数据json数据Dataset<Row> ds = spark.readStream().option("sep", ",").schema(userSchema).json("./data/");//5.打印数据到控制台StreamingQuery query =ds.writeStream().format("console").start();query.awaitTermination();}
}

结果:

以上代码启动之后,向监控的目录“./data”下原子写入含有以下内容的json文件,在控制台可以看到实时监控内容。json文件内容如下:

{"id":1,"name":"zs","age":18}
{"id":2,"name":"ls","age":19}
{"id":3,"name":"ww","age":20}
{"id":4,"name":"ml","age":21}

注意:实时监控json格式数据时,创建的Schema 中的字段需要与Json中的属性保持一致,否则在映射成表时,Schema中含有但在Json中没有的属性的字段对应的数据会为null。

二、Socket Source 

读取Socket方式需要指定对应的host和port,读取Socket数据源多用于测试场景,这里不再演示。

可以参考案例:

Spark实时(三):Structured Streaming入门案例-CSDN博客

三、Rate Source

Rate Source是以每秒指定的行数生成数据,每个输出行包含一个timestamp和value,其中timestamp是一个Timestamp含有信息分配的时间类型,value是从0开始的Long类型的数据,Rate Source式多用于测试。

scala代码如下:

package com.lanson.structuredStreaming.sourceimport org.apache.spark.sql.{DataFrame, SparkSession}/*** SSRateSource*/
object SSRateSource {def main(args: Array[String]): Unit = {//1.创建对象val spark: SparkSession = SparkSession.builder().master("local").appName("rate test")
//      .config("spark.sql.shuffle.partitions", 1).getOrCreate()val result: DataFrame = spark.readStream.format("rate")// 配置每秒生成多少行数据,默认1行.option("rowsPerSecond", "10").option("numPartitions", 5).load()result.writeStream.format("console").option("numRows","100").option("truncate","false").start().awaitTermination()}}

结果:

Java代码如下:

package com.lanson.structuredStreaming.source;import java.util.concurrent.TimeoutException;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.StreamingQueryException;public class ssratesource01 {public static void main(String[] args) throws TimeoutException, StreamingQueryException {//1.创建对象SparkSession spark = SparkSession.builder().master("local").appName("rate test").getOrCreate();spark.sparkContext().setLogLevel("Error");Dataset<Row> result = spark.readStream().format("rate")// 配置每秒生成多少行数据,默认1行.option("rowsPerSecond", "10").option("numPartitions", 5).load();result.writeStream().format("console").option("numRows","100").option("truncate","false").start().awaitTermination();}
}

结果: 

 


  • 📢博客主页:https://lansonli.blog.csdn.net
  • 📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正!
  • 📢本文由 Lansonli 原创,首发于 CSDN博客🙉
  • 📢停下休息的时候不要忘了别人还在奔跑,希望大家抓紧时间学习,全力奔赴更美好的生活✨

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/50233.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

移动式气象站:便携科技的天气守望者

在科技日新月异的今天&#xff0c;我们身边的许多设备都在向着更加智能化、便携化的方向发展。而在气象观测领域&#xff0c;移动式气象站的出现&#xff0c;不仅改变了传统气象观测的固有模式&#xff0c;更以其灵活性和实时性&#xff0c;在气象监测、灾害预警等领域发挥着越…

MySQL练习05

题目 步骤 触发器 use mydb16_trigger; #使用数据库create table goods( gid char(8) primary key, name varchar(10), price decimal(8,2), num int);create table orders( oid int primary key auto_increment, gid char(10) not null, name varchar(10), price decima…

基于python的BP神经网络红酒品质分类预测模型

1 导入必要的库 import pandas as pd import numpy as np import matplotlib.pyplot as plt from sklearn.model_selection import train_test_split from sklearn.preprocessing import LabelEncoder from tensorflow.keras.models import Sequential from tenso…

NET8部署Kestrel服务HTTPS深入解读TLS协议之Certificate证书

Certificate证书 Certificate称为数字证书。数字证书是一种证明身份的电子凭证&#xff0c;它包含一个公钥和一些身份信息&#xff0c;用于验证数字签名和加密通信。数字证书在网络通信、电子签名、认证授权等场景中都有广泛应用。其特征如下&#xff1a; 由权威机构颁发&…

跟李沐学AI:池化层

目录 二维最大池化 填充、步幅和多个通道 平均池化层 池化层总结 二维最大池化 返回滑动窗口中的最大值。 图为池化窗口形状为 22 的最大池化层。着色部分是第一个输出元素&#xff0c;以及用于计算这个输出的输入元素: max(0,1,3,4)4。池化层与卷积层类似&#xff0c;不断…

单元测试的最佳实践

整体架构 合适的架构可以提升可测试性。比如菱形对称架构的模块化和解耦特性使得系统各个部分可以独立进行单元测试。这不仅提高了测试的效率&#xff0c;还能够减少测试的依赖性&#xff0c;提高测试准确性。 代码设计 代码设计和可测试性有密切关联。强烈建议一个方法的代码行…

Android 15 适配整理——实践版

背景 谷歌发布Android 15后&#xff0c;国内的手机厂商迅速行动&#xff0c;开始了新系统的适配工作。小米、OPPO、vivo和联想等金标联盟成员联合发布了适配公告&#xff0c;督促APP开发者在2024年8月31日前完成适配工作&#xff0c;否则将面临搜索标签提示、应用降级、分机型…

JavaWeb笔记_JSTL标签库JavaEE三层架构案例

一.JSTL标签库 1.1 JSTL概述 JSTL(jsp standard tag library):JSP标准标签库,它是针对EL表达式一个扩展,通过JSTL标签库与EL表达式结合可以完成更强大的功能 JSTL它是一种标签语言,JSTL不是JSP内置标签 JSTL标签库主要包含: ****核心标签 格式化标签 …

Android平台轻量级RTSP服务模块二次封装版调用说明

技术背景 在前面的blog&#xff0c;我们发布了Android平台轻量级RTSP服务模块的技术对接说明&#xff0c;好多开发者希望&#xff0c;更黑盒的对接轻量级RTSP服务这块&#xff0c;专注于自身业务逻辑。为此&#xff0c;我们针对Android平台轻量级RTSP服务模块&#xff0c;做了…

解析capl文件生成XML Test Module对应的xml工具

之前一直用的CAPL Test Module来写代码&#xff0c;所有的控制都是在MainTest()函数来实现的&#xff0c;但是有一次&#xff0c;代码都写完了&#xff0c;突然需要用xml的这种方式来实现&#xff0c;很突然&#xff0c;之前也没研究过&#xff0c;整理这个xml整的一身汗&#…

vue3前端开发-小兔鲜项目-登录组件的开发表单验证

vue3前端开发-小兔鲜项目-登录组件的开发表单验证&#xff01;现在开始写登录页面的内容。首先这一次完成基础的首页按钮点击跳转&#xff0c;以及初始化一些简单的表单的输入验证。后期还会继续完善内容。 1&#xff1a;首先还是准备好login页面的组件代码内容。 <script …

企业风险管理的智能监控:Kompas.ai如何预警潜在风险

在企业运营中&#xff0c;风险管理是确保业务连续性和稳健发展的关键环节。随着技术的进步&#xff0c;智能风险管理成为预防损失和识别潜在风险的重要手段。Kompas.ai&#xff0c;作为一款先进的智能监控工具&#xff0c;正通过数据分析和模式识别技术&#xff0c;帮助企业实现…

《华为数据之道》读书笔记六---面向自助消费的数据服务建设

七、从结果管理到过程管理&#xff0c; 从能“看”到能“管” 1、数据赋能业务运营 数字化运营旨在利用数字化技术获取、管理和分析数据&#xff0c;从而为企业的战略决策与业务运营提供可量化的、科学的支撑。 数字化运营归根结底是运营&#xff0c;旨在推动运营效率与能力的…

基站光伏直流叠光能效管理方案

安科瑞 华楠 基站现状和趋势 5G基站是专门提供5G网络服务的公用移动通信基站。5G基站主要用于提供5G空口协议功能&#xff0c;支持与用户设备、核心网之间的通信。按照逻辑功能划分&#xff0c;5G基站可分为5G基带单元与5G射频单元&#xff0c;二者之间可通过CPRI或eCPRI接口…

AFSim仿真系统-架构概览

引言 本文档从最终用户的角度描述了AFSIM架构&#xff0c;旨在帮助最终用户深入了解AFSIM的操作概念。 核心架构 AFSIM基于面向对象的C架构&#xff0c;提供了一种可扩展和模块化的架构&#xff0c;使得许多附加功能能够轻松集成。AFSIM允许新的组件模型&#xff08;如传感器、…

vue3.0学习笔记(二)——生命周期与响应式数据(ref,reactive,toRef,toRefs函数)

1. 组合API-setup函数 使用细节&#xff1a; setup 是一个新的组件选项&#xff0c;作为组件中使用组合API的起点。从组件生命周期来看&#xff0c;它的执行在组件实例创建之前vue2.x的beforeCreate执行。这就意味着在setup函数中 this 还不是组件实例&#xff0c;this 此时是…

SpringBoot 实现图形验证码

一、最终结果展示 二、前端代码 2.1 index.html <!DOCTYPE html> <html lang"en"><head><meta charset"utf-8"><title>验证码</title><style>#inputCaptcha {height: 30px;vertical-align: middle;}#verifica…

交易积累-BR

BR指标&#xff0c;全称是“买卖意愿比率”&#xff08;Bull/Bear Ratio&#xff09;&#xff0c;是一个衡量市场买卖力量对比的技术分析工具。BR指标是由中国的技术分析师发展起来的&#xff0c;它通过比较股票或市场在一定时间内的上涨能量与下跌能量来评估市场情绪和潜在的趋…

自动驾驶(八十八)---------通讯之SOMEIP

1. 什么是SOME/IP 服务导向架构&#xff08;SOA&#xff0c;Service-Oriented Architecture&#xff09;是一种设计软件系统的方法&#xff0c;强调通过可重用的服务来实现系统的松散耦合。每个服务是独立的功能单元&#xff0c;可以被不同的应用程序使用。这些服务通过标准化的…

【教程】Node.js+Apache 部署网页全过程(非常详细!)

文章目录 背景0. 前置假设1. 更新系统和安装必要软件2. 打包并上传项目到服务器2.1 识别需要上传的文件2.2 文件归档和压缩2.3 压缩文件上传到服务器2.4 解压文件 3. 配置Node.js应用3.1 启动 PM23.2 确认 PM2 进程 4. 配置Apache反向代理5. 启用必要的Apache模块6. 检查 Apach…