Spark实时(六):Output Sinks案例演示

文章目录

Output Sinks案例演示

一、​​​​​​​File sink

二、​​​​​​​​​​​​​​Memory Sink

三、​​​​​​​​​​​​​​Foreach Sink

1、​​​​​​​foreachBatch

2、​​​​​​​​​​​​​​foreach


Output Sinks案例演示

当我们对流式数据处理完成之后,可以将数据写出到Flie、Kafka、console控制台、memory内存,或者直接使用foreach做个性化处理。关于将数据结果写出到Kafka在StructuredStreaming与Kafka整合部分再详细描述。

对于一些可以保证端到端容错的sink输出,需要指定checkpoint目录来写入数据信息,指定的checkpoint目录可以是HDFS中的某个路径,设置checkpoint可以通过SparkSession设置也可以通过DataStreamWriter设置,设置方式如下:

//通过SparkSession设置checkpoint
spark.conf.set("spark.sql.streaming.checkpointLocation","hdfs://mycluster/checkpintdir")或者//通过DataStreamWriter设置checkpoint
df.writeStream.format("xxx").option("checkpointLocation","./checkpointdir").start()

checkpoint目录中会有以下目录及数据:

  • offsets:记录偏移量目录,记录了每个批次的偏移量。
  • commits:记录已经完成的批次,方便重启任务检查完成的批次与offset批次做对比,继续offset消费数据,运行批次。
  • metadata:metadata元数据保存jobid信息。
  • sources:数据源各个批次读取详情。
  • sinks:数据sink写出批次情况。
  • state:记录状态值,例如:聚合、去重等场景会记录相应状态,会周期性的生成snapshot文件记录状态。

下面对File、memoery、foreach output Sink进行演示。

一、​​​​​​​​​​​​​​File sink

Flie Sink就是数据结果实时写入到执行目录下的文件中,每次写出都会形成一个新的文件,文件格式可以是parquet、orc、json、csv格式。

Scala代码如下:

package com.lanson.structuredStreaming.sinkimport org.apache.spark.sql.streaming.StreamingQuery
import org.apache.spark.sql.{DataFrame, SparkSession}/***  读取Socket数据,将数据写入到csv文件*/
object FileSink {def main(args: Array[String]): Unit = {val spark: SparkSession = SparkSession.builder().master("local").appName("File Sink").config("spark.sql.shuffle.partitions", 1).getOrCreate()val result: DataFrame = spark.readStream.format("socket").option("host", "node3").option("port", 9999).load()val query: StreamingQuery = result.writeStream.format("csv").option("path", "./dataresult/csvdir").option("checkpointLocation","./checkpint/dir3").start()query.awaitTermination()}
}

 ​​​​​​​

在socket中输入数据之后,每批次数据写入到一个csv文件中。 

二、​​​​​​​​​​​​​​Memory Sink

memory Sink是将结果作为内存表存储在内存中,支持Append和Complete输出模式,这种结果写出到内存表方式多用于测试,如果数据量大要慎用。另外查询结果表中数据时需要写一个循环每隔一段时间读取内存中的数据。

Scala代码如下:

package com.lanson.structuredStreaming.sinkimport org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.streaming.StreamingQuery/***  读取scoket 数据写入memory 内存,再读取*/
object MemorySink {def main(args: Array[String]): Unit = {val spark: SparkSession = SparkSession.builder().master("local").appName("Memory Sink").config("spark.sql.shuffle.partitions", 1).getOrCreate()spark.sparkContext.setLogLevel("Error")val result: DataFrame = spark.readStream.format("socket").option("host", "node3").option("port", 9999).load()val query: StreamingQuery = result.writeStream.format("memory").queryName("mytable").start()//查询内存中表数据while(true){Thread.sleep(2000)spark.sql("""|select * from mytable""".stripMargin).show()}query.awaitTermination()}}

三、​​​​​​​​​​​​​​Foreach Sink

foreach 可以对输出的结果数据进行自定义处理逻辑,针对结果数据自定义处理逻辑数据除了有foreach之外还有foreachbatch,两者区别是foreach是针对一条条的数据进行自定义处理,foreachbatch是针对当前小批次数据进行自定义处理。

1、​​​​​​​foreachBatch

foreachBatch可以针对每个批次数据进行自定义处理,该方法需要传入一个函数,函数有2个参数,分别为当前批次数据对应的DataFrame和当前batchId。

案例:实时读取socket数据,将结果批量写入到mysql中。

Scala代码如下:

package com.lanson.structuredStreaming.sinkimport org.apache.spark.sql.streaming.StreamingQuery
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}/***  读取Socket 数据,将数据写出到mysql中*/
object ForeachBatchTest {def main(args: Array[String]): Unit = {val spark: SparkSession = SparkSession.builder().appName("ForeachBatch Sink").master("local").config("spark.sql.shuffle.partitions", 1).getOrCreate()import spark.implicits._val df: DataFrame = spark.readStream.format("socket").option("host", "node2").option("port", 9999).load()val personDF: DataFrame = df.as[String].map(line => {val arr: Array[String] = line.split(",")(arr(0).toInt, arr(1), arr(2).toInt)}).toDF("id", "name", "age")val query: StreamingQuery = personDF.writeStream.foreachBatch((batchDF: DataFrame, batchId: Long) => {println("batchID : " + batchId)batchDF.write.mode(SaveMode.Append).format("jdbc").option("url","jdbc:mysql://node3:3306/testdb?useSSL=false").option("user","root").option("password","123456").option("dbtable","person").save()}).start()query.awaitTermination();}}

运行结果: 

Java代码如下:

 

package com.lanson.structuredStreaming.sink;import java.util.concurrent.TimeoutException;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.VoidFunction2;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.StreamingQueryException;
import scala.Tuple3;public class ForeachBatchTest01 {public static void main(String[] args) throws TimeoutException, StreamingQueryException {SparkSession spark = SparkSession.builder().master("local").appName("ForeachBatchTest01").config("spark.sql.shuffle.partitions", 1).getOrCreate();spark.sparkContext().setLogLevel("Error");Dataset<Row> result = spark.readStream().format("socket").option("host", "node2").option("port", 9999).load().as(Encoders.STRING()).map(new MapFunction<String, Tuple3<Integer, String, Integer>>() {@Overridepublic Tuple3<Integer, String, Integer> call(String line) throws Exception {String[] arr = line.split(",");return new Tuple3<>(Integer.valueOf(arr[0]), arr[1], Integer.valueOf(arr[2]));}}, Encoders.tuple(Encoders.INT(), Encoders.STRING(), Encoders.INT())).toDF("id", "name", "age");result.writeStream().foreachBatch(new VoidFunction2<Dataset<Row>, Long>() {@Overridepublic void call(Dataset<Row> df, Long batchId) throws Exception {System.out.println("batchID : "+batchId);//将df 保存到mysqldf.write().format("jdbc").mode(SaveMode.Append).option("url","jdbc:mysql://node3:3306/testdb?useSSL=false" ).option("user","root" ).option("password","123456" ).option("dbtable","person" ).save();}}).start().awaitTermination();}
}

运行结果:

 

在mysql中创建testdb库,并创建person表,这里也可以不创建表:

create database testdb;
create table person(id int(10),name varchar(255),age int(2));
1,zs,18
2,ls,19
3,ww,20
4,ml,21
5,tq,22
6,ll,29

mysql结果如下:

2、​​​​​​​​​​​​​​foreach

foreach可以针对数据结果每条数据进行处理。

案例:实时读取socket数据,将结果一条条写入到mysql中。

Scala代码如下:

package com.lanson.structuredStreaming.sinkimport java.sql.{Connection, DriverManager, PreparedStatement}import org.apache.spark.sql.execution.streaming.sources.ForeachWrite
import org.apache.spark.sql.{DataFrame, ForeachWriter, Row, SparkSession}object ForeachSinkTest {def main(args: Array[String]): Unit = {val spark: SparkSession = SparkSession.builder().appName("ForeachBatch Sink").master("local").config("spark.sql.shuffle.partitions", 1).getOrCreate()spark.sparkContext.setLogLevel("Error")import spark.implicits._val df: DataFrame = spark.readStream.format("socket").option("host", "node2").option("port", 9999).load()val personDF: DataFrame = df.as[String].map(line => {val arr: Array[String] = line.split(",")(arr(0).toInt, arr(1), arr(2).toInt)}).toDF("id", "name", "age")personDF.writeStream.foreach(new ForeachWriter[Row]() {var  conn: Connection  = _var pst: PreparedStatement = _//打开资源override def open(partitionId: Long, epochId: Long): Boolean = {conn = DriverManager.getConnection("jdbc:mysql://node3:3306/testdb?useSSL=false","root","123456")pst = conn.prepareStatement("insert into person values (?,?,?)")true}//一条条处理数据override def process(row: Row): Unit = {val id: Int = row.getInt(0)val name: String = row.getString(1)val age: Int = row.getInt(2)pst.setInt(1,id)pst.setString(2,name)pst.setInt(3,age)pst.executeUpdate()}//关闭释放资源override def close(errorOrNull: Throwable): Unit = {pst.close()conn.close()}}).start().awaitTermination()}}

运行结果:

Java代码如下:

package com.lanson.structuredStreaming.sink;import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.concurrent.TimeoutException;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.ForeachWriter;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.StreamingQueryException;
import scala.Tuple3;public class ForeachSinkTest01 {public static void main(String[] args) throws TimeoutException, StreamingQueryException {SparkSession spark = SparkSession.builder().master("local").appName("SSReadSocketData").config("spark.sql.shuffle.partitions", 1).getOrCreate();spark.sparkContext().setLogLevel("Error");Dataset<Row> result = spark.readStream().format("socket").option("host", "node2").option("port", 9999).load().as(Encoders.STRING()).map(new MapFunction<String, Tuple3<Integer, String, Integer>>() {@Overridepublic Tuple3<Integer, String, Integer> call(String line) throws Exception {String[] arr = line.split(",");return new Tuple3<>(Integer.valueOf(arr[0]), arr[1], Integer.valueOf(arr[2]));}}, Encoders.tuple(Encoders.INT(), Encoders.STRING(), Encoders.INT())).toDF("id", "name", "age");result.writeStream().foreach(new ForeachWriter<Row>() {Connection conn;PreparedStatement pst ;@Overridepublic boolean open(long partitionId, long epochId) {try {conn = DriverManager.getConnection("jdbc:mysql://node3:3306/testdb?useSSL=false", "root", "123456");pst = conn.prepareStatement("insert into person values (?,?,?)");} catch (SQLException e) {e.printStackTrace();}return true;}@Overridepublic void process(Row row) {int id = row.getInt(0);String name = row.getString(1);int age = row.getInt(2);try {pst.setInt(1,id );pst.setString(2,name );pst.setInt(3,age );pst.executeUpdate();} catch (SQLException e) {e.printStackTrace();}}@Overridepublic void close(Throwable errorOrNull) {try {pst.close();conn.close();} catch (SQLException e) {e.printStackTrace();}}}).start().awaitTermination();}
}

运行

以上代码编写完成后,清空mysql person表数据,然后输入以下数据:

1,zs,18
2,ls,19
3,ww,20
4,ml,21
5,tq,22
6,ll,29
1,zs,18
2,ls,19
3,ww,20
4,ml,21
5,tq,22
6,ll,29

mysql结果如下:


  • 📢博客主页:https://lansonli.blog.csdn.net
  • 📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正!
  • 📢本文由 Lansonli 原创,首发于 CSDN博客🙉
  • 📢停下休息的时候不要忘了别人还在奔跑,希望大家抓紧时间学习,全力奔赴更美好的生活✨

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/51009.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

kotlin协程-- 基础概念 ①|创建和使用

引言 首先先说一些相关概念 1.并发与并行 在操作系统中我们曾经学到过并发与并行 并发: 是同一个时刻只有一条指令在执行,其他指令没有再执行,但是由于CPU的时间片特别短,导致多个指令来回切换的时间间隔特别短,就好像是同一时间多条指令在执行。单核CPU与多核CPU都可以进…

【python】Python常见的面试题解析:深入探索与实践,助你少走弯路

✨✨ 欢迎大家来到景天科技苑✨✨ &#x1f388;&#x1f388; 养成好习惯&#xff0c;先赞后看哦~&#x1f388;&#x1f388; &#x1f3c6; 作者简介&#xff1a;景天科技苑 &#x1f3c6;《头衔》&#xff1a;大厂架构师&#xff0c;华为云开发者社区专家博主&#xff0c;…

若依ruoyi+AI项目二次开发(智能售货机运营管理系统)

(一) 帝可得 - 产品原型 - 腾讯 CoDesign (qq.com)

一些电脑的操作技巧,你知道吗?

我整理了几个电脑使用的实用技巧&#xff0c;能够帮你提升办公效率&#xff0c;一起来看看吧&#xff01; 技巧一&#xff1a;反方向移动单元格 一般来讲&#xff0c;我们按下【Tab】键、【Enter】键的时候&#xff0c;会切换到右边或者下边的单元格&#xff0c;想要反向移动…

第2章 编译SDK

安装编译依赖 sudo apt-get update sudo apt-get install clang-format astyle libncurses5-dev build-essential python-configparser sconssudo apt-get install repo git ssh make gcc libssl-dev liblz4-tool \ expect g patchelf chrpath gawk texinfo chrpath diffstat …

董宇辉离职,我一点都不意外!只不过感觉来的太快

下面这张图&#xff0c;是我在半年多前写的一段随笔&#xff0c;没想到来的这么快&#xff01; 碰巧的是今天中午&#xff0c;在开发者群里有两位老铁自曝&#xff0c;本以为能公司干到老&#xff0c;但公司却不给机会&#xff0c;已经不在是公司员工了。 最近&#xff0c;晓衡…

粗解React 和 Vue 的异同

相同点&#xff1a; 1、都使用虚拟 DOM【Virtural DOM】 Vue与React都使用了 Virtual DOM Diff算法&#xff0c; 不管是Vue的Template模板options api 写法&#xff0c; 还是React的Class或者Function写法,最后都是生成render函数&#xff0c;而render函数执行返回VNode(虚拟…

iOS collectionView 滑动出现空白

iOS collectionView 滑动出现空白 一个很常见的 banner 轮播&#xff0c;滑动的时候&#xff0c;有时候会出现空白&#xff0c;检查了下&#xff0c;发现代码没什么问题&#xff0c;上网查了也没啥结果&#xff0c;最后的解决方法是自定义layout解决 interface TMLoopViewLayo…

创新概念:柯尔莫哥洛夫-阿诺德网络

文章目录 一、说明二、基础概念三、kolmogorov-Arnold 网络性质3.1 KAN 的潜在优势3.2 挑战和注意事项 四、基本 KAN 超参数五、COLAB 代码六、注意点 一、说明 kolmogorov-Arnold 网络 (KAN) 是深度学习领域的一项“创新”&#xff0c;它提供了一种受现有 Kolmogorov-Arnold …

python基础---1.变量、运算符和表达式、基本数据结构

&#x1f388;个人主页&#xff1a;靓仔很忙i &#x1f4bb;B 站主页&#xff1a;&#x1f449;B站&#x1f448; &#x1f389;欢迎 &#x1f44d;点赞✍评论⭐收藏 &#x1f917;收录专栏&#xff1a;python &#x1f91d;希望本文对您有所裨益&#xff0c;如有不足之处&…

使用Docker搭建MySql的主从同步+ShardingSphere搭建Mysql的读写分离

参考课程 尚硅谷ShardingSphere5实战教程&#xff08;快速入门掌握核心&#xff09;_哔哩哔哩_bilibili 主服务器 创建容器 docker run -d \ -p 3306:3306 \ -v /kira/mysql/master/conf:/etc/mysql/conf.d \ -v /kira/mysql/master/data:/var/lib/mysql \ -e MYSQL_ROOT…

(day26)leecode热题——找到字符串中所有字母异位词

描述 给定两个字符串 s 和 p&#xff0c;找到 s 中所有 p 的 异位词 的子串&#xff0c;返回这些子串的起始索引。不考虑答案输出的顺序。 异位词 指由相同字母重排列形成的字符串&#xff08;包括相同的字符串&#xff09;。 示例 1: 输入: s "cbaebabacd", p …

【Git-驯化】一文搞懂git中代码冲突的解决方案大全

【Git-驯化】一文搞懂git中代码冲突的解决方案大全 本次修炼方法请往下查看 &#x1f308; 欢迎莅临我的个人主页 &#x1f448;这里是我工作、学习、实践 IT领域、真诚分享 踩坑集合&#xff0c;智慧小天地&#xff01; &#x1f387; 免费获取相关内容文档关注&#xff1a…

[C++实战]日期类的实现

&#x1f496;&#x1f496;&#x1f496;欢迎来到我的博客&#xff0c;我是anmory&#x1f496;&#x1f496;&#x1f496; 又和大家见面了 欢迎来到C探索系列 作为一个程序员你不能不掌握的知识 先来自我推荐一波 个人网站欢迎访问以及捐款 推荐阅读 如何低成本搭建个人网站…

ELK安装(Elasticsearch+Logstash+Kibana+Filebeat)

一、简介 1.1、软件简介 ELK其实是Elasticsearch&#xff0c;Logstash 和 Kibana三个产品的首字母缩写&#xff0c;这三款都是开源产品。 1.1.1、Elasticsearch简介 Elasticsearch 是一个分布式、高扩展、高实时的搜索与数据分析引擎。它能很方便的使大量数据具有搜索、分析…

springboot微信老人健康与饮食管理系统-计算机毕业设计源码82939

基于微信老人健康与饮食管理系统的小程序 摘 要 基于Spring Boot的微信老人健康与饮食管理系统的小程序致力于为老年人提供便捷的健康管理和饮食指导服务。该小程序整合了健康资讯浏览、食谱推荐、健康评估等功能模块&#xff0c;通过系统的设计与实现&#xff0c;旨在帮助老年…

古丝绸之路传闻二:十年败壳精灵显,一介穷神富贵来

古丝绸之路传闻二&#xff1a;十年败壳精灵显&#xff0c;一介穷神富贵来 &#xff08;接上节&#xff1a;古丝绸之路传闻&#xff1a;分内功名匣里财&#xff0c;不关聪慧不关呆&#xff09; 先别说闲话。且说众人带着经纪主人到船上发货&#xff0c;文若虚把之前的事情说了一…

AccessLog| 一款开源的日志分析系统

前言 ClkLog作为分析系列产品中的前端数据分析系统&#xff0c;通过采集前端应用数据进行用户行为分析。其社区版从23年9月发布至今已有近一年&#xff0c;商业版也上线快半年&#xff0c;感谢大家一直以来的关注和支持&#xff0c;ClkLog会继续做好产品升级与服务&#xff0c;…

Linux冯诺依曼体系、操作系统、进程概念、进程状态、进程切换

个人主页&#xff1a;仍有未知等待探索-CSDN博客 专题分栏&#xff1a;Linux 目录 一、冯诺依曼体系结构 二、操作系统 1、概念 2、为什么要有操作系统&#xff1f; 3、理解操作系统 1.管理的本质 2.管理的概念 3.操作系统结构图 4.为什么要有操作系统&#xff1f; 三…

python-NLP:2词性标注与命名实体识别

文章目录 词性标注命名实体识别时间命名实体&#xff08;规则方法&#xff09;CRF 命名实体识别方法 词性标注 词性是词汇基本的语法属性&#xff0c;通常也称为词类。词性标注是在给定句子中判定每个词的语法范畴&#xff0c;确定其词性并加以标注的过程。例如&#xff0c;表示…