spark整合MySQL

spark整合MySQL

    <dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.38</version></dependency>

import java.sql.{Connection, DriverManager, PreparedStatement}

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Data2MysqlForeach {
def main(args: Array[String]): Unit = {
//1、构建SparkConf
val sparkConf: SparkConf = new SparkConf().setAppName(“Data2MysqlForeach”).setMaster(“local[2]”)

//2、构建SparkContext
val sc = new SparkContext(sparkConf)
sc.setLogLevel("warn")//3、读取数据文件
val data: RDD[String] = sc.textFile("E:\\data\\person.txt")//4、切分每一行    // id  name  age
val personRDD: RDD[(String, String, Int)] = data.map(x => x.split(",")).map(x => (x(0), x(1), x(2).toInt))//5、把数据保存到mysql表中personRDD.foreach(line =>{//每条数据与mysql建立连接//把数据插入到mysql表操作//1、获取连接val connection: Connection = DriverManager.getConnection("jdbc:mysql://node1:3306/spark","root","123456")//2、定义插入数据的sql语句val sql="insert into person(id,name,age) values(?,?,?)"//3、获取PreParedStatementtry {val ps: PreparedStatement = connection.prepareStatement(sql)//4、获取数据,给?号 赋值ps.setString(1, line._1)ps.setString(2, line._2)ps.setInt(3, line._3)ps.execute()} catch {case e:Exception => e.printStackTrace()} finally {if(connection !=null){connection.close()}}})}

使用 foreachPartition 算子
import java.sql.{Connection, DriverManager, PreparedStatement}

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Data2MysqlForeachPartitions {
def main(args: Array[String]): Unit = {
//1、构建SparkConf
val sparkConf: SparkConf = new SparkConf().setAppName(“Data2MysqlForeachPartitions”).setMaster(“local[2]”)

//2、构建SparkContext
val sc = new SparkContext(sparkConf)
sc.setLogLevel("warn")//3、读取数据文件
val data: RDD[String] = sc.textFile("E:\\data\\person.txt")//4、切分每一行    // id  name  age
val personRDD: RDD[(String, String, Int)] = data.map(x => x.split(",")).map(x => (x(0), x(1), x(2).toInt))//5、把数据保存到mysql表中
//使用foreachPartition每个分区建立一次链接,减少与mysql链接次数
personRDD.foreachPartition( iter =>{//把数据插入到mysql表操作//1、获取连接val connection: Connection = DriverManager.getConnection("jdbc:mysql://node1:3306/spark","root","123456")//2、定义插入数据的sql语句val sql="insert into person(id,name,age) values(?,?,?)"//3、获取PreParedStatementtry {val ps: PreparedStatement = connection.prepareStatement(sql)//4、获取数据,给?号 赋值iter.foreach(line =>{ps.setString(1, line._1)ps.setString(2, line._2)ps.setInt(3, line._3)ps.execute()})} catch {case e:Exception => e.printStackTrace()} finally {if(connection !=null){connection.close()}}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/508785.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

DataFrame不同风格比较

DataFrame不同风格比较 一&#xff0c;DSL风格语法 //加载数据 val rdd1sc.textFile("/person.txt").map(x>x.split(" ")) //定义一个样例类 case class Person(id:String,name:String,age:Int) //把rdd与样例类进行关联 val personRDDrdd1.map(x>…

sparkSQL操作hiveSQL

sparkSQL操作hiveSQL <dependency><groupId>org.apache.spark</groupId><artifactId>spark-hive_2.11</artifactId><version>2.3.3</version></dependency>import org.apache.spark.sql.SparkSession//todo:利用sparksql操作h…

sparksql加载mysql表中的数据

sparksql加载mysql表中的数据 <dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.38</version> </dependency>import java.util.Propertiesimport org.apache.spark.SparkCon…

sparksql保存数据常见操作

sparksql保存数据操作 import org.apache.spark.SparkConf import org.apache.spark.sql.{DataFrame, SparkSession}//todo:sparksql可以把结果数据保存到不同的外部存储介质中 object SaveResult {def main(args: Array[String]): Unit {//1、创建SparkConf对象val sparkCon…

sparksql自定义函数

sparksql中自定义函数 import org.apache.spark.sql.api.java.UDF1 import org.apache.spark.sql.types.StringType import org.apache.spark.sql.{DataFrame, SparkSession}//TODO:自定义sparksql的UDF函数 一对一的关系 object SparkSQLFunction {def main(args: Array[S…

sparksql整合hive

sparksql整合hive 步骤 1、需要把hive安装目录下的配置文件hive-site.xml拷贝到每一个spark安装目录下对应的conf文件夹中2、需要一个连接mysql驱动的jar包拷贝到spark安装目录下对应的jars文件夹中3、可以使用spark-sql脚本 后期执行sql相关的任务 启动脚本 spark-sql \ --…

hive的一些常见内置函数

hive行转列 selectt1.base,concat_ws(|, collect_set(t1.name)) namefrom(selectname,concat(constellation, "," , blood_type) basefromperson_info) t1group byt1.base;hive列转行 select movie, category_name from movie_info lateral view explode(category)…

hive的一些调优参数

hive的一些调优参数 set hive.exec.dynamic.partition.modenonstrict; 使用动态分区 set hive.exec.max.dynamic.partitions100000;自动分区数最大值 set hive.exec.max.dynamic.partitions.pernode100000; set hive.hadoop.supports.splittable.combineinputformattrue;支持切…

hive的SerDe序列化

hive使用Serde进行行对象的序列与反序列化。最后实现把文件内容映射到 hive 表中的字段数据类型。 HDFS files –> InputFileFormat –> <key, value> –> Deserializer –> Row objectRow object –> Serializer –> <key, value> –> Outp…

窗口函数和hive优化简记

窗口函数&#xff1a; &#xff08;1&#xff09; OVER()&#xff1a;指定分析函数工作的数据窗口大小&#xff0c;这个数据窗口大小可能会随着行的变而变化。常用partition by 分区order by排序。 &#xff08;2&#xff09;CURRENT ROW&#xff1a;当前行 &#xff08;3&…

Kafka一些参数配置

Producer消息发送 producer.send(msg); // 用类似这样的方式去发送消息&#xff0c;就会把消息给你均匀的分布到各个分区上去 producer.send(key, msg); // 订单id&#xff0c;或者是用户id&#xff0c;他会根据这个key的hash值去分发到某个分区上去&#xff0c;他可以保证相同…

hive避免MR的情况

什么情况下Hive可以避免进行MapReduce hive 为了执行效率考虑&#xff0c;简单的查询&#xff0c;就是只是select&#xff0c;不带count,sum,group by这样的&#xff0c;都不走map/reduce&#xff0c;直接读取hdfs目录中的文件进行filter过滤。 sql select * from employee; …

flink常见算子的一些操作

常见Transformation操作 map和filter /*** 数据源&#xff1a;1 2 3 4 5.....源源不断过来* 通过map打印一下接受到数据* 通过filter过滤一下数据&#xff0c;我们只需要偶数*/ public class MapDemo {public static void main(String[] args) throws Exception {StreamExecut…

flink的watermark参考配置

需求描述&#xff1a;每隔5秒&#xff0c;计算最近10秒单词出现的次数。 TimeWindow实现 /*** 每隔5秒计算最近10秒单词出现的次数*/ public class TimeWindowWordCount {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env StreamExe…

hbase常见处理方式

相关依赖 <dependencies><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-client</artifactId><version>1.2.1</version></dependency><dependency><groupId>org.apache.hbase</gro…

flink连接kafka整合hbase,scala

解析kafka当中的json格式的数据&#xff0c;入hbase import java.util.Propertiesimport com.alibaba.fastjson.{JSON, JSONObject} import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.contrib.streaming.state.RocksDBStateBack…

sparkStreaming连接kafka整合hbase和redis

sparkStreaming消费kafka数据&#xff0c;并将数据保存到redis和hbase当中去&#xff0c;实现实时 import org.apache.hadoop.hbase.client.{Admin, Connection} import org.apache.hadoop.hbase.{HColumnDescriptor, HTableDescriptor, TableName} import org.apache.kafka.c…

sparksql一些指标

统计指标 select substr(tb.begin_address_code , 1 ,4) as begin_address_code , count(distinct vehicle_license) as dayVehicleCount from (select begin_address_code , vehicle_license from order where date_format(create_time , yyyy-MM-dd) 2020-02-15 ) tb grou…

sparkConf常见参数设置

def getSparkConf():SparkConf {val sparkConf: SparkConf new SparkConf().set("spark.driver.cores","4") //设置driver的CPU核数.set("spark.driver.maxResultSize","2g") //设置driver端结果存放的最大容量&#xff0c;这里设置…

sparkSession常见参数设置

def getSparkSession(sparkConf:SparkConf):SparkSession {val sparkSession: SparkSession SparkSession.builder().config(sparkConf)//调度模式.config("spark.scheduler.mode", "FAIR").config("spark.executor.memoryOverhead", "51…