大数据-玩转数据-Flume

一、Flume简介

    1. Flume提供一个分布式的,可靠的,对大数据量的日志进行高效收集、聚集、移动的服务,Flume只能在Unix环境下运行。
    1. Flume基于流式架构,容错性强,也很灵活简单。
    1. Flume、Kafka用来实时进行数据收集,Spark、Flink用来实时处理数据,impala用来实时查询。

二、Flume角色

2.1、Source

用于采集数据,Source是产生数据流的地方,同时Source会将产生的数据流传输到Channel,这个有点类似于Java IO部分的Channel。

2.2、Channel

用于桥接Sources和Sinks,类似于一个队列。

2.3、Sink

从Channel收集数据,将数据写到目标源(可以是下一个Source,也可以是HDFS或者HBase)。

2.4、Event

传输单元,Flume数据传输的基本单元,以事件的形式将数据从源头送至目的地。

三、Flume传输过程

source监控某个文件或数据流,数据源产生新的数据,拿到该数据后,将数据封装在一个Event中,并put到channel后commit提交,channel队列先进先出,sink去channel队列中拉取数据,然后写入到HDFS中。

四、Flume部署及使用

4.1 采集架构

在这里插入图片描述

4.2 Flume安装

4.2.1 下载

apache-flume-1.6.0-bin.tar.gz
链接:https://pan.baidu.com/s/1ySmEEObFtKtyT7GsEldnfA
提取码:436t

4.2.2 安装

Flume的安装非常简单,只需要解压即可
tar -zxvf apache-flume-1.6.0-bin.tar.gz
然后进入flume的目录,修改conf下的flume-env.sh,在里面配置JAVA_HOME

在这里,我们使用集群模式,因此,需要把在master节点部署的flume分发到slave节点上:
]# scp -rp apache-flume-1.7.0-bin slave1:KaTeX parse error: Expected 'EOF', got '#' at position 6: PWD ]#̲ scp -rp apache…PWD

4.2.3 测试

采集配置:

vi netcat-logger.conf
# 定义这个agent中各组件的名字
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# 描述和配置source组件:r1
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# 描述和配置sink组件:k1
a1.sinks.k1.type = logger
# 描述和配置channel组件,此处使用是内存缓存的方式
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# 描述和配置source  channel   sink之间的连接关系
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

启动agent去采集数据
启动命令:

bin/flume-ng agent -c conf -f conf/netcat-logger.conf -n a1 -Dflume.root.logger=INFO,console
-c conf   指定flume自身的配置文件所在目录
-f conf/netcat-logger.con  指定我们所描述的采集方案
-n a1  指定我们这个agent的名字

在这里插入图片描述
先要往agent采集监听的端口上发送数据,让agent有数据可采
发送命令:

安装telnet:

]# yum install telnet
]# telnet anget-hostname port (telnet localhost 44444)

测试输入输出如下图:
在这里插入图片描述
在这里插入图片描述

4.3 Flume配置

1)Flume 配置分析
在这里插入图片描述
Flume 直接读 log 日志的数据,log 日志的格式是 app-yyyy-mm-dd.log。
2)Flume 的具体配置如下:
(1)在/opt/module/flume/conf 目录下创建 file-flume-kafka.conf 文件

vim file-flume-kafka.conf
a1.sources=r1
a1.channels=c1 c2
#configure source
a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /usr/local/src/apache-flume-1.7.0-bin/test/log_position.json
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /tmp/log/2020-11-03/app.*.log
a1.sources.r1.fileHeader = true
a1.sources.r1.channels = c1 c2
#interceptor
a1.sources.r1.interceptors = i1 i2
a1.sources.r1.interceptors.i1.type = com.zgjy.flume.interceptor.LogETLInterceptor$Builder
a1.sources.r1.interceptors.i2.type = com.zgjy.flume.interceptor.LogTypeInterceptor$Builder
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = topic
a1.sources.r1.selector.mapping.topic_resource = c1
a1.sources.r1.selector.mapping.topic_action = c2
# configure channel
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = master:9092,slave1:9092,slave2:9092
a1.channels.c1.kafka.topic = topic_resource
a1.channels.c1.parseAsFlumeEvent = false
a1.channels.c1.kafka.consumer.group.id = flume-consumer
# configure channe2
a1.channels.c2.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c2.kafka.bootstrap.servers = master:9092,slave1:9092,slave2:9092
a1.channels.c2.kafka.topic = topic_action
a1.channels.c2.parseAsFlumeEvent = false
a1.channels.c2.kafka.consumer.group.id = flume-consumer

测试日志:
在这里插入图片描述
配置说明如下:
在这里插入图片描述

4.4 Flume 的 ETL 和分类型拦截器

本项目中自定义了两个拦截器,分别是:ETL 拦截器、日志类型区分拦截器。
ETL 拦截器主要作用:过滤时间戳不合法和 Json 数据不完整的日志
日志类型区分拦截器主要作用:将启动日志和事件日志区分开来,方便发往 Kafka 的不 同 Topic。

1)创建 Maven 工程 flume-interceptor
2)创建包名:com.zgjy.flume.interceptor
3)在 pom.xml 文件中添加如下配置

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.zgjy</groupId><artifactId>flume-interceptor</artifactId><version>1.0-SNAPSHOT</version><dependencies><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.1.41</version></dependency><dependency><groupId>org.apache.flume</groupId><artifactId>flume-ng-core</artifactId><version>1.7.0</version></dependency></dependencies><build><plugins><plugin><artifactId>maven-compiler-plugin</artifactId><configuration><source>1.8</source><target>1.8</target></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><version>2.5.3</version><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration></plugin></plugins></build></project>

4)在 com.zgjy.flume.interceptor 包下创建 LogETLInterceptor 类名
Flume ETL 拦截器 LogETLInterceptor实现代码如下:

package 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/141109.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

C/S架构学习之基于UDP的本地通信(客户机)

基于UDP的本地通信&#xff08;客户机&#xff09;&#xff1a;创建流程&#xff1a;一、创建数据报式套接字&#xff08;socket函数&#xff09;&#xff1a; int sock_fd socket(AF_UNIX,SOCK_DGRAM,0);if(-1 sock_fd){perror("socket error");exit(-1);}二、创建…

开放领域对话系统架构

开放领域对话系统是指针对非特定领域或行业的对话系统&#xff0c;它可以与用户进行自由的对话&#xff0c;不受特定领域或行业的知识和规则的限制。开放领域对话系统需要具备更广泛的语言理解和生成能力&#xff0c;以便与用户进行自然、流畅的对话。 与垂直领域对话系统相比…

记chrome打不开网址,无法搜索问题

打开网址解决办法 2023关于chrome谷歌浏览器无法正常上网问题&#xff0c;解决办法&#xff0c;亲测有效 下载插件&#xff0c;解压后拖入chrome的扩展程序中&#xff0c;可以打开国内网址 google引擎搜索解决办法 打开无痕浏览 &#xff08;不知道什么原理&#xff0c;可…

MybatisPlus —注解汇总

本文将介绍 MybatisPlus 注解包相关类详解&#xff08;更多详细描述可点击查看源码注释&#xff09; 注解类包源码&#xff1a;&#x1f449; mybatis-plus-annotation(opens new window) 一、#TableName(opens new window) 描述&#xff1a;表名注解&#xff0c;标识实体类对…

AIX5.3安装weblogic10.3

目录 1安装IBM JDK 1.6 2图形化准备 3安装weblogic 准备 4图形化界面安装 1安装IBM JDK 1.6 1.1检查操作系统 # oslevel 5.3.0.0 # bootinfo -y (显示AIX机器硬件是64位) 64 # bootinfo -K (显示AIX系统内核是64位) 64 因此&#xff0c;系统需要安装64位的jdk&#xff0c;…

JavaWeb Day09 Mybatis-基础操作01-增删改查

目录 环境准备 ①Emp.sql ②Emp.java 一、删除 ①Mapper层 ②测试类 ③预编译SQL&#xff08;查看mybatis日志&#xff09; 1.性能 2.安全 ④总结 二、新增 ①Mapper层 ②测试类 ③结果 ④新增&#xff08;主键返回&#xff09; 1.Mapper层 2.测试类 ⑤总结​…

上机4KNN实验4

目录 编程实现 kNN 算法。一、步骤二、实现代码三、总结知识1、切片2、iloc方法3、归一化4、MinMaxScale&#xff08;&#xff09;5、划分测试集、训练集6、KNN算法 .py 编程实现 kNN 算法。 1、读取excel表格存放的Iris数据集。该数据集有5列&#xff0c;其中前4列是条件属性…

jQuery基本知识

1.jQuery 如何获取元素 jQuery 元素选择器和属性选择器允许通过标签名、属性名或内容对 HTML 元素进行选择并可以进行操作。 $(“a”) 选取 a 元素。 $(“a.info”) 选取所有 class“info” 的 a 元素。 $(“a#demo”) 选取所有 id“demo” 的 a 元素。 $(“[href]”) 选取所…

Carla之语义分割及BoundingBox验证模型

参考&#xff1a; Carla系列——4.Cara模拟器添加语义分割相机&#xff08;Semantic segmentation camera&#xff09; Carla自动驾驶仿真五&#xff1a;opencv绘制运动车辆的boudingbox&#xff08;代码详解&#xff09; Carla官网Bounding Boxes Carla官网创建自定义语义标签…

【QT】飞机大战

0 项目简介 飞机大战是我们大家所熟知的一款小游戏&#xff0c;本教程就是教大家如何制作一款自己的飞机大战 首先我们看一下效果图 玩家控制一架小飞机&#xff0c;然后自动发射子弹&#xff0c;如果子弹打到了飞下来的敌机&#xff0c;则射杀敌机&#xff0c;并且有爆炸的特…

隧道施工工艺流程vr线上虚拟展示成为产品3D说明书

行业内都知道&#xff0c;汽车生产的大部分都需要冲压加工来完成&#xff0c;因此汽车冲压工艺是汽车制造过程中的重要环节&#xff0c;传统的展示方式往往局限于二维图纸和实地操作&#xff0c;难以充分展现工艺的细节和流程。然而&#xff0c;随着技术的进步&#xff0c;汽车…

不同优化器的应用

简单用用&#xff0c;优化器具体参考 深度学习中的优化器原理(SGD,SGDMomentum,Adagrad,RMSProp,Adam)_哔哩哔哩_bilibili 收藏版&#xff5c;史上最全机器学习优化器Optimizer汇总 - 知乎 (zhihu.com) import numpy as np import matplotlib.pyplot as plt import torch # …

优秀智慧园区案例 - 中建科技产业园(中建·光谷之星),万字长文解析先进智慧园区建设方案经验

一、项目背景 中建科技产业园&#xff08;中建光谷之星&#xff09;&#xff0c;位于武汉光谷中心城、中国&#xff08;湖北&#xff09;自贸试验区武汉片区双核心区&#xff0c;光谷发展主轴高新大道北侧&#xff0c;建筑面积108万平米&#xff0c;是中建三局“中建之星”和“…

设计模式—结构型模式之代理模式

设计模式—结构型模式之代理模式 代理模式(Proxy Pattern) ,给某一个对象提供一个代理&#xff0c;并由代理对象控制对原对象的引用,对象结构型模式。 静态代理 比如我们有一个直播平台&#xff0c;提供了直播功能&#xff0c;但是如果不进行美颜&#xff0c;可能就比较冷清…

Adobe Photoshop 2020给证件照换底

1.导入图片 2.用魔法棒点击图片 3.点选择&#xff0c;反选 4.选择&#xff0c;选择并遮住 5.用画笔修饰证件照边缘 6. 7.更换要换的底的颜色 8.新建图层 9.使用快捷键altdelete键填充颜色。 10.移动图层&#xff0c;完成换底。

【ArcGIS Pro微课1000例】0030:ArcGIS Pro中自带晕渲地貌工具的妙用

在ArcGIS中,制作地貌晕渲效果通常的做法是先制作山体阴影效果,然后叠加在DEM的下面,再改变DEM的透明度来实现。而在ArcGIS Pro中自带了效果显著的晕渲地貌工具。 文章目录 一、晕渲地貌工具1. 符号系统2. 栅格函数二、山体阴影效果1. 工具箱2. 栅格函数打开ArcGIS Pro3.0,加…

【C++】类和对象(2)--构造函数

目录 一 概念 二 构造函数特性 三 默认构造函数 一 概念 对于以下Date类&#xff1a; class Date { public:void Init(int year, int month, int day){_year year;_month month;_day day;}void Print(){cout << _year << "-" << _month <…

C语言 每日一题 牛客网 11.13 Day17

找零 Z国的货币系统包含面值1元、4元、16元、64元共计4种硬币&#xff0c;以及面值1024元的纸币。 现在小Y使用1024元的纸币购买了一件价值为N(0 < N≤1024)的商品&#xff0c;请问最少他会收到多少硬币&#xff1f; 思路 运用if语句进行判断分类 代码实现 int main() {…

【KVM-6】KVM/QEMU软件栈

前言 大家好&#xff0c;我是秋意零。 &#x1f47f; 简介 &#x1f3e0; 个人主页&#xff1a; 秋意零&#x1f525; 账号&#xff1a;全平台同名&#xff0c; 秋意零 账号创作者、 云社区 创建者&#x1f9d1; 个人介绍&#xff1a;在校期间参与众多云计算相关比赛&#x…

【Git】第三篇:基本操作(配置本地仓库)

初次使用git需要设置你的用户名以及邮箱&#xff0c;这将作为当前机器git的标识&#xff0c;如果你用它来下载远程仓库一些需要登录权限的仓库会要求登录&#xff0c;git默认使用配置邮箱以及用户名登入&#xff0c;但会要求你手动输入密码。 配置本地仓库&#xff1a;git con…