inputdstream mysql_【sparkStreaming】将DStream保存在MySQL

package SparkDemo

import java.sql.{Connection, DriverManager, PreparedStatement}

import org.apache.spark.SparkConf

import org.apache.spark.streaming.{Seconds, StreamingContext}

object DStreamToMySQL {

//定义更新函数

def updateFunc(newValues : Seq[Int],state :Option[Int]):Option[Int] = {

val currentCount = newValues.foldLeft(0)(_+_)

val previousCount = state.getOrElse(0)

Some(currentCount+previousCount)

}

def main(args : Array[String]): Unit ={

//建立SparkStream

val conf = new SparkConf().setAppName("DStreamToMySQL")

val ssc = new StreamingContext(conf,Seconds(1))

//设置日志等级

StreamingLoggingExample.setStreamingLogLevels()

val lines = ssc.textFileStream("/tmp/yuhang.zhang/data")

val words = lines.flatMap(_.split(" "))

val pairWord = words.map((_,1))

//累计更新

val stateWordCount = pairWord.updateStateByKey[Int](updateFunc)

//将stateWordCount存入数据库

//stateWordCount中包含一堆的Rdd

//我们需要对每个Rdd中的每条数据进行处理储存

stateWordCount.foreachRDD(rdd => {

//每个rdd中包含的数据类型为(String,Int)

//我们把所有数据records定义为Iterator类型,方便我们遍历

def func(records:Iterator[(String,Int)]): Unit ={

//注意,conn和stmt定义为var不能是val

var conn: Connection = null

var stmt : PreparedStatement = null

try{

//连接数据库

val url = "jdbc:mysql://localhost:3306/spark" //地址+数据库

val user = "root"

val password = ""

conn = DriverManager.getConnection(url,user,password)

//

records.foreach(p =>{

//wordcount为表名,word和count为要插入数据的属性

//插入数据

val sql = "insert into wordcount(word,count) values(?,?)"

stmt = conn.prepareStatement(sql)

stmt.setString(1,p._1.trim)

stmt.setInt(2,p._2.toInt)

stmt.executeUpdate()

})

}catch {

case e : Exception => e.printStackTrace()

}finally {

if(stmt != null)

stmt.close()

if(conn != null)

conn.close()

}

}

val repairtitionedRDD = rdd.repartition(3)//将每个rdd重新分区

repairtitionedRDD.foreachPartition(func)//对重新分区后的rdd执行func函数

})

ssc.start()//启动

ssc.awaitTermination()//等待终止命令

}

}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/340912.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

无线设置 用户_无线WiFi远距离无线蹭网怎么中继桥接组网【详细介绍】

自从小编每天分享各种各样的无线无线WiFi蹭网、桥接、远距离组网等实例,吸引了很多朋友的兴趣,有很多网友通过关心我后,给我私信,求助我一些相关的问题,还有一些朋友建议我出一些教程。首先小编觉得有些东西只能意会不…

Hibernate和UUID标识符

介绍 在我以前的文章中,我讨论了UUID代理密钥以及用例 , 这些用例比更常见的自动递增标识符更合适。 UUID数据库类型 有几种表示128位UUID的方法,每当有疑问时,我都希望向Stack Exchange寻求专家建议。 由于通常对表标识符进行索…

应用宝苹果版_点赞应用ios版下载-点赞应用苹果版下载v1.1

《点赞应用》app是一款实用的视频生成器工具,用户可以利用它为自己的视频添加各种各样的点赞效果。应用内含有多种类型的模板,都是免费使用的,想要将你的视频变得更有趣吗?快来下载体验一下这款应用吧!软件特色1、这个…

mysql+默认值+default_十六、MySQL 中数据类型的默认值 - default 约束-搜云库

MySQL 中,所有的数据类型,都可以显式或隐式的拥有默认值。我们可以使用 DEFAULT 约束显式的为列指定一个默认值。比如CREATE TABLE t1 (i INT DEFAULT -1,c VARCHAR(10) DEFAULT ,price DOUBLE(16,2) DEFAULT 0.00);在上面这条语句中,我们为 …

SQL即服务

自2007年以来,我一直在考虑这一点,大约在Amazon 推出 S3时。 我什至尝试实现了几次,但是在设计阶段之后就失败了。 我听说过一家初创公司,也曾尝试这样做,但也失败了 。 我仍然不确定是否可以这样做,但是它…

c++ vector 一部分_《JACS》:在富电子C-H键位点上实现光控活性聚合

可逆加成-断裂链转移(RAFT)自由基活性聚合是一种调控聚合物结构组成、分子量和分布的重要聚合方法,其中,光诱导电子/能量转移(PET)的RAFT聚合反应是一种更精确的调控手段,因而经常被用于设计具有复杂3D分子结构的聚合物。然而常规的PET-RAFT法…

phpmyadmin忘记mysql密码_忘记phpmyadmin密码怎么重置

忘记phpmyadmin密码怎么重置,新密码,教程,相关文章,重新启动,跳过忘记phpmyadmin密码怎么重置易采站长站,站长之家为您整理了忘记phpmyadmin密码怎么重置的相关内容。1、停止mysql服务:/etc/init.d/mysql stop2、跳过验证启动MySQL/usr/local/mysql/bin/…

java中避免空指针_在Java中避免空检查

java中避免空指针对于Java开发人员(从初级到专家)最糟糕的噩梦之一是空对象引用检查。 我很确定您已经看过几次这样的代码: public void addAddressToCustomer(Customer customer, Address newAddress){if ( cutomer null || newAddress n…

纵横免root框架打不开应用怎么办_很好用的软件多开神奇安卓欧皇十框架!!!...

欧皇十框架这是一款兼容安卓10的应用框架,轻松实现应用多开,可以完美免ROOT运行GG修改器,专为和平精英游戏设计,软件体积小,运行稳定。修改说明:1.支持更多应用游戏的多开、双开,使用更稳定、快…

用java和mysql开发网站怎么实现_如何用java开发一个网站?

java语言和类库:java语言是支持整个java技术的底层基础,java类库是随java语言Java 运行系统:主要指java虚拟机,负责将java与平台无关的中间代码翻译成本机的Java applet :Java applet 是用java语言编写的小应用程序,通…

Elasticsearch SQL

Elasticsearch引擎 Elasticsearch是当今许多生产部署中使用最广泛的搜索引擎之一。 它基于Lucene搜索库,它提供的主要功能之一是在Lucene之上的基于JSON的查询DSL,它提供了一种易于使用的机制来与搜索引擎进行交互。 但是,查询DSL非常特定于E…

电脑无internet访问_电脑中的代理服务器怎么设置 代理服务器设置方法 - 操作系统...

如何设置电脑中的代理服务器?对于代理服务器,可能大家对其并不是非常了解,其实代理服务器作为一种特殊的网络服务,可以代理网络用户去获取网络信息,提高浏览速度与效率,而且还可以突破自身IP的访问限制,访…

mysql先进后出_栈、队列中“先进先出”,“后进先出”的含义

展开全部先进先出(62616964757a686964616fe58685e5aeb931333433653339FIFO,first-in,first-out)为处理从队列或堆栈发出的程序工作要求的一种方法,它使最早的要求被最先处理。后进先出,从栈中取出数据项的顺序与将它们插入栈的顺序…

平台框架_从框架到平台

平台框架当我在十年前以Java开发人员的身份开始职业生涯时,该行业正经历着革命性的变化。 2003年发布的Spring框架Swift流行,并成为庞大的J2EE平台的严重挑战者。 经过过渡时间后,我很快发现自己赞成使用Spring框架而不是J2EE平台&#xff0c…

敲代码时如何快速移动光标_如何用 Linux 技巧大大提高工作效率?

前言Linux中的一些小技巧可以大大提高你的工作效率,本文就细数那些提高效率或者简单却有效的Linux技巧。命令编辑及光标移动这里有很多快捷键可以帮我们修正自己的命令。接下来使用光标二字代替光标的位置。删除从开头到光标处的命令文本ctrl u,例如&am…

Java 13:文本块

Java 13已交付了期待已久的多行字符串或Text Blocks 。 您不再需要连接跨越多行的字符串或转义特殊字符,这确实提高了代码的可读性。 文本块是一种预览语言功能 ,这意味着必须使用--enable-preview标志在Java编译器和运行时中明确启用它们。 这是一个文…

java 异常练习题_java入门异常处理练习题问题

tppe大概方式:1、判断用户输入的类型是否正确,不正确捕获异常,把他包装成我自己定义的异常2、判断用户输入的数是多少2.1、如果是1,则打印“输入图书名称”,用户输入,定义一个Book类型的数组,然…

windows副本不是正版怎么办_盗版系统总是崩溃?别着急,让我来告诉你正版系统怎么下载...

电脑系统崩溃了怎么办?相信很多小伙伴都会选择重装系统,奈何自己又不会,只能搬到修电脑的地方,最后发现安装的还是盗版系统,不能登录微软账号不说,还会被捆绑安装一堆流氓软件,那么,…

java线程有几种状态_java线程的几种状态

java线程的几种状态导语:线程,有时被称为轻量级进程(Lightweight Process,LWP),是程序执行流的最小单元。下面是Java线程的介绍,欢迎参考!新建:new一个Thread对象或者其子类对象就是创建一个线程&#xff0…

sudo spctl --master-disable_量大从优批发--阳离子聚丙烯酰胺--用于生活污水、

量大从优批发--阳离子聚丙烯酰胺--用于生活污水、wkkk量大从优批发--阳离子聚丙烯酰胺--用于生活污水、怎么来辨别聚丙烯酰胺到底是什么型号的呢?下面来介绍型号辨别的消防法。聚丙烯酰胺我们都知道聚丙烯酰基是昂贵的阳离子,其次是非离子聚丙烯酰胺&…