目录
1、 生成一条,使用 java 代码将数据放入hdfs上传。
2、 生成一条,编写kafka生产者,将数据放入kafka。kafka source-->flume -->hdfs sink
场景题:
使用 java 代码随机生成学生信息,学生的学号从 0001 开始,学生姓名可以使用一个集合随机选出学生的姓名,年龄的话随机生成 15~25 之间,每生成一条就休息 200ms, 将该数据存入hdfs平台。
1、 生成一条,使用 java 代码将数据放入hdfs上传。
package com.bigdata.Day03;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;import java.util.Random;public class randomStudent_02 {public static void main(String[] args) throws Exception {//创建连接Configuration conf=new Configuration();//连接端口conf.set("fs.defaultFS", "hdfs://bigdata01:9820");//获取连接对象FileSystem fs= FileSystem.get(conf);String[] student = {"zhangsan","lisi","wangwu","zhaoliu"};Random random = new Random();//生成随机学生id、姓名、年龄String studentId = String.format("%04d",random.nextInt(10)+1);String studentName = student[random.nextInt(student.length)];int age = random.nextInt(11)+15;String infor = studentId + ',' + studentName + "," + age;//数据将要写入hdfs的地址Path hdfs=new Path("/flume/11-12-02/zuoye.txt");// 如果文件不存在则创建if (!fs.exists(hdfs)) {fs.createNewFile(hdfs);}// 获取输出流FSDataOutputStream outputStream = fs.append(hdfs);// 写入数据outputStream.write(infor.getBytes());// 刷新并确保数据写入磁盘outputStream.hsync();//关闭资源fs.close();}
}
2、 生成一条,编写kafka生产者,将数据放入kafka。kafka source-->flume -->hdfs sink
第一步:生成随机数据,将数据放入kafka中。
package com.bigdata.Day03;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;
import java.util.Random;public class randomStudent_01 {public static void main(String[] args) {Properties properties = new Properties();// 连接kafkaproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"bigdata01:9092");// 字段反序列化 key 和 valueproperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");// 创建一个kafka生产者的对象KafkaProducer<String,String> kafkaProducer = new KafkaProducer<>(properties);String[] student = {"zhangsan","lisi","wangwu","zhaoliu"};Random random = new Random();//生成随机学生id、姓名、年龄String studentId = String.format("%04d",random.nextInt(20)+1);String studentName = student[random.nextInt(student.length)];int age = random.nextInt(11)+15;String infor = studentId + ',' + studentName + "," + age;ProducerRecord<String,String> record = new ProducerRecord<>("homework1112",infor);// 调用 send 方法,发送消息kafkaProducer.send(record);kafkaProducer.close();}
}
第二步:创建数据存入kafka的topic
kafka-topics.sh --bootstrap-server bigdata01:9092 --create --topic homework1112
第三步:编写conf文件:
vi test.conf
a1.sources = r1
a1.sinks = k1
a1.channels = c1# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 100
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = bigdata01:9092,bigdata02:9092,bigdata03:9092
a1.sources.r1.kafka.topics = homework1112
a1.sources.r1.kafka.consumer.group.id = zuoye01# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /flume/11-12
编辑完之后记得保存,然后执行以下命令:
//先进入kafka的bin目录下(以下路径仅供参考)
cd /opt/installs/kafka3/bin
//执行conf文件,向hdfs中抽取数据
flume-ng agent -n a1 -c ../conf -f ./homework1112.conf -Dflume.root.logger=INFO,console
第四步:如果没有启动hdfs记得启动一下:
//启动命令
start-dfs.sh
//通过端口,进入hdfs界面(ip地址:9870)
bigdata:9870
抽取成功效果展示: