大数据基础设施搭建 - Flume

文章目录

  • 一、上传压缩包
  • 二、解压压缩包
  • 三、监控本地文件(file to kafka)
    • 3.1 编写配置文件
    • 3.2 自定义拦截器
      • 3.2.1 开发拦截器jar包
        • (1)创建maven项目
        • (2)开发拦截器类
        • (3)开发pom文件
        • (4)打成jar包上传到Flume
      • 3.2.3 修改配置文件
    • 3.3 创建Kafka Topic
    • 3.4 启动Flume
    • 3.5 停止Flume
  • 四、监控Kafka(kafka to hdfs)
    • 3.0 将lib文件夹下的guava-11.0.2.jar删除以兼容Hadoop 3.1.3
    • 3.1 自定义拦截器
    • 3.2 编写配置文件
    • 3.3 启动Flume
    • 3.4 停止Flume
  • 五、监控 ip+port(TODO)

一、上传压缩包

官网:https://flume.apache.org/

二、解压压缩包

[mall@mall software]$ tar -zxf /opt/software/apache-flume-1.9.0-bin.tar.gz -C /opt/module/

三、监控本地文件(file to kafka)

Flume是用java写的,所以需要确保JDK环境可用
需求描述:监控目录下多个文件写入Kafka
TAILDIR SOURCE:本质是tail -F [file]命令,只能监控文件的新增和修改,不能处理历史文件。

3.1 编写配置文件

[mall@mall ~]$ cd /opt/module/apache-flume-1.9.0-bin/
[mall@mall apache-flume-1.9.0-bin]$ mkdir job
[mall@mall apache-flume-1.9.0-bin]$ cd job/
[mall@mall job]$ vim file_to_kafka.conf

内容:

# 0、配置agent:给source channel sink组件命名
a1.sources = r1
a1.channels = c1# 1、配置source组件
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /opt/module/applog/app.*
# 断点续传标记信息存储位置
a1.sources.r1.positionFile = /opt/module/apache-flume-1.9.0-bin/taildir_position.json# 2、配置channel组件:event临时缓冲区
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.channels.c1.kafka.topic = topic_mall_applog# 按照字符串类型传到kafka去
a1.channels.c1.parseAsFlumeEvent = false# 3、配置source、channel、sink之间的连接关系
a1.sources.r1.channels = c1

3.2 自定义拦截器

作用:拦截events,经拦截器处理,输出处理后的events。
开发:创建maven项目,打成jar包形式上传到flume所在机器

3.2.1 开发拦截器jar包

(1)创建maven项目
(2)开发拦截器类
package com.songshuang.flume.interceptor;import com.alibaba.fastjson.JSONException;
import com.alibaba.fastjson.JSONObject;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.List;/*** @date 2023/11/21 20:40* 功能:剔除掉非json格式数据** 1、实现接口* 2、实现抽象方法* 3、建造者模式:静态内部类*/
public class ETLInterceptor implements Interceptor {public void initialize() {}// 将log中event为非json格式数据置为nullpublic Event intercept(Event event) {byte[] body = event.getBody();// byte数组转为字符串String log = new String(body, StandardCharsets.UTF_8);boolean flag = false;// 判断log是否是json格式try {JSONObject jsonObject = JSONObject.parseObject(log);flag = true;} catch (JSONException e) {}return flag ? event : null;}// 将log中event为null的删掉public List<Event> intercept(List<Event> events) {// 遍历eventsIterator<Event> iterator = events.iterator();while (iterator.hasNext()) {Event event = iterator.next();if (intercept(event) == null) {iterator.remove();}}return events;}public void close() {}// 建造者模式public static class Builder implements Interceptor.Builder {@Overridepublic Interceptor build() {return new ETLInterceptor();}@Overridepublic void configure(Context context) {}}
}
(3)开发pom文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.songshuang</groupId><artifactId>flume_interceptor</artifactId><version>1.0-SNAPSHOT</version><dependencies><dependency><groupId>org.apache.flume</groupId><artifactId>flume-ng-core</artifactId><version>1.9.0</version><scope>provided</scope></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.62</version></dependency></dependencies><build><plugins><plugin><artifactId>maven-compiler-plugin</artifactId><version>2.3.2</version><configuration><source>1.8</source><target>1.8</target></configuration></plugin><plugin><artifactId>maven-assembly-plugin</artifactId><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins></build>
</project>
(4)打成jar包上传到Flume

上传到 /opt/module/apache-flume-1.9.0-bin/lib 目录下

3.2.3 修改配置文件

[mall@mall job]$ vim file_to_kafka.conf

新增内容:

# 自定义拦截器
a1.sources.r1.interceptors = i1
# 指定自定义拦截器的建造者类名(入口)
a1.sources.r1.interceptors.i1.type = com.songshuang.flume.interceptor.ETLInterceptor$Builder

3.3 创建Kafka Topic

为什么要手动创建topic:flume自动创建的topic默认1个分区,每个分区1个副本。手动创建可以指定分区和副本数,可以有效利用Kafka集群资源。
–bootstrap-server参数作用:连接Kafka集群

[hadoop@hadoop102 kafka_2.11-2.4.1]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092,hadoop103:9092,hadoop104:9092 --create --replication-factor 2 --partitions 3 --topic topic_mall_applog

3.4 启动Flume

注意:放开Kafka集群所在机器9092端口,对Flume所在机器放开。
原因:Flume需要向Kafka集群写入数据,所以需要具有访问Kafka集群端口的权限。
– conf参数:配置文件存储所在目录
– name参数:agent名称,每个Flume配置文件就是一个agent。
– conf-file参数:flume本次启动读取的配置文件
nohup配合&:后台运行
&>/dev/null:将标准输出重定向到 /dev/null ,即丢弃所有输出
2>/dev/null:将标准错误输出重定向到 /dev/null ,即丢弃所有错误输出

[mall@mall ~]$ cd /opt/module/apache-flume-1.9.0-bin/
[mall@mall apache-flume-1.9.0-bin]$ nohup bin/flume-ng agent --conf conf/ --name a1 --conf-file job/file_to_kafka.conf &>/dev/null 2>/dev/null &

3.5 停止Flume

[mall@mall apache-flume-1.9.0-bin]$ ps -ef | grep file_to_kafka.conf
[mall@mall apache-flume-1.9.0-bin]$ kill 11001

四、监控Kafka(kafka to hdfs)

需求描述:监控Kafka,将数据写入HDFS
如果想要从头消费需要设置kafka.consumer.auto.offset.reset = earliest,默认从最新offset开始
注意:需要在HDFS所在机器部署FLume,需要调用HADOOP相关jar包。

3.0 将lib文件夹下的guava-11.0.2.jar删除以兼容Hadoop 3.1.3

否则Flume向HDFS写数据时会失败!

[hadoop@hadoop104 ~]$ rm /opt/module/apache-flume-1.9.0-bin/lib/guava-11.0.2.jar

3.1 自定义拦截器

作用:按照kafka消息中的时间字段,决定消息存储到hdfs的哪个文件中。

代码:

package com.songshuang.flume.interceptor;import com.alibaba.fastjson.JSONObject;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;/*** @date 2023/11/22 16:52* 作用:获取kafka中时间戳字段,放入event头中,flume写入hdfs时,从头部获取时间,作为该event放入hdfs的文件夹名称*/
public class TimestampInterceptor implements Interceptor {@Overridepublic void initialize() {}// 获取kafka时间戳字段,放入event的header@Overridepublic Event intercept(Event event) {byte[] body = event.getBody();String log = new String(body, StandardCharsets.UTF_8);JSONObject jsonObject = JSONObject.parseObject(log);String ts = jsonObject.getString("ts");Map<String, String> headers = event.getHeaders();headers.put("timestamp",ts); // event是引用变量类型,存储的是地址,header变了,自然event所对应地址上的值就变了return event;}@Overridepublic List<Event> intercept(List<Event> events) {for (Event event : events) {intercept(event);}return events;}@Overridepublic void close() {}// 建造者模式public static class Builder implements Interceptor.Builder {@Overridepublic Interceptor build() {return new TimestampInterceptor();}@Overridepublic void configure(Context context) {}}
}

3.2 编写配置文件

[hadoop@hadoop104 job]$ vim kafka_to_hdfs.conf

内容:

a1.sources.r1.kafka.consumer.group.id:消费者组名。
a1.channels.c1.type:file类型channel,缓冲数据放在磁盘中,而不是内存中。
a1.channels.c1.dataDirs:file channel缓冲内容落盘地址。
a1.channels.c1.checkpointDir:检查点存放位置,用于断点续传。

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1# 配置source
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sources.r1.kafka.topics = topic_mall_applog
a1.sources.r1.kafka.consumer.group.id = consumer_group_flume
# 指定consumer从哪个offset开始消费,默认latest
# a1.sources.r1.kafka.consumer.auto.offset.reset = earliest
# 自定义拦截器
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.songshuang.flume.interceptor.TimestampInterceptor$Builder# 配置sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /warehouse/applog/%Y-%m-%d
a1.sinks.k1.hdfs.codeC = gzip# 配置channel
a1.channels.c1.type = file
a1.channels.c1.dataDirs = /opt/module/apache-flume-1.9.0-bin/data/kafka_to_hdfs
a1.channels.c1.checkpointDir = /opt/module/apache-flume-1.9.0-bin/checkpoint/kafka_to_hdfs# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

3.3 启动Flume

注意1:需要放开kafka端口,即9092端口,Flume要读Kafka。

[hadoop@hadoop104 job]$ cd /opt/module/apache-flume-1.9.0-bin/
[hadoop@hadoop104 apache-flume-1.9.0-bin]$ nohup bin/flume-ng agent --conf conf/ --name a1 --conf-file job/kafka_to_hdfs.conf &>/dev/null 2>/dev/null &

3.4 停止Flume

[mall@mall apache-flume-1.9.0-bin]$ ps -ef | grep kafka_to_hdfs.conf
[mall@mall apache-flume-1.9.0-bin]$ kill 11001

五、监控 ip+port(TODO)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/162814.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【数字化转型方法论读书笔记】-数据中台角色解读

一千个读者&#xff0c;就有一千个哈姆雷特。同样&#xff0c;数据中台对于企业内部不同角色的价值也不同&#xff0c;下面分别从董事长、CEO、 CTO/CIO、IT 架构师、数据分析师这 5 个角色的视角详细解读数据中台。 1、董事长视角下的数据中台 在数字经济时代&#xff0c;企业…

RTT打印在分区跳转后无法打印问题

场景&#xff1a; RTT打印仅占用JLINK的带宽&#xff0c;比串口传输更快更简洁&#xff0c;同时RTT可以使用jscope对代码里面的变量实时绘图显示波形&#xff0c;而采用串口打印波形无法实时打印。同时可以保存原始数据到本地进行分析&#xff0c;RTT在各方面完胜串口。 问题描…

PTA-城市间紧急救援

作为一个城市的应急救援队伍的负责人&#xff0c;你有一张特殊的全国地图。在地图上显示有多个分散的城市和一些连接城市的快速道路。每个城市的救援队数量和每一条连接两个城市的快速道路长度都标在地图上。当其他城市有紧急求助电话给你的时候&#xff0c;你的任务是带领你的…

采样概率 假设检验推导数组最大值的方法与可行性

当需要寻找大量数据中的最大值的时候&#xff0c;比如从 2G 个 float16 中寻找其中的最大值&#xff0c;是一件耗时的操作。 现计划通过小样本来发掘数据的规律&#xff0c;对最大值进行预测。 方案&#xff1a; step1&#xff0c;从2G个float16 中截取64段float16&#xff…

【Vue入门篇】基础篇—Vue指令,Vue生命周期

&#x1f38a;专栏【JavaSE】 &#x1f354;喜欢的诗句&#xff1a;更喜岷山千里雪 三军过后尽开颜。 &#x1f386;音乐分享【如愿】 &#x1f384;欢迎并且感谢大家指出小吉的问题&#x1f970; 文章目录 &#x1f354;Vue概述&#x1f384;快速入门&#x1f33a;Vue指令⭐v-…

AI绘画工具汇总:免费、简单易上手

欢迎来到魔法宝库&#xff0c;传递AIGC的前沿知识&#xff0c;做有格调的分享❗ 喜欢的话记得点个关注吧&#xff01; 提到AI绘画&#xff0c;许多人通常会想到Midjourney和Stable Diffusion等工具&#xff0c;然而&#xff0c;这些工具对于新手而言门槛较高&#xff0c;不太友…

【C++】——标准模板库STL作业(其一)

&#x1f383;个人专栏&#xff1a; &#x1f42c; 算法设计与分析&#xff1a;算法设计与分析_IT闫的博客-CSDN博客 &#x1f433;Java基础&#xff1a;Java基础_IT闫的博客-CSDN博客 &#x1f40b;c语言&#xff1a;c语言_IT闫的博客-CSDN博客 &#x1f41f;MySQL&#xff1a…

opencv使用pyinstaller打包错误:‘can‘t find starting number (in the name of file)

使用Python语言和opencv模块在pycharm中编辑的代码运行没问题&#xff0c;但是在使用pyinstaller打包后出现错误can‘t find starting number (in the name of file) [ERROR:0] global C:\Users\runneradmin\AppData\Local\Temp\pip-req-build-q3d_8t8e\opencv\modules\videoi…

安卓毕业设计基于安卓android微信小程序的家校通系统

运行环境 开发语言&#xff1a;Java 框架&#xff1a;ssm JDK版本&#xff1a;JDK1.8 服务器&#xff1a;tomcat7 小程序框架&#xff1a;uniapp 小程序开发软件&#xff1a;HBuilder X 小程序运行软件&#xff1a;微信开发者 项目介绍 基于微信小程序的家校通系统的设计基…

【实用】PPT没几页内存很大怎么解决

PPT页数很少但导出内存很大解决方法 1.打开ppt点击左上角 “文件”—“选项” 2.对话框选择 “常规与保存” &#xff08;1&#xff09;如果想要文件特别小时可 取消勾选 “将字体嵌入文件” &#xff08;2&#xff09;文件大小适中 可选择第一个选项 “仅最入文档中所用的字…

每日一题 1410. HTML 实体解析器(中等,模拟)

模拟&#xff0c;没什么好说的 class Solution:def entityParser(self, text: str) -> str:entityMap {&quot;: ",&apos;: "",>: >,<: <,&frasl;: /,&amp;: &,}i 0n len(text)res []while i < n:isEntity Falseif …

Oracle-客户端连接报错ORA-12545问题

问题背景: 用户在客户端服务器通过sqlplus通过scan ip登陆访问数据库时&#xff0c;偶尔会出现连接报错ORA-12545: Connect failed because target host or object does not exist的情况。 问题分析&#xff1a; 首先&#xff0c;登陆到连接有问题的客户端数据库上&#xff0c;…

高品质MP3音频解码语音芯片WT2003Hx的特征优势与应用场景

在现代化科技快速发展的时代&#xff0c;高品质音频语音芯片在各个领域的应用越来越广泛。唯创知音推出的高品质MP3音频语音芯片WT2003Hx&#xff0c;凭借其出色的特性与优势&#xff0c;赢得了市场的广泛认可。本文将详细介绍WT2003Hx的特征优势以及其在各个领域的应用场景。 …

单片机调试技巧--修改bin文件实现断点

fromelf --text -a -c --outputall.dis F103_Moduel\F103_Moduel.axffromelf --bin --outputtest.bin F103_Moduel\F103_Moduel.axf 在启动文件中&#xff0c;修改UsageFault_Handler UsageFault_Handler\PROC; get current contextTST lr, #0x04 ; if(!EXC_RETURN[2])ITE…

2014年08月25日 Go生态洞察:深入理解Go中的常量

&#x1f337;&#x1f341; 博主猫头虎&#xff08;&#x1f405;&#x1f43e;&#xff09;带您 Go to New World✨&#x1f341; &#x1f984; 博客首页——&#x1f405;&#x1f43e;猫头虎的博客&#x1f390; &#x1f433; 《面试题大全专栏》 &#x1f995; 文章图文…

高通OTA升级非常规分区方法

高通OTA升级非常规分区方法 1. 高通LE OTA背景2. 高通LE OTA升级方案2.1 SDX12 OTA方案2.2 OTA升级TZ/RPM/Aboot OTA是一个通用述语&#xff0c;常见的解释为over the air。通过这一解释&#xff0c;OTA最开始的概念&#xff0c;是空中升级。后来&#xff0c;又衍生出了FOTA&am…

中国智能汽车这一年,主打一个“卷”

文丨刘俊宏 “这才刚过去半年多&#xff0c;汽车行业又更新了一轮。”一位车评人在广州车展感叹道。 作为每年最后一个A级车展&#xff0c;广州车展向来被视为中国车市的“风向标”。相比上海车展“拥抱汽车行业新时代”、成都车展“驭见未来”的主题&#xff0c;广州车展“新…

数据结构(超详细讲解!!)第二十四节 二叉树(上)

1.定义 二叉树&#xff08;Binary Tree&#xff09;是另一种树型结构。 二叉树的特点&#xff1a; 1&#xff09;每个结点至多只有两棵子树&#xff08;即二叉树中不存在度大于2的结点&#xff09;&#xff1b; 2&#xff09;二叉树的子树有左右之分&#xff0c;其次序…

python爬虫教程:selenium常用API用法和浏览器控制

文章目录 selenium apiwebdriver常用APIwebelement常用API 控制浏览器 selenium api selenium新版本(4.8.2)很多函数&#xff0c;包括元素定位、很多API方法均发生变化&#xff0c;本文记录以selenium4.8.2为准。 webdriver常用API 方法描述get(String url)访问目标url地址&…

分布式锁之传统锁回顾(一)

1. 传统锁回顾 1.1. 从减库存聊起 多线程并发安全问题最典型的代表就是超卖现象 库存在并发量较大情况下很容易发生超卖现象&#xff0c;一旦发生超卖现象&#xff0c;就会出现多成交了订单而发不了货的情况。 场景&#xff1a; 商品S库存余量为5时&#xff0c;用户A和B同…