flume使用实例

1、监听端口a1.sources.r1.type = netcat

配置文件nc-flume-console.conf

# Name the components on this agent a1 表示jvm进程名

a1.sources = r1

a1.sinks = k1

a1.channels = c1

# Describe/configure the source

a1.sources.r1.type = netcat

a1.sources.r1.bind = node1

a1.sources.r1.port = 44444

# Describe the sink

a1.sinks.k1.type = logger

# Use a channel which buffers events in memory

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000 #1000个event

a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

flume-ng agent -n a1 -c conf/ -f /export/server/flume/job/nc-flume-console.conf

 参数-n 表示jvm进程名 -c表示本次启动读取的配置文件conf目录下的文件 -f 表示具体执行的文件

另开窗口输入内容后控制台会自动返回OK

2、实时监控单个追加文件

配置文件 flume-exec-logger.conf

#Agent_name

a1.sources = r1

a1.sinks = k1

a1.channels = c1

#Sources

a1.sources.r1.type = exec

a1.sources.r1.command = tail -F /export/server/hive/logs/hive.log

#Channel

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

#sinks

a1.sinks.k1.type = logger

#组合

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

启动flume监听

flume-ng agent -c conf/ -f job/flume-exec-logger.conf -n a1

手动追加数据到hive.log文件 并查看监控窗口

echo INFO [main] spark.HiveSparkClientFactory >> logs/hive.log

动态添加数据到hive.log

连接hive 观察flume监控变化

 beeline -u jdbc:hive2://node1:10000 -n ljr

 show databases;

  由此可见当我们操作hive的时候 hive.log 就更新,由于我们监控了hive.log文件所以当有新数据追加到hive.log的时候 就会监听到 并打印到控制台

3、实时监控单个追加文件,并将数据输出到hdfs

配置文件 flume-hivelogs-hdfs.con

# Name the components on this agent

a2.sources = r2

a2.sinks = k2

a2.channels = c2

# Describe/configure the source

a2.sources.r2.type = exec

a2.sources.r2.command = tail -F /export/server/hive/logs/hive.log

# Describe the sink

a2.sinks.k2.type = hdfs

a2.sinks.k2.hdfs.path = hdfs://node1:8020/flume/%Y%m%d/%H

#上传文件的前缀

a2.sinks.k2.hdfs.filePrefix = logs-

#是否按照时间滚动文件夹

a2.sinks.k2.hdfs.round = true

#多少时间单位创建一个新的文件夹

a2.sinks.k2.hdfs.roundValue = 1

#重新定义时间单位

a2.sinks.k2.hdfs.roundUnit = hour

#是否使用本地时间戳

a2.sinks.k2.hdfs.useLocalTimeStamp = true

#积攒多少个 Event 才 flush 到 HDFS 一次

a2.sinks.k2.hdfs.batchSize = 100

#设置文件类型,可支持压缩

a2.sinks.k2.hdfs.fileType = DataStream

#多久生成一个新的文件

a2.sinks.k2.hdfs.rollInterval = 60

#设置每个文件的滚动大小

a2.sinks.k2.hdfs.rollSize = 134217700

#文件的滚动与 Event 数量无关

a2.sinks.k2.hdfs.rollCount = 0

# Use a channel which buffers events in memory

a2.channels.c2.type = memory

a2.channels.c2.capacity = 1000

a2.channels.c2.transactionCapacity = 100

# Bind the source and sink to the channel

a2.sources.r2.channels = c2

a2.sinks.k2.channel = c2

启动flume监听,操作hive

flume-ng agent -n a2 -c conf/ -f flume-hivelogs-hdfs.con

查看hdfs,有新文件产生

使用 Flume 监听整个目录(a3.sources.r3.type = TAILDIR)

的实时追加文件,并上传至 HDFS

实现步骤:

【1】创建被监控目录

我这里监控data目录  此目录需要提前创建

mkdir data

cd data

touch file1.txt

touch file2.txt

touch log2.txt

toch log1.txt

【2】创建文件 flume-taildir-hdfs.conf

a3.sources = r3

a3.sinks = k3

a3.channels = c3

# Describe/configure the source

a3.sources.r3.type = TAILDIR

#记录最后监控文件的断点的文件,此文件位置可不改

a3.sources.r3.positionFile =  /export/server/flume/data /tail_dir.json

a3.sources.r3.filegroups = f1 f2

a3.sources.r3.filegroups.f1 = /export/server/flume/data/.*file.*

a3.sources.r3.filegroups.f2 =/export/server/flume/data/.*log.*

# Describe the sink

a3.sinks.k3.type = hdfs

# hdfs://node1:8020 可省略

a3.sinks.k3.hdfs.path = hdfs://node1:8020/flume/upload2/%Y%m%d/%H

#上传文件的前缀

a3.sinks.k3.hdfs.filePrefix = upload-

#是否按照时间滚动文件夹

a3.sinks.k3.hdfs.round = true

#多少时间单位创建一个新的文件夹

a3.sinks.k3.hdfs.roundValue = 1

#重新定义时间单位

a3.sinks.k3.hdfs.roundUnit = hour

#是否使用本地时间戳

a3.sinks.k3.hdfs.useLocalTimeStamp = true

#积攒多少个 Event 才 flush 到 HDFS 一次

a3.sinks.k3.hdfs.batchSize = 100

#设置文件类型,可支持压缩

a3.sinks.k3.hdfs.fileType = DataStream

#多久生成一个新的文件,单位是秒

a3.sinks.k3.hdfs.rollInterval = 3600

#设置每个文件的滚动大小大概是 128M,单位是byte

a3.sinks.k3.hdfs.rollSize = 134217700

#文件的滚动与 Event 数量无关

a3.sinks.k3.hdfs.rollCount = 0

# Use a channel which buffers events in memory

a3.channels.c3.type = memory

a3.channels.c3.capacity = 1000

a3.channels.c3.transactionCapacity = 100

# Bind the source and sink to the channel

a3.sources.r3.channels = c3

a3.sinks.k3.channel = c3

【3】启动flume监控

        bin/flume-ng agent -c conf -f datas/flume-taildir-hdfs.conf -n a3

【4】向文件中追加内容

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/14665.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

VERY DEEP CONVOLUTIONAL NETWORKS FOR LARGE-SCALE IMAGE RECOGNITION--论文笔记

论文笔记 论文来源 Very Deep Convolutional Networks for Large-Scale Image Recognition 代码来源 还没上传 数据集 这里采用的是猫狗数据集 还没上传 1论文摘要的翻译 在这项工作中,我们研究了卷积网络深度对其在大规模图像识别设置中的准确性的影响。我…

防火墙技术基础篇:解析入侵检测与预防系统(IDPS)功能

防火墙技术基础篇:解析入侵检测与预防系统(IDPS)功能 入侵检测与预防系统(Intrusion Detection and Prevention Systems, IDPS)作为防火墙技术的核心组成部分,扮演着保护网络安全的关键角色。本文将全面讲…

OSPF状态机及网络接口类型

、OSPF 状态机 Down一旦接收到hello 包进人下一个状态机 Init 初始化接收到的hello 包中,若存在本地的 RID,进入下一状态 2way 双向通讯--邻居关系建立的标志 条件匹配:点到点网络直接进入下一个状态机 MA 网络将进行 DR/BDR 选举(40S) 非 DR…

哪个网盘最适合个人文件长期储存?用派盘最好

派盘是一款面向个人和企业的本地云存储解决方案,专为长期文件存储而设计。这种存储方式利用了本地硬盘的存储容量,通过“云化”的方式,可以将本地硬盘变成云存储空间。它具有强大的数据保护功能,确保了数据的私密性和安全性。 派盘的主要特点 高效的存取速度:由于使用本地…

这种电脑原来这么耗电……震惊了粉丝小姐姐

前言 在今年1月份的时候,一位来自重庆的小姐姐加了小白,咨询电脑的问题: 哦豁,这个电脑看着确实闪闪发光,是真的很漂亮~(嗯,小姐姐也很漂亮) 电脑无法开机,按…

什么是流量削峰?如何解决秒杀等业务的削峰场景

文章推荐 1 作为程序员,开发用过最好用的AI工具有哪些? 2 Github Copilot正版的激活成功,终于可以chat了 3 idea,pycharm等的ai assistant已成功激活 4 新手如何拿捏 Github Copilot AI助手,帮助你提高写代码效率 5 Jetbrains的a…

数字驱动,教育先行——低代码揭秘教育机构管理数字化转型

数字化时代为教育带来了许多变革和挑战,同时也为教育创新提供了无限可能。数字化转型可以帮助教育机构应对这些变革和挑战,提高教育效率和质量,满足学生个性化需求,优化教育管理和服务,并提高教育机构的竞争力。 并且…

docker 安装 yapi

文章目录 docker 安装 yapi一、拉取镜像二、创建目录三、添加配置文件四、初始化数据库表五、启动 yapi六、测试以及修改默认密码 没有 MongDB 的可以先看这个教程:MongDB安装教程 docker 安装 yapi 版本: 1.9.5 一、拉取镜像 docker pull yapipro/y…

以及Spring中为什么会出现IOC容器?@Autowired和@Resource注解?

以及Spring中为什么会出现IOC容器?Autowired和Resource注解? IOC容器发展史 没有IOC容器之前 首先说一下在Spring之前,我们的程序里面是没有IOC容器的,这个时候我们如果想要得到一个事先已经定义的对象该怎么得到呢?…

131. 面试中关于架构设计都需要了解哪些内容?

文章目录 一、社区系统架构组件概览1. 系统拆分2. CDN、Nginx静态缓存、JVM本地缓存3. Redis缓存4. MQ5. 分库分表6. 读写分离7. ElasticSearch 二、商城系统-亿级商品如何存储三、对账系统-分布式事务一致性四、统计系统-海量计数六、系统设计 - 微软1、需求收集2、顶层设计3、…

【Django】从零开始学Django(持续更新中)

PyCharm的版本必须为专业版,社区版不具备Web开发功能的。 一. Django建站基础 Django采用MTV的框架模式,即模型(Model)、模板(Template)和视图(Views),三者之间各自负责不同的职责。 ●模型:数据存取层,处理与数据相关…

信号:MSK调制和GMSK调制

目录 一、MSK信号 1. MSK信号的第k个码元 2.MSK信号的频率间隔 3.MSK信号的相位连续性 3.1 相位路径 3.2初始相位ψk 4.MSK信号的产生 原理框图 5.MSK信号的频谱图 二、高斯最小频移键控(GMSK) 1.频率响应 2.GMSK调制产生方式 2.1 高斯滤波器法 2.2 正交调制器法…

海外私人IP和原生IP有什么区别,谁更有优势?

一、什么是海外私人IP?什么是原生IP? 1、海外私人IP: 海外私人IP是由专门的服务提供商提供的IP地址,这些IP地址通常与特定地理位置或国家相关联。这些IP地址独享私人而不用与其他用户共享。海外私人IP广泛应用与跨境电商中&#x…

【Qt】修改QToolButton图标颜色

1. 目的 修改QToolButton的图标颜色,单一颜色,效果类似于Qt Creator左边选项卡。 2. 代码 QIcon MainWindow::setIconColor(QIcon icon, QColor color) {QPixmap pixmap icon.pixmap(QSize(64,64));QPainter painter(&pixmap);painter.setCompo…

汇编:函数以及函数参数传递

汇编语言中的函数(或过程)是指一段可以被调用和执行的代码块;它们用于组织和重用代码,并使程序结构更加清晰;由于汇编语言没有高层次语言的语法糖,编写和调用函数涉及直接的堆栈操作和寄存器管理&#xff1…

多项式重构的平滑和法线估计-------PCL

多项式重构的平滑和法线估计 /// <summary> /// 多项式重构的平滑和法线估计 /// </summary> /// <param name"cloud"></param> /// <returns>输出一个包含平滑后的点云数据以及相应法线信息的数据结构</returns> pcl::PointCl…

28v电源 28V电源系统 28v航空电源系统概述

28V电源是指一种工作电压为28V的直流电源系统&#xff0c;主要用于航空电子、航天、J事和高端工业应用中。它通常用于为复杂的电子设备和系统供电&#xff0c;如飞机上的导航、通信、控制面板、计算机系统等。这些设备需要稳定的电压输入&#xff0c;而28V电压既能够保证电力供…

第12周作业--HLS入门

目录 一、HLS入门 二、HLS入门程序编程 创建项目 1、点击Vivado HLS 中的Create New Project 2、设置项目名 3、加入文件 4、仿真 3、综合 一、HLS入门 1. HLS是什么&#xff1f;与VHDL/Verilog编程技术有什么关系? HLS&#xff08;High-Level Synthesis&#xff0c…

(Askchat.ai、360智脑、鱼聪明、天工AI、DeepSeek)

目录 1、Askchat.ai - 梦想为蓝图&#xff0c;ChatGPT为笔。 2、360智脑 — 以人为本&#xff0c;安全可信 3、鱼聪明AI - 做您强大的AI助手 (yucongming.com) 4、天工AI-搜索、对话、写作、文档分析、画画、做PPT的全能AI助手 (tiangong.cn) 5、DeepSeek | 深度求索 1、Askch…

基于STM32实现智能风扇控制系统

目录 文章主题环境准备智能风扇控制系统基础代码示例&#xff1a;实现智能风扇控制系统 PWM控制风扇速度温度传感器数据读取串口通信控制应用场景&#xff1a;智能家居与环境调节问题解决方案与优化收尾与总结 1. 文章主题与命名 文章主题 本教程将详细介绍如何在STM32嵌入式…