文章目录
InputSource数据源案例演示
一、File Source
1、读取text文件
2、读取csv文件
3、读取json文件
二、Socket Source
三、Rate Source
InputSource数据源案例演示
在Spark2.0版本之后,DataFrame和Dataset可以表示静态有边界的数据,也可以表示无边界的流式数据。在Structured Streaming中我们可以使用SparkSession针对流式数据源创建对应的Dataset或者DataFrame,并可以像处理批数据一样使用各种Operators操作处理流式数据。
Structured Streaming的数据源目前支持File Source 、Socket Source 、Rate Source、Kafka Source ,与Kafka的整合在后续整理,这里对其他三种数据源分别演示。
一、File Source
Sturctured Streaming可以读取写入目录的文件作为数据流,文件将按照文件修改时间的顺序进行处理,文件必须原子性的存入到监控目录中,支持的格式有text、csv、json、orc、parquet。
1、读取text文件
Scala代码如下:
package com.lanson.structuredStreaming.sourceimport org.apache.spark.sql.streaming.StreamingQuery
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}/*** Structured Streaming监控目录 text格式数据*/
object SSReadTextData {def main(args: Array[String]): Unit = {//1.创建对象val spark: SparkSession = SparkSession.builder().master("local").appName("SSReadTextData").config("spark.sql.shuffle.partitions", 1).getOrCreate()import spark.implicits._spark.sparkContext.setLogLevel("Error")//2.监控目录val ds: Dataset[String] = spark.readStream.textFile("./data/")val result: DataFrame = ds.map(line => {val arr: Array[String] = line.split("-")(arr(0).toInt, arr(1), arr(2).toInt)}).toDF("id", "name", "age")val query: StreamingQuery = result.writeStream.format("console").start()query.awaitTermination()}}
结果:
Java代码如下:
package com.lanson.structuredStreaming.source;import java.util.concurrent.TimeoutException;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.StreamingQueryException;
import scala.Tuple3;public class SSReadTextData01 {public static void main(String[] args) throws TimeoutException, StreamingQueryException {//1.创建对象SparkSession spark = SparkSession.builder().master("local").appName("SSReadSocketData01").config("spark.sql.shuffle.partitions", 1).getOrCreate();spark.sparkContext().setLogLevel("Error");Dataset<String> ds = spark.readStream().textFile("./data/");Dataset<Tuple3<Integer, String, Integer>> ds2 = ds.map(new MapFunction<String, Tuple3<Integer, String, Integer>>() {@Overridepublic Tuple3<Integer, String, Integer> call(String line) throws Exception {String[] arr = line.split("-");return new Tuple3<>(Integer.valueOf(arr[0]), arr[1],Integer.valueOf(arr[2]) );}}, Encoders.tuple(Encoders.INT(), Encoders.STRING(), Encoders.INT()));Dataset<Row> result = ds2.toDF("id", "name", "age");result.writeStream().format("console").start().awaitTermination();}
}
结果:
以上代码编写完成之后,向监控的目录“./data”中不断写入含有以下内容的文件,可以看到控制台有对应的流数据输出,这里一定是原子性的将文件复制到对应目录下。文件内容如下:
1-zhangsan-18
2-lisi-19
3-ww-20
2、读取csv文件
Scala代码如下:
package com.lanson.structuredStreaming.sourceimport org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
import org.apache.spark.sql.streaming.StreamingQuery
import org.apache.spark.sql.types.StructType/*** Structured Streaming 读取CSV数据*/
object SSReadCsvData {def main(args: Array[String]): Unit = {//1.创建对象val spark: SparkSession = SparkSession.builder().master("local").appName("SSReadCsvData").config("spark.sql.shuffle.partitions", 1).getOrCreate()import spark.implicits._spark.sparkContext.setLogLevel("Error")//2.创建CSV数据schemaval userSchema: StructType = new StructType().add("id", "integer").add("name", "string").add("gender", "string").add("age", "integer")val result: DataFrame = spark.readStream.option("sep", ",").schema(userSchema).csv("./data/")val query: StreamingQuery = result.writeStream.format("console").start()query.awaitTermination()}}
结果:
Java代码如下
package com.lanson.structuredStreaming.source;import java.util.concurrent.TimeoutException;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.StreamingQueryException;
import org.apache.spark.sql.types.StructType;/*** Structured Streaming 读取CSV数据*/public class SSReadCsvData01 {public static void main(String[] args) throws TimeoutException, StreamingQueryException {//1.创建对象SparkSession spark = SparkSession.builder().master("local").appName("SSReadCsvData").config("spark.sql.shuffle.partitions", 1).getOrCreate();spark.sparkContext().setLogLevel("Error");StructType userSchema = new StructType().add("id", "integer").add("name", "string").add("gender", "string").add("age", "integer");Dataset<Row> result = spark.readStream().option("sep", ",").schema(userSchema).csv("./data/");result.writeStream().format("console").start().awaitTermination();}
}
结果:
以上代码运行之后向对应监控的目录下原子性写入含有数据的csv文件,在控制台可以看到实时监控内容。文件内容如下:
1,zhangsan,一班,100
2,lisi,二班,200
3,wangwu,一班,300
4,maliu,二班,100
5,tianqi,三班,100
6,gaoba,三班,50
7,zs2,四班,50
3、读取json文件
Scala代码如下:
package com.lanson.structuredStreaming.sourceimport org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.streaming.StreamingQuery
import org.apache.spark.sql.types.StructType/*** Structured Streaming 监控Json格式数据*/
object SSReadJsonData {def main(args: Array[String]): Unit = {//1.创建对象val spark: SparkSession = SparkSession.builder().master("local").appName("SSReadCsvData").config("spark.sql.shuffle.partitions", 1).getOrCreate()import spark.implicits._spark.sparkContext.setLogLevel("Error")//2.创建 json 数据schemaval userSchema: StructType = new StructType().add("id", "integer").add("name", "string").add("age", "integer")val result: DataFrame = spark.readStream.schema(userSchema).json("./data/")val query: StreamingQuery = result.writeStream.format("console").start()query.awaitTermination()}}
结果:
Java代码如下
package com.lanson.structuredStreaming.source;import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;
import org.apache.spark.sql.types.StructType;
import java.util.concurrent.TimeoutException;/*** Structured Streaming实时监控目录中json文件作为数据流*/
public class SSReadJsonData01 {public static void main(String[] args) throws TimeoutException, StreamingQueryException {//1.创建对象SparkSession spark = SparkSession.builder().appName("File Source test").master("local").getOrCreate();//2.设置日志spark.sparkContext().setLogLevel("Error");//3.设置SchemaStructType userSchema = new StructType().add("id", "integer").add("name", "string").add("age", "integer");//4.指定监控目录读取数据json数据Dataset<Row> ds = spark.readStream().option("sep", ",").schema(userSchema).json("./data/");//5.打印数据到控制台StreamingQuery query =ds.writeStream().format("console").start();query.awaitTermination();}
}
结果:
以上代码启动之后,向监控的目录“./data”下原子写入含有以下内容的json文件,在控制台可以看到实时监控内容。json文件内容如下:
{"id":1,"name":"zs","age":18}
{"id":2,"name":"ls","age":19}
{"id":3,"name":"ww","age":20}
{"id":4,"name":"ml","age":21}
注意:实时监控json格式数据时,创建的Schema 中的字段需要与Json中的属性保持一致,否则在映射成表时,Schema中含有但在Json中没有的属性的字段对应的数据会为null。
二、Socket Source
读取Socket方式需要指定对应的host和port,读取Socket数据源多用于测试场景,这里不再演示。
可以参考案例:
Spark实时(三):Structured Streaming入门案例-CSDN博客
三、Rate Source
Rate Source是以每秒指定的行数生成数据,每个输出行包含一个timestamp和value,其中timestamp是一个Timestamp含有信息分配的时间类型,value是从0开始的Long类型的数据,Rate Source式多用于测试。
scala代码如下:
package com.lanson.structuredStreaming.sourceimport org.apache.spark.sql.{DataFrame, SparkSession}/*** SSRateSource*/
object SSRateSource {def main(args: Array[String]): Unit = {//1.创建对象val spark: SparkSession = SparkSession.builder().master("local").appName("rate test")
// .config("spark.sql.shuffle.partitions", 1).getOrCreate()val result: DataFrame = spark.readStream.format("rate")// 配置每秒生成多少行数据,默认1行.option("rowsPerSecond", "10").option("numPartitions", 5).load()result.writeStream.format("console").option("numRows","100").option("truncate","false").start().awaitTermination()}}
结果:
Java代码如下:
package com.lanson.structuredStreaming.source;import java.util.concurrent.TimeoutException;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.StreamingQueryException;public class ssratesource01 {public static void main(String[] args) throws TimeoutException, StreamingQueryException {//1.创建对象SparkSession spark = SparkSession.builder().master("local").appName("rate test").getOrCreate();spark.sparkContext().setLogLevel("Error");Dataset<Row> result = spark.readStream().format("rate")// 配置每秒生成多少行数据,默认1行.option("rowsPerSecond", "10").option("numPartitions", 5).load();result.writeStream().format("console").option("numRows","100").option("truncate","false").start().awaitTermination();}
}
结果:
- 📢博客主页:https://lansonli.blog.csdn.net
- 📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正!
- 📢本文由 Lansonli 原创,首发于 CSDN博客🙉
- 📢停下休息的时候不要忘了别人还在奔跑,希望大家抓紧时间学习,全力奔赴更美好的生活✨