Spark---资源、任务调度

一、Spark资源调度源码

1、Spark资源调度源码过程

Spark资源调度源码是在Driver启动之后注册Application完成后开始的。Spark资源调度主要就是Spark集群如何给当前提交的Spark application在Worker资源节点上划分资源。Spark资源调度源码在Master.scala类中的schedule()中进行的。

2、Spark资源调度源码结论

  1. Executor在集群中分散启动,有利于task计算的数据本地化。
  2. 默认情况下(提交任务的时候没有设置--executor-cores选项),每一个Worker为当前的Application启动一个Executor,这个Executor会使用这个Worker的所有的cores和1G内存。
  3. 如果想在Worker上启动多个Executor,提交Application的时候要加--executor-cores这个选项。
  4. 默认情况下没有设置--total-executor-cores,一个Application会使用Spark集群中所有的cores。
  5. 启动Executor不仅和core有关还和内存有关。

3、资源调度源码结论验证

使用Spark-submit提交任务演示。也可以使用spark-shell来验证。

1、默认情况每个worker为当前的Application启动一个Executor,这个Executor使用集群中所有的cores和1G内存。

./spark-submit 
--master spark://node1:7077--class org.apache.spark.examples.SparkPi../lib/spark-examples-1.6.0-hadoop2.6.0.jar 
10000

2、在workr上启动多个Executor,设置--executor-cores参数指定每个executor使用的core数量。

./spark-submit--master  spark://node1:7077--executor-cores 1 --class org.apache.spark.examples.SparkPi 
../lib/spark-examples-1.6.0-hadoop2.6.0.jar 
10000

3、内存不足的情况下启动core的情况。Spark启动是不仅看core配置参数,也要看配置的core的内存是否够用。

./spark-submit 
--master  spark://node1:7077 
--executor-cores 1  
--executor-memory 3g 
--class org.apache.spark.examples.SparkPi../lib/spark-examples-1.6.0-hadoop2.6.0.jar 
10000

--total-executor-cores集群中共使用多少cores

注意:一个进程不能让集群多个节点共同启动。

./spark-submit 
--master  spark://node1:7077 
--executor-cores 1  
--executor-memory 2g 
--total-executor-cores 3
--class org.apache.spark.examples.SparkPi../lib/spark-examples-1.6.0-hadoop2.6.0.jar 
10000

二、Spark任务调度源码

Spark任务调度源码是从Spark Application的一个Action算子开始的。action算子开始执行,会调用RDD的一系列触发job的逻辑。其中也有stage的划分过程:

三、Spark二次排序和分组取topN

1、二次排序

大数据中很多排序场景是需要先根据一列进行排序,如果当前列数据相同,再对其他某列进行排序的场景,这就是二次排序场景。例如:要找出网站活跃的前10名用户,活跃用户的评测标准就是用户在当前季度中登录网站的天数最多,如果某些用户在当前季度登录网站的天数相同,那么再比较这些用户的当前登录网站的时长进行排序,找出活跃用户。这就是一个典型的二次排序场景。

解决二次排序问题可以采用封装对象的方式,对象中实现对应的比较方法。

1.SparkConf sparkConf = new SparkConf()
2..setMaster("local")
3..setAppName("SecondarySortTest");
4.final JavaSparkContext sc = new JavaSparkContext(sparkConf);
5.
6.JavaRDD<String> secondRDD = sc.textFile("secondSort.txt");
7.
8.JavaPairRDD<SecondSortKey, String> pairSecondRDD = secondRDD.mapToPair(new PairFunction<String, SecondSortKey, String>() {
9.
10.  /**
11.   * 
12.  */
13.  private static final long serialVersionUID = 1L;
14.
15.  @Override
16.  public Tuple2<SecondSortKey, String> call(String line) throws Exception {
17.    String[] splited = line.split(" ");
18.    int first = Integer.valueOf(splited[0]);
19.    int second = Integer.valueOf(splited[1]);
20.    SecondSortKey secondSortKey = new SecondSortKey(first,second);
21.    return new Tuple2<SecondSortKey, String>(secondSortKey,line);
22.  }
23.});
24.
25.pairSecondRDD.sortByKey(false).foreach(new 
26.VoidFunction<Tuple2<SecondSortKey,String>>() {
27.
28.  /**
29.   * 
30.   */
31.  private static final long serialVersionUID = 1L;
32.
33.    @Override
34.    public void call(Tuple2<SecondSortKey, String> tuple) throws Exception {
35.      System.out.println(tuple._2);
36.  }
37.});
38.
39.
40.
41.public class SecondSortKey implements Serializable,Comparable<SecondSortKey>{
42.  /**
43.   * 
44.   */
45.  private static final long serialVersionUID = 1L;
46.  private int first;
47.  private int second;
48.  public int getFirst() {
49.    return first;
50.  }
51.  public void setFirst(int first) {
52.    this.first = first;
53.  }
54.  public int getSecond() {
55.    return second;
56.  }
57.  public void setSecond(int second) {
58.    this.second = second;
59.  }
60.  public SecondSortKey(int first, int second) {
61.    super();
62.    this.first = first;
63.    this.second = second;
64.  }
65.  @Override
66.  public int compareTo(SecondSortKey o1) {
67.    if(getFirst() - o1.getFirst() ==0 ){
68.      return getSecond() - o1.getSecond();
69.    }else{
70.      return getFirst() - o1.getFirst();
71.    }
72.  }
73.}

2、分组取topN

大数据中按照某个Key进行分组,找出每个组内数据的topN时,这种情况就是分组取topN问题。

解决分组取TopN问题有两种方式,第一种就是直接分组,对分组内的数据进行排序处理。第二种方式就是直接使用定长数组的方式解决分组取topN问题。

1.SparkConf conf = new SparkConf()
2..setMaster("local")
3..setAppName("TopOps");
4.JavaSparkContext sc = new JavaSparkContext(conf);
5.JavaRDD<String> linesRDD = sc.textFile("scores.txt");
6.
7.JavaPairRDD<String, Integer> pairRDD = linesRDD.mapToPair(new PairFunction<String, String, Integer>() {
8.
9.  /**
10.   * 
11.  */
12.  private static final long serialVersionUID = 1L;
13.
14.  @Override
15.  public Tuple2<String, Integer> call(String str) throws Exception {
16.    String[] splited = str.split("\t");
17.    String clazzName = splited[0];
18.    Integer score = Integer.valueOf(splited[1]);
19.    return new Tuple2<String, Integer> (clazzName,score);
20.  }
21.});
22.
23.pairRDD.groupByKey().foreach(new 
24.VoidFunction<Tuple2<String,Iterable<Integer>>>() {
25.
26.  /**
27.   * 
28.   */
29.  private static final long serialVersionUID = 1L;
30.
31.  @Override
32.  public void call(Tuple2<String, Iterable<Integer>> tuple) throws Exception {
33.    String clazzName = tuple._1;
34.    Iterator<Integer> iterator = tuple._2.iterator();
35.
36.    Integer[] top3 = new Integer[3];
37.
38.    while (iterator.hasNext()) {
39.      Integer score = iterator.next();
40.
41.      for (int i = 0; i < top3.length; i++) {
42.        if(top3[i] == null){
43.          top3[i] = score;
44.          break;
45.        }else if(score > top3[i]){
46.        for (int j = 2; j > i; j--) {
47.          top3[j] = top3[j-1];
48.        }
49.        top3[i] = score;
50.        break;
51.      }
52.    }
53.  }
54.  System.out.println("class Name:"+clazzName);
55.  for(Integer sscore : top3){
56.    System.out.println(sscore);
57.  }
58.}
59.});

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/179654.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

界面控件DevExpress WPF流程图组件,完美复制Visio UI!(二)

DevExpress WPF Diagram&#xff08;流程图&#xff09;控件帮助用户完美复制Microsoft Visio UI&#xff0c;并将信息丰富且组织良好的图表、流程图和组织图轻松合并到您的下一个WPF项目中。 在上文中&#xff08;点击这里回顾>>&#xff09;&#xff0c;我们为大家介绍…

Java后端开发——MVC商品管理程序

Java后端开发——MVC商品管理程序 本篇文章内容主要有下面几个部分&#xff1a; MVC架构介绍项目环境搭建商品管理模块Servlet代码重构BaseServlet文件上传 MVC 是模型-视图-控制器&#xff08;Model-View-Controller&#xff09;&#xff0c;它是一种设计模式&#xff0c;也…

java设计模式学习之【原型模式】

文章目录 引言原型模式简介定义与用途实现方式UML 使用场景优势与劣势原型模式在spring中的应用员工记录示例代码地址 引言 原型模式是一种创建型设计模式&#xff0c;它允许对象能够复制自身&#xff0c;以此来创建一个新的对象。这种模式在需要重复地创建相似对象时非常有用…

【代码】基于卷积神经网络(CNN)-支持向量机(SVM)的分类预测算法

程序名称&#xff1a;基于卷积神经网络&#xff08;CNN&#xff09;-支持向量机&#xff08;SVM&#xff09;的分类预测算法 实现平台&#xff1a;matlab 代码简介&#xff1a;CNN-SVM是一种常用的图像分类方法&#xff0c;结合了卷积神经网络&#xff08;CNN&#xff09;和支…

移动应用开发介绍及iOS方向学习路线(HUT移动组版)

移动应用开发介绍及iOS方向学习路线&#xff08;HUT移动组版&#xff09; 前言 ​ 作为一个HUT移动组待了一坤年&#xff08;两年半&#xff09;多的老人&#xff0c;在这里为还在考虑进哪个组的萌新们以及将来进组的新朋友提供一份关于移动应用开发介绍以及学习路线的白话文…

DC电源模块有哪些常见故障?怎么解决这些问题?

DC-DC电源模块的作用是将输入电压转换为所需的输出电压&#xff0c;广泛应用于电子产品、汽车电子、医疗设备、通信系统等领域。但是在使用过程中DC电源模块会出现一些故障和问题&#xff0c;影响电源模块和其它电路器件的性能。因此&#xff0c;纳米软件将为大家介绍常见的DC-…

大坝安全监测的内容及作用

大坝安全监测是指对大坝水雨情沉降、倾斜、渗压以及大坝形状特征有效地进行监测&#xff0c;及时发现潜在的安全隐患和异常情况&#xff0c;以便大坝管理人员能够做出科学决策&#xff0c;以确保大坝安全稳定运行。 大坝安全监测的主要内容 1.表面位移监测&#xff1a;监测大坝…

分子骨架跃迁工具-DiffHopp 评测

一、文章背景介绍 DiffHopp模型发表在ICML 2023 Workshop on Computational Biology&#xff08;简称&#xff1a;2023 ICML-WCB&#xff09;上的文章。第一作者是剑桥计算机系的Jos Torge。 DiffHopp是一个专门针对骨架跃迁任务而训练的E3等变条件扩散模型。此外&#xff0c;…

LeetCode Hot100 84.柱状图中最大的矩形

题目&#xff1a; 给定 n 个非负整数&#xff0c;用来表示柱状图中各个柱子的高度。每个柱子彼此相邻&#xff0c;且宽度为 1 。 求在该柱状图中&#xff0c;能够勾勒出来的矩形的最大面积。 方法&#xff1a; 代码&#xff1a; class Solution {public int largestRectang…

MySOL常见四种连接查询

1、内联接 &#xff08;典型的联接运算&#xff0c;使用像 或 <> 之类的比较运算符&#xff09;。包括相等联接和自然联接。 内联接使用比较运算符根据每个表共有的列的值匹配两个表中的行。例如&#xff0c;检索 students和courses表中学生标识号相同的所有行。 2、…

机器学习之危险品车辆目标检测

危险品的运输涉及从离开仓库到由车辆运输到目的地的风险。监控事故、车辆运动动态以及车辆通过特定区域的频率对于监督车辆运输危险品的过程至关重要。 在线工具推荐&#xff1a; 三维数字孪生场景工具 - GLTF/GLB在线编辑器 - Three.js AI自动纹理化开发 - YOLO 虚幻合成数…

使用STM32微控制器实现光电传感器的接口和数据处理

光电传感器在许多领域中被广泛应用&#xff0c;例如工业自动化、智能家居等。本文将介绍如何使用STM32微控制器实现光电传感器的接口和数据处理的方案&#xff0c;包括硬件设计、引脚配置、数据采集、滤波和阈值判断等关键步骤&#xff0c;并给出相应的代码示例。 一、引言 光…

MySQL使用函数和存储过程实现:向数据表快速插入大量测试数据

实现过程 1.创建表 CREATE TABLE user_info (id INT(11) NOT NULL AUTO_INCREMENT,name VARCHAR(20) DEFAULT NULL,age INT(3) DEFAULT NULL,pwd VARCHAR(20) DEFAULT NULL,phone_number VARCHAR(11) DEFAULT NULL,email VARCHAR(255) DEFAULT NULL,address VARCHAR(255) DEF…

DHCP协议及实验omnipeek抓包工具分析 IPv4协议

一 抓包命令 adb shell tcpdump -i wlan0 -w /data/tcpdump.pcap 抓包后截图如下 二 DHCP是什么 2.1 DHCP定义 DHCP( Dynamic Host Configuration Protocol, 动态主机配置协议)定义: 存在于应用层(OSI) 前身是BOOTP(Bootstrap Protocol)协议 是一个使用UDP(User …

如何编写自己的python包,并在本地进行使用

如何编写自己的python包,并在本地进行使用 一、直接引用 1.创建Python项目pythonProject。 2.并且在此项目下创建pg_message包。 3.pg_message包下默认生成_init_.py文件。 Python中_init_.py是package的标志。init.py 文件的一个主要作用是将文件夹变为一个Python模块,Pyt…

使用Jmeter进行http接口测试

前言&#xff1a; 本文主要针对http接口进行测试&#xff0c;使用Jmeter工具实现。 Jmter工具设计之初是用于做性能测试的&#xff0c;它在实现对各种接口的调用方面已经做的比较成熟&#xff0c;因此&#xff0c;本次直接使用Jmeter工具来完成对Http接口的测试。 一、开发接口…

杂记 | 使用Docker安装并配置MongoDB以支持事务(单副本,并解决了证书文件错误的问题)

文章目录 00 安装前的准备01 创建Docker Compose文件02 设置证书文件03 启动MongoDB04 初始化副本集和创建用户05 验证安装 00 安装前的准备 在开始之前&#xff0c;确保已经安装了Docker&#xff0c;本文基于Docker Compose进行示范&#xff0c;没有装Docker Compose也可将其…

人大金仓亮相2023信息技术应用创新论坛

11月25日&#xff0c;2023信息技术应用创新论坛在常州开幕。人大金仓受邀分享信息技术应用创新行业应用典型成果&#xff0c;在论坛展览部分集中展示了最具代表性的新产品、应用及解决方案。 江苏省工业和信息化厅副厅长池宇、中国电子工业标准化技术协会理事长胡燕、常州市常务…

量子力学技术前沿:探索、挑战与未来

量子力学技术前沿:探索、挑战与未来 一、引言 量子力学,这门揭示微观世界规律的学科,自诞生以来就在科技领域发挥着举足轻重的作用。随着科技的飞速发展,量子力学的应用也在不断拓展和深化。今天,我将带领大家一起领略量子力学技术的魅力,探讨其发展趋势和挑战。 二、量…

windows系统mobaxterm远程执行linux上ssh命令

命令如下 start "" "%~dp0\MobaXterm_Personal_23.4.exe" -newtab "sshpass -p root ssh root192.168.11.92 mkdir 33" -p 是密码 左边是用户名&#xff0c;右边是服务器ip 后面跟的是服务器上执行的命令 第一次执行的时候要设置mobaxt…