【IDEA+Spark Streaming 3.4.1+Dstream监控套接字流统计WordCount保存至MySQL8】

【IDEA+Spark Streaming 3.4.1+Dstream监控套接字流统计WordCount保存至MySQL8】

把DStream写入到MySQL数据库中

  • Spark 3.4.1
  • MySQL 8.0.30
  • sbt 1.9.2

文章目录

  • 【IDEA+Spark Streaming 3.4.1+Dstream监控套接字流统计WordCount保存至MySQL8】
  • 前言
  • 一、背景说明
  • 二、使用步骤
    • 1.引入库
    • 2.开发代码
    • 运行测试
  • 总结


前言

需要基于Spark Streaming 将实时监控的套接字流统计WordCount结果保存至MySQL


提示:本项目通过sbt控制依赖

一、背景说明

在Spark应用中,外部系统经常需要使用到Spark DStream处理后的数据,因此,需要采用输出操作把DStream的数据输出到数据库或者文件系统中

Spark Streaming是一个基于Spark的实时计算框架,它可以从多种数据源消费数据,并对数据进行高效、可扩展、容错的处理。Spark Streaming的工作原理有以下几个步骤:

  • 数据接收:Spark Streaming可以从各种输入源接收数据,如Kafka、Flume、Twitter、Kinesis等,然后将数据分发到Spark集群中的不同节点上。每个节点上有一个接收器(Receiver)负责接收数据,并将数据存储在内存或磁盘中。
  • 数据划分:Spark Streaming将连续的数据流划分为一系列小批量(Batch)的数据,每个批次包含一定时间间隔内的数据。这个时间间隔称为批处理间隔(Batch Interval),可以根据应用的需求进行设置。每个批次的数据都被封装成一个RDD,RDD是Spark的核心数据结构,表示一个不可变的分布式数据集。
  • 数据处理:Spark Streaming对每个批次的RDD进行转换和输出操作,实现对流数据的处理和分析。转换操作可以使用Spark Core提供的各种函数,如map、reduce、join等,也可以使用Spark Streaming提供的一些特殊函数,如window、updateStateByKey等。输出操作可以将处理结果保存到外部系统中,如HDFS、数据库等。
  • 数据输出:Spark Streaming将处理结果以DStream的形式输出,DStream是一系列连续的RDD组成的序列,表示一个离散化的数据流。DStream可以被进一步转换或输出到其他系统中。

DStream有状态转换操作是指在Spark Streaming中,对DStream进行一些基于历史数据或中间结果的转换,从而得到一个新的DStream。
在这里插入图片描述

二、使用步骤

1.引入库

ThisBuild / version := "0.1.0-SNAPSHOT"ThisBuild / scalaVersion := "2.13.11"lazy val root = (project in file(".")).settings(name := "SparkLearning",idePackagePrefix := Some("cn.lh.spark"),libraryDependencies += "org.apache.spark" %% "spark-sql" % "3.4.1",libraryDependencies += "org.apache.spark" %% "spark-core" % "3.4.1",libraryDependencies += "org.apache.hadoop" % "hadoop-auth" % "3.3.6",libraryDependencies += "org.apache.spark" %% "spark-streaming" % "3.4.1",libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka-0-10" % "3.4.1",libraryDependencies += "org.apache.spark" %% "spark-mllib" % "3.4.1" % "provided",libraryDependencies += "mysql" % "mysql-connector-java" % "8.0.30"
)

2.开发代码

为了实现通过spark Streaming 监控控制台输入,需要开发两个代码:

  • NetworkWordCountStatefultoMysql.scala
  • StreamingSaveMySQL8.scala

NetworkWordCountStatefultoMysql.scala

package cn.lh.spark  import org.apache.spark.SparkConf  
import org.apache.spark.streaming.{Seconds, StreamingContext}  
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}  object NetworkWordCountStatefultoMysql {  def main(args: Array[String]): Unit = {  //    定义状态更新函数  val updateFunc = (values: Seq[Int], state: Option[Int]) => {  val currentCount = values.foldLeft(0)(_ + _)  val previousCount = state.getOrElse(0)  Some(currentCount + previousCount)  }  //    设置log4j日志级别  StreamingExamples.setStreamingLogLevels()  val conf: SparkConf = new SparkConf().setAppName("NetworkCountStateful").setMaster("local[2]")  val scc: StreamingContext = new StreamingContext(conf, Seconds(5))  //    设置检查点,具有容错机制  scc.checkpoint("F:\\niit\\2023\\2023_2\\Spark\\codes\\checkpoint")  val lines: ReceiverInputDStream[String] = scc.socketTextStream("192.168.137.110", 9999)  val words: DStream[String] = lines.flatMap(_.split(" "))  val wordDstream: DStream[(String, Int)] = words.map(x => (x, 1))  val stateDstream: DStream[(String, Int)] = wordDstream.updateStateByKey[Int](updateFunc)  // 打印出状态  stateDstream.print()  // 将统计结果保存到MySQL中  stateDstream.foreachRDD(rdd =>{  val repartitionedRDD = rdd.repartition(3)  repartitionedRDD.foreachPartition(StreamingSaveMySQL8.writeToMySQL)  })  scc.start()  scc.awaitTermination()  scc.stop()  }  }

StreamingSaveMySQL8.scala

package cn.lh.spark  import java.sql.DriverManager  object StreamingSaveMySQL8 {  // 定义写入 MySQL 的函数  def writeToMySQL(iter: Iterator[(String,Int)]): Unit = {  // 保存到MySQL  val ip = "192.168.137.110"  val port = "3306"  val db = "sparklearning"  val username = "lh"  val pwd = "Lh123456!"  val jdbcurl = s"jdbc:mysql://$ip:$port/$db"  val conn = DriverManager.getConnection(jdbcurl, username, pwd)  val statement = conn.prepareStatement("INSERT INTO wordcount (word,count) VALUES (?,?)")  try {  // 写入数据  iter.foreach { wc =>  statement.setString(1, wc._1.trim)  statement.setInt(2, wc._2.toInt)  statement.executeUpdate()  }  } catch {  case e:Exception => e.printStackTrace()  } finally {  if(statement != null){  statement.close()  }  if(conn!=null){  conn.close()  }  }  }  }

运行测试

准备工作:

  1. 提前在mysql中新建数据表保存Spark Streaming写入的数据
    在这里插入图片描述

  2. 启动nc -lk 9999
    在这里插入图片描述

  3. 启动 NetworkWordCountStatefultoMysql.scala
    ![[Pasted image 20230804214904.png]]在这里插入图片描述

  4. 在nc端口输入字符,再分别到idea控制台和MySQL检查结果

在这里插入图片描述


总结

本次实验通过IDEA基于Spark Streaming 3.4.1开发程序监控套接字流,并统计字符串,实现实时统计单词出现的数量。试验成功,相对简单。
后期改善点如下:

  • 通过配置文件读取mysql数据库相应的配置信息,不要写死在代码里
  • 写入数据时,sql语句【插入的表信息】,可以在调用方法时,当作参数输入
  • iter: Iterator[(String,Int)] 应用泛型
  • 插入表时,自动保存插入时间

欢迎各位开发者一同改进代码,有问题有疑问提出来交流。谢谢!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/22154.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【数字IC基础】低功耗设计

低功耗技术 功耗构成静态功耗(漏电功耗)动态功耗翻转功耗(Switch Power)短路功耗(Internal Power) 不同类型的标准单元的功耗 低功耗设计方法降低芯片工作电压多阈值工艺方法电源门控(Power Gating)多电压域(Multi-Voltage Domain)体偏置门控时钟一个简单…

微服务使用步骤

Maven的依赖冲突解决方案: 路径最短原则配置优先原则破坏规则则使用排除 SpringBoot场景启动器starter的开发流程 c3p0-spring-boot-starter自定义场景启动器test-c3p0调用自定义场景启动器SpringBoot自动装配SpringBoot应用启动原理nacos服务治理 安装 启动bin/s…

算法leetcode|66. 加一(rust重拳出击)

文章目录 66. 加一:样例 1:样例 2:样例 3:提示: 分析:题解:rust:go:c:python:java: 66. 加一: 给定一个由 整数 组成的 非…

IDEA SpringBoot项目引入外部jar并打包

1、首先,我们再pom.xml中导入依赖包时,打包可以正常进行。 但如果我们引入了第三方的外部jar包(这里需要先把jar包添加到该项目依赖库中,这里不做演示),如图 2、导致打包时报错,程序包不存在或…

1005 继续(3n+1)猜想

描述 卡拉兹(Callatz)猜想已经在1001中给出了描述。在这个题目里,情况稍微有些复杂。 当我们验证卡拉兹猜想的时候,为了避免重复计算,可以记录下递推过程中遇到的每一个数。例如对 n3 进行验证的时候,我们需要计算 3、5、8、4、…

SpringBoot 配置文件

一、配置文件作用 整个项目中所有重要的数据都是在配置文件中配置的,比如: 数据库的连接信息(包含用户名和密码的设置); 项目的启动端口; 第三方系统的调用秘钥等信息; 用于发现和定位问题的…

MBG中update语句的区别

int updateByPrimaryKey(User record) thorws SQLException 按主键更新 int updateByPrimaryKeySelective(User record) thorws SQLException 按主键更新值不为null的字段 使用以上的方式更新数据时必须提供主键,MyBatis根据主键进行数据记录的更新。 int updateBy…

android app控制ros机器人四(调整界面布局)

半吊子改安卓,记录页面布局调整: 在ros-mobile基础上顶端增加一行,用于显示app名称和logo图像;修改标签页。 添加文字简单,但是替换图标长知识了,开始只是简单的把mipmap各个文件夹下的图片进行替换&…

MFC第二十六天 CRgn类简介与开发、封装CMemoryDC类并应用开发

文章目录 CRgn类简介与开发CRgn类简介CRgn类区域管理开发CRgn类区域管理与不规则形状的选取 封装CMemoryDC类并应用开发CMemoryDC.h封装CMemoryDC开发游戏透明动画CFlashDlg.hCFlashDlg.cpp 封装CMemoryDC开发游戏动画 附录四大窗口CDC派生类 CRgn类简介与开发 CRgn类简介 CR…

Qt 6. 其他类调用Ui中的控件

1. 把主类指针this传给其他类,tcpClientSocket new TcpClient(this); //ex2.cpp #include "ex2.h" #include "ui_ex2.h"Ex2::Ex2(QWidget *parent): QDialog(parent), ui(new Ui::Ex2) {ui->setupUi(this);tcpClientSocket new TcpClient…

next_permutation与prev_permutation(全排列函数)

1&#xff0c;next_permutation(a,an);&#xff08;找下一个&#xff09; a代表数组头地址&#xff0c;b代表数组尾地址&#xff0c;如果下一个排列存在则返回真&#xff0c;否者返回假 #include<iostream> #include<algorithm> using namespace std; int main()…

mysql重置和修改密码 Ubuntu系统

忘记密码要重置密码 cat /etc/mysql/debian.cnf/etc/mysql/debian.cnf这个只有Debian或者Ubuntu服务器才有&#xff0c;里面有mysql安装后自带的用户&#xff0c;作用就是重启及运行mysql服务。我们用这个用户登录来达到重置密码的操作 使用上面的那个文件中的用户名和密码登…

【腾讯云Cloud Studio实战训练营】React 快速构建点餐页面

前言&#xff1a; Cloud Studio是一个在线的云集成开发环境&#xff08;IDE&#xff09;&#xff0c;可以让开发人员在浏览器中轻松地开发、测试、调试和部署应用程序。它提供了基于云的计算资源和工具&#xff0c;例如代码编辑器、编译器、调试器、版本控制系统和项目管理工具…

210. 课程表 II Python

文章目录 一、题目描述示例 1示例 2示例 3 二、代码三、解题思路 一、题目描述 现在你总共有 numCourses 门课需要选&#xff0c;记为 0 到 numCourses - 1。给你一个数组 prerequisites &#xff0c;其中 prerequisites[i] [ai, bi] &#xff0c;表示在选修课程 ai 前 必须 …

IDEA超强XSD文件编辑插件-XSD / WSDL Visualizer

前言 XSD / WSDL Visualizer可以简化XML架构定义(XSD)和WSDL文件编辑过程; 通过使用与IntelliJ无缝集成的可视化编辑器&#xff0c;转换处理XSD和WSDL文件的方式。告别导航复杂和难以阅读的代码的挫败感&#xff0c;迎接流线型和直观的体验。 插件安装 在线安装 IntelliJ IDE…

第三方控价服务流程

控价是一项需要技术和经验双重协作才能完成好的工作&#xff0c;技术指的是监控价格&#xff0c;经验指的是治理乱价&#xff0c;二者缺一不可&#xff0c;很多品牌会选择自己去完成&#xff0c;自己开发系统&#xff0c;组建治理团队&#xff0c;成本相对更高&#xff0c;也有…

Qt/VS LNK2019/LNK2001:无法解析的外部符号

LNK2019 序言LNK2019&#xff1a;无法解析的外部符号"__declspecLNK2019&#xff1a;无法解析的外部符号" public private函数名1、有函数声明忘写定义2、有种可能性是处于不同目录下去调用那个对应的文件接口3、有种可能性&#xff0c;是因为跨类调用了inline函数4、…

爬楼梯 LeetCode热题100

题目 假设你正在爬楼梯。需要 n 阶你才能到达楼顶。 每次你可以爬 1 或 2 个台阶。你有多少种不同的方法可以爬到楼顶呢&#xff1f; 思路 最后一步有两种情况&#xff0c;从n-1跳到n&#xff0c;从n-2跳到n。 推出&#xff1a;f(n)f(n-1)f(n-1) 斐波那契数列。 代码 cla…

Go指针取址问题:循环后每次都拿到相同内容

例子&#xff1a; func main() {yourList : [...]int{1, 2, 3}yourMap1 : make(map[int]*int)yourMap2 : make(map[int]*int)for key, value : range yourList {// 修改前yourMap1[key] &value// 修改后tmp : valueyourMap2[key] &tmpfmt.Println(value, &value…

LabVIEW开发高压航空航天动力系统爬电距离的测试

LabVIEW开发高压航空航天动力系统爬电距离的测试 更多电动飞机MEA技术将发电&#xff0c;配电和用电集成到一个统一的系统中&#xff0c;提高了飞机的可靠性和可维护性。更多的电动飞机使用更多的电能来用电动替代品取代液压和气动系统。对车载电力的需求不断增加&#xff0c;…