streaming接mysql数据库_[Spark streaming举例]-- 实时统计并且存储到mysql数据库中

举例

package com.scala.my

import org.apache.spark.SparkConf

import org.apache.spark.streaming.Durations

import org.apache.spark.streaming.StreamingContext

/**

*

* @author root

* 测试步骤:

*    1\打开h15\h16\h17\h18,启动zookeeper,再启动hadoop集群:start-all.sh,再启动mysql

*    2\在h15上创建文件夹wordcount_checkpoint,用于docheckpoint

*       在h5上mysql的dg数据库中创建表t_word

*    3\启动eclipse的本程序,让他等待着

*    4\在h15的dos窗口下输入单词,以空格分隔的单词(需要在h15上开启端口9999:#nc -lk 9999)

*    5\查询h15上的mysql的dg数据库的t_word表是否有数据即可

*

* 注:建表语句

*     mysql> show create table wordcount;  //查看表语句

CREATE TABLE   t_word (

id  int(11) NOT NULL AUTO_INCREMENT,

updated_time  timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,

word varchar(255) DEFAULT NULL,

count  int(11) DEFAULT NULL,

PRIMARY KEY (id)

);

*/

*

* 测试结果:通过,注意-----》第74行没有取得数据,原因在最后没有触发事件(封装事件),目前已经解决

*

* sh spark-submit --master spark://de2:7077 --class 全类名 --driver-class-path /mysql-connector-java-5.1.26.jar  sparkstreaming.jar

sh spark-submit --class com.day6.scala.my.PresistMysqlWordCount --master yarn-cluster --driver-class-path /home/spark-1.5.1-bin-hadoop2.4/lib/mysql-connector-

java-5.1.31-bin.jar /home/spark-1.5.1-bin-hadoop2.4/sparkstreaming.jar

$bin/hadoop dfsadmin -safemode leave

也就是关闭Hadoop的安全模式,这样问题就解决了。

*/

object PresistMysqlWordCount {

def main(args: Array[String]): Unit = {

//获取streamingContext,并且设置每5秒切割一次rdd

//    val sc = new StreamingContext(new SparkConf().setAppName("mysqlPresist").setMaster("local[2]"), Durations.seconds(8))

val sc = new StreamingContext(new SparkConf().setAppName("mysqlPresist").setMaster("local[2]"), Durations.seconds(8))

//设置checkpoit缓存策略

/**

* 利用 checkpoint 来保留上一个窗口的状态,

* 这样可以做到移动窗口的更新统计

*/

sc.checkpoint("hdfs://hh15:8020/wordcount_checkpoint")

//    sc.checkpoint("hdfs://h15:8020/wordcount_checkpoint")

//获取doc窗口或者hdfs上的words

//    val lines=sc.textFileStream("hdfs://h15:8020/文件夹名称")  //实时监控hdfs文件夹下新增的数据

val lines = sc.socketTextStream("hh15", 9999)

//    val lines = sc.socketTextStream("h15", 9999)

//压扁

val words = lines.flatMap { x => x.split(" ") }

//map

val paris = words.map { (_, 1) }

//定义一个函数,用于保持状态

val addFunc = (currValues: Seq[Int], prevValueState: Option[Int]) => {

var newValue = prevValueState.getOrElse(0)

for (value wd.foreachPartition(

data => {

val conn = ConnectPool.getConn("root", "1714004716", "hh15", "dg")

//        val conn = ConnectPool.getConn("root", "1714004716", "h15", "dg")

//插入数据

//        conn.prepareStatement("insert into t_word2(word,num) values('tom',23)").executeUpdate()

try {

for (row

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/530906.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

mysql更新id最大_我们可以在单个MySQL查询中更新具有最高ID的行吗?

是的,我们可以做到。让我们首先创建一个表-mysql> create table DemoTable(ID int,GameScore int);使用插入命令在表中插入一些记录-mysql> insert into DemoTable values(15,848747);mysql> insert into DemoTable values(13,909049);mysql> insert in…

三张表有重复字段_什么?搞不定Kafka重复消费?

点戳蓝字“架构之美”关注我们哦!前言 今天我们聊一个话题,这个话题大家可能在面试过程中,或者是工作当中经常遇到 ?如何保证 Kafka 消息不重复消费?我们在做开发的时候为了程序的健壮性,在使用 Kafka 的时候一般都会…

如何利用扩展欧几里得算法求解不定方程_欧几里德算法、拓展欧几里德、中国剩余定理...

01.欧几里德算法(Euclidean algorithm)(辗转相除法)欧几里德算法又称辗转相除法,主要是用于计算两个整数a,b的最大公约数。简单点说一下算法原理:两个整数的最大公约数等于其中小的那个数跟大除以小余数的最…

mysql 先删后增 更新_MySQL 高级操作——新增数据、更新数据、删除数据、查询数据...

新增数据多数据插入只要写一次insert指令,但是可以插入多条记录语法:insert into 表名 [(字段列表)] values (值列表1),(值列表2),(值列表3);主键冲突主键冲突,在有的表中,使用的是业务主键(字段有业务含义),但是往往在…

nltk和python的关系_NLTK学习笔记(一):语言处理和Python

目录nltk资料下载import nltknltk.download()其中,download() 参数默认是all,可以在脚本里面加上nltk.download(需要的资料库) 来进行下载文本和词汇首先,通过from nltk.book import * 引入需要的内置9本书搜索文本上下文:Text.concordance(monstrous) &…

python七段数码管倒计时_python实现七段数码管和倒计时效果

8是典型的七段数码管的例子,因为刚好七段都有经过,这里我写的代码是从1开始右转。这是看Mooc视频写的一个关于用七段数码管显示当前时间# -*-coding:utf-8 -*-import turtle as timport timedef drawGap():t.penup()t.fd(5)def drawLine(draw):drawGap()…

rda分析怎么做_数量生态学笔记||冗余分析(RDA)

上一节数量生态学笔记||冗余分析(RDA)概述中,我们回顾了RDA的计算过程,不管这个过程我们有没有理解透彻,我希望你能知道的是:RDA是响应变量矩阵与解释变量之间多元多重线性回归的拟合值矩阵的PCA分析。本节我们就是具体来看一个RD…

mysql 服务器管理员_mysql 查看数据库管理员

mysql 查看数据库管理员云服务器(Elastic Compute Service,简称ECS)是阿里云提供的性能卓越、稳定可靠、弹性扩展的IaaS(Infrastructure as a Service)级别云计算服务。云服务器ECS免去了您采购IT硬件的前期准备,让您像使用水、电、天然气等公共资源一样…

python中有哪些重要的书写规则_一文读懂Python代码的书写规范

Python代码的书写规范1. 一致性的建议打破一条既定规则的两个好理由当应用这个规则将导致代码可读性下降,即使对于某人来说他已经习惯于按照这条规则来阅读代码了为了和周围的代码保持一致而打破规则(也许是历史原因)2. 代码的布局缩进4个空格代码行行最大长度 : 79字符推荐长度…

二进制文件mysql创表_MySQL_MYSQL中如何存取二进制文件,首先创建测试表testtable CREATE TA - phpStudy...

MYSQL中如何存取二进制文件首先创建测试表testtableCREATE TABLE testtable ( id INT(5) NOT NULL AUTO_INCREMENT PRIMARY KEY,filename CHAR(255),data LONGBLOB );将文件存入表中mysql_connect( "localhost", "root", "password"); //连接数据…

树莓派 php mysql 中文_使用树莓派(raspberry pi)搭建网站(nginx+php+mysql+ddclient)

标签: 树莓派 raspberrypi php 网站 mysql分类: Linux技术最近在研究学习PHP,有时候想随时就学习,所以就决定搭建一个网站,随时可以进行学习,因为要24小时在线,要低功耗和安静,所以选…

mysql从库应用负载_线上MySQL数据库高负载的解决思路--再次论程序应用索引的重要性...

前言:过去的笔记整理而得,未免丢失,发布个人博客。[2012年的资料笔记]场景:数据库的负载飙升,CPU高达99%。查看进程。通过猜测推理,定位了一些select语句363478427 | apps_read | 192.168.1.113:48945 …

python获取方法的装饰方法_python中的方法和装饰器

[TOC]装饰器python中的装饰器(decorator)是在pep 318中被首次引入,它的本质是一个函数这个函数是接受其它参数为参数,并且用一个新的,修改后的函数作为替换,最常见的装饰器就classmethod和staticmethoddef happy(f):return lambda…

一帮一python_[python]L1-030 一帮一 (15分)

L1-030 一帮一 (15分)“一帮一学习小组”是中小学中常见的学习组织方式,老师把学习成绩靠前的学生跟学习成绩靠后的学生排在一组。本题就请你编写程序帮助老师自动完成这个分配工作,即在得到全班学生的排名后,在当前尚未分组的学生中&#xf…

java书面_Java程序猿的书面采访String3

public class SameString {//思想二:每个字符都相应着自己的ASC码,第一个思想的算法复杂度为O(nlogn)。一般能够利用空间来减少时间复杂度//能够开辟一个大小为256的数组空间,而且将256个数组元素都置为0,然后遍历第一个字符串把字…

java fangfa_daicanfangfa java中的方法 刚入门的分不清带参方法的作用和用处 这个可以详细的讲解如何使用带参方法 - 下载 - 搜珍网...

第14章 带参数的方法/01 教学演示示例/示例1:带一个参数的方法/StudentsBiz.java第14章 带参数的方法/01 教学演示示例/示例1:带一个参数的方法/TestAdd.java第14章 带参数的方法/01 教学演示示例/示例2:带多个参数的方法/StudentsBiz.java第…

java sqlite 工具类_Java 工具类 - JDBC通用操作基类 BaseDao

封装了增删改查功能适用于MySQL、Oracle、SQLServer、DB2、Sybase、JTDS、PostgreSql、SQLite、Derby、H2、HSQLDB、ODBC 等等数据库,有需要的还可以自己增加。package com.tgb.hz.jdbc;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import javax.namin…

java 跨域 下载文件_文件下载重命名(可跨域)

一、正常情况下,我们都如此下载文件并修改文件名,在a标签上面添加download属性var link document.createElement(a);link.href file.url;link.download file.name;link.target"_blank";link.click();由于a.download跨域会失效,上…

java hibernate 插入数据_[Java教程]hibernate 返回新插入数据的Id

[Java教程]hibernate 返回新插入数据的Id0 2015-08-28 10:00:11例如 表明 studentInfoString sql"set set nocount on studentInfo(列名,列名) values(值,值);select identity as inserId";java代码:public int executeCount(String sql, Map paramMap) {…

java输入行数打印菱形_JAVA题,输入行数,输入列数,输出一个菱形

展开全部1,冒泡排序1. /**2. * JAVA排序算法实现代码-冒泡(Bubble Sort)排序。3. *4. *5. *6. */7. public class Test {8. public static void main(String[] args) {9. int[] a ;10.11. System.out.print("排序前: ");12.13. for (int i 0; i < a.length; i)1…