将数据上传至hdfs的两种方式:java代码上传、将数据放入kafka中,通过flume抽取

 

目录

1、 生成一条,使用 java 代码将数据放入hdfs上传。

 2、 生成一条,编写kafka生产者,将数据放入kafka。kafka source-->flume -->hdfs sink  


场景题:

使用 java 代码随机生成学生信息,学生的学号从 0001 开始,学生姓名可以使用一个集合随机选出学生的姓名,年龄的话随机生成 15~25 之间,每生成一条就休息 200ms, 将该数据存入hdfs平台。

1、 生成一条,使用 java 代码将数据放入hdfs上传。 

package com.bigdata.Day03;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;import java.util.Random;public class randomStudent_02 {public static void main(String[] args) throws Exception {//创建连接Configuration conf=new Configuration();//连接端口conf.set("fs.defaultFS", "hdfs://bigdata01:9820");//获取连接对象FileSystem fs= FileSystem.get(conf);String[] student = {"zhangsan","lisi","wangwu","zhaoliu"};Random random = new Random();//生成随机学生id、姓名、年龄String studentId = String.format("%04d",random.nextInt(10)+1);String studentName = student[random.nextInt(student.length)];int age = random.nextInt(11)+15;String infor = studentId + ',' + studentName + "," + age;//数据将要写入hdfs的地址Path hdfs=new Path("/flume/11-12-02/zuoye.txt");// 如果文件不存在则创建if (!fs.exists(hdfs)) {fs.createNewFile(hdfs);}// 获取输出流FSDataOutputStream outputStream = fs.append(hdfs);// 写入数据outputStream.write(infor.getBytes());// 刷新并确保数据写入磁盘outputStream.hsync();//关闭资源fs.close();}
}

2、 生成一条,编写kafka生产者,将数据放入kafka。kafka source-->flume -->hdfs sink  

第一步:生成随机数据,将数据放入kafka中。 

package com.bigdata.Day03;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;
import java.util.Random;public class randomStudent_01 {public static void main(String[] args) {Properties properties = new Properties();// 连接kafkaproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"bigdata01:9092");// 字段反序列化   key 和  valueproperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");// 创建一个kafka生产者的对象KafkaProducer<String,String> kafkaProducer = new KafkaProducer<>(properties);String[] student = {"zhangsan","lisi","wangwu","zhaoliu"};Random random = new Random();//生成随机学生id、姓名、年龄String studentId = String.format("%04d",random.nextInt(20)+1);String studentName = student[random.nextInt(student.length)];int age = random.nextInt(11)+15;String infor = studentId + ',' + studentName + "," + age;ProducerRecord<String,String> record = new ProducerRecord<>("homework1112",infor);//  调用 send 方法,发送消息kafkaProducer.send(record);kafkaProducer.close();}
}

第二步:创建数据存入kafka的topic

 kafka-topics.sh --bootstrap-server bigdata01:9092 --create --topic homework1112

第三步:编写conf文件:

vi test.conf 
a1.sources = r1
a1.sinks = k1
a1.channels = c1# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 100
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = bigdata01:9092,bigdata02:9092,bigdata03:9092
a1.sources.r1.kafka.topics = homework1112
a1.sources.r1.kafka.consumer.group.id = zuoye01# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /flume/11-12

编辑完之后记得保存,然后执行以下命令: 

//先进入kafka的bin目录下(以下路径仅供参考)
cd /opt/installs/kafka3/bin
//执行conf文件,向hdfs中抽取数据
flume-ng agent -n a1 -c ../conf -f ./homework1112.conf -Dflume.root.logger=INFO,console

 第四步:如果没有启动hdfs记得启动一下:

//启动命令
start-dfs.sh
//通过端口,进入hdfs界面(ip地址:9870)
bigdata:9870

 抽取成功效果展示:

 

 

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/60299.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【vue】echarts地图添加蒙版图片,多图层地图实现天气信息展示

实现原理&#xff1a;多层图层叠加实现复杂的信息展示。 <template><div class"wrapper"><el-drawertitle"天气信息":modal"iszz":visible.sync"weatherinfo":direction"direction"><drawer:labelnam…

100+SCI科研绘图系列教程(R和python)

科研绘图系列&#xff1a;箱线图加百分比点图展示组间差异-CSDN博客科研绘图系列&#xff1a;箱线图加蜜蜂图展示组间数据分布-CSDN博客科研绘图系列&#xff1a;小提琴图和双侧小提琴图展示组间差异-CSDN博客科研绘图系列&#xff1a;组间差异的STAMP图的ggplot2实现-CSDN博客…

QT鼠标事件

QT鼠标事件 1.概述 这篇文章介绍如何使用事件和获取事件的信号 2.创建项目 创建一个widget类型项目&#xff0c;在widget.ui文件中添加一个label控件 然后在项目名称上右键选择Add new... 添加文件&#xff0c;选择 C Class 自定义类名Mylabel&#xff0c;选择基类Base …

“双十一”电商狂欢进行时,在AI的加持下看网易云信IM、RTC如何助力商家!

作为一年一度的消费盛会&#xff0c;2024年“双十一”购物狂欢节早已拉开帷幕。蹲守直播间、在主播热情介绍中点开链接并加购&#xff0c;也已成为大多数人打开“双11”的重要方式。然而&#xff0c;在这火热的购物氛围背后&#xff0c;主播频频“翻车”、优质主播稀缺、客服响…

深入浅出rust内存对齐

在 Rust 中&#xff0c;内存对齐是一个重要的概念&#xff0c;它涉及到数据在内存中的存储方式&#xff0c;以及如何优化内存访问的效率。往往一门语言的内存布局以及对齐方式决定了一门语言的性能&#xff0c;因此学会并深入理解rust中内存布局会让我们写出高性能的rust代码&a…

题目练习之二叉树那些事儿(续集)

♥♥♥~~~~~~欢迎光临知星小度博客空间~~~~~~♥♥♥ ♥♥♥零星地变得优秀~也能拼凑出星河~♥♥♥ ♥♥♥我们一起努力成为更好的自己~♥♥♥ ♥♥♥如果这一篇博客对你有帮助~别忘了点赞分享哦~♥♥♥ ♥♥♥如果有什么问题可以评论区留言或者私信我哦~♥♥♥ ✨✨✨✨✨✨个人…

【STL栈和队列】:高效数据结构的应用秘籍

前言&#xff1a; C 标准模板库&#xff08;STL&#xff09;为我们提供了多种容器&#xff0c;其中 stack&#xff08;栈&#xff09;和 queue&#xff08;队列&#xff09;是非常常用的两种容器。 根据之前C语言实现的栈和队列&#xff0c;&#xff08;如有遗忘&#xff0c;…

Zabbix 7 最新版本安装 Rocky Linux 8

前言 本实验主要在Rocky Linux 中安装Zabbix&#xff0c;其他centos8、Debian、Ubuntu、Alma Linux都可以安装&#xff0c;就是在中间件有点不同。Nginx就要配置一下&#xff0c;官网给的教程也算是很规范的&#xff0c;就是在MySQL上要自己安装&#xff0c;他没有告诉我们&am…

git新手使用教程

git新手使用教程 一、安装和初始化配置2、新建仓库3.工作区域和文件状态4.添加和提交文件5 git reset回退版本6 使用git diff查看差异7 使用git rm删除文件8 .gitignore忽略文件9 注册GitHub账号10 SSH配置和克隆仓库11 关联本地仓库和远程仓库12 Gitee的使用 由B站视频教程整理…

【GPTs】Email Responder Pro:高效生成专业回复邮件

博客主页&#xff1a; [小ᶻZ࿆] 本文专栏: AIGC | GPTs应用实例 文章目录 &#x1f4af;GPTs指令&#x1f4af;前言&#x1f4af;Email Responder Pro主要功能适用场景优点缺点 &#x1f4af;小结 &#x1f4af;GPTs指令 中文翻译&#xff1a; Email Craft 是一款专门用于…

2024下半年软考系统架构师案例分析题试题与答案--ROS机器人操作系统

一、知识点回顾 ROS(Robot Operating System)是一个用于编写机器人软件的框架。它提供了一系列的工具和库,帮助开发者创建复杂的、可以在多种操作系统上运行的机器人应用程序。 ROS的主要特点包括: 分布式计算能力:ROS提供了一种方式让多个计算机或设备协同工作,通过…

探索Copier:Python项目模板的革命者

文章目录 **探索Copier&#xff1a;Python项目模板的革命者**1. 背景介绍&#xff1a;为何Copier成为新宠&#xff1f;2. Copier是什么&#xff1f;3. 如何安装Copier&#xff1f;4. 简单库函数使用方法4.1 创建模板4.2 从Git URL创建项目4.3 使用快捷方式4.4 动态替换文本4.5 …

密码学知识点整理二:常见的加密算法

常用的加密算法包括对称加密算法、非对称加密算法和散列算法。 对称加密算法 AES&#xff1a;高级加密标准&#xff0c;是目前使用最广泛的对称加密算法之一&#xff0c;支持多种密钥长度&#xff08;128位、192位、256位&#xff09;&#xff0c;安全性高&#xff0c;加密效率…

大模型就业收入高吗?大模型入门到精通,收藏这篇就够了

目前&#xff0c;已经可以说人工智能&#xff08;AI&#xff09;是推动社会进步和产业升级的重要力量。 其中&#xff0c;AI大模型作为人工智能领域的核心技术之一&#xff0c;正引领着新一轮的技术革命。 2024年&#xff0c;AI大模型开发工程师无疑成为了IT行业中最炙手可热…

Kubebot:一款Google云平台下的Slackbot安全测试工具

Kubebot 今天给大家介绍的是一款名叫Kubebot的安全测试Slackbot&#xff0c;该工具基于Google 云平台搭建&#xff0c;并且提供了Kubernetes后端。 项目架构 数据流 1.API请求由Slackbot发起&#xff0c;发送至API服务器&#xff0c;API服务器以Kubernetes(K8s)集群中的Docke…

openai Realtime API (实时语音)

https://openai.com/index/introducing-the-realtime-api/ 官方demo https://github.com/openai/openai-realtime-console 官方demo使用到的插件 https://github.com/openai/openai-realtime-api-beta?tabreadme-ov-file 装包配置 修改yarn.lock 这个包是从github下载的 &q…

【IC】DTCO

DTCO本质上是DSE。。。 文章A Novel Framework for DTCO: Fast and Automatic Routability Assessment with Machine Learning for Sub-3nm Technology Options中提到&#xff1a; std cell尺寸缩小不一定会在block模块级获得面积收益。。。得综合考虑&#xff0c;综合了设计…

SpringBoot配置Rabbit中的MessageConverter对象

SpringAMQP默认使用SimpleMessageConverter组件对消息内容进行转换 SimpleMessageConverter&#xff1a; only supports String, byte[] and Serializable payloads仅仅支持String、Byte[]和Serializable对象Jackson2JsonMessageConverter&#xff1a;was expecting (JSON Str…

Python毕业设计选题:基于django+vue的医院挂号系统设计与实现

开发语言&#xff1a;Python框架&#xff1a;djangoPython版本&#xff1a;python3.7.7数据库&#xff1a;mysql 5.7数据库工具&#xff1a;Navicat11开发软件&#xff1a;PyCharm 系统展示 病人管理 科室类型管理 医生管理 公告咨询管理 挂号预约管理 科室信息管理 摘要 医…

arm 汇编技巧

汇编标号&#xff1a;f表示forward&#xff0c; b表示backward&#xff1a; Here is an example: 1: branch 1f 2: branch 1b 1: branch 2f 2: branch 1b Which is the equivalent of: label_1: branch label_3 label_2: branch label_1 label_3: branch label_4 label_4: bra…