一百七十二、Flume——Flume采集Kafka数据写入HDFS中(亲测有效、附截图)

一、目的

作为日志采集工具Flume,它在项目中最常见的就是采集Kafka中的数据然后写入HDFS或者HBase中,这里就是用flume采集Kafka的数据导入HDFS中

二、各工具版本

(一)Kafka

kafka_2.13-3.0.0.tgz

(二)Hadoop(HDFS)

hadoop-3.1.3.tar.gz

(三)Flume

apache-flume-1.9.0-bin.tar.gz

三、实施步骤

(一)到flume的conf的目录下

# cd  /home/hurys/dc_env/flume190/conf

(二)创建配置文件evaluation.properties

# vi  evaluation.properties

### Name agent, source, channels and sink alias
a1.sources = s1
a1.channels = c1
a1.sinks = k1

### define kafka source
a1.sources.s1.type = org.apache.flume.source.kafka.KafkaSource

# Maximum number of messages written to Channel in one batch
a1.sources.s1.batchSize = 5000

# Maximum time (in ms) before a batch will be written to Channel The batch will be written whenever the first of size and time will be reached.
a1.sources.s1.batchDurationMillis = 2000

# set kafka broker address
a1.sources.s1.kafka.bootstrap.servers = 192.168.0.27:9092

# set kafka consumer group Id and offset consume
# 官网推荐1.9.0版本只设置了topic,但测试后不能正常消费,需要添加消费组id(自己写一个),并定义偏移量消费方式
a1.sources.s1.kafka.consumer.group.id = evaluation_group
a1.sources.s1.kafka.consumer.auto.offset.reset = earliest

# set kafka topic
a1.sources.s1.kafka.topics = topic_b_evaluation


### defind hdfs sink
a1.sinks.k1.type = hdfs
# set store hdfs path
a1.sinks.k1.hdfs.path = hdfs://hurys22:8020/rtp/evaluation/evaluation_%Y-%m-%d
# set file size to trigger roll
a1.sinks.k1.hdfs.rollSize = 0
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.rollInterval = 3600
a1.sinks.k1.hdfs.threadsPoolSize = 30
a1.sinks.k1.hdfs.fileType=DataStream
a1.sinks.k1.hdfs.writeFormat=Text


### define channel from kafka source to hdfs sink
# memoryChannel:快速,但是当设备断电,数据会丢失
# FileChannel:速度较慢,即使设备断电,数据也不会丢失
a1.channels.c1.type = file
# 这里不单独设置checkpointDir和dataDirs文件位置,参考官网不设置会有默认位置
# channel store size
a1.channels.c1.capacity = 100000
# transaction size
a1.channels.c1.transactionCapacity = 10000


### 绑定source、channel和sink
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1

(三)配置文件创建好后启动flume服务

# cd /home/hurys/dc_env/flume190/

# ./bin/flume-ng agent -n a1  -f /home/hurys/dc_env/flume190/conf/evaluation.properties

(四)到HDFS文件里验证一下

HDFS中生成evaluation_2023-09-07 文件夹,里面有很多小文件

(五)注意:小文件里的数据是JSON格式,即使我设置文件后缀名为csv也没用(可能配置文件中的文件类型设置需要优化

a1.sinks.k1.hdfs.writeFormat=Text

(六)jps查看Flume的服务

[root@hurys22 conf]# jps
16801 ResourceManager
4131 Application
18055 AlertServer
16204 DataNode
22828 Application
17999 LoggerServer
2543 launcher.jar
22224 Application
17393 QuorumPeerMain
16980 NodeManager
17942 WorkerServer
16503 SecondaryNameNode
11384 Application
32669 Application
17886 MasterServer
10590 Jps
16031 NameNode
18111 ApiApplicationServer

注意:Application就是Flume运行的任务

(七)关闭Flume服务

如果想要关闭Flume服务,直接杀死服务就好了

# kill -9 32669

(八)checkpointDir和dataDirs默认的文件位置

默认的文件位置:/root/.flume/file-channel/

总之,Flume这个工具的用法还需进一步研究优化,当然kettle也可以,所以这个项目目前还是用kettle吧!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/74916.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

javaee springMVC model的使用

项目结构图 pom依赖 <?xml version"1.0" encoding"UTF-8"?><project xmlns"http://maven.apache.org/POM/4.0.0" xmlns:xsi"http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation"http://maven.apache.org…

腾讯云CVM标准型S4服务器性能测评_CPU_网络收发包PPS详解

腾讯云服务器CVM标准型S4性能测评&#xff0c;包括S4云服务器CPU型号、处理器主频、网络收发包PPS、队列数、出入内网带宽能力性能参数说明&#xff0c;标准型 S4 实例是次新一代的标准型实例&#xff0c;CPU采用2.4GHz主频的Intel Xeon Skylake 6148处理器&#xff0c;腾讯云百…

Visual studio解决‘scanf: This function or variable may be unsafe. 问题

使用C语言的scanf函数在Visual Studio软件上运行会报如下错误&#xff1a; scanf: This function or variable may be unsafe. Consider using scanf s instead. To disable deprecation, use. CRT SECURE NO WARNINGS. See online help for details. 这个函数或变量可能是不安…

LeetCode 92. Reverse Linked List II【链表,头插法】中等

本文属于「征服LeetCode」系列文章之一&#xff0c;这一系列正式开始于2021/08/12。由于LeetCode上部分题目有锁&#xff0c;本系列将至少持续到刷完所有无锁题之日为止&#xff1b;由于LeetCode还在不断地创建新题&#xff0c;本系列的终止日期可能是永远。在这一系列刷题文章…

React 防抖与节流用法

在React中&#xff0c;防抖和节流是优化性能和提升用户体验的常用技术。下面是它们的用法&#xff1a; 防抖&#xff08;Debounce&#xff09;&#xff1a;防抖是指在某个事件触发后&#xff0c;等待一段时间后执行回调函数。如果在等待时间内再次触发该事件&#xff0c;将重新…

upload-labs1-21关文件上传通关手册

upload-labs文件上传漏洞靶场 目录 upload-labs文件上传漏洞靶场第一关pass-01&#xff1a;第二关Pass-02第三关pass-03&#xff1a;第四关pass-04&#xff1a;第五关pass-05&#xff1a;第六关pass-06&#xff1a;第七关Pass-07第八关Pass-08第九关Pass-09第十关Pass-10第十一…

苹果手机远程控制安卓手机,为什么不能发起控制?

这位用户想要用iOS设备远程控制安卓设备&#xff0c;在被控端安装好AirDroid之后&#xff0c;就在控制端的苹果手机上也安装了AirDroid&#xff0c;然而打开控制端的软件&#xff0c;却没有在手机界面上看到【远程控制】按钮&#xff0c;于是提出了以上疑问。 解答 想要让iOS设…

自动驾驶:轨迹预测综述

自动驾驶&#xff1a;轨迹预测综述 轨迹预测的定义轨迹预测的分类基于物理的方法&#xff08;Physics-based&#xff09;基于机器学习的方法&#xff08;Classic Machine Learning-based&#xff09;基于深度学习的方法&#xff08;Deep Learning-based&#xff09;基于强化学习…

用postman 推送消息到GCP的pubsub

创建1个Topic 和 2个 subscription 我们可以用terraform 去创建1个topic 和 2个subscriptions # topic resource "google_pubsub_topic" "topic_a" {name "TopicA"project var.project_id }# subscriptions resource "google_pubsub_s…

【AIGC专题】Stable Diffusion 从入门到企业级实战0402

一、概述 本章是《Stable Diffusion 从入门到企业级实战》系列的第四部分能力进阶篇《Stable Diffusion ControlNet v1.1 图像精准控制》第02节&#xff0c; 利用Stable Diffusion ControlNet Openpose模型精准控制图像生成。上一节&#xff0c;我们介绍了《Stable Diffusion C…

numpy详解

Numpy 简介 Numpy&#xff08;Numerical Python&#xff09;是一个在Python领域做数值计算非常重要的库&#xff0c; Pandas、Matplotlib、Statmodels、Scikit-learn和其它一些科学计算库都依赖Numpy 也就是说有时候你可能没有直接import numpy as np&#xff0c;但是却在背后…

Spring Bean的获取方式

参考https://juejin.cn/post/7251780545972994108?searchId2023091105493913AF7C1E3479BB943C80#heading-12 记录并补充 1.通过BeanFactoryAware package com.toryxu.demo1.beans;import org.springframework.beans.BeansException; import org.springframework.beans.facto…

postgresql-通用表达式

postgresql-通用表达式 入门案例简单CTE递归 CTE案例1案例2 入门案例 -- 通用表达式 with t(n) as (select 2) select * from t;简单CTE WITH cte_name (col1, col2, ...) AS (cte_query_definition ) sql_statement;WITH 表示定义 CTE&#xff0c;因此 CTE 也称为 WITH 查询…

【Unity】rotation和Quaternion学习笔记

1.rotation 赋值 Quaternion可以为transform.rotation 赋值 2. 从正轴面向原点&#xff0c;顺时针旋转&#xff0c;角度正增加 正x轴面向原点&#xff0c;顺时针旋转&#xff0c;z正轴往下&#xff0c;rotation的x正增加。 3.rotation和Quaternion的关系 1.查询 2.实践 旋转…

LVS DR模式负载均衡群集部署

目录 1 LVS-DR 模式的特点 1.1 数据包流向分析 1.2 DR 模式的特点 2 DR模式 LVS负载均衡群集部署 2.1 配置负载调度器 2.1.1 配置虚拟 IP 地址 2.1.2 调整 proc 响应参数 2.1.3 配置负载分配策略 2.2 部署共享存储 2.3 配置节点服务器 2.3.1 配置虚拟 IP 地址 2.3.2…

树形控件加自定义图标样式及指引线

记录一下留用&#xff0c;有错误请指正。 效果图如下&#xff1a; 自定义图标及指引线 代码&#xff1a; <div class"head-container" style"margin-left: -15px;"><el-tree icon-class"none"style"height:100%; overflow-y: h…

flutter 网络地址URL转file

方法1 import dart:io; import package:http/http.dart as http; import package:path/path.dart; import package:path_provider/path_provider.dart;Future<File> _fileFromImageUrl() async {final response await http.get(Uri.parse(https://example.com/xyz.jpg)…

滚动菜单 flutter

想实现这个功能&#xff1a; 下面的代码可以实现&#xff1a; import package:flutter/material.dart;void main() > runApp(MyApp());class MyApp extends StatelessWidget {static const String _title Flutter Code Sample;overrideWidget build(BuildContext context)…

vcruntime140_1.dll修复的方法大全,缺失vcruntime140_1.dll解决方法分享

vcruntime140_1.dll这个文件在电脑里属于挺重要的一个文件&#xff0c;一但它缺失了&#xff0c;那么很多程序都是运行不了的&#xff0c;今天我们就来讲解一下这个vcruntime140_1.dll修复以及它的一些作用和属性。 一.vcruntime140_1.dll的作用 vcruntime140_1.dll是Microso…

在MAC电脑上将NTFS格式移动硬盘转换为ExFAT格式

注意&#xff1a;转化之前先将移动硬盘中的内容进行备份 1、点击桌面上的【前往】&#xff0c;选择【实用工具】 2、在列表中选择【磁盘工具】 3、在左侧选中你的磁盘&#xff0c;点击右侧上方的【抹掉】,注意&#xff1a;将永久抹掉储存在上面的所有数据&#xff0c;因此需要…