基于持久化的wordcount程序 foreachRDD

基于持久化的wordCount程序!中途遇到了一个坑!
自己手动封装一个静态线程池,使用RDD的foreachPartition操作,并且在该操作内部,从静态连接池中,通过静态方法,获取一个连接,使用之后再换回来,这样的话,可以在对个RDD的partition之间,也可以复用连接了,而且可以让连接池采取懒创建的策略,并且空闲一段时间后,将其释放掉。
代码:
package com.bynear.spark_Streaming;

import com.bynear.tool.ConnectionPool;
import com.google.common.base.Optional;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.*;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import scala.Tuple2;import java.sql.Connection;
import java.sql.Statement;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;/* 2018/5/16* 11:30* 基于持久化的wordcount程序*/
public class PersisWordCount {public static void main(String[] args) {final SparkConf conf = new SparkConf().setAppName("persiswordcount").setMaster("local[2]");JavaSparkContext jsc = new JavaSparkContext(conf);JavaStreamingContext jssc = new JavaStreamingContext(jsc, Durations.seconds(5));jssc.checkpoint("hdfs://Spark01:9000/zjs/chepoint");JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 9999);JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {@Overridepublic Iterable<String> call(String line) throws Exception {return Arrays.asList(line.split(" "));}});JavaPairDStream<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {@Overridepublic Tuple2<String, Integer> call(String word) throws Exception {return new Tuple2<String, Integer>(word, 1);}});final JavaPairDStream<String, Integer> wordcount = pairs.updateStateByKey(new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() {@Overridepublic Optional<Integer> call(List<Integer> values, Optional<Integer> state) throws Exception {Integer newValue = 0;if (state.isPresent()) {newValue = state.get();}for (Integer value : values) {newValue += value;}return Optional.of(newValue);}});wordcount.foreachRDD(new Function<JavaPairRDD<String, Integer>, Void>() {@Overridepublic Void call(JavaPairRDD<String, Integer> wordCountsRDD) throws Exception {wordCountsRDD.foreachPartition(new VoidFunction<Iterator<Tuple2<String, Integer>>>() {@Overridepublic void call(Iterator<Tuple2<String, Integer>> wordcounts) throws Exception {Connection conn = ConnectionPool.getConection();Tuple2<String, Integer> wordcount = null;while (wordcounts.hasNext()) {wordcount = wordcounts.next();String sql = "insert into word (word,count) values ('" + wordcount._1 + "'," + wordcount._2 + ")";System.out.println(sql+conn+"YES");Statement stmt = conn.createStatement();stmt.executeUpdate(sql);}ConnectionPool.returnConnection(conn);}});return null;}});jssc.start();jssc.awaitTermination();jssc.stop();}
}

手动搭建的线程池

package com.bynear.tool;
import java.sql.Connection;
import java.sql.DriverManager;
import java.util.LinkedList;
/*** 2018/5/16* 12:24*/
public class ConnectionPool {//    静态的Connection队列public static LinkedList<Connection> connectionQueue;//      加载驱动static {try {Class.forName("com.mysql.jdbc.Driver");} catch (ClassNotFoundException e) {e.printStackTrace();}}//    获取连接,多线程访问并发控制public synchronized static Connection getConection() {connectionQueue = new LinkedList<Connection>();try {if (connectionQueue.isEmpty()) {for (int i = 0; i < 2; i++) {Connection conn = DriverManager.getConnection("jdbc:mysql://192.168.2.10:3306/testdb","root", "123456");connectionQueue.push(conn);}}} catch (Exception e) {e.printStackTrace();}return connectionQueue.poll();}public static void returnConnection(Connection conn) {connectionQueue.push(conn);}
}

最开始自己搭建的线程池中,用的方法为
if (connectionQueue==null) {
for (int i = 0; i < 2; i++) {
Connection conn = DriverManager.getConnection(“jdbc:mysql://192.168.2.10:3306/testdb”,
“root”, “123456”);
connectionQueue.push(conn);
}
}
将代码提交到集群上时,一直抱空指指针。
后来 System.out.println(sql+conn+”YES”);输出一下conn
conn = ConnectionPool.getConection();
insert into wordcount (word,count) values (‘heool,word’,1)nullYES 为null

跑成功代码:
if (connectionQueue.isEmpty()) {
for (int i = 0; i < 2; i++) {
Connection conn = DriverManager.getConnection(“jdbc:mysql://192.168.2.10:3306/testdb”,
“root”, “123456”);
connectionQueue.push(conn);
}
}
输出结果:在SQL中查询:
mysql> select * from word;
+—-+———————+————+——-+
| id | updated_time | word | count |
+—-+———————+————+——-+
| 1 | 2018-05-16 01:11:10 | ???,?? | 1 |
| 2 | 2018-05-16 01:11:15 | ???,?? | 1 |
| 3 | 2018-05-16 01:13:00 | hello,word | 1 |
| 4 | 2018-05-16 01:16:00 | hello | 1 |
| 5 | 2018-05-16 01:16:00 | word | 1 |
| 6 | 2018-05-16 01:16:05 | hello | 1 |
| 7 | 2018-05-16 01:16:05 | word | 1 |
+—-+———————+————+——-+
7 rows in set (0.00 sec)
完美成功!!!!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/325988.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

HTML5知识点总结

 HTML5基础&#xff1a; 一、HTML&#xff1a; Hyper Text MarkUp Language(超文本标记语言)。 二、W3C&#xff1a; 1.world wide web consortium(万维网联盟)&#xff0c;成立于1994年&#xff0c;WEB技术领域最权威和最具影响力的国际中立性技术标准机构。 2.w3c标准包括&…

漫画:什么是鸡尾酒排序

转载自 漫画&#xff1a;什么是鸡尾酒排序 那么&#xff0c;鸡尾酒排序又是何方神圣呢&#xff1f;我们这一期将会详细讲述。 让我们首先来回顾一下冒泡排序的思想&#xff1a; 冒泡排序的每一个元素都可以像小气泡一样&#xff0c;根据自身大小&#xff0c;一点一点向着数…

2018蓝桥杯省赛---java---A---10(付账问题)

题目描述 思路分析 用贪心算法&#xff0c;要使标准差最小&#xff0c;则需要将每个人需付的钱接近于平均值。如果有人的钱低于当前平均值a1&#xff0c;则需要将这人的钱全部支付&#xff0c;此人不够的钱需让其他人付&#xff0c;然后可以计算剩余人所需付钱的平均值a2&…

java基础的知识点(一)

 初始Java&#xff1a; 一、程序&#xff1a; 1.通常指完成某些事物的一种既定方式和过程。 2.计算机程序&#xff1a;为了让计算机执行某些操作或解决某个问题而编写的一系列有序指定的集合。 二、java可以做什么&#xff1a; 1.开发桌面应用程序 2.开发网页应用程序 三、技…

想要学习设计模式,你得先会看类图,一张图读懂UML

虚线箭头指向依赖&#xff1b; 实线箭头指向关联&#xff1b; 虚线三角指向接口&#xff1b; 实线三角指向父类&#xff1b; 空心菱形能分离而独立存在&#xff0c;是聚合&#xff1b; 实心菱形精密关联不可分&#xff0c;是组合&#xff1b;

数据库的这些性能优化,你做了吗

转载自 数据库的这些性能优化&#xff0c;你做了吗 在互联网项目中&#xff0c;当业务规模越来越大&#xff0c;数据也越来越多&#xff0c;随之而来的就是数据库压力会越来越大。 我们可能会采取各种方式去优化&#xff0c;比如之前文章提到的缓存方案&#xff0c;SQL优化…

java知识点总结(二)

 复杂选择结构: 一、Switch的结构语法&#xff1a; switch&#xff08;变量&#xff09;{ case 1: //代码 break; case 2: //代码 break; default : //代码 break; } 2.switch&#xff1a;相当于一个开关&#xff0c;后面的括号里面可以是int short byte char 枚举类型 Strin…

flume的配置和开启

配置文件&#xff1a; 编辑 flume-conf.properties agent1表示代理名称 agent1.sourcessource1 agent1.sinkssink1 agent1.channelschannel1 配置source1 agent1.sources.source1.typespooldir agent1.sources.source1.spoolDir/usr/local/flume_logs agent1.sources…

青客宝团队Consul内部分享ppt

Consul 是一个支持多数据中心&#xff0c;分布式&#xff0c;高可用的服务发现和配置共享系统。由 HashiCorp 公司使用 Go 语言开发&#xff0c;基于Raft协议。部署起来非常容易&#xff0c;只需要极少的可执行程序和配置文件&#xff0c;具有绿色、轻量级的特点。 Consul是支持…

2018蓝桥杯省赛---java---C---9(小朋友崇拜圈)

题目描述 思路分析 数组来储存小朋友们的崇拜对象&#xff0c;然后下标1就是对应的小朋友座号&#xff0c;写一个方法找出每一个小朋友的崇拜圈大小&#xff0c;然后找出最大的崇拜圈即可 代码实现 package lanqiao;import java.util.*;public class Main {public static vo…

漫画:什么是时间复杂度

转载自 漫画&#xff1a;什么是时间复杂度 时间复杂度的意义 究竟什么是时间复杂度呢&#xff1f;让我们来想象一个场景&#xff1a; 某一天&#xff0c;小灰和大黄同时加入了一个公司...... 一天过后&#xff0c;小灰和大黄各自交付了代码&#xff0c;两端代码实现的功能…

Office基础和计算机操作基础的知识点(一)

 运指如飞&#xff1a; 一、鼠标的基本操作 1.单击 2.双击 3.移动 4.拖拽 二、键盘的分布 1.功能键区 2.打字键盘区 3.数字键盘区 4.指示灯 5.编辑键区 三、输入法 1.切换输入法&#xff1a;ctrlshift 2.选中的方法&#xff1a; 1)鼠标选中 2)按shift键移动左右键 四、记事…

Flume整合SparkStreaming出现UnsupportedClassVersionError: org/apache/spark/streaming/flume/sink/SparkFlum

Flume整合SparkStreaming使用Poll方式拉取数据出现 UnsupportedClassVersionError: org/apache/spark/streaming/flume/sink/SparkFlumeProtocol : Unsupported major.minor version 52.0 问题&#xff1a; INFO node.Application: Starting Source source1 18/06/12 03:34…

Visual Studio 和 Team Foundation Server 产品维护及周期策略

1 适用于 Visual Studio 和 Team Foundation Server 2012 - 2017 这些产品遵循 Microsoft 10 年支持生命周期策略&#xff08;5 年主流支持和 5 年扩展支持&#xff09;&#xff0c;起始日期为主要产品版本向全球 (RTW) 发布的日期。 例如&#xff0c;Visual Studio 2017 于 20…

28. 实现 strStr()---LeetCode---JAVA(今天又是一行超人)

class Solution {public int strStr(String haystack, String needle) {return haystack.indexOf(needle);} }

DevOps通用及版本控制面试题

转载自 DevOps通用及版本控制面试题 通用DevOps面试问题 此类别将包含与任何特定DevOps阶段无关的问题。这里的问题旨在测试您对DevOps的理解&#xff0c;而不是关注特定工具或阶段。 问题一&#xff1a; DevOps和Agile之间的根本区别是什么&#xff1f; 两者之间的差异…

Office基础知识点总结(二)

 PowerPoint幻灯片制作&#xff1a; 一、PPT2010的界面&#xff1a; 1.功能选项卡 2.大纲区 3.标题栏 4.备注页 5.状态栏 6.工作区 二、创建PPT演示文稿 1.开始–》PPT2010 2.在PPT里面新建–》文件–》新建–》样本模板–》选择模板。 三、PPT2010中的视图&#xff1a; 1.普…

zookeeper出现Error contacting service. It is probably not running.

在两个节点 的zookeeper搭建启动的时候出现如下报错 JMX enabled by default Using config: /root/app/zookeeper/bin/../conf/zoo.cfg Error contacting service. It is probably not running. 查看zookeeper.out 日志报错如下 2018-06-13 03:13:18,573 [myid:0] - INFO …

微信和支付宝支付模式详解及实现(.Net标准库)

支付基本上是很多产品都必须的一个模块&#xff0c;大家最熟悉的应该就是微信和支付宝支付了&#xff0c;不过更多的可能还是停留在直接sdk的调用上&#xff0c;甚至和业务系统高度耦合&#xff0c;网上也存在各种解决方案&#xff0c;但大多形式各异&#xff0c;东拼西凑而成。…

DFS

//深度优先算法//i第一次就是0public void dfs(boolean[]isVisited,int i){//首先我们访问该节点System.out.println(getValueByIndex(i));isVisited[i]true;int wgetFirstNeighbor(i);while (w!-1){if(!isVisited[w]){dfs(isVisited,w);}//如果w节点已经被访问过wgetNextNeigh…