Hadoop简单应用程序实例

Hadoop是一个分布式系统基础架构,主要用于大数据的存储和处理。它允许使用简单的编程模型跨集群处理和生成大数据集。Hadoop主要由HDFS(Hadoop Distributed FileSystem,分布式文件系统)和MapReduce编程模型两部分组成。 

 

准备工作

首先查看数据集(一小部分数据和示例)

 

配置pom文件和建包

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>stock_daily</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties><dependencies><!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-hdfs --><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-hdfs</artifactId><version>3.1.2</version><scope>test</scope></dependency><!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-client --><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>3.1.2</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-common --><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-common</artifactId><version>3.1.2</version></dependency><dependency><groupId>org.apache.commons</groupId><artifactId>commons-configuration2</artifactId><version>2.7</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>1.7.30</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-common</artifactId><version>3.1.2</version></dependency></dependencies>
</project>

代码

创建一个类继承Configured实现Tool接口,configured类可以帮助hadoop命令行工具管理配置文件,如yarn-site.xmlmapred-site.xmlTool接口中的ToolRunner是程序的入口点,可以启动run方法并把命令行的参数传给run

重写run方法,创建job和配置mapreduce类。这个configurition就是用来管理hadoop的配置文件的类。args是命令行输入的参数,虚拟机会把它读进来。

Mapper类以及map方法

Mapper类会将文件按行切分,然后把每一行的字节偏移量作为建,每一行的数据作为值交给map方法。Map方法把里面的内容取出来求下行指数,下行指数=((收盘价-开盘价) / (收盘价 - 最低价+1)然后将股票代码作为键,每一行的下行指数作为值写入context中,作为后面reduce的输出。context.write用于写入输出数据。

Reduce类和reduce方法

Shuffile会把map输出文件下载过来,然后会自动根据键,聚合到一个容器里面,遍历求和并计算平均的下行指数即可。

完整代码

package com.zlh;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.io.IOException;/*** calculate and output the code and danger values of stock.* */
public class stock_daily extends Configured implements Tool {/*** The entrance of the program.* @param args is used as the parameter of run method.* */public static void main(String[] args) throws Exception {//run the stock_daily as a mapreduce job.int res = ToolRunner.run(new stock_daily(),args);//close the JVMSystem.exit(res);}//of main/*** construct job and execute the job.* @param args The given String args.* */@Overridepublic int run(String[] args) throws Exception {//set configure parameter information of hadoopConfiguration conf = new Configuration();//construct Job classSystem.out.println("创建和配置Job");Job job = Job.getInstance(conf,"stock_daily");//indicate the class of the Jobjob.setJarByClass(stock_daily.class);//indicate the class of the Map and Reducejob.setMapperClass(Map.class);job.setReducerClass(Reduce.class);job.setCombinerClass(Reduce.class);//indicate the format of the input:Text type filejob.setInputFormatClass(TextInputFormat.class);TextInputFormat.addInputPath(job, new Path(args[0]));//indicate the format of the output:key is text,value is double.job.setOutputFormatClass(TextOutputFormat.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(DoubleWritable.class);TextOutputFormat.setOutputPath(job,new Path(args[1]));//Execute the mapreduceboolean res = job.waitForCompletion(true);if(res){return 0;}//of ifelse{return -1;}//of else}//of run/*** The map class is used to dispose the data to many lines as the input of the method.* */public static class Map extends Mapper<LongWritable, Text, Text, DoubleWritable>{//define the map output key and value.private final static DoubleWritable downIndex = new DoubleWritable();private Text stock = new Text();/*** Use each line's stock_code as key,downIndex as the value* @param key The byte offset of every line.* @param value text values.* @param context The program context.* */@Overridepublic void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException{//split line to calculate the falling indexString[] fields = value.toString().split("\t");stock.set(fields[0]);double openPrice=Double.parseDouble(fields[2]);double closePrice=Double.parseDouble(fields[3]);double lowPrice=Double.parseDouble(fields[5]);downIndex.set((closePrice-openPrice)-(closePrice-lowPrice+1));context.write(stock,downIndex);}//of map}//of class Map/*** The Reduce is used to calculate the output result.* */public static class Reduce extends Reducer<Text, DoubleWritable, Text, DoubleWritable>{/*** Output the avg downIndex of every stock code.* @param key The output key of mapper* @param values output values of mapper* @param context The context of mapreduce.* */public void reduce (Text key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException {double sum=0;int nums = 0;//traverse the Iterable values and sum of themfor (DoubleWritable value : values) {sum += value.get();nums++;}//of whilecontext.write(key,new DoubleWritable(sum/nums));}//of reduce}//of class Reduce
}//of class stock_daily

 上传集群并执行

将项目文件打包为jar包上传至hadoop集群中(打包方式参照Hadoop应用1)

windows的命令提示符里面使用pscp命令上传jar包(前提是已经安装了putty

文件夹也可以通过这个方式传,要在pscp后面加个-r

启动集群后使用hadoop jar 输入文件位置(要在hdfs里面,不是在linux里面) 输出文件目录,会报找不到类的错,要修改两个配置文件。

1、mapred-site.xml
增加两个配置:
<property><name>mapreduce.admin.user.env</name><value>HADOOP_MAPRED_HOME=$HADOOP_COMMON_HOME</value>
</property>
<property><name>yarn.app.mapreduce.am.env</name><value>HADOOP_MAPRED_HOME=$HADOOP_COMMON_HOME</value>
</property>2、yarn-site.xml
增加container本地日志查看配置
<property>  <name>yarn.nodemanager.log-dirs</name>  <value>hadoop安装目录/logs/userlogs</value>  
</property>
<property>  <name>yarn.nodemanager.log.retain-seconds</name>  <value>108000</value>  
</property>
<property><name>yarn.nodemanager.resource.memory-mb</name><value>2048</value>	<!--此项小于1536,mapreduce程序会报错-->
</property>
<property><name>yarn.scheduler.maximum-allocation-mb</name><value>2048</value>   <!--防止一级调度器请求资源量过大-->
</property>设置虚拟内存与内存的倍率,防止VM不足Container被kill
<property>  <name>yarn.nodemanager.vmem-pmem-ratio</name>  <value>3</value>  
</property>以上配置确认无误后,如果仍有报内存错误、AM错误、卡Job、卡0%等问题找不到原因,可以尝试按以下方式解决:
(相应属性的设置为HA模式设置)(1)mapred-site.xml
将mapreduce.framework.name改为:
------------------------------------
vix.mapreduce.framework.name
yarn
------------------------------------(2)yarn-site.xml
将yarn.resourcemanager.address改为:
------------------------------------
vix.yarn.resourcemanager.address
主节点地址:18040
------------------------------------将yarn.resourcemanager.scheduler.address改为:
------------------------------------
vix.yarn.resourcemanager.scheduler.address
主节点地址:18030
------------------------------------

文件位置以及路径如下图所示

修改之后把文件传到另外两个节点,然后重新启动集群

然后执行jar包(要先把数据上传到hadoop集群中,使用hdfs dfs -put命令)

 试验运行过程及结果

 ps:hadoop执行jar包出现问题可以在日志文件里面找报错。在logs里面的resourcemanager里面。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/36050.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

第十节 动态面板实现推动和拉动效果

在原型设计中我们经常会遇到元件使用显示更多或者收起效果,下面以面板元件推动与拉动效果做案件说明。 一、设置原有内容 我这里添加一个表格内容,添加“显示更多”文本超链接 二、设置在更多显示面板内容 添加一个动态面板,设置有内容、无内容两个状态 在有内容面板中添…

WDF驱动开发-WDF总线枚举

支持在总线驱动程序中进行 PnP 和电源管理 某些设备永久插入系统&#xff0c;而其他设备可以在系统运行时插入和拔出电源。 总线驱动 必须识别并报告连接到其总线的设备&#xff0c;并且他们必须发现并报告系统中设备的到达和离开情况。 总线驱动程序标识和报告的设备称为总线…

TEMU半托管模式引领跨境电商新风尚

TEMU半托管模式作为2024年的热门话题&#xff0c;正吸引着越来越多卖家的目光。继全托管模式取得巨大成功之后&#xff0c;半托管模式的推出无疑为跨境电商行业注入了新的活力。 在选品方向上&#xff0c;TEMU半托管模式强调商品的聚焦与精选。卖家在选择上架商品时&#xff0c…

python AI全栈工程师

python AI全栈工程师 前端&#xff1a;Streamlit Streamlit是一个开源的Python库&#xff0c;专为数据科学家和机器学习工程师设计&#xff0c;用于快速构建交互式用户界面。Streamlit功能强大、易于使用&#xff0c;特别适合数据科学家和机器学习工程师快速构建和部署交互式数…

在项目中使用 VitePress 作为文档常见问题:样式丢失,图标丢失,打包错误,中文配置修改等

本文总结和记录自己在使用 vitepress 作为 Vue 项目文档时遇到的问题&#xff0c;以及解决方法。直接进入正题&#xff1a; md 文档中引入组件部分样式丢失 默认你导入的 vue 文件的 style 标签里的样式会生效&#xff0c;但是样式之外的样式不会生效&#xff0c;需要手动引入…

nodejs国内源下载

nodejs的官网下载太慢了 可以尝试网盘下载快一点 夸克网盘分享夸克网盘是夸克推出的一款云服务产品&#xff0c;功能包括云存储、高清看剧、文件在线解压、PDF一键转换等。通过夸克网盘可随时随地管理和使用照片、文档、手机资料&#xff0c;目前支持Android、iOS、PC、iPad。…

AI产品经理如何快速接手一个新产品?

我们到一家新的公司&#xff0c;往往都有现成的产品需要你熟悉&#xff0c;这个对你来说就是一个新产品。 又或者说&#xff0c;公司要搭建一个新的项目&#xff0c;让你负责&#xff0c;需要你从0开始去接手&#xff0c;最终去上线&#xff0c;去推广&#xff0c;去盈利&…

【雷丰阳-谷粒商城 】【分布式高级篇-微服务架构篇】【14】缓存与分布式锁

持续学习&持续更新中… 守破离 【雷丰阳-谷粒商城 】【分布式高级篇-微服务架构篇】【14】缓存与分布式锁 缓存本地缓存分布式缓存-本地模式在分布式下的问题分布式缓存整合 redis 作为缓存JMeter测试出OutOfDirectMemoryError【堆外内存溢出】 高并发读下缓存失效问题缓存…

基于YOLOv5+PyQT5的吸烟行为检测(含pyqt页面、模型、数据集)

简介 吸烟不仅对个人健康有害,也可能在某些特定场合带来安全隐患。为了有效地监控公共场所和工作环境中的吸烟行为,我们开发了一种基于YOLOv5目标检测模型的吸烟检测系统。本报告将详细介绍该系统的实际应用与实现,包括系统架构、功能实现、使用说明、检测示例、数据集获取…

UDS - 10.2 DiagnosticSessionControl (10) service

10.3 诊断会话控制(10)服务 来自:ISO 14229-1-2020.pdf 10.2.1 服务说明 DiagnosticsSessionControl服务用于在服务器中启用不同的诊断会话。 诊断会话启用服务器中的一组特定诊断服务和/或功能。该服务提供了服务器可以报告对启用的诊断会话有效的数据链路层特定参数值(…

ZAP安全扫描工具

下载地址: 去官网下载&#xff1a;https://www.zaproxy.org/download/ 1.主动扫描 需要登录的网站建议使用主动扫描 也可以绕过登录进行手动扫描 再选择手动扫描后 获取到对应的token 2.自动扫描 3.查看报告 4.扫描策略的使用

.gitignore git添加忽略文件

在项目的根目录下创建一个名为 .gitignore 的文件。在这个文件中&#xff0c;列出您希望Git忽略的文件和文件夹的名称或模式。 下面是一些基本的步骤和规则&#xff1a; 创建 .gitignore 文件&#xff1a;在项目根目录下创建一个名为 .gitignore 的文件。如果没有这个文件&…

如何设计一门编程语言?

一、设计流程 步骤说明 确定语言目标和用途&#xff1a; 目标受众&#xff1a;确定是面向初学者、专业开发者还是特定领域专家。 主要用途&#xff1a;明确语言的主要用途&#xff0c;如系统编程、Web 开发、数据分析、科学计算等。 独特卖点&#xff1a;确定语言的独特优势…

如何使用 Python 交互式解释器?

1. 什么是Python交互式解释器&#xff1f; Python交互式解释器是一种REPL&#xff08;Read-Eval-Print Loop&#xff09;环境。它会读取用户输入的代码&#xff0c;执行代码&#xff0c;并输出结果&#xff0c;随后等待下一个用户输入。这种交互方式使得Python非常适合快速原型…

【Day03】0基础微信小程序入门-学习笔记

文章目录 视图与逻辑学习目标页面导航1. 声明式导航2. 编程式导航3. 导航传参 页面事件1. 下拉刷新2. 上拉触底3.扩展-自定义编译模式 生命周期1. 简介2. 生命周期函数3. 应用的生命周期函数4. 页面生命周期函数 WXS脚本1. 概述2. 基础语法3. WXS的特点4. 使用WXS处理手机号 总…

Multisim详细安装过程

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 前言一、Multisim是什么&#xff1f;二、下载安装步骤1.下载安装包2.安装 总结 前言 对于很多学习电路&#xff0c;数电&#xff0c;模电的朋友&#xff0c;我们在…

富格林:可信经验曝光有效出金

富格林认为&#xff0c;在现货黄金当中&#xff0c;曝光可信的交易技巧可以帮助投资者有效地盈利出金。现货黄金市场就像一把双刃剑&#xff0c;投资者利用得好就能成为赢家&#xff0c;利用得不好便是损失钱财。事实上&#xff0c;要想成为赢家还是要掌握必须的可信经验。以下…

volcengine 库装不上 #25

https://github.com/volcengine/volc-sdk-python/issues/25 解决了, 就是解决方案比较蠢 在 Docker python3.10-slim 中 volcengine 安装时报错, 其依赖 pycryptodome 显示 gcc 相关错误 调研发现 pycryptodome3.19.0 不会报错, volcengine 依赖的 pycryptodome3.9.9 会报错 …

php中strict_types使用详解

在PHP中&#xff0c;strict_types是一个声明性的指令&#xff0c;用于在文件级别控制类型声明的严格性。当你在脚本的最顶部使用declare语句启用strict_types时&#xff0c;PHP将在该文件中对类型声明执行严格的类型检查。这意味着函数参数、返回值等必须精确匹配指定的类型&am…

05-Mysql备份与恢复

物理备份&#xff1a;对数据库操作系统的物理文件&#xff08;如数据库文件&#xff0c;日志文件等&#xff09;的备份 物理备份方法&#xff1a; 冷备份&#xff08;防脱备份&#xff09;&#xff1a;是在关闭数据库的时候进行的 热备份&#xff08;联机备份&#xff09;&am…