Flume集成Kafka

之前提到Flume可以直接采集数据存储到HDFS中,那为什么还要引入Kafka这个中间件呢,这个是因为在实际应用场景中,我们既需要实时计算也需要离线计算。

image-20240313120335406

Kfka to HDFS配置

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1# source
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.channels = channel1
a1.sources.r1.batchSize = 10
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = hadoop01:9092,hadoop02:9092,hadoop03:9092
a1.sources.r1.kafka.topics = test_r2p5
a1.sources.r1.kafka.consumer.group.id = flume-group1# Bind the source and sink to the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 1000# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = hdfs://192.168.52.100:9000/kafkaout/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = access
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.useLocalTimeStamp = true
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.rollInterval = 3600
a1.sinks.k1.hdfs.rollSize = 134217728# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

File to Kafka配置

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1# source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /home/log/test.log# Bind the source and sink to the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 1000# Describe the sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = test_r2p5
a1.sinks.k1.kafka.bootstrap.servers = hadoop01:9092,hadoop02:9092,hadoop03:9092
a1.sinks.k1.kafka.flumeBatchSize = 10
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
a1.sinks.k1.kafka.producer.compression.type = snappy# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

创建Topic

[root@hadoop01 kafka_2.12-2.4.0]# bin/kafka-topics.sh --create --zookeeper hadoop01:2181 --partitions 5 --replication-factor 2 --topic test_r2p5

启动flume

[root@hadoop04 conf-kafka-hdfs]# bin/flume-ng agent --name a1 --conf conf-kafka-hdfs --conf-file conf-kafka-hdfs/kafka-to-hdfs.conf -Dflume.root.logger=INFO,console
[root@hadoop04 apache-flume-1.11.0-bin]# bin/flume-ng agent --name a1 --conf conf-file-kafka --conf-file conf-file-kafka/file-to-kafka.conf -Dflume.root.logger=INFO,console

创建test.log文件

[root@hadoop04 log]# echo hello world >> /home/log/test.log

验证

[root@hadoop01 kafka_2.12-2.4.0]# hdfs dfs -cat /kafkaout/2024-03-13/access.1710307375351.tmp
hello world

p01 kafka_2.12-2.4.0]# hdfs dfs -cat /kafkaout/2024-03-13/access.1710307375351.tmp
hello world

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/741515.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

动态规划7,等差数列划分,湍流子数组,唯一的子字符串,最长递增子序列

等差数列划分 思路: 经验题目要求 dp[i]表示:以 i 位置为结尾的所有子数组中有多少个等差数列 状态转移方程 对 dp[i] 位置,数列至少有三个元素,如果相邻三个为等差数列,dp[i] dp[i-1] 1; 如果相邻三个不为等差数…

windows批处理脚本(cmd指令)

一、简介 最早期的电脑系统是DOS系统,DOS系统只有一个黑漆漆的窗口,需要自己输入命令,所以学习命令是很有必要的,那么CMD命令大全是什么?直到今天的Windows系统,还是离不开DOS命令的操作。如今懂得使用windows批处理脚…

【AI绘画教程】AI绘画图生图怎么用?

AI绘画技术已经越来越成熟,越来越多的人开始尝试利用AI进行创作。而AI绘画图生图作为一款优秀的AI绘画工具,正是帮助许多人创作的好帮手。 AI绘画图生图功能可以通过多种软件实现,具体的操作步骤可能因软件而异,但大体流程相似。以…

基于springboot+vue的会议室预约系统(源码+论文)

目录 前言 一、功能设计 二、功能实现 三、库表设计 四、论文 前言 随着互联网技术的发展,各行各业乃至人们的衣食住行都离不开网络。就拿最普普通通的衣食住行来说吧,穿衣服、买衣服我们现在基本都是在网络上进行购买,线下商场基本不去。…

1688商品详情数据采集(商品属性,规格,价格,详情图等)

京东商品详情数据采集是一个复杂但重要的过程,它涉及获取商品的详细信息,包括商品属性、规格、价格以及详情图等。以下是关于如何进行京东商品详情数据采集的基本步骤: 确定采集目标:首先,你需要明确需要采集的商品信…

ARM 汇编指令:(五)CMP指令

目录 1.CMP比较指令 2.指令条件码 cond 1.CMP比较指令 CMP指令是计算机指令集中的一种比较指令,用于比较两个操作数的大小关系或相等性,并根据比较结果设置或更新条件码寄存器(或程序状态字)的标志位。 指令格式:C…

VUE内盘期货配资软件源码国际外盘二合一

开发一个Vue内盘期货配资软件源码,同时兼容国际外盘二合一的功能,是一个复杂且专业的任务,涉及前端Vue.js框架的使用、后端服务器处理、数据库管理、实时交易接口对接等多个方面。下面是一些关于开发此类软件的基本指导和考虑因素&#xff1a…

什么是同城上门预约按摩系统,上门预约平台有哪些功能?

随着互联网技术的发展,人们的生活方式发生了很大的变化。在日常生活中,大家都习惯使用手机来订餐、购物、家政服务等,这也为我们的生活带来了很大的便利。而同城按摩小程序作为一种新兴的按摩预约方式,受到了越来越多人的欢迎。下…

机器视觉检测设备的组成要素

机器视觉检测设备是一种先进的自动化检测技术工具,它利用光学、图像处理和计算机硬件及软件技术模拟并扩展人类的视觉功能,以实现对产品或目标物体进行自动化的尺寸测量、缺陷检测、表面质量评估、颜色识别、形状匹配以及位置判断等功能。这种设备通常包…

c/c++| 常规 |sizeof 、strlen

总结来说 ,sizeof 查看内存给对象分配的空间大小,不仅仅是普通的内置变量,还包括用户自定义变量、结构体、类对象 然后strlen 是查看字符串的实际长度大小,注意它不会计算那个结束符’\0’

重生奇迹MU攻击防御技能石哪里掉

在《重生奇迹MU》中,攻击和防御技能石可以从以下途径获得: 1.怪物掉落:你可以通过击败怪物获得攻击和防御技能石,不同的怪物掉落不同的石头。你可以在各个地图的怪物掉落表中查看特定怪物掉落的技能石。 2.商店购买:…

【教程】APP加固的那些小事

摘要 APP加固是保护APP代码逻辑的重要手段,通过隐藏、混淆、加密等操作提高软件的逆向成本,降低被破解的几率,保障开发者和用户利益。本文将介绍APP加固常见失败原因及解决方法,以及处理安装出现问题的情况和资源文件加固策略选择…

怎么查看电脑是不是固态硬盘?简单几个步骤判断

随着科技的发展,固态硬盘(Solid State Drive,简称SSD)已成为现代电脑的标配。相较于传统的机械硬盘,固态硬盘在读写速度、稳定性和耐用性等方面都有显著优势。但是,对于不熟悉电脑硬件的用户来说&#xff0…

3D地图在BI大屏中的应用实践

前言 随着商业智能的不断发展,数据可视化已成为一项重要工具,有助于用户更好地理解数据和分析结果。其中,3D地图作为一种可视化工具,已经在BI大屏中得到了广泛地应用。 3D地图通过将地理信息与数据相结合,以更加直观…

【Linux】Shell编程【一】

shell是一个用 C 语言编写的程序,它是用户使用 Linux 的桥梁。Shell 既是一种命令语言,又是一种程序设计语言。 Shell 是指一种应用程序,这个应用程序提供了一个界面,用户通过这个界面访问操作系统内核的服务。 Shell属于内置的…

【C++ 学习】程序内存分布

文章目录 1. C 内存分布的引入 1. C 内存分布的引入 ① 栈又叫堆栈:非静态局部变量/函数参数/返回值等等,栈是向下增长的。 ② 内存映射段:是高效的I/O映射方式,用于装载一个共享的动态内存库。用户可使用系统接口创建共享共享内存…

【知识库系统】使用SpringSecurity进行身份认证

一、理论知识部分 SpringSecurity 的官网文档地址:SpringSecurity 这里以24年3月份的 6.2.2 版本为例,记录一下学习过程。 1. SpringSecurity 是基于 Servlet Filters 的,而 Servlet Filters 中的流程如下:首先由客户端 Client…

时间复杂度中的log(n)底数是多少?

问题: 最近有好几学生问我,无论是计算机算法概论、还是数据结构书中, 关于算法的时间复杂度很多都用包含O(logN)这样的描述,但是却没有明确说logN的底数究竟是多少。 解答: 算法中log级别的时间复杂度都是由于使用了分…

关于stm32(CubeMX+HAL库)的掉电检测以及flash读写

1.掉电检测 CubeMX配置 只需使能PVD中断即可 但是使能了PVD中断后还需要自行配置一些PWR寄存器中的参数,我也通过HAL库进行编写 void PVD_config(void) {//配置PWRPWR_PVDTypeDef sConfigPVD; sConfigPVD.PVDLevel PWR_PVDLEVEL_7; …

Python学习笔记1:Pycharm首次安装环境搭建汉化

Pycharm首次安装环境搭建汉化笔记 1.下载网址 首先下载专业版的pycharm,这里建议下载专业版是因为功能更全面,社区版的往往没有远程调控等实践功能。 网址贴在下方: https://www.jetbrains.com/pycharm/download/?sectionwindows •Profe…