DataFrame不同风格比较

DataFrame不同风格比较

一,DSL风格语法

//加载数据
val rdd1=sc.textFile("/person.txt").map(x=>x.split(" "))
//定义一个样例类
case class Person(id:String,name:String,age:Int)
//把rdd与样例类进行关联
val personRDD=rdd1.map(x=>Person(x(0),x(1),x(2).toInt))
//把rdd转换成DataFrame
val personDF=personRDD.toDF//打印schema信息
personDF.printSchema//展示数据
personDF.show//查询指定的字段
personDF.select("name").show
personDF.select($"name").show
personDF.select(col("name").show//实现age+1personDF.select($"name",$"age",$"age"+1).show   //实现age大于30过滤personDF.filter($"age" > 30).show//按照age分组统计次数personDF.groupBy("age").count.show //按照age分组统计次数降序personDF.groupBy("age").count().sort($"count".desc)show   

二,SQL风格语法

//DataFrame注册成表
personDF.createTempView("person")//使用SparkSession调用sql方法统计查询
spark.sql("select * from person").show
spark.sql("select name from person").show
spark.sql("select name,age from person").show
spark.sql("select * from person where age >30").show
spark.sql("select count(*) from person where age >30").show
spark.sql("select age,count(*) from person group by age").show
spark.sql("select age,count(*) as count from person group by age").show
spark.sql("select * from person order by age desc").show

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/508784.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

sparkSQL操作hiveSQL

sparkSQL操作hiveSQL <dependency><groupId>org.apache.spark</groupId><artifactId>spark-hive_2.11</artifactId><version>2.3.3</version></dependency>import org.apache.spark.sql.SparkSession//todo:利用sparksql操作h…

sparksql加载mysql表中的数据

sparksql加载mysql表中的数据 <dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.38</version> </dependency>import java.util.Propertiesimport org.apache.spark.SparkCon…

sparksql保存数据常见操作

sparksql保存数据操作 import org.apache.spark.SparkConf import org.apache.spark.sql.{DataFrame, SparkSession}//todo:sparksql可以把结果数据保存到不同的外部存储介质中 object SaveResult {def main(args: Array[String]): Unit {//1、创建SparkConf对象val sparkCon…

sparksql自定义函数

sparksql中自定义函数 import org.apache.spark.sql.api.java.UDF1 import org.apache.spark.sql.types.StringType import org.apache.spark.sql.{DataFrame, SparkSession}//TODO:自定义sparksql的UDF函数 一对一的关系 object SparkSQLFunction {def main(args: Array[S…

sparksql整合hive

sparksql整合hive 步骤 1、需要把hive安装目录下的配置文件hive-site.xml拷贝到每一个spark安装目录下对应的conf文件夹中2、需要一个连接mysql驱动的jar包拷贝到spark安装目录下对应的jars文件夹中3、可以使用spark-sql脚本 后期执行sql相关的任务 启动脚本 spark-sql \ --…

hive的一些常见内置函数

hive行转列 selectt1.base,concat_ws(|, collect_set(t1.name)) namefrom(selectname,concat(constellation, "," , blood_type) basefromperson_info) t1group byt1.base;hive列转行 select movie, category_name from movie_info lateral view explode(category)…

hive的一些调优参数

hive的一些调优参数 set hive.exec.dynamic.partition.modenonstrict; 使用动态分区 set hive.exec.max.dynamic.partitions100000;自动分区数最大值 set hive.exec.max.dynamic.partitions.pernode100000; set hive.hadoop.supports.splittable.combineinputformattrue;支持切…

hive的SerDe序列化

hive使用Serde进行行对象的序列与反序列化。最后实现把文件内容映射到 hive 表中的字段数据类型。 HDFS files –> InputFileFormat –> <key, value> –> Deserializer –> Row objectRow object –> Serializer –> <key, value> –> Outp…

窗口函数和hive优化简记

窗口函数&#xff1a; &#xff08;1&#xff09; OVER()&#xff1a;指定分析函数工作的数据窗口大小&#xff0c;这个数据窗口大小可能会随着行的变而变化。常用partition by 分区order by排序。 &#xff08;2&#xff09;CURRENT ROW&#xff1a;当前行 &#xff08;3&…

Kafka一些参数配置

Producer消息发送 producer.send(msg); // 用类似这样的方式去发送消息&#xff0c;就会把消息给你均匀的分布到各个分区上去 producer.send(key, msg); // 订单id&#xff0c;或者是用户id&#xff0c;他会根据这个key的hash值去分发到某个分区上去&#xff0c;他可以保证相同…

hive避免MR的情况

什么情况下Hive可以避免进行MapReduce hive 为了执行效率考虑&#xff0c;简单的查询&#xff0c;就是只是select&#xff0c;不带count,sum,group by这样的&#xff0c;都不走map/reduce&#xff0c;直接读取hdfs目录中的文件进行filter过滤。 sql select * from employee; …

flink常见算子的一些操作

常见Transformation操作 map和filter /*** 数据源&#xff1a;1 2 3 4 5.....源源不断过来* 通过map打印一下接受到数据* 通过filter过滤一下数据&#xff0c;我们只需要偶数*/ public class MapDemo {public static void main(String[] args) throws Exception {StreamExecut…

flink的watermark参考配置

需求描述&#xff1a;每隔5秒&#xff0c;计算最近10秒单词出现的次数。 TimeWindow实现 /*** 每隔5秒计算最近10秒单词出现的次数*/ public class TimeWindowWordCount {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env StreamExe…

hbase常见处理方式

相关依赖 <dependencies><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-client</artifactId><version>1.2.1</version></dependency><dependency><groupId>org.apache.hbase</gro…

flink连接kafka整合hbase,scala

解析kafka当中的json格式的数据&#xff0c;入hbase import java.util.Propertiesimport com.alibaba.fastjson.{JSON, JSONObject} import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.contrib.streaming.state.RocksDBStateBack…

sparkStreaming连接kafka整合hbase和redis

sparkStreaming消费kafka数据&#xff0c;并将数据保存到redis和hbase当中去&#xff0c;实现实时 import org.apache.hadoop.hbase.client.{Admin, Connection} import org.apache.hadoop.hbase.{HColumnDescriptor, HTableDescriptor, TableName} import org.apache.kafka.c…

sparksql一些指标

统计指标 select substr(tb.begin_address_code , 1 ,4) as begin_address_code , count(distinct vehicle_license) as dayVehicleCount from (select begin_address_code , vehicle_license from order where date_format(create_time , yyyy-MM-dd) 2020-02-15 ) tb grou…

sparkConf常见参数设置

def getSparkConf():SparkConf {val sparkConf: SparkConf new SparkConf().set("spark.driver.cores","4") //设置driver的CPU核数.set("spark.driver.maxResultSize","2g") //设置driver端结果存放的最大容量&#xff0c;这里设置…

sparkSession常见参数设置

def getSparkSession(sparkConf:SparkConf):SparkSession {val sparkSession: SparkSession SparkSession.builder().config(sparkConf)//调度模式.config("spark.scheduler.mode", "FAIR").config("spark.executor.memoryOverhead", "51…

关于kafka中acks是否可以为all

kafka源码中有这样一段代码&#xff1a; org.apache.kafka.clients.producer.KafkaProducer private static int parseAcks(String acksString) {try {return acksString.trim().equalsIgnoreCase("all") ? -1 : Integer.parseInt(acksString.trim());} catch (Numb…