初识Spark2.0之Spark SQL

内存计算平台Spark在今年6月份的时候正式发布了spark2.0,相比上一版本的spark1.6版本,在内存优化,数据组织,流计算等方面都做出了较大的改变,同时更加注重基于DataFrame数据组织的MLlib,更加注重机器学习整个过程的管道化。

当然,作为使用者,特别是需要运用到线上的系统,大部分厂家还是会继续选择已经稳定的spark1.6版本,并且在spark2.0逐渐成熟之后才会开始考虑系统组件的升级。作为开发者,还是有必要先行一步,去了解spark2.0的一些特性和使用,及思考/借鉴一些spark2.0做出某些改进的思路。

首先,为了调用spark API 来完成我们的计算,需要先创建一个sparkContext:

 String warehouseLocation = System.getProperty("user.dir") + "spark-warehouse";//用户的当前工作目录
SparkConf conf = new SparkConf().setAppName("spark sql test")  .set("spark.sql.warehouse.dir", warehouseLocation)  .setMaster("local[3]");
  SparkSession spark = SparkSession  .builder()  .config(conf)  .getOrCreate();

上述代码主要有三点:

    • 使用spark sql时需要指定数据库的文件地址,这里使用了一个本地的目录
    • spark配置,指定spark app的名称和数据库地址,master url为local 3核
    • 使用SparkSession,取代了原本的SQLContext与HiveContext。对于DataFrame API的用户来说,Spark常见的混乱源头来自于使用哪个“context”。现在你可以使用SparkSession了,它作为单个入口可以兼容两者。注意原本的SQLContext与HiveContext仍然保留,以支持向下兼容。这是spark2.0的一个较大的改变,对用户更加友好。

下面开始体验spark sql:

 //===========================================1 spark SQL===================  //数据导入方式  Dataset<Row> df = spark.read().json("..\\sparkTestData\\people.json");  //查看表  
        df.show();  //查看表结构  
        df.printSchema();  //查看某一列 类似于MySQL: select name from people  df.select("name").show();  //查看多列并作计算 类似于MySQL: select name ,age+1 from people  df.select(col("name"), col("age").plus(1)).show();  //设置过滤条件 类似于MySQL:select * from people where age>21  df.filter(col("age").gt(21)).show();  //做聚合操作 类似于MySQL:select age,count(*) from people group by age  df.groupBy("age").count().show();  //上述多个条件进行组合 select ta.age,count(*) from (select name,age+1 as "age" from people) as ta where ta.age>21 group by ta.age  df.select(col("name"), col("age").plus(1).alias("age")).filter(col("age").gt(21)).groupBy("age").count().show();  //直接使用spark SQL进行查询  //先注册为临时表  df.createOrReplaceTempView("people");  Dataset<Row> sqlDF = spark.sql("SELECT * FROM people");  sqlDF.show();

主要关注以下几点:

  • 数据来源:spark可以直接导入json格式的文件数据,people.json是我从spark安装包下拷贝的测试数据。
  • spark sql:sparkSql语法和用法和mysql有一定的相似性,可以查看表、表结构、查询、聚合等操作。用户可以使用sparkSql的API接口做聚合查询等操作或者用类SQL语句实现(但是必须将DataSet注册为临时表)
  • DataSet:DataSet是spark2.0i引入的一个新的特性(在spark1.6中属于alpha版本)。DataSet结合了RDD和DataFrame的优点, 并带来的一个新的概念Encoder当序列化数据时,,Encoder产生字节码与off-heap进行交互,,能够达到按需访问数据的效果,而不用反序列化整个对象。
我们可以为自定义的对象创建DataSet,首先创建一个JavaBeans:
/** * 一个描述人属性的JavaBeans * A JavaBean is a Java object that satisfies certain programming conventions: The JavaBean class must implement either Serializable or Externalizable The JavaBean class must have a no-arg constructor All JavaBean properties must have public setter and getter methods All JavaBean instance variables should be private */  public static class Person implements Serializable {  private String name;  private int age;  public String getName() {  return name;  }  public void setName(String name) {  this.name = name;  }  public int getAge() {  return age;  }  public void setAge(int age) {  this.age = age;  }  }

接下来,就可以为该类的对象创建DataSet了,并像操作表一样操作自定义对象的DataSet了:

   //为自定义的对象创建Dataset  List<Person> personpList = new ArrayList<Person>();  Person person1 = new Person();  person1.setName("Andy");  person1.setAge(32);  Person person2 = new Person();  person2.setName("Justin");  person2.setAge(19);  personpList.add(person1);  personpList.add(person2);  Encoder<Person> personEncoder = Encoders.bean(Person.class);  Dataset<Person> javaBeanDS = spark.createDataset(  personpList,  personEncoder  );  javaBeanDS.show();

同时,可以利用Java反射的特性,来从其他数据集中创建DataSet对象:

 //spark支持使用java 反射机制推断表结构  //1 首先创建一个存储person对象的RDD  JavaRDD<Person> peopleRDD = spark.read()  .textFile("..\\sparkTestData\\people.txt")  .javaRDD()  .map(new Function<String, Person>() {  public Person call(String line) throws Exception {  String[] parts = line.split(",");  Person person = new Person();  person.setName(parts[0]);  person.setAge(Integer.parseInt(parts[1].trim()));  return person;  }  });  //2 表结构推断  Dataset<Row> peopleDF = spark.createDataFrame(peopleRDD, Person.class);  peopleDF.createOrReplaceTempView("people");  //3 定义map 这里对每个元素做序列化操作  Encoder<String> stringEncoder = Encoders.STRING();  Dataset<String> peopleSerDF = peopleDF.map(new MapFunction<Row, String>() {  public String call(Row row) throws Exception {  return "Name: " + row.getString(1) + " and age is " + String.valueOf(row.getInt(0));  }  }, stringEncoder);  peopleSerDF.show();  //==============================================3 从RDD创建Dataset StructType对象的使用  JavaRDD<String> peopleRDD2 = spark.sparkContext()  .textFile("..\\sparkTestData\\people.txt", 1)  .toJavaRDD();  // 创建一个描述表结构的schema  String schemaString = "name age";  List<StructField> fields = new ArrayList<StructField>();  for (String fieldName : schemaString.split(" ")) {  StructField field = DataTypes.createStructField(fieldName, DataTypes.StringType, true);  fields.add(field);  }  StructType schema = DataTypes.createStructType(fields);  // Convert records of the RDD (people) to Rows  JavaRDD<Row> rowRDD = peopleRDD2.map(new Function<String, Row>() {  //@Override  public Row call(String record) throws Exception {  String[] attributes = record.split(",");  return RowFactory.create(attributes[0], attributes[1].trim());  }  });  // Apply the schema to the RDD  Dataset<Row> peopleDataFrame = spark.createDataFrame(rowRDD, schema);  // Creates a temporary view using the DataFrame  peopleDataFrame.createOrReplaceTempView("people");  peopleDataFrame.show();

主要关注以下几点:

  • RDD:从普通文本文件中解析数据,并创建结构化数据结构的RDD。
  • 表结构推断的方式创建DataSet:利用Java类反射特性将RDD转换为DataSet。
  • 指定表结构的方式创建DataSet:我们可以使用StructType来明确定义我们的表结构,完成DataSet的创建
如何将自己的数据/文本导入spark并创建spark的数据对象,对新手来说显得尤为关键,对自己的数据表达好了之后,才有机会去尝试spark的其他API ,完成我们的目标。一般数据源在经过我们其他程序的前处理之后,存储成行形式的文本/json格式或者本身存储的hive/mysql数据库中,spark对这些数据源的调用都是比较方便的。
 
介绍完了spark-sql的数据导入及数据表达后,我们来完成一个比较简单的数据统计任务。一般在工作生活中对某些数据按一定的周期进行统计分析是一个比较常见的任务了。下面,我们就以股票统计的例子为例。我们使用spark的窗口统计功能,来对某一公司的股票在2016年6月份的各个星期的均值做统计。
 //在Spark 2.0中,window API内置也支持time windows!Spark SQL中的time windows和Spark Streaming中的time windows非常类似。  Dataset<Row> stocksDF = spark.read().option("header","true").  option("inferSchema","true").  csv("..\\sparkTestData\\stocks.csv");  //stocksDF.show();  
  Dataset<Row> stocks201606 = stocksDF.filter("year(Date)==2016").  filter("month(Date)==6");  stocks201606.show(100,false);

首先读入了csv格式的数据文件,同时将2016年6月份的数据过滤出来,并以不截断的方式输出前面100条记录,运行的结果为:

调用window接口做窗口统计:

  //window一般在group by语句中使用。window方法的第一个参数指定了时间所在的列;  //第二个参数指定了窗口的持续时间(duration),它的单位可以是seconds、minutes、hours、days或者weeks。  Dataset<Row> tumblingWindowDS = stocks201606.groupBy(window(stocks201606.col("Date"),"1 week")).  agg(avg("Close").as("weekly_average"));  tumblingWindowDS.show(100,false);  tumblingWindowDS.sort("window.start").  select("window.start","window.end","weekly_average").  show(false);

其运行结果为:

由于没有指定窗口的开始时间,因此统计的开始时间为2016-05-26,并且不是从0点开始的。通常情况下,这样统计就显得有点不对了,因此我们需要指定其开始的日期和时间,但是遗憾的是spark并没有接口/参数让我们明确的指定统计窗口的开始时间。好在提供了另外一种方式,指定偏移时间,上述时间(2016-05-26 08:00:00)做一个时间偏移,也可以得到我们想要的开始时间(2016-06-01 00:00:00)。

 //在前面的示例中,我们使用的是tumbling window。为了能够指定开始时间,我们需要使用sliding window(滑动窗口)。  //到目前为止,没有相关API来创建带有开始时间的tumbling window,但是我们可以通过将窗口时间(window duration)  //和滑动时间(slide duration)设置成一样来创建带有开始时间的tumbling window。代码如下:  Dataset<Row>  windowWithStartTime = stocks201606.  groupBy(window(stocks201606.col("Date"),"1 week","1 week", "136 hour")).  agg(avg("Close").as("weekly_average"));  //6 days参数就是开始时间的偏移量;前两个参数分别代表窗口时间和滑动时间,我们打印出这个窗口的内容:  windowWithStartTime.sort("window.start").  select("window.start","window.end","weekly_average").  show(false);

运行结果为:

这就得到了我们需要的统计结果了。

关于spark2.0的sparkSql部分,基本就介绍这么多了。

 

 

 

 

 

转载于:https://www.cnblogs.com/itboys/p/6676858.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/284002.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

ABP详细教程——模块类

概述模块化是ABP vNext的最大亮点&#xff0c;也是ABP vNext框架的核心&#xff0c;而模块类是ABP vNext框架模块化的核心要素。这一章节&#xff0c;我就从模块类的用法、运行机制、源代码等层面&#xff0c;带大家详细了解ABP vNext的模块类。用法在ABP的约定中&#xff0c;每…

[转]Eureka工作原理

目录 Eureka 工作原理 Eureka 核心概念 自我保护机制 Eureka 集群原理 Eurka 工作流程 总结 Eureka 工作原理 上节内容为大家介绍了&#xff0c;注册中心 Eureka 产品的使用&#xff0c;以及如何利用 Eureka 搭建单台和集群的注册中心。这节课我们来继续学习 Eureka&…

重谈联想5G编码投票事件

此前&#xff0c;司马南谈了联想好几个问题&#xff0c;其中最尖锐的要属国有资产流失&#xff0c;这是联想管理层无法回避的死穴。不过&#xff0c;司马南批判联想5G投票背刺H公司&#xff0c;这基本就是造谣了。当年&#xff0c;媒体把编码投票炒作的很厉害&#xff0c;抨击联…

JStorm2.1.1集群的安装和使用

为什么80%的码农都做不了架构师&#xff1f;>>> JStorm2.1.1集群的安装和使用 Storm是一个免费开源、分布式、高容错的实时计算系统&#xff0c;而JStorm是阿里巴巴开源的基于Storm采用Java重写的一套分布式实时流计算框架&#xff0c;在性能和支持的集群规模上做了…

Hystrix 原理

Hystrix是什么&#xff1f; Hystrix是Netflix开源库&#xff0c;这是一个针对分布式系统的延迟和容错库。 Hystrix 供分布式系统使用&#xff0c;提供延迟和容错功能&#xff0c;隔离远程系统、访问和第三方程序库的访问点&#xff0c;防止级联失败&#xff0c;保证复杂的分布…

「深度」无人机实名制政策特稿|市场看好、资本关注,“反黑飞”正在崛起

从政策和需求来看&#xff0c;“反黑飞”越来越重要&#xff0c;市场也正在不断崛起。 对于大多数人来说&#xff0c;今天是最适合明目张胆“装嫩”的六一儿童节。不过&#xff0c;在无人机厂商和无人机玩家的眼里&#xff0c;今天是无人机实名制政策正式实施的日子。 近年来&…

在navicat中新建数据库

前言&#xff1a; 在本地新建一个名为editor的数据库&#xff1b; 过程&#xff1a; 1.&#xff1b; 2.选择&#xff1a;utf8mb4 -- UTF-8 Unicode字符集&#xff0c;原因在于&#xff1a;utf8mb4兼容utf8&#xff0c;且比utf8能表示更多的字符。&#xff0c;而且它支持表情符号…

MASA Stack 第三期社区例会

MASA Blazor 0.5.0发版内容功能Autocomplete&#xff1a;支持通过设置AutoSelectFirst参数开启自动选择第一项的功能&#xff0c;支持CacheItems参数&#xff0c;增强使用上下键的用户体验。BottomNavigation&#xff1a;&#xff1a;一个替代侧边栏的新组件。它主要用于移动应…

[转]高并发架构设计之--「服务降级」、「服务限流」与「服务熔断」

目录 服务降级 1 、简介 2 、使用场景 3 、核心设计 3.1 分布式开关 3.2 自动降级分类 3.3 配置中心 3.4 处理策略 3.5 降级分类 3.6 服务降级要考虑的问题 4 、高级特性 4.1 分级降级 4.2 降级权值 5 、总结与展望 服务限流 一、为什么要做服务限流设计&…

SpringBoot获取ApplicationContext

2019独角兽企业重金招聘Python工程师标准>>> 有两种方法&#xff1a; 创建Component实现ApplicationContextAware接口&#xff0c;SpringBoot会自动调用这个类的setApplicationConext()方法。鼓励使用这种方式。SpringApplication.run(MyApplication.class, args)这…

SkiaSharp 之 WPF 自绘 投篮小游戏(案例版)

此案例主要是针对光线投影法碰撞检测功能的示例&#xff0c;顺便做成了一个小游戏&#xff0c;很简单&#xff0c;但是&#xff0c;效果却很不错。投篮小游戏规则&#xff0c;点击投篮目标点&#xff0c;就会有一个球沿着相关抛物线&#xff0c;然后&#xff0c;判断是否进入篮…

zuul集成ribbon完成服务通信和负载均衡

目录 Zuul2服务通信 超时相关 默认超时配置 自定义超时配置 负载均衡 Zuul2服务通信 描述&#xff1a;zuul2通过Ribbon完成客户端负载均衡以及与服务器群集进行通信。 zuul2的通信是集成Ribbon实现的&#xff0c;在Origin中集成Ribbon基本配置&#xff08;例如IClientCo…

时任上海来伊份互联网事业群总裁王戈钧 :传统企业(线上+线下)移动互联网改造...

2017年12月22日-23日&#xff0c;第13届信息化领袖峰会暨2017中国数字化贡献人物颁奖盛典在上海盛大开幕。本次峰会由上海市经济和信息化委员会指导&#xff0c;上海市国有资产信息中心、上海市计算机用户协会、上海市信息服务业行业协会、上海大数据联盟、上海市高等教育学会支…

【.NET6+Modbus】Modbus TCP协议解析、仿真环境以及基于.NET实现基础通信

接下来的内容&#xff0c;我会以从头开发一个简单的基于modbus tcp通信的案例&#xff0c;来实现一个基础的通信功能。有关环境&#xff1a;开发环境&#xff1a;VS 2022企业版运行环境&#xff1a;Win 10 专业版.NET 环境版本&#xff1a;.NET 6【备注】 源码在文末 1、新建一…

源码深度剖析Eureka与Ribbon服务发现原理

本文基于 spring cloud dalston&#xff0c;同时文章较长&#xff0c;请选择舒服姿势进行阅读。 Eureka 与 Ribbon 是什么&#xff1f;和服务发现什么关系&#xff1f; Eureka 与 Ribbon 都是 Netflix 提供的微服务组件&#xff0c;分别用于服务注册与发现、负载均衡。同时&a…

std的find和reverse_iterator联合使用

上代码&#xff1a; // test2013.cpp : 定义控制台应用程序的入口点。 //#include "stdafx.h" #include <stdlib.h> #include <stdio.h> #include<iostream> #include<vector> #include<map> #include<string> using namespace …

论如何提升学习的能力

为啥要学习如果有一件事情是能改变你自己的&#xff0c;我想这件事情必然就是学习&#xff0c;我的人生重要的转折点也是从学习这件事情始发的&#xff0c;那么&#xff0c;我们就从这里开始。学习不仅仅是为了找到答案&#xff0c;而是为了找到方法&#xff0c;找到一个可以找…

CSS布局解决方案(终结版)

前端布局非常重要的一环就是页面框架的搭建&#xff0c;也是最基础的一环。在页面框架的搭建之中&#xff0c;又有居中布局、多列布局以及全局布局&#xff0c;今天我们就来总结总结前端干货中的CSS布局。 居中布局 水平居中 1&#xff09;使用inline-blocktext-align&#xff…

基于ABP和Magicodes实现Excel导出操作

前端使用的vue-element-admin框架&#xff0c;后端使用ABP框架&#xff0c;Excel导出使用的Magicodes.IE.Excel.Abp库。Excel导入和导出操作几乎一样&#xff0c;不再介绍。文本主要介绍Excel导出操作和过程中遇到的坑&#xff0c;主要是Excel文件导出后无法打开的问题。一.Mag…

消息模式在实际开发应用中的优势

曾经.NET面试过程中经常问的一个问题是&#xff0c;如果程序集A&#xff0c;引用B &#xff0c;B 引用C&#xff0c;那么C怎么去访问A中的方法呢。 这个问题初学.net可能一时想不出该咋处理&#xff0c;这涉及到循环引用问题。但有点经验的可能就简单了&#xff0c;通过委托的方…