spark 快速入门 java API

1.1 transform

l  map(func):对调用map的RDD数据集中的每个element都使用func,然后返回一个新的RDD,这个返回的数据集是分布式的数据集

l  filter(func) : 对调用filter的RDD数据集中的每个元素都使用func,然后返回一个包含使func为true的元素构成的RDD

l  flatMap(func):和map差不多,但是flatMap生成的是多个结果

l  mapPartitions(func):和map很像,但是map是每个element,而mapPartitions是每个partition

l  mapPartitionsWithSplit(func):和mapPartitions很像,但是func作用的是其中一个split上,所以func中应该有index

l  sample(withReplacement,faction,seed):抽样

l  union(otherDataset):返回一个新的dataset,包含源dataset和给定dataset的元素的集合

l  distinct([numTasks]):返回一个新的dataset,这个dataset含有的是源dataset中的distinct的element

l  groupByKey(numTasks):返回(K,Seq[V]),也就是Hadoop中reduce函数接受的key-valuelist

l  reduceByKey(func,[numTasks]):就是用一个给定的reduce func再作用在groupByKey产生的(K,Seq[V]),比如求和,求平均数

l  sortByKey([ascending],[numTasks]):按照key来进行排序,是升序还是降序,ascending是boolean类型

1.2 action

l  reduce(func):说白了就是聚集,但是传入的函数是两个参数输入返回一个值,这个函数必须是满足交换律和结合律的

l  collect():一般在filter或者足够小的结果的时候,再用collect封装返回一个数组

l  count():返回的是dataset中的element的个数

l  first():返回的是dataset中的第一个元素

l  take(n):返回前n个elements

l  takeSample(withReplacement,num,seed):抽样返回一个dataset中的num个元素,随机种子seed

l  saveAsTextFile(path):把dataset写到一个text file中,或者hdfs,或者hdfs支持的文件系统中,spark把每条记录都转换为一行记录,然后写到file中

l  saveAsSequenceFile(path):只能用在key-value对上,然后生成SequenceFile写到本地或者hadoop文件系统

l  countByKey():返回的是key对应的个数的一个map,作用于一个RDD

l  foreach(func):对dataset中的每个元素都使用func

以下是案例:

package com.leoao;
import org.apache.spark.SparkConf;
/**
* Created by chengtao on 16/12/27.
*/
public class Test2 {
public static void main( String[] args ) {
SparkConf conf = new SparkConf().setAppName("App").setMaster("local[2]");
JavaSparkContext sc = new JavaSparkContext(conf);
// JavaRDD<String> rdd = sc.textFile("/Users/chengtao/downloads/worldcount/ctTest.txt");

String C = "c 3";
String D = "d 4";
String E = "e 5";
ArrayList<String> listA = new ArrayList<String>();
listA.add("a 1");
listA.add("b 2");
listA.add(C);
listA.add(D);
listA.add(E);
JavaRDD<String> rdd = sc.parallelize(listA);
System.out.println("listA ----> " + listA); // listA ----> [a 1, b 2, c 3, d 4, e 5]
List list = rdd.collect();
for (int i = 0; i < list.size(); i++) {
System.out.println("rdd ----> " + list.get(i));
}
// rdd ----> a 1
// rdd ----> b 2
// rdd ----> c 3
// rdd ----> d 4
// rdd ----> e 5

ArrayList<String> listb = new ArrayList<String>();
listb.add("aa 11");
listb.add("bb 22");
listb.add(C);
listb.add(D);
listb.add(E);
JavaRDD<String> rdd2 = sc.parallelize(listb);

// -------transform
testSparkCoreApiMap(rdd);
testSparkCoreApiFilter(rdd);
testSparkCoreApiFlatMap(rdd);
testSparkCoreApiUnion(rdd,rdd2);
testSparkCoreApiDistinct(rdd,rdd2);
testSparkCoreApiMaptoPair(rdd);
testSparkCoreApiGroupByKey(rdd,rdd2);
testSparkCoreApiReduceByKey(rdd);
// -------action
testSparkCoreApiReduce(rdd);
}

//Map主要是对数据进行处理,不进行数据集的增减:本案例实现,打印所有数据,并在结束加上"test"
private static void testSparkCoreApiMap(JavaRDD<String> rdd){
JavaRDD<String> logData1=rdd.map(new Function<String,String>(){
public String call(String s){
return s + " test";
}
});
List list = logData1.collect();
for (int i = 0; i < list.size(); i++) {
System.out.println(list.get(i));
}
}
/*方法输出:
a 1 test
b 2 test
c 3 test
d 4 test
e 5 test
*/

//filter主要是过滤数据的功能,本案例实现:过滤含有a的那行数据
private static void testSparkCoreApiFilter(JavaRDD<String> rdd){
JavaRDD<String> logData1=rdd.filter(new Function<String,Boolean>(){
public Boolean call(String s){
if(!(s.contains("a"))){
return true;
}
//return (s.split(" "))[0].equals("a");
return false;
}
});
List list = logData1.collect();
for (int i = 0; i < list.size(); i++) {
System.out.println(list.get(i));
}
}
/*方法输出:
b 2
c 3
d 4
e 5
*/

//flatMap 用户行转列,本案例实现:打印所有的字符
private static void testSparkCoreApiFlatMap(JavaRDD<String> rdd){
JavaRDD<String> words=rdd.flatMap(
new FlatMapFunction<String, String>() {
public Iterable<String> call(String s) throws Exception {
return Arrays.asList(s.split(" "));
}
}
);
List list = words.collect();
for (int i = 0; i < list.size(); i++) {
System.out.println(list.get(i));
}
}
/*方法输出:
a
1
b
2
c
3
d
4
e
5
*/

//合并两个RDD
private static void testSparkCoreApiUnion(JavaRDD<String> rdd,JavaRDD<String> rdd2){
JavaRDD<String> unionRdd=rdd.union(rdd2);
unionRdd.foreach(new VoidFunction<String>(){
public void call(String lines){
System.out.println(lines);
}
});
}
/*方法输出:
a 1
b 2
c 3
d 4
e 5
aa 11
bb 22
c 3
d 4
e 5
*/


//对RDD去重
private static void testSparkCoreApiDistinct(JavaRDD<String> rdd,JavaRDD<String> rdd2){
JavaRDD<String> unionRdd=rdd.union(rdd2).distinct();
unionRdd.foreach(new VoidFunction<String>(){
public void call(String lines){
System.out.println(lines);
}
});
}
/*方法输出:
e 5
d 4
c 3
aa 11
a 1
bb 22
b 2
*/

//把RDD映射为键值对类型的数据
private static void testSparkCoreApiMaptoPair(JavaRDD<String> rdd){
JavaPairRDD<String, Integer> pairRdd=rdd.mapToPair(new PairFunction<String,String,Integer>(){
@Override
public Tuple2<String, Integer> call(String t) throws Exception {
String[] st=t.split(" ");
return new Tuple2(st[0], st[1]);
}

});

pairRdd.foreach(new VoidFunction<Tuple2<String, Integer>>(){
@Override
public void call(Tuple2<String, Integer> t) throws Exception {
System.out.println(t._2());
}
});
}
/*方法输出:
1
3
2
4
5
*/


// 对键值对类型的数据进行按键值合并
private static void testSparkCoreApiGroupByKey(JavaRDD<String> rdd,JavaRDD<String> rdd1){
JavaPairRDD<String, Integer> pairRdd=rdd.mapToPair(new PairFunction<String,String,Integer>(){
@Override
public Tuple2<String, Integer> call(String t) throws Exception {
String[] st=t.split(" ");
return new Tuple2(st[0], Integer.valueOf(st[1]));
}
});

JavaPairRDD<String, Integer> pairRdd1=rdd1.mapToPair(new PairFunction<String,String,Integer>(){
@Override
public Tuple2<String, Integer> call(String t) throws Exception {
String[] st=t.split(" ");
return new Tuple2(st[0], Integer.valueOf(st[1]));
}
});

JavaPairRDD<String, Iterable<Integer>> pairrdd2= pairRdd.union(pairRdd1).groupByKey();
pairrdd2.foreach(new VoidFunction<Tuple2<String, Iterable<Integer>>>(){
@Override
public void call(Tuple2<String, Iterable<Integer>> t) throws Exception {
Iterable<Integer> iter = t._2();
for (Integer integer : iter) {
System.out.println(integer);
}
}
});
}

/*方法输出:
5
5
1
4
4
11
22
2
3
3
*/


//对键值对进行按键相同的对值进行操作
private static void testSparkCoreApiReduceByKey(JavaRDD<String> rdd){
JavaPairRDD<String, Integer> pairRdd=rdd.mapToPair(new PairFunction<String,String,Integer>(){
@Override
public Tuple2<String, Integer> call(String t) throws Exception {
String[] st=t.split(" ");
return new Tuple2(st[0], Integer.valueOf(st[1]));
}
});

JavaPairRDD<String, Integer> pairrdd2 =pairRdd.union(pairRdd).reduceByKey(
new Function2<Integer,Integer,Integer>(){
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1+v2;
}
}
).sortByKey() ;
pairrdd2.foreach(new VoidFunction<Tuple2<String, Integer>>(){
@Override
public void call(Tuple2<String, Integer> t) throws Exception {
System.out.println(t._2());
}
});
}
/*方法输出:
2
4
6
10
8
*/

// 对RDD进行递归调用
private static void testSparkCoreApiReduce(JavaRDD<String> rdd){
//由于原数据是String,需要转为Integer才能进行reduce递归
JavaRDD<Integer> rdd1=rdd.map(new Function<String,Integer>(){
@Override
public Integer call(String v1) throws Exception {
return Integer.valueOf(v1.split(" ")[1]);
}
});

Integer a= rdd1.reduce(new Function2<Integer,Integer,Integer>(){
@Override
public Integer call(Integer v1,Integer v2) throws Exception {
return v1+v2;
}
});
System.out.println("a ----> " + a);
}
/*方法输出:
a ----> 15
*/
}










 

转载于:https://www.cnblogs.com/ctaixw/p/6226187.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/371763.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

网页设计上机考试原题_Dreamweaver上机考试题目dreamweaver试题库网页制作试题.doc...

网页设计上机考试题集注意&#xff1a;所有题目中涉及的素材都在考试文件夹内&#xff0c;其中图片在下面的pic文件夹中&#xff0c;音乐、flash在media文件夹。1) 在1.html中的顶部添加一个锚点链接&#xff0c;点击之能立即到达页面最底端。2) 将1.html中的所有链接的默认样式…

35数据结构与算法分析之---最短路径

本系列是阅读《数据结构与算法应用实践教程》第2版 主编 李文书 北京大学出版社 的读书笔记&#xff0c;加上自己的理解&#xff0c;更多的是学习的记录与反思&#xff0c;如有不妥&#xff0c;欢迎指正&#xff0c;非常感谢。转载于:https://www.cnblogs.com/guochaoxxl/p/712…

Quartz 2 Scheduler示例

Quartz是一个开源作业调度框架。 它可用于管理和计划应用程序中的作业。 步骤1&#xff1a;建立已完成的专案 创建一个Maven项目&#xff0c;如下所示。 &#xff08;可以使用Maven或IDE插件来创建它&#xff09;。 步骤2&#xff1a;图书馆 Quartz依赖项已添加到Maven的po…

sql server 2008 com.microsoft.sqlserver.jdbc.SQLServerException: 通过端口 1433 连接到主机

原内容搬迁到了新网站&#xff0c;给你带来的不便&#xff0c;敬请谅解&#xff01; 》 http://www.suanliutudousi.com/2017/08/28/sql-server-2008-com-microsoft-sqlserver-jdbc-sqlserverexception-%E9%80%9A%E8%BF%87%E7%AB%AF%E5%8F%A3-1433-%E8%BF%9E%E6%8E%A5%E5%88%B0…

如何通过网线连接两台电脑快速传输数据?

介绍 我们经常需要拷贝文件会用到类似U盘等工具&#xff0c;但我们有时在传输大文件时又苦于没有&#xff0c;那么大内存的转存工具。这时候我们就可以通过一条小小的网线连接两台电脑&#xff0c;形成一个小的局域网传输数据&#xff0c;因为是通过网线传输&#xff0c;所以传…

30分钟内使用MongoDB

最近&#xff0c;我被NoSQL错误咬住了-或是我的同事Mark Atwell提出的“燃烧在哪里&#xff01;” 运动。 尽管我无意于在不久的将来或可预见的将来回避友好的“ SELECT ... WHERE”&#xff0c;但我确实设法弄懂了一些代码。 在本文中&#xff0c;我分享了我在NoSQL世界中首次…

【Django】--ModelForm组件

ModelForm a.class Meta:model,#对应Model的  fieldsNone,#字段  excludeNone,#排除字段  labelsNone,#提示信息  help_texts None,#帮助提示信息  widgets None,#自定义插件  error_messages None,#自定义错误信息(整体错误信息from django.core.exceptions im…

mysql实际综合案例_Mysql综合案例

Mysql综合案例考核要点&#xff1a;创建数据表、单表查询、多表查询已知&#xff0c;有一个学生表student和一个分数表score&#xff0c;请按要求对这两个表进行操作。student表和score分数表的表结构分别如表1-1和表1-2所示。表1-1student表结构字段名数据类型主键外键非空唯一…

2012年I / O之后

从注册到赠品&#xff0c;每年的I / O疯狂都在不断发展。 在今年20分钟内被出售&#xff0c;并没有阻止Google赠送更多的东西。 以这种速度并有望在明年发布Google Glass&#xff0c;明年注册很可能会变得更加混乱&#xff01; 因此&#xff0c;Google&#xff0c;请停止提供免…

h5启动原生APP总结

许久没有写博客了&#xff0c;最近有个H5启动APP原生页面的需求&#xff0c;中间遇上一些坑&#xff0c;看了些网上的实现方案&#xff0c;特意来总结下 一、需要判断客户端的平台以及是否在微信浏览器中访问 1、客户端判断 在启动APP时&#xff0c;Android和IOS系统处理的方式…

mysql导入创建表空间_oracle创建表空间 用户 数据库导入和导出(转)

已经安装orcale 9i 和pl/sql(6.0)OracleJobSchedulerORCL、OracleOraDb10g_home1iSQL*PlusOracleOraDb10g_home1TNSListenerOracleServiceORCL第一个是oem控制台服务进程第二个是定时器和isql*plus的服务进程第三个是监听器的服务进程最后是数据库服务进程1. pl/sql客户机安装后…

什么时候使用Apache Camel?

Apache Camel是JVM / Java环境中我最喜欢的开源框架之一。 它可以轻松集成使用多种协议和技术的不同应用程序。 本文介绍了何时使用Apache Camel以及何时使用其他替代方法。 问题&#xff1a;企业应用程序集成&#xff08;EAI&#xff09; 由于新产品和新应用&#xff0c;几乎…

念整数

念整数&#xff08;5分&#xff09;题目内容&#xff1a; 你的程序要读入一个整数&#xff0c;范围是[-100000,100000]。然后&#xff0c;用汉语拼音将这个整数的每一位输出出来。 如输入1234&#xff0c;则输出&#xff1a; yi er san si注意&#xff0c;每个字的拼音之间有一…

python 比较运算符放在列表中_在Python3中将运算符放在列表中

我想把操作符作为一个列表&#xff0c;然后从列表中调用一个元素作为操作符。在如果我没有在运算符周围加引号&#xff0c;那么列表中逗号的语法错误&#xff1a;File "p22.py", line 24cat [,-,*]^SyntaxError: invalid syntax如果我把引语放在周围&#xff0c;那么…

软工个人总结

目录 一、个人提升二、写下属于自己的人月神话三、对下一届、后来人、自己的建议四、我的团队——Clover五、关于代码质量六、学过软件工程&#xff1f;七、自我介绍八、个性发挥一、个人提升 1. 开学初的目标 希望通过团队合作领会团队合作的内在精神&#xff0c;希望在分工完…

Tomcat上下文JUnit @Rule

创建测试上下文的JUnit Rule的初稿。 这可以用Spring上下文规则可用于 这个帖子 创建集成测试一个完整的Spring上下文。 import org.apache.commons.dbcp.BasicDataSource; import org.apache.log4j.Logger; import org.junit.rules.TestRule; import org.junit.runner.Descrip…

排序算法之(7)——堆排序

【堆排序的思路】 堆排序主要是利用了堆的性质。对于大顶堆&#xff1a;堆中的每一个节点的值都不小于它的孩子节点的值&#xff0c;具体可參考我的还有一篇博客http://blog.csdn.net/adminabcd/article/details/46880591&#xff0c;那么大顶堆的堆顶元素就是当前堆中全部元素…

HTML基础:基本标签简介(3)

html中有很多标签&#xff0c;下面介绍最基本的几个标签。 1、meta 是head标签中的一个辅助性标签。 有2个重要属性&#xff1a; &#xff08;1&#xff09;name 可以优化页面被搜索到的可能性。name中可以指定属性&#xff0c;content是属性值。 <html><head><…

java 字符码_Java字符编码

编码原理介绍(中文编码杂谈)&#xff1a;int -> byte可以直接使用强制类型转换: byte b (byte) aInt;这个操作是直接截取int中最低一个字节&#xff0c;如果int大于255&#xff0c;则值就会变得面目全非了byte -> int这里有两种情况&#xff0c;一种是要求保持值不变&am…

重新登录:重新登录

嗨&#xff0c;我再次回到日志中来&#xff0c;这是任何应用程序设计和开发的固有部分。 我是坚强的基础知识的忠实拥护者&#xff0c;在我的拙见中&#xff0c;日志记录是任何企业级应用程序中经常被忽略但基本的关键要素之一。 我已经写在此之前这里 。 为了理解当前文章&…