1.hadoop 配置文件 core-site hdfs-site yarn-site.xml worker
hdfs-site.xml
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration><property><name>dfs.nameservices</name><value>mycluster</value></property><property><name>dfs.ha.namenodes.mycluster</name><value>nn1,nn2</value></property><property><name>dfs.namenode.rpc-address.mycluster.nn1</name><value>xiemeng-01:9870</value></property><property><name>dfs.namenode.rpc-address.mycluster.nn2</name><value>xiemeng-02:9870</value></property><property><name>dfs.namenode.http-address.mycluster.nn1</name><value>xiemeng-01:50070</value></property><property><name>dfs.namenode.http-address.mycluster.nn2</name><value>xiemeng-02:50070</value></property><property><name>dfs.namenode.shared.edits.dir</name><value>qjournal://xiemeng-01:8485;xiemeng-02:8485;xiemeng-03:8485/mycluster</value></property><!--配置journalnode的工作目录--><property><name>dfs.journalnode.edits.dir</name><value>/home/xiemeng/software/hadoop-3.2.0/journalnode/data</value></property><property><name>dfs.client.failover.proxy.provider.mycluster</name><value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value></property><property><name>dfs.ha.fencing.methods</name><value>sshfenceshell(/bin/true)</value></property><property><name>dfs.ha.fencing.ssh.private-key-files</name><value>/home/root/.ssh/id_rsa</value></property><property><name>dfs.ha.automatic-failover.enabled</name><value>true</value></property><property><name>dfs.webhdfs.enabled</name><value>true</value> </property><property><name>dfs.name.dir</name><value>/home/xiemeng/software/hadoop-3.2.0/name</value></property><property><name>dfs.data.dir</name><value>/home/xiemeng/software/hadoop-3.2.0/data</value></property><property><name>dfs.replication</name><value>1</value></property><property><name>dfs.journalnode.edits.dir</name><value>/opt/journalnode/data</value></property><property><name>dfs.ha.automatic-failover.enabled</name><value>true</value></property><property><name>dfs.ha.fencing.ssh.connect-timeout</name><value>30000</value></property>
</configuration>
core-site.xml
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration><property><name>fs.defaultFS</name><value>hdfs://mycluster</value></property><property><name>dfs.nameservices</name><value>mycluster</value></property><property><name>ha.zookeeper.quorum</name><value>192.168.64.128:2181,192.168.64.130:2181,192.168.64.131:2181</value></property><property><name>hadoop.tmp.dir</name><value>/home/xiemeng/software/hadoop-3.2.0/var</value></property><property><name>io.file.buffer.size</name><value>131072</value></property><property><name>ipc.client.connect.max.retries</name><value>100</value><description>Indicates the number of retries a client will make to establisha server connection.</description></property><property><name>ipc.client.connect.retry.interval</name><value>10000</value><description>Indicates the number of milliseconds a client will wait forbefore retrying to establish a server connec
tion.</description></property><property><name>hadoop.proxyuser.xiemeng.hosts</name><value>*</value></property><property><name>hadoop.proxyuser.xiemeng.groups</name><value>*</value></property><property><name>hadoop.native.lib</name><value>true</value><description>Should native hadoop libraries, if present, be used.</description></property><property><name>fs.trash.interval</name><value>1</value></property><property><name>fs.trash.checkpoint.interval</name><value>1</value></property>
</configuration>
yarn-site.xml
<configuration>
<property><name>yarn.nodemanager.aux-services</name><value>mapreduce_shuffle</value></property><property><name>yarn.resourcemanager.ha.enabled</name><value>true</value></property><property><name>yarn.resourcemanager.cluster-id</name><value>mycluster</value></property><property><name>yarn.resourcemanager.ha.rm-ids</name><value>rm1,rm2</value></property><property><name>yarn.resourcemanager.hostname.rm1</name><value>xiemeng-01</value></property><property><name>yarn.resourcemanager.hostname.rm2</name><value>xiemeng-02</value></property><property><name>yarn.resourcemanager.webapp.address.rm1</name><value>xiemeng-01:8088</value></property><property><name>yarn.resourcemanager.webapp.address.rm2</name><value>xiemeng-02:8088</value></property><property><name>yarn.resourcemanager.zk-address</name><value>192.168.64.128:2181,192.168.64.130:2181,192.168.64.131:2181</value></property><property><name>yarn.nodemanager.pmem-check-enabled</name><value>false</value></property><!--是否启动一个线程查询每个任务使用的虚拟内存量,如果任务超出内存值直接杀掉,默认为true--><property><name>yarn.nodemanager.vmem-check-enabled</name><value>false</value></property><property><name>yarn.resourcemanager.scheduler.class</name><value>org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler</value></property><!-- 开启标签功能 --><property><name>yarn.node-labels.enabled</name><value>true</value></property><!-- 设置标签存储位置--><property><name>yarn.node-labels.fs-store.root-dir</name><value>hdfs://mycluster/yn/node-labels/</value></property><!-- 开启资源抢占监控 --><property><name>yarn.resourcemanager.scheduler.monitor.enable</name><value>true</value></property><!-- 设置一轮抢占的资源占比,默认为0.1 --><property><name>yarn.resourcemanager.monitor.capacity.preemption.total_preemption_per_round</name><value>0.3</value></property>
</configuration>
workers
xiemeng-01
xiemeng-02
xiemeng-03
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration><property><name>mapreduce.framework.name</name><value>yarn</value></property><property><name>mapreduce.application.classpath</name><value>/home/xiemeng/software/hadoop-3.2.0/etc/hadoop,/home/xiemeng/software/hadoop-3.2.0/share/hadoop/common/*,/home/xiemeng/software/hadoop-3.2.0/share/hadoop/common/lib/*,/home/xiemeng/software/hadoop-3.2.0/share/hadoop/hdfs/*,/home/xiemeng/software/hadoop-3.2.0/share/hadoop/hdfs/lib/*,/home/xiemeng/software/hadoop-3.2.0/share/hadoop/mapreduce/*,/home/xiemeng/software/hadoop-3.2.0/share/hadoop/mapreduce/lib/*,/home/xiemeng/software/hadoop-3.2.0/share/hadoop/yarn/*,/home/xiemeng/software/hadoop-3.2.0/share/hadoop/yarn/lib/*</value></property>
</configuration>
capacity-scheduler.xml
<configuration><property><name>yarn.scheduler.capacity.maximum-applications</name><value>10000</value></property><property><name>yarn.scheduler.capacity.maximum-am-resource-percent</name><value>0.1</value></property><property><name>yarn.scheduler.capacity.resource-calculator</name><value>org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator</value></property><property><name>yarn.scheduler.capacity.root.leaf-queue-template.ordering-policy</name><value>fair</value></property>
<property><name>yarn.scheduler.capacity.root.queues</name><value>default,low</value></property><property><name>yarn.scheduler.capacity.root.default.capacity</name><value>40</value></property><property><name>yarn.scheduler.capacity.root.low.capacity</name><value>60</value></property><property><name>yarn.scheduler.capacity.root.default.user-limit-factor</name><value>1</value></property><property><name>yarn.scheduler.capacity.root.low.user-limit-factor</name><value>1</value></property><property><name>yarn.scheduler.capacity.root.default.maximum-capacity</name><value>60</value></property><property><name>yarn.scheduler.capacity.root.low.maximum-capacity</name><value>80</value></property><property><name>yarn.scheduler.capacity.root.default.default-application-priority</name><value>100</value></property><property><name>yarn.scheduler.capacity.root.low.default-application-priority</name><value>100</value></property><property><name>yarn.scheduler.capacity.root.low.acl_administer_queue</name><value>xiemeng,root</value></property><property><name>yarn.scheduler.capacity.root.low.acl_submit_applications</name><value>xiemeng,root</value></property><property><name>yarn.scheduler.capacity.root.default.acl_administer_queue</name><value>xiemeng,root</value></property><property><name>yarn.scheduler.capacity.root.default.acl_submit_applications</name><value>xiemeng,root</value></property>
</configuration>
Hadoop
启动Hadoop集群:
Step1 : 在各个JournalNode节点上,输入以下命令启动journalnode服务: sbin/hadoop-daemon.sh start journalnode
Step2: 在[nn1]上,对其进行格式化,并启动: bin/hdfs namenode -format sbin/hadoop-daemon.sh start namenode
Step3: 在[nn2]上,同步nn1的元数据信息: hdfs namenode -bootstrapStandby
查看执行任务日志
yarn logs -applicationId application_1607776903207_0002
2. 基本架构 jobMannager resourceManager TaskMananger 一些流程
3.hadoop 命令行操作
hdfs dfs -put [-f] [-p] <localsrc> ... <dst>
hdfs dfs -get [-p] [-ignoreCrc] [-crc] <src> ... <localdst>
hadoop hdfs dfs –put [本地目录] [hadoop目录]
hadoop fs -mkdir -p < hdfs dir >
3.hadoop java 操作
Mapper,Reducer,InputFormat OutPutFormat Comparator Partition Comperess
public class WordCountMapper extends Mapper<LongWritable,Text,Text,LongWritable> {/*** 初始化** @param context* @throws IOException* @throws InterruptedException*/@Overrideprotected void setup(Context context) throws IOException, InterruptedException {super.setup(context);}/**** 用户业务** @param key* @param value* @param context* @throws IOException* @throws InterruptedException*/@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String str = value.toString();String [] words = StringUtils.split(str);for(String word:words){context.write(new Text(word),new LongWritable(1));}}/*** 清理资源** @param context* @throws IOException* @throws InterruptedException*/@Overrideprotected void cleanup(Context context) throws IOException, InterruptedException {super.cleanup(context);}
}
public class WordCountReducer extends Reducer<Text, LongWritable, Text, LongWritable> {@Overrideprotected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {long count =0;for(LongWritable value:values){count += value.get();}context.write(key,new LongWritable(count));}
}
public class WordCountDriver {public static void main(String[] args) {Configuration config = new Configuration();System.setProperty("HADOOP_USER_NAME", "xiemeng");config.set("fs.defaultFS","hdfs://192.168.64.128:9870");config.set("mapreduce.framework.name","yarn");config.set("yarn.resourcemanager.hostname","192.168.64.128");config.set("mapreduce.app-submission.cross-platform", "true");config.set("mapreduce.job.jar","file:/D:/code/hadoop-start-demo/target/hadoop-start-demo-1.0-SNAPSHOT.jar");try {Job job = Job.getInstance(config);job.setJarByClass(WordCountDriver.class);job.setMapperClass(WordCountMapper.class);job.setCombinerClass(WordCountCombiner.class);job.setReducerClass(WordCountReducer.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(LongWritable.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(LongWritable.class);FileInputFormat.setInputPaths(job,new Path("/wordcount/input"));FileOutputFormat.setOutputPath(job,new Path("/wordcount2/output"));instance.setGroupingComparatorClass(OrderGroupintComparator.class);FileOutputFormat.setCompressOutput(job, true);FileOutputFormat.setOutputCompressorClass(job, BZip2Codec.class);boolean complete = job.waitForCompletion(true);System.exit(complete ? 0:1);} catch (Exception e) {e.printStackTrace();}
}
public class OrderGroupintComparator extends WritableComparator {public OrderGroupintComparator() {super(OrderBean.class,true);}@Overridepublic int compare(Object o1, Object o2) {OrderBean orderBean = (OrderBean) o1;OrderBean orderBean2 = (OrderBean)o2;if(orderBean.getOrderId() > orderBean2.getOrderId()){return 1;}else if(orderBean.getOrderId() < orderBean2.getOrderId()){return -1;}else {return 0;}}
}
public class FilterOutputFormat extends FileOutputFormat<Text, NullWritable> {@Overridepublic RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {CustomWriter customWriter = new CustomWriter(taskAttemptContext);return customWriter;}protected static class CustomWriter extends RecordWriter<Text, NullWritable> {private FileSystem fs;private FSDataOutputStream fos;private TaskAttemptContext context;public CustomWriter(TaskAttemptContext context) {this.context = context;}@Overridepublic void write(Text text, NullWritable nullWritable) throws IOException, InterruptedException {fs = FileSystem.get(context.getConfiguration());String key = text.toString();Path path = null;if (StringUtils.startsWith(key, "137")) {path = new Path("file:/D:/hadoop/output/format/out/137/");} else {path = new Path("file:/D:/hadoop/output/format/out/138/");}fos = fs.create(path,true);byte[] bys = new byte[text.getLength()];fos.write(text.toString().getBytes());}@Overridepublic void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {IOUtils.closeQuietly(fos);IOUtils.closeQuietly(fs);}}
}
public class WholeFileInputFormat extends FileInputFormat<Text, BytesWritable> {@Overrideprotected boolean isSplitable(JobContext context, Path filename) {return false;}@Overridepublic RecordReader<Text, BytesWritable> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {WholeRecordReader reader = new WholeRecordReader();reader.initialize(inputSplit, taskAttemptContext);return reader;}
}
@Data
public class FlowBeanObj implements Writable, WritableComparable<FlowBeanObj> {private long upFlow;private long downFlow;private long sumFlow;@Overridepublic int compareTo(FlowBeanObj o) {if(o.getSumFlow() > this.getSumFlow()){return -1;}else if(o.getSumFlow() < this.getSumFlow()){return 1;}else {return 0;}}
}
public class WholeRecordReader extends RecordReader<Text, BytesWritable> {private Configuration config;private FileSplit fileSplit;private boolean isProgress = true;private BytesWritable value = new BytesWritable();private Text k = new Text();private FileSystem fs;private FSDataInputStream fis;@Overridepublic void initialize(InputSplit inputSplit, TaskAttemptContext context) throws IOException, InterruptedException {fileSplit = (FileSplit) inputSplit;this.config = context.getConfiguration();}@Overridepublic boolean nextKeyValue() throws IOException, InterruptedException {try {if (isProgress) {byte[] contents = new byte[(int) fileSplit.getLength()];Path path = fileSplit.getPath();fs = path.getFileSystem(config);fis = fs.open(path);IOUtils.readFully(fis,contents, 0,contents.length);value.set(contents, 0, contents.length);k.set(fileSplit.getPath().toString());isProgress = false;return true;}} catch (Exception e) {e.printStackTrace();}finally {IOUtils.closeQuietly(fis);}return false;}@Overridepublic Text getCurrentKey() throws IOException, InterruptedException {return k;}@Overridepublic BytesWritable getCurrentValue() throws IOException, InterruptedException {return value;}@Overridepublic float getProgress() throws IOException, InterruptedException {return 0;}@Overridepublic void close() throws IOException {fis.close();fs.close();}
}
public class HdfsClient {public static void main(String[] args) throws URISyntaxException, IOException, InterruptedException {Configuration config = new Configuration();config.set("fs.defaultFS","hdfs://localhost:9000");config.set("dfs.replication","2");FileSystem fs = FileSystem.get(new URI("hdfs://localhost:9000"),config,"xieme");fs.mkdirs(new Path("/hive3"));fs.copyFromLocalFile(new Path("file:/d:/elasticsearch.txt") ,new Path("/hive3"));fs.copyToLocalFile(false,new Path("/hive3/elasticsearch.txt"), new Path("file:/d:/hive3/elasticsearch2.txt"));fs.rename(new Path("/hive3/elasticsearch.txt"),new Path("/hive3/elasticsearch2.txt"));RemoteIterator<LocatedFileStatus> locatedFileStatusRemoteIterator = fs.listFiles(new Path("/"), true);while(locatedFileStatusRemoteIterator.hasNext()){LocatedFileStatus next = locatedFileStatusRemoteIterator.next();System.out.print(next.getPath().getName()+"\t");System.out.print(next.getLen()+"\t");System.out.print(next.getGroup()+"\t");System.out.print(next.getOwner()+"\t");System.out.print(next.getPermission()+"\t");System.out.print(next.getPath()+"\t");BlockLocation[] blockLocations = next.getBlockLocations();for(BlockLocation queue: blockLocations){for(String host :queue.getHosts()){System.out.print(host+"\t");}}System.out.println("");}*///fs.delete(new Path("/hive3"),true);/*FileStatus[] fileStatuses = fs.listStatus(new Path("/"));for(FileStatus fileStatus:fileStatuses){if(fileStatus.isDirectory()){System.out.println(fileStatus.getPath().getName());}}*/// 流copyFileInputStream fis = new FileInputStream("d:/elasticsearch.txt");FSDataOutputStream fos = fs.create(new Path("/hive/elasticsearch.txt"));IOUtils.copyBytes(fis,fos, config);IOUtils.closeStream(fis);IOUtils.closeStream(fos);FSDataInputStream fis2 = fs.open(new Path("/hive/elasticsearch.txt"));FileOutputStream fos2 = new FileOutputStream("d:/elasticsearch.tar.gz.part1");fis2.seek(1);IOUtils.copyBytes(fis2,fos2,config);/*byte [] buf = new byte[1024];for(int i=0; i<128;i++){while(fis2.read(buf)!=-1){fos2.write(buf);}}*/IOUtils.closeStream(fis2);IOUtils.closeStream(fos2);fs.close();}
}
3. hadoop 优化