Flume的安装部署及常见问题解决

在这里插入图片描述

1.安装地址

(1) Flume官网地址:http://flume.apache.org/
(2)文档查看地址:http://flume.apache.org/FlumeUserGuide.html
(3)下载地址:http://archive.apache.org/dist/flume/

2.安装部署

注意:前提是配置好java环境

(1)将apache-flume-1.10.1-bin.tar.gz上传到linux的/opt/package/目录下
在这里插入图片描述
(2)解压apache-flume-1.10.1-bin.tar.gz到/opt/software/目录下

[zhangflink@9wmwtivvjuibcd2e package]$ tar -zxvf apache-flume-1.10.1-bin.tar.gz -C /opt/software/

(3)修改apache-flume-1.10.1-bin的名称为flume

[zhangflink@9wmwtivvjuibcd2e software]$ mv apache-flume-1.10.1-bin/ flume

(4)修改conf目录下的log4j2.xml配置文件,配置日志文件路径

修改日志路径

<Property name="LOG_DIR">/opt/module/flume/log</Property>

在这里插入图片描述

 <AppenderRef ref="Console" />

在这里插入图片描述

编写配置文件

官网翻译成中文的网站,可以参考这个网站进行编写配置文件:https://flume.liyifeng.org/

在这里插入图片描述

(1).Agent 是一个 JVM 进程,它以事件的形式将数据从源头送至目的。
Agent 主要有三个组成部分,Source、Channel、Sink。
(2).第一步:配置各个组件,根据你采集数据的需求进行选择对应的source,channels,sinks组件(直接去参考官网对应的组件功能选择即可)。
(3).第二步:连接各个组件,把采集端(Flume Sources),中间缓存(Flume Channels)和写入端(Flume Sinks)连接到一起。
(4).第三步:启动Agent。
bin目录下的flume-ng是Flume的启动脚本,启动时需要指定Agent的名字、配置文件的目录和配置文件的名称。

bin/flume-ng agent -n $agent_name -c conf -f conf/flume-conf.properties.template

-n后面就是agent的主节点,-f 后面就是配置文件的位置,其它不变。

常用案例

监听端口配置:

# example.conf: 一个单节点的 Flume 实例配置# 配置Agent a1各个组件的名称#Agent a1 的source有一个,叫做r1
a1.sources = r1    
#Agent a1 的sink也有一个,叫做k1
a1.sinks = k1      
#Agent a1 的channel有一个,叫做c1
a1.channels = c1   # 配置Agent a1的source r1的属性
#使用的是NetCat TCP Source,这里配的是别名,Flume内置的一些组件都是有别名的,没有别名填全限定类名
a1.sources.r1.type = netcat       
#NetCat TCP Source监听的hostname,这个是本机
a1.sources.r1.bind = localhost    
#监听的端口
a1.sources.r1.port = 44444        # 配置Agent a1的sink k1的属性# sink使用的是Logger Sink,这个配的也是别名
a1.sinks.k1.type = logger         # 配置Agent a1的channel c1的属性,channel是用来缓冲Event数据的#channel的类型是内存channel,顾名思义这个channel是使用内存来缓冲数据
a1.channels.c1.type = memory                
#内存channel的容量大小是1000,注意这个容量不是越大越好,配置越大一旦Flume挂掉丢失的event也就越多
a1.channels.c1.capacity = 1000              
#source和sink从内存channel每次事务传输的event数量
a1.channels.c1.transactionCapacity = 100    # 把source和sink绑定到channel上#与source r1绑定的channel有一个,叫做c1
a1.sources.r1.channels = c1       
#与sink k1绑定的channel有一个,叫做c1
a1.sinks.k1.channel = c1         

启动agent

 bin/flume-ng agent -n a1 -c conf -f conf/example.conf

在这里插入图片描述

监听文件写入HDFS里面

# file_chanel_hdfs.conf: 一个监听文件数据写入hdfs的实例配置# 配置Agent a1各个组件的名称#Agent a1 的source有一个,叫做r1
a1.sources = r1    
#Agent a1 的sink也有一个,叫做k1
a1.sinks = k1      
#Agent a1 的channel有一个,叫做c1
a1.channels = c1   #监听文件的source,这个source支持断点续传可靠性更高
a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /opt/software/flume/text_log/taildir_position.json
a1.sources.r1.filegroups = f1 f2
a1.sources.r1.filegroups.f1 = /opt/software/flume/text_log/example.log
a1.sources.r1.headers.f1.headerKey1 = value1
a1.sources.r1.filegroups.f2 = /opt/software/flume/text_log/.*log.*
a1.sources.r1.headers.f2.headerKey1 = value2
a1.sources.r1.headers.f2.headerKey2 = value2-2
a1.sources.r1.fileHeader = true
a1.sources.ri.maxBatchCount = 1000# 配置Agent a1的sink k1的属性#写入HDFS的sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://10.0.3.141:8020/flume/events/%y-%m-%d/%H%M/%S
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
a1.sinks.k1.hdfs.useLocalTimeStamp = true
a1.sinks.k1.hdfs.timeZone = Asia/Shanghai# 配置Agent a1的channel c1的属性,channel是用来缓冲Event数据的#channel的类型是内存channel,顾名思义这个channel是使用内存来缓冲数据
a1.channels.c1.type = memory                
#内存channel的容量大小是1000,注意这个容量不是越大越好,配置越大一旦Flume挂掉丢失的event也就越多
a1.channels.c1.capacity = 1000              
#source和sink从内存channel每次事务传输的event数量
a1.channels.c1.transactionCapacity = 100    # 把source和sink绑定到channel上#与source r1绑定的channel有一个,叫做c1
a1.sources.r1.channels = c1       
#与sink k1绑定的channel有一个,叫做c1
a1.sinks.k1.channel = c1        

启动后可能遇到的问题及解决方法

在这里插入图片描述

原因是普通用户没有创建文件的权限,使用root权限启动即可

sudo bin/flume-ng agent -c conf -n a1 -f conf/file_chanel_hdfs.conf

在这里插入图片描述

原因是因为写入到hfds时使用到了时间戳来区分目录结构,flume的消息组件event在接受到之后在header中没有发现时间戳参数,导致该错误发生,有三种方法可以解决这个错误;
1、agent1.sources.source1.interceptors = t1
agent1.sources.source1.interceptors.t1.type = timestamp
为source添加拦截,每条event头中加入时间戳;(效率会慢一些)
2、agent1.sinks.sink1.hdfs.useLocalTimeStamp = true 为sink指定该参数为true
(如果客户端和flume集群时间不一致数据时间会不准确)
3、在向source发送event时,将时间戳参数添加到event的header中即可,header是一个map,添加时mapkey为timestamp(推荐使用)

我使用了第二种方法(如果实时链路中,一般数据中都会带有时间戳,要使用第一种方法,保证时间语义的准确性)。

在这里插入图片描述
在这里插入图片描述

遇到这个错误是sink配置语句中创建hdfs的路径报错

要和hadoop里面的core-site.xml 文件保持一致

<!-- 指定NameNode的地址 --><property><name>fs.defaultFS</name><value>hdfs://flinkv1:8020</value>
</property>

在这里插入图片描述
此问题是由于操作hdfs的文件权限不足,修改hdfs文件权限即可。

[zhangflink@9wmwtivvjuibcd2e flume]$ hdfs dfs -ls /
Found 1 items
drwxr-xr-x   - zhangflink supergroup          0 2023-11-19 11:04 /flume
[zhangflink@9wmwtivvjuibcd2e flume]$ hdfs dfs -chmod 777 /flume
[zhangflink@9wmwtivvjuibcd2e flume]$ hdfs dfs -ls /
Found 1 items
drwxrwxrwx   - zhangflink supergroup          0 2023-11-19 11:04 /flume

启动成功数据写入

在这里插入图片描述
在这里插入图片描述

监听文件写入kafka里面

首先创建kafka的topic

[zhangflink@9wmwtivvjuibcd2e kafka]$ bin/kafka-topics.sh --bootstrap-server flinkv1:9092 --create --partitions 1 --replication-factor 3 --topic flumeData

编写配置文件:

# file_memory_kafka.conf: 一个监听文件数据写入hdfs的实例配置# 配置Agent a1各个组件的名称#Agent a1 的source有一个,叫做r1
a1.sources = r1    
#Agent a1 的sink也有一个,叫做k1
a1.sinks = k1      
#Agent a1 的channel有一个,叫做c1
a1.channels = c1   #监听文件的source,这个source支持断点续传可靠性更高
a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /opt/software/flume/text_log/taildir_position.json
a1.sources.r1.filegroups = f1 f2
a1.sources.r1.filegroups.f1 = /opt/software/flume/text_log/example.log
a1.sources.r1.headers.f1.headerKey1 = value1
a1.sources.r1.filegroups.f2 = /opt/software/flume/text_log/.*log.*
a1.sources.r1.headers.f2.headerKey1 = value2
a1.sources.r1.headers.f2.headerKey2 = value2-2
a1.sources.r1.fileHeader = true
a1.sources.ri.maxBatchCount = 1000# 配置Agent a1的sink k1的属性#写入kafka的sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = flumeData
a1.sinks.k1.kafka.bootstrap.servers = localhost:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1# 配置Agent a1的channel c1的属性,channel是用来缓冲Event数据的#channel的类型是内存channel,顾名思义这个channel是使用内存来缓冲数据
a1.channels.c1.type = memory                
#内存channel的容量大小是1000,注意这个容量不是越大越好,配置越大一旦Flume挂掉丢失的event也就越多
a1.channels.c1.capacity = 1000              
#source和sink从内存channel每次事务传输的event数量
a1.channels.c1.transactionCapacity = 100    # 把source和sink绑定到channel上#与source r1绑定的channel有一个,叫做c1
a1.sources.r1.channels = c1       
#与sink k1绑定的channel有一个,叫做c1
a1.sinks.k1.channel = c1     

消费对应topic测试数据是否写入

[zhangflink@9wmwtivvjuibcd2e kafka]$ bin/kafka-console-consumer.sh --bootstrap-server flinkv1:9092 --from-beginning --topic flumeData

监听成功
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/150922.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

基于Qt QList和QMap容器类示例

## QList<T> QList<T>容器是一个数组列表,特点如下: 1.大多数情况下可以用QList。像prepend()、append()和insert()这种操作,通常QList比QVector快的多。这是因为QList是基于index标签存储它的元素项在内存中(虽然内存不连续,这点与STL的list 是一样的),比…

【机器学习基础】K-Means聚类算法

&#x1f680;个人主页&#xff1a;为梦而生~ 关注我一起学习吧&#xff01; &#x1f4a1;专栏&#xff1a;机器学习 欢迎订阅&#xff01;相对完整的机器学习基础教学&#xff01; ⭐特别提醒&#xff1a;针对机器学习&#xff0c;特别开始专栏&#xff1a;机器学习python实战…

linux镜像的下载,系统下载(个人使用)

文章目录 一、系统之家二、国内镜像源三、Centos官网四、安装成功截图五、镜像类型的区别参考文档 一、系统之家 系统之家官网 二、国内镜像源 下载镜像地址&#xff1a; 1、官网地址&#xff1a;https://www.centos.org/ 2、阿里镜像站&#xff1a;https://mirrors.aliyu…

一文读懂:testcafe框架和页面元素交互

一、互动要求 使用 TestCafe 与元素进行交互操作&#xff0c;元素需满足以下条件&#xff1a;☟ 元素在 body 页面窗口或 iframe 窗口的元素内。如果某个元素在视口之外&#xff0c;则 TestCafe 通过滚动可以滚动到元素可见。 元素是可见的&#xff0c;具有以下属性&#…

实力认证|易知微上榜中国信息通信研究院数字孪生城市产业图谱!

近期&#xff0c;中国通信院就数字孪生技术在城市层面的广泛应用&#xff0c;根据数字孪生产业框架&#xff0c;结合产业发展动态和企业综合实力评估&#xff0c;选取了核心产业、关联产业和辐射产业等各领域业务代表性较强的企业&#xff08;机构&#xff09;&#xff0c;形成…

Flink(六)【DataFrame 转换算子(下)】

前言 今天学习剩下的转换算子&#xff1a;分区、分流、合流。 每天出来自学是一件孤独又充实的事情&#xff0c;希望多年以后回望自己的大学生活&#xff0c;不会因为自己的懒惰与懈怠而悔恨。 回答之所以起到了作用&#xff0c;原因是他们自己很努力。 …

FPGA系列:1、FPGA/verilog源代码保护:基于Quartus13.1平台保护verilog源码发给第三方但不泄露源码

catlog 需求具体步骤工程描述去掉相关调试文件切换顶层模块并导出相应模块为网表文件切换回原顶层模块并添加相应保护模块的qxp文件再次编译工程 参考&#xff1a; 需求 有时需要将源码交付给第三方&#xff0c;但是源码中部分模块涉及到的核心代码无法暴漏给第三方。因此&…

2023年【高处安装、维护、拆除】模拟考试题及高处安装、维护、拆除模拟考试题库

题库来源&#xff1a;安全生产模拟考试一点通公众号小程序 2023年【高处安装、维护、拆除】模拟考试题及高处安装、维护、拆除模拟考试题库&#xff0c;包含高处安装、维护、拆除模拟考试题答案和解析及高处安装、维护、拆除模拟考试题库练习。安全生产模拟考试一点通结合国家…

C语言之qsort()函数的模拟实现

C语言之qsort()函数的模拟实现 文章目录 C语言之qsort()函数的模拟实现1. 简介2. 冒泡排序3. 对冒泡排序进行改造4. 改造部分4.1 保留部分的冒泡排序4.2 比较部分4.3 交换部分 5. bubble_sort2完整代码6. 使用bubble_sort2来排序整型数组7. 使用bubble_sort2来排序结构体数组7.…

golang学习笔记——接口interfaces

文章目录 Go 语言接口例子空接口空接口的定义空接口的应用空接口作为函数的参数空接口作为map的值 类型断言接口值 类型断言例子001类型断言例子002类型断言例子003巩固练习 Go 语言接口 接口&#xff08;interface&#xff09;定义了一个对象的行为规范&#xff0c;只定义规范…

基于java web个人财务管理系统

末尾获取源码 开发语言&#xff1a;Java Java开发工具&#xff1a;JDK1.8 后端框架&#xff1a;SSM 前端&#xff1a;采用JSP技术开发 数据库&#xff1a;MySQL5.7和Navicat管理工具结合 服务器&#xff1a;Tomcat8.5 开发软件&#xff1a;IDEA / Eclipse 是否Maven项目&#x…

stable-diffusion-webui之webui.py

主要就是webui的启动这块&#xff0c;需要初始化的地方&#xff0c;东西还是挺多的。

8年资深测试,自动化测试常见问题总结,惊险避坑...

目录&#xff1a;导读 前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结&#xff08;尾部小惊喜&#xff09; 前言 1、自动化测试简介…

2023.11.18 Hadoop之 YARN

1.简介 Apache Hadoop YARN &#xff08;Yet Another Resource Negotiator&#xff0c;另一种资源协调者&#xff09;是一种新的 Hadoop 资源管理器&#xff0c;它是一个通用资源管理系统和调度平台&#xff0c;可为上层应用提供统一的资源管理和调度。支持多个数据处理框架&…

《轻购优品》新零售玩法:消费积分认购+众筹新玩法

《轻购优品》新零售玩法&#xff1a;消费积分认购众筹新玩法 引言&#xff1a;2023年开年已来&#xff0c;政府的工作报告提出“把恢复和扩大消费摆在优先位置”&#xff0c;并且把2023年定位为“消费提振年”&#xff0c;以“全年乐享全年盛惠”为主题多地政府共同发力&#x…

Altium Designer 相同模块的布局布线复用-AD

1、利用交互式布线&#xff0c;将两个相同模块的元器件在PCB上分块显示。 在原理图中&#xff0c;框选某一模块电路、按快捷键 TS 切换到PCB编辑界面、工具>器件摆放>在矩形区域内排列&#xff08;可将模块中的器件都集中放置到矩形框内&#xff09;。2、为模块电路添加 …

YOLOv8改进 | EIoU、SIoU、WIoU、DIoU、FocusIoU等二十余种损失函数

一、本文介绍 这篇文章介绍了YOLOv8的重大改进&#xff0c;特别是在损失函数方面的创新。它不仅包括了多种IoU损失函数的改进和变体&#xff0c;如SIoU、WIoU、GIoU、DIoU、EIOU、CIoU&#xff0c;还融合了“Focus”思想&#xff0c;创造了一系列新的损失函数。这些组合形式的…

Java方法中不使用的对象应该手动赋值为NULL吗?

在java方法中&#xff0c;不使用的对象是否应该手动赋值为null&#xff1f;我们先来通过一个示例看一下。 垃圾回收示例一 public class GuoGuoTest {public static void main(String[] args) {byte[] placeholder new byte[64 * 1024 * 1024];System.gc();} } 上面代码向内…

vue3 tsx 项目中使用 Antv/G2 实现多线折线图

Antv/G2 文档 Antv/G2 双折线图 安装 antV-G2 通过 npm 安装 项目中安装 antv/g2 依赖库&#xff1a; npm install antv/g2 --save安装成功&#xff1a; 浏览器引入 可以将脚本下载到本地&#xff0c;也可以直接引入在线资源。 引入在线资源 <!-- 引入在线资源&…

java springboot在测试类中启动一个web环境

我们在开发过程中 可以对数据层 业务层做测试 那我们的表现层能做测试吗&#xff1f; 答案自然是可以的 但是 前提 我们要有一个web环境 我们现在 测试类运行 明显是个很普通的java程序 还是这个 SpringBootTest 它有一个 webEnvironment 我们可以先这样写 package com.examp…