一百七十二、Flume——Flume采集Kafka数据写入HDFS中(亲测有效、附截图)

一、目的

作为日志采集工具Flume,它在项目中最常见的就是采集Kafka中的数据然后写入HDFS或者HBase中,这里就是用flume采集Kafka的数据导入HDFS中

二、各工具版本

(一)Kafka

kafka_2.13-3.0.0.tgz

(二)Hadoop(HDFS)

hadoop-3.1.3.tar.gz

(三)Flume

apache-flume-1.9.0-bin.tar.gz

三、实施步骤

(一)到flume的conf的目录下

# cd  /home/hurys/dc_env/flume190/conf

(二)创建配置文件evaluation.properties

# vi  evaluation.properties

### Name agent, source, channels and sink alias
a1.sources = s1
a1.channels = c1
a1.sinks = k1

### define kafka source
a1.sources.s1.type = org.apache.flume.source.kafka.KafkaSource

# Maximum number of messages written to Channel in one batch
a1.sources.s1.batchSize = 5000

# Maximum time (in ms) before a batch will be written to Channel The batch will be written whenever the first of size and time will be reached.
a1.sources.s1.batchDurationMillis = 2000

# set kafka broker address
a1.sources.s1.kafka.bootstrap.servers = 192.168.0.27:9092

# set kafka consumer group Id and offset consume
# 官网推荐1.9.0版本只设置了topic,但测试后不能正常消费,需要添加消费组id(自己写一个),并定义偏移量消费方式
a1.sources.s1.kafka.consumer.group.id = evaluation_group
a1.sources.s1.kafka.consumer.auto.offset.reset = earliest

# set kafka topic
a1.sources.s1.kafka.topics = topic_b_evaluation


### defind hdfs sink
a1.sinks.k1.type = hdfs
# set store hdfs path
a1.sinks.k1.hdfs.path = hdfs://hurys22:8020/rtp/evaluation/evaluation_%Y-%m-%d
# set file size to trigger roll
a1.sinks.k1.hdfs.rollSize = 0
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.rollInterval = 3600
a1.sinks.k1.hdfs.threadsPoolSize = 30
a1.sinks.k1.hdfs.fileType=DataStream
a1.sinks.k1.hdfs.writeFormat=Text


### define channel from kafka source to hdfs sink
# memoryChannel:快速,但是当设备断电,数据会丢失
# FileChannel:速度较慢,即使设备断电,数据也不会丢失
a1.channels.c1.type = file
# 这里不单独设置checkpointDir和dataDirs文件位置,参考官网不设置会有默认位置
# channel store size
a1.channels.c1.capacity = 100000
# transaction size
a1.channels.c1.transactionCapacity = 10000


### 绑定source、channel和sink
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1

(三)配置文件创建好后启动flume服务

# cd /home/hurys/dc_env/flume190/

# ./bin/flume-ng agent -n a1  -f /home/hurys/dc_env/flume190/conf/evaluation.properties

(四)到HDFS文件里验证一下

HDFS中生成evaluation_2023-09-07 文件夹,里面有很多小文件

(五)注意:小文件里的数据是JSON格式,即使我设置文件后缀名为csv也没用(可能配置文件中的文件类型设置需要优化

a1.sinks.k1.hdfs.writeFormat=Text

(六)jps查看Flume的服务

[root@hurys22 conf]# jps
16801 ResourceManager
4131 Application
18055 AlertServer
16204 DataNode
22828 Application
17999 LoggerServer
2543 launcher.jar
22224 Application
17393 QuorumPeerMain
16980 NodeManager
17942 WorkerServer
16503 SecondaryNameNode
11384 Application
32669 Application
17886 MasterServer
10590 Jps
16031 NameNode
18111 ApiApplicationServer

注意:Application就是Flume运行的任务

(七)关闭Flume服务

如果想要关闭Flume服务,直接杀死服务就好了

# kill -9 32669

(八)checkpointDir和dataDirs默认的文件位置

默认的文件位置:/root/.flume/file-channel/

总之,Flume这个工具的用法还需进一步研究优化,当然kettle也可以,所以这个项目目前还是用kettle吧!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/74916.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

javaee springMVC model的使用

项目结构图 pom依赖 <?xml version"1.0" encoding"UTF-8"?><project xmlns"http://maven.apache.org/POM/4.0.0" xmlns:xsi"http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation"http://maven.apache.org…

腾讯云CVM标准型S4服务器性能测评_CPU_网络收发包PPS详解

腾讯云服务器CVM标准型S4性能测评&#xff0c;包括S4云服务器CPU型号、处理器主频、网络收发包PPS、队列数、出入内网带宽能力性能参数说明&#xff0c;标准型 S4 实例是次新一代的标准型实例&#xff0c;CPU采用2.4GHz主频的Intel Xeon Skylake 6148处理器&#xff0c;腾讯云百…

Visual studio解决‘scanf: This function or variable may be unsafe. 问题

使用C语言的scanf函数在Visual Studio软件上运行会报如下错误&#xff1a; scanf: This function or variable may be unsafe. Consider using scanf s instead. To disable deprecation, use. CRT SECURE NO WARNINGS. See online help for details. 这个函数或变量可能是不安…

LeetCode 92. Reverse Linked List II【链表,头插法】中等

本文属于「征服LeetCode」系列文章之一&#xff0c;这一系列正式开始于2021/08/12。由于LeetCode上部分题目有锁&#xff0c;本系列将至少持续到刷完所有无锁题之日为止&#xff1b;由于LeetCode还在不断地创建新题&#xff0c;本系列的终止日期可能是永远。在这一系列刷题文章…

upload-labs1-21关文件上传通关手册

upload-labs文件上传漏洞靶场 目录 upload-labs文件上传漏洞靶场第一关pass-01&#xff1a;第二关Pass-02第三关pass-03&#xff1a;第四关pass-04&#xff1a;第五关pass-05&#xff1a;第六关pass-06&#xff1a;第七关Pass-07第八关Pass-08第九关Pass-09第十关Pass-10第十一…

苹果手机远程控制安卓手机,为什么不能发起控制?

这位用户想要用iOS设备远程控制安卓设备&#xff0c;在被控端安装好AirDroid之后&#xff0c;就在控制端的苹果手机上也安装了AirDroid&#xff0c;然而打开控制端的软件&#xff0c;却没有在手机界面上看到【远程控制】按钮&#xff0c;于是提出了以上疑问。 解答 想要让iOS设…

用postman 推送消息到GCP的pubsub

创建1个Topic 和 2个 subscription 我们可以用terraform 去创建1个topic 和 2个subscriptions # topic resource "google_pubsub_topic" "topic_a" {name "TopicA"project var.project_id }# subscriptions resource "google_pubsub_s…

【AIGC专题】Stable Diffusion 从入门到企业级实战0402

一、概述 本章是《Stable Diffusion 从入门到企业级实战》系列的第四部分能力进阶篇《Stable Diffusion ControlNet v1.1 图像精准控制》第02节&#xff0c; 利用Stable Diffusion ControlNet Openpose模型精准控制图像生成。上一节&#xff0c;我们介绍了《Stable Diffusion C…

postgresql-通用表达式

postgresql-通用表达式 入门案例简单CTE递归 CTE案例1案例2 入门案例 -- 通用表达式 with t(n) as (select 2) select * from t;简单CTE WITH cte_name (col1, col2, ...) AS (cte_query_definition ) sql_statement;WITH 表示定义 CTE&#xff0c;因此 CTE 也称为 WITH 查询…

【Unity】rotation和Quaternion学习笔记

1.rotation 赋值 Quaternion可以为transform.rotation 赋值 2. 从正轴面向原点&#xff0c;顺时针旋转&#xff0c;角度正增加 正x轴面向原点&#xff0c;顺时针旋转&#xff0c;z正轴往下&#xff0c;rotation的x正增加。 3.rotation和Quaternion的关系 1.查询 2.实践 旋转…

LVS DR模式负载均衡群集部署

目录 1 LVS-DR 模式的特点 1.1 数据包流向分析 1.2 DR 模式的特点 2 DR模式 LVS负载均衡群集部署 2.1 配置负载调度器 2.1.1 配置虚拟 IP 地址 2.1.2 调整 proc 响应参数 2.1.3 配置负载分配策略 2.2 部署共享存储 2.3 配置节点服务器 2.3.1 配置虚拟 IP 地址 2.3.2…

树形控件加自定义图标样式及指引线

记录一下留用&#xff0c;有错误请指正。 效果图如下&#xff1a; 自定义图标及指引线 代码&#xff1a; <div class"head-container" style"margin-left: -15px;"><el-tree icon-class"none"style"height:100%; overflow-y: h…

滚动菜单 flutter

想实现这个功能&#xff1a; 下面的代码可以实现&#xff1a; import package:flutter/material.dart;void main() > runApp(MyApp());class MyApp extends StatelessWidget {static const String _title Flutter Code Sample;overrideWidget build(BuildContext context)…

vcruntime140_1.dll修复的方法大全,缺失vcruntime140_1.dll解决方法分享

vcruntime140_1.dll这个文件在电脑里属于挺重要的一个文件&#xff0c;一但它缺失了&#xff0c;那么很多程序都是运行不了的&#xff0c;今天我们就来讲解一下这个vcruntime140_1.dll修复以及它的一些作用和属性。 一.vcruntime140_1.dll的作用 vcruntime140_1.dll是Microso…

在MAC电脑上将NTFS格式移动硬盘转换为ExFAT格式

注意&#xff1a;转化之前先将移动硬盘中的内容进行备份 1、点击桌面上的【前往】&#xff0c;选择【实用工具】 2、在列表中选择【磁盘工具】 3、在左侧选中你的磁盘&#xff0c;点击右侧上方的【抹掉】,注意&#xff1a;将永久抹掉储存在上面的所有数据&#xff0c;因此需要…

Alibaba(商品详情)API接口

为了进行电商平台 的API开发&#xff0c;首先我们需要做下面几件事情。 1&#xff09;开发者注册一个账号 2&#xff09;然后为每个alibaba应用注册一个应用程序键&#xff08;App Key) 。 3&#xff09;下载alibaba API的SDK并掌握基本的API基础知识和调用 4&#xff09;利…

计算机专业毕业设计项目推荐03-Wiki系统设计与实现(JavaSpring+Vue+Mysql)

Wiki系统设计与实现&#xff08;JavaSpringVueMysql&#xff09; **介绍****系统总体开发情况-功能模块****各部分模块实现** 介绍 本系列(后期可能博主会统一为专栏)博文献给即将毕业的计算机专业同学们,因为博主自身本科和硕士也是科班出生,所以也比较了解计算机专业的毕业设…

大数据技术之Hadoop:提交MapReduce任务到YARN执行(八)

目录 一、前言 二、示例程序 2.1 提交wordcount示例程序 2.2 提交求圆周率示例程序 三、写在最后 一、前言 我们前面提到了MapReduce&#xff0c;也说了现在几乎没有人再写MapReduce代码了&#xff0c;因为它已经过时了。然而不写代码不意味着它没用&#xff0c;当下很火…

103. 二叉树的锯齿形层序遍历

103. 二叉树的锯齿形层序遍历 题目-中等难度示例1. bfs 题目-中等难度 给你二叉树的根节点 root &#xff0c;返回其节点值的 锯齿形层序遍历 。&#xff08;即先从左往右&#xff0c;再从右往左进行下一层遍历&#xff0c;以此类推&#xff0c;层与层之间交替进行&#xff09…

Elasticsearch:使用 ESRE 和生成式 AI 了解 TLS 日志错误

作者&#xff1a;DAVID HOPE 本博客介绍了 Elasticsearch 相关性引擎 (ESRE​​) 及其 Elastic Learned Sparse Encoder 功能的新颖应用&#xff0c;特别是在日志分析中。 最近发布的 Elasticsearch Relevance Engine™ (ESRE™) 包含一系列重要功能&#xff0c;可增强搜索能力…