Flume 之自定义 Source

1、简介

        Flume 自带 Source 有 Avro、Thrift、Netcat、Taildir、Kafka、Http等,有些场合比如我们指定访问接口获取数据当做 Flume 的 Source,像这种定制化的 Source 需要我们自己实现,下面我将介绍如何自定义实现 Source。

2、自定义实现 Flume 的 Source 

        使用自定义 Source ,访问自身的web 服务,并且发送至 logger 的 Sink。

2.1、引入依赖
<dependencies><dependency><groupId>org.apache.flume</groupId><artifactId>flume-ng-core</artifactId><version>1.11.0</version></dependency>
</dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><version>3.2.1</version><configuration><skip>true</skip></configuration></plugin></plugins>
</build>
2.2、自定义Source
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.PollableSource;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.EventBuilder;
import org.apache.flume.source.AbstractSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.URL;public class MySource extends AbstractSource implements Configurable, PollableSource {private String path;private final static Logger log = LoggerFactory.getLogger(MySource.class);@Overridepublic Status process() throws EventDeliveryException {Status status = null;try{URL url = new URL(this.path);HttpURLConnection connection = (HttpURLConnection) url.openConnection();connection.setRequestMethod("GET");connection.connect();InputStream is = connection.getInputStream();BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(is, "utf-8"));StringBuffer stringBuffer = new StringBuffer();String line = null;while ((line = bufferedReader.readLine()) != null){stringBuffer.append(line);stringBuffer.append("\t");}log.info("test =======================: {}" , stringBuffer.toString());Event event = EventBuilder.withBody(stringBuffer.toString().getBytes());getChannelProcessor().processEvent(event);status = Status.READY;// 2s调用一次Thread.sleep(2000);}catch (Exception ex){log.info("出错了!, {}", ex.getMessage());status = Status.BACKOFF;}return status;}@Overridepublic long getBackOffSleepIncrement() {return 0;}@Overridepublic long getMaxBackOffSleepInterval() {return 0;}@Overridepublic void configure(Context context) {// 从配置文件中读取String path = context.getString("path", "http://baidu.com");log.info("path ==========================: {}", path);this.path = path;}
}
2.3、将自定义Source 打包放到 flume 的 lib目录下

2.4、flume 的配置文件编写 

        vim flume-self-source.conf

a1.sources = r1
a1.channels = c1
a1.sinks=k1
# source
a1.sources.r1.type = com.weilong.flumeselfdefinition.MySource # 自定义 Source 的全限定类名
a1.sources.r1.path = http://192.168.30.33:8088/hello # 自定义参数
# channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Sink
a1.sinks.k1.type = logger
# bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
2.5、测试
2.5.1、启动web服务

2.5.2、启动flume
bin/flume-ng agent -c conf/ -n a1 -f testconf/flume-self-source.conf -Dflume.root.logger=INFO,console
2.5.3、结果 

 

3、总结

        本文详细介绍 Flume 如何实现自定义 Source,帮助大家进一步了解Flume的使用。

        本人是一个从小白自学计算机技术,对运维、后端、各种中间件技术、大数据等有一定的学习心得,想获取自学总结资料(pdf版本)或者希望共同学习,关注微信公众号:it自学社团。后台回复相应技术名称/技术点即可获得。(本人学习宗旨:学会了就要免费分享)

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/628533.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Git 是什么?

Git 是什么&#xff1f; Git 是一个开源的分布式版本控制系统&#xff0c;用于敏捷高效地处理任何或小或大的项目。 Git 是 Linus Torvalds 为了帮助管理 Linux 内核开发而开发的一个开放源码的版本控制软件。 Git 与常用的版本控制工具 CVS, Subversion 等不同&#xff0c;…

Cesium 模型压平

最近整理了下手上的代码&#xff0c;以下是对模型压平的说明。 原理是使用了customShader来重新设置了模型的着色器&#xff0c;通过修改模型顶点的坐标来实现了压平。 废话不多说&#xff0c;下面上代码&#xff1a; /*** class* description 3dtiles模型压平*/ class Flat…

推荐给前端程序员的5款浏览器插件

所谓“工欲善其事&#xff0c;必先利其器”。Chrome&#xff08;谷歌浏览器&#xff09; 应该是程序员或者互联网行业人员使用最多的浏览器了。而在日常开发中&#xff0c;下面几款 浏览器 扩展也许能让你的开发工作事半功倍 。 1、Vimium vimium 是一个旨在将你的双手从鼠标…

C++核心编程(包含:内存、函数、引用、类与对象、文件操作等)【持续更新】

&#x1f308;个人主页&#xff1a;godspeed_lucip &#x1f525; 系列专栏&#xff1a;C从基础到进阶 C核心编程&#x1f30f;1 内存分区模型&#x1f384;1.1 程序运行前&#x1f384;1.2 程序运行后&#x1f384;1.3 new操作符 &#x1f30f;2 引用&#x1f384;2.1 引用的基…

【Golang开源项目】Golang高性能内存缓存库BigCache设计与分析

项目地址 BigCache 是一个快速&#xff0c;支持并发访问&#xff0c;自淘汰的内存型缓存&#xff0c;可以在存储大量元素时依然保持高性能。BigCache将元素保存在堆上却避免了GC的开销。 背景介绍 BigCache的作者在项目里遇到了如下的需求&#xff1a; 支持http协议支持 10…

Linux shell编程学习笔记39:df命令

0 前言1 df命令的功能、格式和选项说明 1.1 df命令的功能1.2 df命令的格式1.3 df命令选项说明 2 df命令使用实例 2.1 df&#xff1a;显示主要文件系统信息2.2 df -a&#xff1a;显示所有文件系统信息2.3 df -t[]TYPE或--type[]TYPE&#xff1a;显示TYPE指定类型的文件系统信…

解决英特尔无线网卡WiFi或者蓝牙突然消失问题

winR&#xff0c;输入“devmgmt.msc”&#xff0c;检查设备管理器中的无线网卡驱动是否安装好。 访问https://www.intel.cn/content/www/cn/zh/download/19351/windows-10-and-windows-11-wi-fi-drivers-for-intel-wireless-adapters.html下载对应系统版本的英特尔无线网卡WiFi…

遇到问题不要慌,轻松搞定内存泄露

当一个系统在发生 OOM 的时候&#xff0c;行为可能会让你感到非常困惑。因为 JVM 是运行在操作系统之上的&#xff0c;操作系统的一些限制&#xff0c;会严重影响 JVM 的行为。故障排查是一个综合性的技术问题&#xff0c;在日常工作中要增加自己的知识广度。多总结、多思考、多…

基于PyQT的图片批处理系统

项目背景&#xff1a; 随着数字摄影技术的普及&#xff0c;人们拍摄和处理大量图片的需求也越来越高。为了提高效率&#xff0c;开发一个基于 PyQt 的图片批处理系统是很有意义的。该系统可以提供一系列图像增强、滤波、水印、翻转、放大缩小、旋转等功能&#xff0c;使用户能够…

SpringBoot:详解依赖注入和使用配置文件

&#x1f3e1;浩泽学编程&#xff1a;个人主页 &#x1f525; 推荐专栏&#xff1a;《深入浅出SpringBoot》《java项目分享》 《RabbitMQ》《Spring》《SpringMVC》 &#x1f6f8;学无止境&#xff0c;不骄不躁&#xff0c;知行合一 文章目录 前言一、&#x1f3…

力扣精选算法100题——等于目标值的两个数or三数之和(双指针专题)

目录 &#x1f6a9;等于目标值的俩个数 第一步&#xff1a;了解题意 第二步&#xff1a;算法原理 第三步&#xff1a;代码实现 &#x1f6a9;三数之和 第一步&#xff1a;了解题意 第二步&#xff1a;算法原理 思路&#xff1a; ❗不漏&#xff1a; ❗去重: &#xf…

Simulink旧版本如何打开新版的模型文件

Simulink旧版本如何打开新版的模型文件 当用旧版本Simulink软件打开模型时会报错&#xff0c;是因为版本不兼容造成的 解决办法 在simulink的选项中去掉 do not load models created with newer version of Simulink

计算机视觉的应用

计算机视觉&#xff08;Computer Vision&#xff09;是一门研究如何让计算机能够理解和分析数字图像或视频的学科。简单来说&#xff0c;计算机视觉的目标是让计算机能够像人类一样对视觉信息进行处理和理解。为实现这个目标&#xff0c;计算机视觉结合了图像处理、机器学习、模…

分享用 vector的vector实现一个二维数组并初始化的逆置矩阵问题

题目名称 867.转置矩阵 目录 题目名称 867.转置矩阵 1.题目 2.题目分析 3.题目知识点 3.1vector的构造函数 3.2vector构造二维数组 最后&#x1f490; 推荐阅读顺序: 1.题目->2.题目分析->3.题目知识点 1.题目 如果矩阵 matrix为 m 行 n列&#xff0c;则转置后的矩…

【Python学习】Python学习15-模块

目录 【Python学习】Python学习15-模块 前言创建语法引入模块from…import 语句from…import* 语句搜索路径PYTHONPATH 变量-*- coding: UTF-8 -*-导入模块现在可以调用模块里包含的函数了PYTHONPATH 变量命名空间和作用域dir()函数globals() 和 locals() 函数reload() 函数Py…

Python中如何简化if...else...语句

一、引言 我们通常在Python中采用if...else..语句对结果进行判断&#xff0c;根据条件来返回不同的结果&#xff0c;如下面的例子。这段代码是一个简单的Python代码片段&#xff0c;让用户输入姓名并将其赋值给变量user_input。我们能不能把这几行代码进行简化&#xff0c;优化…

RocketMQ源码阅读-Message拉取与消费-Consumer篇

RocketMQ源码阅读-Message拉取与消费-Consumer篇 1. Consumer2. PushConsumer3. PushConsumer 订阅3.1 subscribe订阅3.2 registerMessageListener注册监听器 4. PushConsumer 消息队列Rebalance4.1 Rebalance流程4.2 Rebalance策略AllocateMessageQueueAveragelyAllocateMessa…

CSS 动态邮件查收效果

<template><view class="content"><view class="tooltip-container"><text class="tooltip">查看</text><text class="text">@</text></view></view> </template><sc…

Visual Studio调试模式下无法使用右键菜单将ppt转换到pdf

Visual Studio调试模式下无法使用右键菜单将ppt转换到pdf 症状 Visual Studio调试模式下&#xff0c;程序停在断点时&#xff0c;我临时需要将ppt转为pdf&#xff0c;遂右键单击文件&#xff0c;想直接转pdf&#xff0c;奈何光标转了几秒钟&#xff0c;毫无反应。 解决方法 …

未来科技五年人工智能行业产业发展趋势最新竞争力

人工智能&#xff08;Artificial Intelligence&#xff0c;AI&#xff09;是近年来快速发展的热门领域&#xff0c;被广泛应用于各个行业。随着技术的不断创新和突破&#xff0c;人工智能行业的竞争力也在不断提升。本文将分析未来科技五年人工智能行业产业发展趋势&#xff0c…