sparksql加载mysql表中的数据

sparksql加载mysql表中的数据

<dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.38</version>
</dependency>
import java.util.Propertiesimport org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}//todo:利用sparksql加载mysql表中的数据
object DataFromMysql {def main(args: Array[String]): Unit = {//1、创建SparkConf对象val sparkConf: SparkConf = new SparkConf().setAppName("DataFromMysql").setMaster("local[2]")//2、创建SparkSession对象val spark: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()//3、读取mysql表的数据//3.1 指定mysql连接地址val url="jdbc:mysql://node1:3306/spark"//3.2 指定要加载的表名val tableName="iplocation"// 3.3 配置连接数据库的相关属性val properties = new Properties()//用户名properties.setProperty("user","root")//密码properties.setProperty("password","123456")val mysqlDF: DataFrame = spark.read.jdbc(url,tableName,properties)//打印schema信息mysqlDF.printSchema()//展示数据mysqlDF.show()//把dataFrame注册成表mysqlDF.createTempView("iplocation")spark.sql("select * from iplocation where total_count >1500").show()spark.stop()}
}

sparksql保存结果数据到mysql表中

import java.util.Propertiesimport org.apache.spark.sql.{DataFrame, SparkSession}//todo:通过sparksql把结果数据写入到mysql表中
object Data2Mysql {def main(args: Array[String]): Unit = {//1、创建SparkSessionval spark: SparkSession = SparkSession.builder().appName("Data2Mysql") .getOrCreate()//2、读取mysql表中数据//2.1 定义url连接val url="jdbc:mysql://node1:3306/spark"//2.2 定义表名val table="iplocation"//2.3 定义属性val properties=new Properties()properties.setProperty("user","root")properties.setProperty("password","123456")val mysqlDF: DataFrame = spark.read.jdbc(url,table,properties)//把dataFrame注册成一张表mysqlDF.createTempView("iplocation")//通过sparkSession调用sql方法//需要统计经度和维度出现的人口总数大于1000的记录 保存到mysql表中val result: DataFrame = spark.sql("select * from iplocation where total_count >1000")//保存结果数据到mysql表中//mode:指定数据的插入模式//overwrite: 表示覆盖,如果表不存在,事先帮我们创建//append   :表示追加, 如果表不存在,事先帮我们创建//ignore   :表示忽略,如果表事先存在,就不进行任何操作//error    :如果表事先存在就报错(默认选项)result.write.mode(args(0)).jdbc(url,args(1),properties)//关闭spark.stop()}
}

提交任务脚本
spark-submit
–master spark://node1:7077
–class com.sql.DataMysql
–executor-memory 1g
–total-executor-cores 4
–driver-class-path /opt/hive/lib/mysql-connector-java-5.1.38.jar
–jars /export/servers/hive/lib/mysql-connector-java-5.1.35.jar
spark_class01-1.0-SNAPSHOT.jar
append user

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/508782.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

sparksql保存数据常见操作

sparksql保存数据操作 import org.apache.spark.SparkConf import org.apache.spark.sql.{DataFrame, SparkSession}//todo:sparksql可以把结果数据保存到不同的外部存储介质中 object SaveResult {def main(args: Array[String]): Unit {//1、创建SparkConf对象val sparkCon…

sparksql自定义函数

sparksql中自定义函数 import org.apache.spark.sql.api.java.UDF1 import org.apache.spark.sql.types.StringType import org.apache.spark.sql.{DataFrame, SparkSession}//TODO:自定义sparksql的UDF函数 一对一的关系 object SparkSQLFunction {def main(args: Array[S…

sparksql整合hive

sparksql整合hive 步骤 1、需要把hive安装目录下的配置文件hive-site.xml拷贝到每一个spark安装目录下对应的conf文件夹中2、需要一个连接mysql驱动的jar包拷贝到spark安装目录下对应的jars文件夹中3、可以使用spark-sql脚本 后期执行sql相关的任务 启动脚本 spark-sql \ --…

hive的一些常见内置函数

hive行转列 selectt1.base,concat_ws(|, collect_set(t1.name)) namefrom(selectname,concat(constellation, "," , blood_type) basefromperson_info) t1group byt1.base;hive列转行 select movie, category_name from movie_info lateral view explode(category)…

hive的一些调优参数

hive的一些调优参数 set hive.exec.dynamic.partition.modenonstrict; 使用动态分区 set hive.exec.max.dynamic.partitions100000;自动分区数最大值 set hive.exec.max.dynamic.partitions.pernode100000; set hive.hadoop.supports.splittable.combineinputformattrue;支持切…

hive的SerDe序列化

hive使用Serde进行行对象的序列与反序列化。最后实现把文件内容映射到 hive 表中的字段数据类型。 HDFS files –> InputFileFormat –> <key, value> –> Deserializer –> Row objectRow object –> Serializer –> <key, value> –> Outp…

窗口函数和hive优化简记

窗口函数&#xff1a; &#xff08;1&#xff09; OVER()&#xff1a;指定分析函数工作的数据窗口大小&#xff0c;这个数据窗口大小可能会随着行的变而变化。常用partition by 分区order by排序。 &#xff08;2&#xff09;CURRENT ROW&#xff1a;当前行 &#xff08;3&…

Kafka一些参数配置

Producer消息发送 producer.send(msg); // 用类似这样的方式去发送消息&#xff0c;就会把消息给你均匀的分布到各个分区上去 producer.send(key, msg); // 订单id&#xff0c;或者是用户id&#xff0c;他会根据这个key的hash值去分发到某个分区上去&#xff0c;他可以保证相同…

hive避免MR的情况

什么情况下Hive可以避免进行MapReduce hive 为了执行效率考虑&#xff0c;简单的查询&#xff0c;就是只是select&#xff0c;不带count,sum,group by这样的&#xff0c;都不走map/reduce&#xff0c;直接读取hdfs目录中的文件进行filter过滤。 sql select * from employee; …

flink常见算子的一些操作

常见Transformation操作 map和filter /*** 数据源&#xff1a;1 2 3 4 5.....源源不断过来* 通过map打印一下接受到数据* 通过filter过滤一下数据&#xff0c;我们只需要偶数*/ public class MapDemo {public static void main(String[] args) throws Exception {StreamExecut…

flink的watermark参考配置

需求描述&#xff1a;每隔5秒&#xff0c;计算最近10秒单词出现的次数。 TimeWindow实现 /*** 每隔5秒计算最近10秒单词出现的次数*/ public class TimeWindowWordCount {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env StreamExe…

hbase常见处理方式

相关依赖 <dependencies><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-client</artifactId><version>1.2.1</version></dependency><dependency><groupId>org.apache.hbase</gro…

flink连接kafka整合hbase,scala

解析kafka当中的json格式的数据&#xff0c;入hbase import java.util.Propertiesimport com.alibaba.fastjson.{JSON, JSONObject} import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.contrib.streaming.state.RocksDBStateBack…

sparkStreaming连接kafka整合hbase和redis

sparkStreaming消费kafka数据&#xff0c;并将数据保存到redis和hbase当中去&#xff0c;实现实时 import org.apache.hadoop.hbase.client.{Admin, Connection} import org.apache.hadoop.hbase.{HColumnDescriptor, HTableDescriptor, TableName} import org.apache.kafka.c…

sparksql一些指标

统计指标 select substr(tb.begin_address_code , 1 ,4) as begin_address_code , count(distinct vehicle_license) as dayVehicleCount from (select begin_address_code , vehicle_license from order where date_format(create_time , yyyy-MM-dd) 2020-02-15 ) tb grou…

sparkConf常见参数设置

def getSparkConf():SparkConf {val sparkConf: SparkConf new SparkConf().set("spark.driver.cores","4") //设置driver的CPU核数.set("spark.driver.maxResultSize","2g") //设置driver端结果存放的最大容量&#xff0c;这里设置…

sparkSession常见参数设置

def getSparkSession(sparkConf:SparkConf):SparkSession {val sparkSession: SparkSession SparkSession.builder().config(sparkConf)//调度模式.config("spark.scheduler.mode", "FAIR").config("spark.executor.memoryOverhead", "51…

关于kafka中acks是否可以为all

kafka源码中有这样一段代码&#xff1a; org.apache.kafka.clients.producer.KafkaProducer private static int parseAcks(String acksString) {try {return acksString.trim().equalsIgnoreCase("all") ? -1 : Integer.parseInt(acksString.trim());} catch (Numb…

关于统计时间切片标签的一些sql

------当天付费明细表 DROP TABLE IF EXISTS rpt.tmp_mm_rb_daily_ffmx; create table rpt.tmp_mm_rb_daily_ffmx as select a.* FROM (select c.feemsisdn, c.destmsisdn, c.day, c.price/1000 fee, c.contentid, dc.content_name, c.ordernumber, c.cdrtime, c.createtime, c…

hadoop 二次开发DatanodeWriteTimeout设置

int getDatanodeWriteTimeout(int numNodes) {return this.dfsClientConf.confTime > 0 ? this.dfsClientConf.confTime 5000 * numNodes : 0;}int getDatanodeReadTimeout(int numNodes) {return this.dfsClientConf.socketTimeout > 0 ? 5000 * numNodes this.dfsC…