基于持久化的wordCount程序!中途遇到了一个坑!
自己手动封装一个静态线程池,使用RDD的foreachPartition操作,并且在该操作内部,从静态连接池中,通过静态方法,获取一个连接,使用之后再换回来,这样的话,可以在对个RDD的partition之间,也可以复用连接了,而且可以让连接池采取懒创建的策略,并且空闲一段时间后,将其释放掉。
代码:
package com.bynear.spark_Streaming;
import com.bynear.tool.ConnectionPool;
import com.google.common.base.Optional;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.*;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import scala.Tuple2;import java.sql.Connection;
import java.sql.Statement;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;/* 2018/5/16* 11:30* 基于持久化的wordcount程序*/
public class PersisWordCount {public static void main(String[] args) {final SparkConf conf = new SparkConf().setAppName("persiswordcount").setMaster("local[2]");JavaSparkContext jsc = new JavaSparkContext(conf);JavaStreamingContext jssc = new JavaStreamingContext(jsc, Durations.seconds(5));jssc.checkpoint("hdfs://Spark01:9000/zjs/chepoint");JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 9999);JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {@Overridepublic Iterable<String> call(String line) throws Exception {return Arrays.asList(line.split(" "));}});JavaPairDStream<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {@Overridepublic Tuple2<String, Integer> call(String word) throws Exception {return new Tuple2<String, Integer>(word, 1);}});final JavaPairDStream<String, Integer> wordcount = pairs.updateStateByKey(new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() {@Overridepublic Optional<Integer> call(List<Integer> values, Optional<Integer> state) throws Exception {Integer newValue = 0;if (state.isPresent()) {newValue = state.get();}for (Integer value : values) {newValue += value;}return Optional.of(newValue);}});wordcount.foreachRDD(new Function<JavaPairRDD<String, Integer>, Void>() {@Overridepublic Void call(JavaPairRDD<String, Integer> wordCountsRDD) throws Exception {wordCountsRDD.foreachPartition(new VoidFunction<Iterator<Tuple2<String, Integer>>>() {@Overridepublic void call(Iterator<Tuple2<String, Integer>> wordcounts) throws Exception {Connection conn = ConnectionPool.getConection();Tuple2<String, Integer> wordcount = null;while (wordcounts.hasNext()) {wordcount = wordcounts.next();String sql = "insert into word (word,count) values ('" + wordcount._1 + "'," + wordcount._2 + ")";System.out.println(sql+conn+"YES");Statement stmt = conn.createStatement();stmt.executeUpdate(sql);}ConnectionPool.returnConnection(conn);}});return null;}});jssc.start();jssc.awaitTermination();jssc.stop();}
}
手动搭建的线程池
package com.bynear.tool;
import java.sql.Connection;
import java.sql.DriverManager;
import java.util.LinkedList;
/*** 2018/5/16* 12:24*/
public class ConnectionPool {// 静态的Connection队列public static LinkedList<Connection> connectionQueue;// 加载驱动static {try {Class.forName("com.mysql.jdbc.Driver");} catch (ClassNotFoundException e) {e.printStackTrace();}}// 获取连接,多线程访问并发控制public synchronized static Connection getConection() {connectionQueue = new LinkedList<Connection>();try {if (connectionQueue.isEmpty()) {for (int i = 0; i < 2; i++) {Connection conn = DriverManager.getConnection("jdbc:mysql://192.168.2.10:3306/testdb","root", "123456");connectionQueue.push(conn);}}} catch (Exception e) {e.printStackTrace();}return connectionQueue.poll();}public static void returnConnection(Connection conn) {connectionQueue.push(conn);}
}
最开始自己搭建的线程池中,用的方法为
if (connectionQueue==null) {
for (int i = 0; i < 2; i++) {
Connection conn = DriverManager.getConnection(“jdbc:mysql://192.168.2.10:3306/testdb”,
“root”, “123456”);
connectionQueue.push(conn);
}
}
将代码提交到集群上时,一直抱空指指针。
后来 System.out.println(sql+conn+”YES”);输出一下conn
conn = ConnectionPool.getConection();
insert into wordcount (word,count) values (‘heool,word’,1)nullYES 为null
跑成功代码:
if (connectionQueue.isEmpty()) {
for (int i = 0; i < 2; i++) {
Connection conn = DriverManager.getConnection(“jdbc:mysql://192.168.2.10:3306/testdb”,
“root”, “123456”);
connectionQueue.push(conn);
}
}
输出结果:在SQL中查询:
mysql> select * from word;
+—-+———————+————+——-+
| id | updated_time | word | count |
+—-+———————+————+——-+
| 1 | 2018-05-16 01:11:10 | ???,?? | 1 |
| 2 | 2018-05-16 01:11:15 | ???,?? | 1 |
| 3 | 2018-05-16 01:13:00 | hello,word | 1 |
| 4 | 2018-05-16 01:16:00 | hello | 1 |
| 5 | 2018-05-16 01:16:00 | word | 1 |
| 6 | 2018-05-16 01:16:05 | hello | 1 |
| 7 | 2018-05-16 01:16:05 | word | 1 |
+—-+———————+————+——-+
7 rows in set (0.00 sec)
完美成功!!!!