二百三十三、Flume——Flume采集JSON文件到Kafka,再用Flume采集Kafka数据到HDFS中

一、目的

由于使用了新的Kafka协议,因为根据新的协议推送模拟数据到Kafka中,再Flume采集Kafka数据到HDFS中

二、技术选型

(一)Kettle工具

准备使用Kettle的JSON input控件和Kafka producer控件,但是搞了1天没搞定,博客上也找不到相关的资料,后面再研究研究

之前用kettle采集Kafka数据写入HDFS中(不建议使用这种方式采集Kafka数据到HDFS中

(二)Flume工具

三、实施步骤

(一)准备json文件

注意:Json数据用JSOB工具压缩一下,1个完整的JSON为1行

(二)创建Kafka主题

[root@hurys23 bin]# ./kafka-topics.sh  --create --bootstrap-server  192.168.0.23:9092 --topic topic_internal_data_queue  --partitions 1 --replication-factor 1

(三)创建文件目录

[root@gree128 userfriends]# cd  /opt/kb15tmp/

[root@gree128 kb15tmp]# mkdir -p  /opt/kb15tmp/checkpoint/queue

[root@gree128 kb15tmp]# mkdir -p  /opt/kb15tmp/checkpoint/data/queue

(四)配置Flume采集json任务文件

queue.sources=queueSource
queue.channels=queueChannel
queue.sinks=queueSink

queue.sources.queueSource.type=spooldir
queue.sources.queueSource.spoolDir=/opt/kb15tmp/flumelogfile/queue
queue.sources.queueSource.deserializer=LINE
queue.sources.queueSource.deserializer.maxLineLength=320000
queue.sources.queueSource.includePattern=queue_[0-9]{4}-[0-9]{2}-[0-9]{2}.json

queue.channels.queueChannel.type=file
queue.channels.queueChannel.checkpointDir=/opt/kb15tmp/checkpoint/queue
queue.channels.queueChannel.dataDirs=/opt/kb15tmp/checkpoint/data/queue

queue.sinks.queueSink.type=org.apache.flume.sink.kafka.KafkaSink
queue.sinks.queueSink.batchSize=640
queue.sinks.queueSink.brokerList=192.168.0.23:9092
queue.sinks.queueSink.topic=topic_internal_data_queue

queue.sources.queueSource.channels=queueChannel
queue.sinks.queueSink.channel=queueChannel

(五)修改Flume采集Kafka任务文件

## agent a1
a1.sources = s1
a1.channels = c1
a1.sinks = k1

## configure source s1
a1.sources.s1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.s1.kafka.bootstrap.servers = 192.168.0.23:9092
a1.sources.s1.kafka.topics = topic_internal_data_queue
a1.sources.s1.kafka.consumer.group.id = queue_group
a1.sources.s1.kafka.consumer.auto.offset.reset = latest
a1.sources.s1.batchSize = 1000

## configure channel c1
## a1.channels.c1.type = memory
## a1.channels.c1.capacity = 10000
## a1.channels.c1.transactionCapacity = 1000
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /home/data/flumeData/checkpoint/queue
a1.channels.c1.dataDirs = /home/data/flumeData/flumedata/queue

## configure sink k1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://hurys23:8020/user/hive/warehouse/hurys_dc_ods.db/ods_queue/day=%Y-%m-%d/
a1.sinks.k1.hdfs.filePrefix = queue
a1.sinks.k1.hdfs.fileSuffix = .log
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = second
a1.sinks.k1.hdfs.rollSize = 1200000000
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.rollInterval = 0
a1.sinks.k1.hdfs.idleTimeout = 60
a1.sinks.k1.hdfs.minBlockReplicas = 1

a1.sinks.k1.hdfs.fileType = SequenceFile
a1.sinks.k1.hdfs.codeC = gzip

## Bind the source and sink to the channel
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
​​​​

(六)打开监控,拷贝文件

[root@hurys23 kb15tmp]# cp queue.json /opt/kb15tmp/flumelogfile/queue/queue_2024-04-19.json

(七)执行Kafka消费窗口

[root@hurys23 bin]# ./kafka-console-consumer.sh --bootstrap-server 192.168.0.23:9092  --topic topic_internal_data_queue  --from-beginning

(八)​​​​​​​运行Flume采集Kafka到HDFS的任务​​​​​​​

[root@hurys23 flume190]# bin/flume-ng agent -n a1  -f /usr/local/hurys/dc_env/flume/flume190/conf/queue.properties

(九)​​​​​​​运行Flume采集json文件到Kafka的任务

[root@hurys23 flume190]# bin/flume-ng agent --name queue --conf ./conf/ --conf-file ./conf/KB15conf/queue.conf   -Dflume.root.logger=INFO,console

(十)查看HDFS文件

(十一)ODS层验证JSON格式是否正确,是否可以解析

--刷新表分区
msck repair table ods_queue;

里面的JSON字段都可以解析出来,搞定!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/1679.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

OSPF的LSA详解

一、什么是LSA?LSA作用? 在OSPF协议中,LSA全称链路状态通告,主要由LSA头部信息(LSA摘要)和链路状态组成。部分LSA只有LSA头部信息,无链路状态信息。使用LSA来传递路由信息和拓扑信息&#xff0c…

【STM32F4】STM32CUMX相关环境配置

一、环境配置 我们需要以下两个软件 (一)keil5 最正统,最经典的嵌入式MCU开发环境。 该环境的配置可以看看之前的文章 所需文件如下: 当时配置的是STC8H的环境,现在基于此,重新给STM32配置环境。能让STC…

运营商三要素核验接口-手机实名验证API

运营商三要素核验接口是一种API(Application Programming Interface,应用程序编程接口),主要用于通过互联网技术对接通信运营商的实名制数据库,以验证用户提供的手机号码、身份证号码、姓名这三项关键信息(…

Python | Leetcode Python题解之第37题解数独

题目: 题解: class Solution:def solveSudoku(self, board: List[List[str]]) -> None:def dfs(pos: int):nonlocal validif pos len(spaces):valid Truereturni, j spaces[pos]for digit in range(9):if line[i][digit] column[j][digit] bloc…

jmeter 指定QPS压测接口

文章目录 jmeter 指定QPS压测接口更换语言为中文创建测试任务新建线程组右键线程组,新建http request,填写要你要压测的接口地址、参数如果需要自定义请求头,添加一个Http头信息管理器要查看结果和QPS统计数据,给上门的http请求添…

算法库应用-有序单链表插入节点

学习源头: 模仿贺利坚老师单链表排序文章浏览阅读5.9k次。  本文针对数据结构基础系列网络课程(2):线性表中第11课时单链表应用举例。例:拆分单链表 (linklist.h是单链表“算法库”中的头文件,详情单击链接…)//本程…

VForm3的文件上传方式

更多ruoyi-nbcio功能请看演示系统 gitee源代码地址 前后端代码: https://gitee.com/nbacheng/ruoyi-nbcio 演示地址:RuoYi-Nbcio后台管理系统 http://122.227.135.243:9666/ 更多nbcio-boot功能请看演示系统 gitee源代码地址 后端代码&#xff1a…

第三届 SWCTF-Web 部分 WP

写在前面 题目主要涉及的是前端 php 内容知识,仅以本篇博客记录自己 Web 出题的奇思妙想。 Copyright © [2024] [Myon⁶]. All rights reserved. 目录 1、HTTP 2、再见了晚星 3、myon123_easy_php 4、baby_P0P 5、LOGIN!!! 1、HTTP 首页文件默认就是 ind…

《大话西游2》本人收集的十二个单机版游戏,有详细的视频架设教程,云盘下载

《大话西游2》是一款经典的大型多人在线角色扮演游戏,也是一款国风经典的游戏。 有能力的可以架设个外网,让大家一起玩。 《大话西游2》本人收集的十二个单机版游戏,有详细的视频架设教程,值得收藏 下载地址: 链接&…

半导体制造工艺之分类浅述

半导体制造工艺分为逻辑制程(也叫逻辑工艺)和特殊制程(也叫特色工艺)。 1、逻辑工艺概述 随着集成电路行业沿着摩尔定律不断发展,晶体管数量增加的同时,工艺节点不断缩小。先进逻辑工艺是相对的概念,2005年全球先进逻辑工艺的工艺节点在65/55纳米,现在则变为3纳米。中…

人人可拥有刘强东同款数字人分身!

每个人都可以拥有东哥同款数字人分身直播间进行直播带货,怎样克隆自己的数字人形象? 青否数字人克隆源码的克隆效果媲美真人: 仅需将真人录制的2-6分钟视频上传至克隆端后台,系统便会自动启动自动克隆。3-5小时后,即可…

学习微服务nacos遇到的问题

在学习微服务注册到nacos的时候&#xff0c;所有过程都正确了&#xff0c;注册也成功了&#xff0c;但是访问不了调用的地址报错出现问题。 一、引入依赖 在cloud-demo父工程的pom文件中的<dependencyManagement>中引入SpringCloudAlibaba的依赖 1、springboot <pa…

森林消防装备:高压消防接力水泵/恒峰智慧科技

在广袤无垠的森林中&#xff0c;每一份绿色都是大自然赋予我们的宝贵财富。然而&#xff0c;这些美丽的绿色也可能因为一场突如其来的火灾而被瞬间吞噬。为了保护这片生命的绿洲&#xff0c;我们需要一种高效、可靠的消防装备——高压消防接力水泵。 这款森林消防装备采用本田汽…

常规文件怎么做成二维码?扫描二维码就能在线看文件

现在可以将文件做成活码二维码之后&#xff0c;通过扫描二维码的方式来查看文件内容&#xff0c;部分二维码也可以扫码下载文件&#xff0c;从而实现文件在其他人之间的快速传播。 文件二维码的制作原理是将文件上传到云端之后&#xff0c;生成单独的链接转换成二维码&#xf…

皮带跑偏AI巡检系统 砂石、煤矿、物流场景下的皮带跑偏自动检测

在工业生产中&#xff0c;皮带传动系统被广泛应用于输送、运输和生产线等领域。然而&#xff0c;皮带跑偏是一个普遍存在且隐患严重的问题。一旦皮带跑偏&#xff0c;不仅会造成设备损坏、生产中断&#xff0c;还可能引发严重的安全事故&#xff0c;甚至导致人员伤亡。目前常见…

vue3项目使用<img :src=““ />动态加载图片

分享一下使用<img :src"" />动态加载图片时遇到的问题以及解决方法。 下面是部分页面代码&#xff0c;这里我使用了<img :src"itemc.headUrl" />来动态加载图片 这时遇到了问题&#xff0c;因为这里的itemc.headUrl是图片的相对路径&#xff…

P450焕新而来,室内外两用+路径规划+YOLO点击跟踪,算力高达100TOPS

Prometheus 450&#xff08;简称P450&#xff09;是一款室内室外两用的中型轴距&#xff08;410mm&#xff09;无人机&#xff0c;基于F450基础飞行平台&#xff0c;搭载感知传感器二维平面激光雷达&#xff0c;双目深度相机等&#xff0c;配合软件Prometheus自主无人机系统和S…

Java学习笔记29(泛型)

1.泛型 ArrayList<Dog> arrayList new ArrayList<Dog>(); //1.当我们ArrayList<Dog>表示存放到ArrayList集合中的元素是Dog类 //2.如果编译器发现添加的类型&#xff0c;不满足要求&#xff0c;就会报错 //3.在便利的时候&#xff0c;可以直接取出Dog类型而…

论文笔记:Does Writing with Language Models Reduce Content Diversity?

iclr 2024 reviewer评分 566 1 intro 大模型正在迅速改变人们创造内容的方式 虽然基于LLM的写作助手有可能提高写作质量并增加作者的生产力&#xff0c;但它们也引入了算法单一文化——>论文旨在评估与LLM一起写作是否无意中降低了内容的多样性论文设计了一个控制实验&…

空间数据索引的利器:R-Tree原理与实现深度解析

空间数据索引的利器&#xff1a;R-Tree原理与实现深度解析 R-Tree的原理插入操作分裂操作查询操作 R-Tree的伪代码R-Tree的C语言实现讨论结论 R-Tree是一种平衡树&#xff0c;用于空间数据索引&#xff0c;特别是在二维或更高维度的几何对象存储和检索中。它由Antony Guttman和…