Flume 与 Kafka 整合实战

目录

一、Kafka 作为 Source【数据进入到kafka中,抽取出来】

(一)环境准备与配置文件创建

(二)创建主题

(三)测试步骤

二、Kafka 作为 Sink数据从别的地方抽取到kafka里面】

(一)编写配置脚本

(二)创建 topic

(三)测试过程

三、应用场景示例 

四、总结


        在大数据处理的生态系统中,Flume 和 Kafka 都是非常重要的组件。Flume 擅长收集、聚合和传输大量的日志数据等,而 Kafka 则是一个高性能的分布式消息队列,能够处理海量的实时数据。将 Flume 和 Kafka 进行整合,可以构建强大的数据处理管道,实现数据的高效采集、传输和处理。本文将详细介绍 Flume 和 Kafka 整合的两种常见方式:Kafka 作为 Source 和 Kafka 作为 Sink。

一、Kafka 作为 Source【数据进入到kafka中,抽取出来】

 

(一)环境准备与配置文件创建

        在 Flume 的 conf 文件夹下,创建一个名为 kafka - memory - logger.conf 的脚本文件。这里需要注意,在实际操作中可能会遇到错误,例如 kafka 的每一批次的读取数量大于了 channel 的容量。这种情况下的解决方案是要么降低 kafka 的每一批次读取的容量,要么提高 channel 的容量。

https://flume.liyifeng.org/#kafka-source

kafka-memory-logger.conf

a1.sources = r1
a1.sinks = k1
a1.channels = c1# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 100
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = bigdata01:9092,bigdata02:9092,bigdata03:9092
a1.sources.r1.kafka.topics = five
a1.sources.r1.kafka.consumer.group.id = qiaodaohu# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100a1.sinks.k1.type = logger
a1.sinks.k1.maxBytesToLog = 128

 

(二)创建主题

        接着创建一个 topic,名字可以叫做 kafka - flume,当然也可以直接使用以前创建好的主题。

kafka-topics.sh --create --topic kafka-flume --bootstrap-server bigdata01:9092 --partitions 3 --replication-factor 1

 

(三)测试步骤

首先启动一个消息生产者,向 topic 中发送消息。

kafka-console-producer.sh --topic kafka-flume --bootstrap-server bigdata01:9092

然后启动 Flume,接收消息并查看 log 日志,这样就可以验证数据是否能够从 Kafka 成功抽取到 Flume 中并进行后续处理。

在flume的flumeconf 文件夹下

flume-ng agent -n a1 -c ../conf -f ./kafka-memory-logger.conf -Dflume.root.logger=INFO,console

二、Kafka 作为 Sink数据从别的地方抽取到kafka里面】

 

 

(一)编写配置脚本

编写一个名为 flume - kafka - sink.conf 的脚本,内容如下:

##a1就是flume agent的名称
## source r1
## channel c1
## sink k1
a1.sources = r1
a1.sinks = k1
a1.channels = c1# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = bigdata01
a1.sources.r1.port = 44444# 修改sink为kafka
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers = bigdata01:9092
a1.sinks.k1.kafka.topic = five
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

这里的流程是 netcat(模拟数据源)→ memory(内存通道)→ kafka。

 

(二)创建 topic

使用以下命令创建 topic(flume - kafka):

kafka-topics.sh --create --topic flume-kafka --bootstrap-server bigdata01:9092 --partitions 3 --replication-factor 1

 

(三)测试过程

启动 Flume:

flume-ng agent -n a1 -c conf -f $FLUME_HOME/job/flume-kafka-sink.conf -Dflume.root.logger=INFO,console

使用 telnet 命令,向端口发送消息:

yum -y install telnettelnet bigdata01 44444

在窗口不断地发送文本数据,数据就会被抽取到 Kafka 中。


使用消费者获取 Kafka 数据:

kafka-console-consumer.sh --topic flume-kafka --bootstrap-server bigdata01:9092 --from-beginning

 

三、应用场景示例 

        假定有这样一个场景:Flume 可以抽取不断产生的日志,抽取到的日志数据,发送给 Kafka,Kafka 经过处理,可以展示在页面上,或者进行汇总统计。这样就实现了一定的实时效果,在实际的大数据处理流程中,这种整合方式能够有效地处理海量的实时数据,提高数据处理的效率和可靠性。

四、总结

        通过 Flume 和 Kafka 的整合,我们能够构建更加灵活、高效的数据处理架构,满足不同场景下的大数据处理需求,为后续的数据挖掘、分析等提供坚实的数据基础。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/62664.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

SRS搭建直播推流服务

学习链接 5分钟教你搭建SRS流媒体服务器 - B站视频 SRS Stack 入门B站合集视频 - SRS官方教程 SRS官网 SRS官网文档 ossrs/srs github SRS for window - 可以安装windows版本的srs,SRS 5.0.89正式支持Windows,每个5.0的版本都会提供安装包 文章目录…

css—轮播图实现

一、背景 最近和朋友在一起讨论的时候,我们提出了这样的一个提问,难道轮播图的效果只能通过js来实现吗?经过我们的一系列的争论,发现了这是可以通过纯css来实现这一效果的,CSS轮播图也是一种常见的网页展示方式&#x…

nacos安装部署

nacos安装部署 1.安装nacos 1.安装nacos nacos的安装很简单下载后解压启动即可,但是在启动前请确保jdk环境正常; 1.首先我们要下载nacos安装包:可以到官网下载,注意我这里使用的是2.1.0版本; 2.下载完成后&#xff0…

tomcat 8.5.35安装及配置

安装包地址: 1.Index of /dist/tomcat/tomcat-8/v8.5.35/binhttps://archive.apache.org/dist/tomcat/tomcat-8/v8.5.35/bin/ 2.通过网盘分享的文件:tomcat 链接: https://pan.baidu.com/s/1z9bD4rIuIRvzQ4okm3iRzw?pwdp24p 提取码: p24p 3.通过官网…

YOLO系列论文综述(从YOLOv1到YOLOv11)【第12篇:YOLOv9——可编程梯度信息(PGI)+广义高效层聚合网络(GELAN)】

YOLOv9 1 摘要2 改进点3 网络架构 YOLO系列博文: 【第1篇:概述物体检测算法发展史、YOLO应用领域、评价指标和NMS】【第2篇:YOLO系列论文、代码和主要优缺点汇总】【第3篇:YOLOv1——YOLO的开山之作】【第4篇:YOLOv2—…

机器学习提高电子病历主要诊断编码正确率的路径分析

摘要 本研究探讨机器学习在强化病历书写质量和提高主要诊断编码正确率方面的应用。介绍了基于机器学习的病历质量分析方法、AI病历质控应用、智能预问诊系统和诊室听译机器人等在病历书写质量提升中的作用,以及基于机器学习的ICD智能诊断编码方法和重症病人ICD自动…

鸿蒙征文|鸿蒙技术分享:使用到的开发框架和技术概览

目录 每日一句正能量前言正文1. 开发环境搭建关键技术:2. 用户界面开发关键技术:3. 应用逻辑开发关键技术:4. 应用测试关键技术:5. 应用签名和打包关键技术:6. 上架流程关键技术:7. 后续维护和更新关键技术…

C++类中多线程的编码方式

问题 在C++代码中,一般的代码是需要封装在类里面,比如对象,方法等。否则就不能很好的利用C++面向对象的能力了。 但是这个方式在处理线程时会碰到一个问题。 考虑下面一个简单的场景: class demoC { public:std::thread t;int x;void threadFunc(){std::cout<<x&…

Android开发仿qq详情下拉头像变大

Android开发仿qq详情下拉头像变大 个人详情界面&#xff0c;很多都有下拉头像变大的效果&#xff0c;其实我觉得这效果还不如点击头像看大图呢 一、思路&#xff1a; 自定义ScrollView 二、效果图&#xff1a; 看视频更直观点&#xff1a; Android开发教程案例分享-仿qq详情…

深入解析下oracle date底层存储方式

之前我们介绍了varchar2和char的数据库底层存储格式&#xff0c;今天我们介绍下date类型的数据存储格式&#xff0c;并通过测试程序快速获取一个日期。 一、环境搭建 1.1&#xff0c;创建表 我们还是创建一个测试表t_code&#xff0c;并插入数据&#xff1a; 1.2&#xff0c;…

golang版本管理工具:scoop使用

安装 Scoophttps://scoop.sh/根据官方文档安装。 第一步&#xff1a;打开PowerShell。(注意不要使用管理员方式打开&#xff0c;否则在执行安装Scoop的过程中&#xff0c;会报错。) 第二步&#xff1a;切到C盘根目录下。 第三步&#xff1a; Set-ExecutionPolicy -Executi…

时装购物系统

私信我获取源码和万字论文&#xff0c;制作不易&#xff0c;感谢点赞支持。 摘 要 随着科学技术的飞速发展&#xff0c;社会的方方面面、各行各业都在努力与现代的先进技术接轨&#xff0c;通过科技手段来提高自身的优势&#xff0c;时装购物系统当然也不能排除在外。时装购物…

架构04-透明多级分流系统

零、文章目录 架构04-透明多级分流系统 1、透明多级分流系统 &#xff08;1&#xff09;概述 **定义&#xff1a;**透明多级分流系统是指在用户请求从客户端发出到最终查询或修改数据库信息的过程中&#xff0c;通过多个技术部件对流量进行合理分配&#xff0c;以提高系统的…

【AI】数据,算力,算法和应用(3)

三、算法 算法这个词&#xff0c;我们都不陌生。 从接触计算机&#xff0c;就知道有“算法”这样一个神秘的名词存在。象征着专业、权威、神秘、高难等等。 算法是一组有序的解决问题的规则和指令&#xff0c;用于解决特定问题的一系列步骤。算法可以被看作是解决问题的方法…

YOLO系列论文综述(从YOLOv1到YOLOv11)【第14篇:YOLOv11——在速度和准确性方面具有无与伦比的性能】

YOLOv11 1 摘要2 改进点3 模型性能4 模型架构 YOLO系列博文&#xff1a; 【第1篇&#xff1a;概述物体检测算法发展史、YOLO应用领域、评价指标和NMS】【第2篇&#xff1a;YOLO系列论文、代码和主要优缺点汇总】【第3篇&#xff1a;YOLOv1——YOLO的开山之作】【第4篇&#xff…

最短路径(Floyd-Warshall、Dijkstra、Bellman-Ford)

图的遍历&#xff0c;通过算法优雅实现。 上次使用遍历的方法求得最短路径&#xff08;图的遍历。-CSDN博客&#xff09;&#xff0c;这样虽然可以解决问题&#xff0c;但还是不够优雅&#xff0c;有一些弊端&#xff0c;时间复杂度和空间复杂度都比较高。本博客主要描述三种求…

一些优秀的布隆过滤器介绍

&#x1f680; 博主介绍&#xff1a;大家好&#xff0c;我是无休居士&#xff01;一枚任职于一线Top3互联网大厂的Java开发工程师&#xff01; &#x1f680; &#x1f31f; 在这里&#xff0c;你将找到通往Java技术大门的钥匙。作为一个爱敲代码技术人&#xff0c;我不仅热衷…

YOLOv8模型pytorch格式转为onnx格式

一、YOLOv8的Pytorch网络结构 model DetectionModel((model): Sequential((0): Conv((conv): Conv2d(3, 64, kernel_size(3, 3), stride(2, 2), padding(1, 1))(act): SiLU(inplaceTrue))(1): Conv((conv): Conv2d(64, 128, kernel_size(3, 3), stride(2, 2), padding(1, 1))(a…

论文解读:Reward criteria impact on the performance ofreinforcement learning...

Reward criteria impact on the performance ofreinforcement learning agent for autonomous navigation 译文&#xff1a; 奖励准则对自主导航强化学习agent性能的影响 摘要&#xff1a; 在强化学习中&#xff0c;主体在环境中的每个时间步采取行动&#xff08;遵循策略&…

glog在vs2022 hello world中使用

准备工作 设置dns为阿里云dns 223.5.5.5&#xff0c;下载cmake&#xff0c;vs2022&#xff0c;git git clone https://github.com/google/glog.git cd glog mkdir build cd build cmake .. 拷贝文件 新建hello world并设置 设置预处理器增加GLOG_USE_GLOG_EXPORT;GLOG_NO_AB…