很多初次接触到StructuredStreaming 应该会写一个这样的案例
- py脚本不断产生数据写入linux本地, 通过hdfs dfs 建目录文件来实时存储到HDFS中
1. 指定数据schema: 实时json数据
2. 数据源地址:HDFS
3. 结果落地位置: HDFS
这个小案例重点在于数据传输
- item源码:
// 1. 创建sparksessionval spark: SparkSession = SparkSession.builder().appName("HDFS_source").master("local[4]").getOrCreate()// 1. 指定data源schema---jsonval schema = new StructType().add("name", dataType = "string").add("age", dataType = "integer")// 2.指定源址hdfssourceval source = spark.readStream.schema(schema).json("hdfs://hadoop102:8020/dataset/dataset")// 3.结果val outputPath = "hdfs://hadoop102:8020/filetmp" // 结果存储路径hdfssource.writeStream.outputMode(OutputMode.Append()).format("json").option("checkpointLocation", "hdfs://hadoop102:8020/checkpoint") // hdfs检查点的位置.start(outputPath).awaitTermination()
报错信息:java.lang.IllegalArgumentException: 'path' is not specified
就是没有指定流处理的sink path在start()中传入sink path 即可;
指定checkpointLocation 地址做容错(也就是检查点)
format落地格式 (parquet , json ...)具体场景具体分析
如果只是对数据进行处理然后打印到console 不用指定sink path