大数据技术之Flume 企业开发案例——自定义 Sink(10)

目录

自定义 Sink

1)介绍

2)需求

3)编码

4)测试


自定义 Sink

1)介绍

Sink 不断地轮询 Channel 中的事件并批量地移除它们,随后将这些事件批量写入到存储或索引系统,或者发送到另一个 Flume Agent。

Sink 是完全事务性的。在从 Channel 批量删除数据之前,每个 Sink 会用 Channel 启动一个事务。批量事件一旦成功写入到存储系统或下一个 Flume Agent,Sink 就利用 Channel 提交事务。事务一旦被提交,该 Channel 从自己的内部缓冲区删除事件。

Sink 组件的目的地包括 hdfs、logger、avro、thrift、ipc、file、null、HBase、solr、自定义等。虽然官方提供的 Sink 类型已经很多,但在实际开发中可能仍不能满足需求。此时,可以根据实际需求来自定义 Sink。

官方提供了自定义 Sink 的接口:Flume Developer Guideicon-default.png?t=N7T8https://flume.apache.org/FlumeDeveloperGuide.html#sink。自定义 MySink 需要继承 AbstractSink 类并实现 Configurable 接口。

主要实现的方法包括:

  • configure(Context context) —— 初始化 context(读取配置文件内容)
  • process() —— 从 Channel 读取获取数据(event),这个方法将被循环调用。

使用场景:例如读取 Channel 数据写入 MySQL 或其他文件系统。

2)需求

使用 Flume 接收数据,并在 Sink 端给每条数据添加前缀和后缀,输出到控制台。前后缀可以从 Flume 任务配置文件中配置。

流程分析:

  • MySink
  • process():从 Channel 中取数据,添加前后缀,写入日志。
  • 输出示例:hello:lzl:hello
  • lzl

数据流:

  • source
  • channel
  • sink

步骤:

  1. 编码
    • AbstractSink
  2. 打包到集群并编写任务配置文件
    • Configurable
    • configure():读取任务配置文件中的配置信息。

3)编码

package com.lzl;import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;public class MySink extends AbstractSink implements Configurable {// 创建 Logger 对象private static final Logger LOG = LoggerFactory.getLogger(AbstractSink.class);private String prefix;private String suffix;@Overridepublic Status process() throws EventDeliveryException {// 声明返回值状态信息Status status;// 获取当前 Sink 绑定的 ChannelChannel ch = getChannel();// 获取事务Transaction txn = ch.getTransaction();// 声明事件Event event;// 开启事务txn.begin();// 读取 Channel 中的事件,直到读取到事件结束循环while (true) {event = ch.take();if (event != null) {break;}}try {// 处理事件(打印)LOG.info(prefix + new String(event.getBody()) + suffix);// 事务提交txn.commit();status = Status.READY;} catch (Exception e) {// 遇到异常,事务回滚txn.rollback();status = Status.BACKOFF;} finally {// 关闭事务txn.close();}return status;}@Overridepublic void configure(Context context) {// 读取配置文件内容,有默认值prefix = context.getString("prefix", "hello:");// 读取配置文件内容,无默认值suffix = context.getString("suffix");}
}

4)测试

(1)打包 将写好的代码打包,并放到 Flume 的 lib 目录(例如 /opt/module/flume)下。

(2)配置文件

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444# Describe the sink
a1.sinks.k1.type = com.lzl.MySink
#a1.sinks.k1.prefix = lzl:
a1.sinks.k1.suffix = :lzl# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

(3)开启任务

[lzl@hadoop12  flume]$ bin/flume-ng agent -c conf/ -f job/mysink.conf -n a1 -Dflume.root.logger=INFO,console
[lzl@hadoop12  ~]$ nc localhost 44444
hello
OK
lzl
OK

(4)查看结果 

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/52645.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

HTML中渲染空格和换行符样式的实现方式

在HTML中&#xff0c;连续的空格和换行符会被合并为一个空格&#xff0c;所以无法直接通过添加换行符来实现缩进效果。如果您希望在HTML中显示缩进的效果&#xff0c;可以使用CSS样式中的white-space: pre属性。 以下是使用<pre>标签和CSS样式实现缩进效果的示例&#x…

jenkins 开启控制台详细日志

1、开启控制台详细日志&#xff0c;查看真正报错原因 开启后生成流水线语句&#xff1a; 2、根本问题 使用jenkins再次构建&#xff0c;查看控制台日志 报错&#xff1a; 意思是在执行ssh命令的时候&#xff0c; /root/apps/jenkins/portal/portal-server/Dockerfile 路径下没…

极狐GitLab 如何管理 Kubernetes 集群?

极狐GitLab 是 GitLab 在中国的发行版&#xff0c;专门面向中国程序员和企业提供企业级一体化 DevOps 平台&#xff0c;用来帮助用户实现需求管理、源代码托管、CI/CD、安全合规&#xff0c;而且所有的操作都是在一个平台上进行&#xff0c;省事省心省钱。可以一键安装极狐GitL…

【计算机网络】电路交换、报文交换、分组交换

电路交换&#xff08;Circuit Switching&#xff09;&#xff1a;通过物理线路的连接&#xff0c;动态地分配传输线路资源 ​​​​

python——requests

Python requests 库 一、什么是requests库&#xff1f; Python的requests库是一个用于发送HTTP请求的第三方库。它简单易用&#xff0c;封装了许多底层操作&#xff0c;能够帮助开发者更轻松地与Web服务进行通信。requests库支持发送各种HTTP请求&#xff0c;比如GET、POST、…

《机器学习》 SVM支持向量机 推导、参数解析、可视化实现

目录 一、SVM支持向量机 1、什么是SVM 例如&#xff1a; 2、SVM的主要特点是&#xff1a; 二、SVM方程 1、超平面方程 2、标签问题 3、决策函数&#xff1a; 符号函数&#xff1a; 整合&#xff1a; 4、距离问题 1&#xff09;点到直线距离 2&#xff09;点到平面…

关于喷墨打印:液滴喷射及基材影响的那些事儿

大家好&#xff0c;今天我们来探讨一篇关于液滴喷射在生物应用中相关知识的文章——《Understanding droplet jetting on varying substrate for biological applications》是发表于《International Journal of Bioprinting》。在生物打印领域&#xff0c;了解液滴在不同基材上…

【Pytorch】Linear 层,举例:相机参数和Instance Feaure通过Linear层生成Group Weights

背景 看论文看到这个pipeline&#xff0c;对于相机参数和Instance Fature 的融合有点兴趣&#xff0c;研究如下&#xff1a; Linear 层 Linear 层是最基本的神经网络层之一&#xff0c;也称为全连接层。它将输入与每个输出神经元完全连接。每个连接都有一个权重和一个偏置。…

GWASinspector简单教程

在进行GWAS meta分析前&#xff0c;对GWAS summary data数据进行QC非常重要&#xff0c;最近文章提出了一个pipeline可以进行相关的操作&#xff1a;GWASinspector&#xff08;文章连接&#xff1a;GWASinspector: comprehensive quality control of genome-wide association s…

elasticsearch快照存储到linux本地路径或分布式存储系统mioio

一、使用linux本地目录做快照存储 1.编辑 elasticsearch.yml 文件&#xff0c;添加以下配置&#xff1a; path.repo: ["/path/to/your/backup/dir"]2.创建一个文件系统类型的快照仓库 PUT /_snapshot/my_local_repository {"type": "fs",&quo…

【手写数据库内核组件】0102 链表的类型,单向链表,双向链表,循环链表,二叉树,多路树等类型以及它们的特点,物尽其用

0102 链表的类型 ​专栏内容: postgresql使用入门基础手写数据库toadb并发编程个人主页:我的主页 管理社区:开源数据库 座右铭:天行健,君子以自强不息;地势坤,君子以厚德载物. 文章目录 0102 链表的类型一、概述 二、链表的类型与特点 2.1 单链表 2.2 双向链表 2.3 循环…

磷酸二氢钾溶液净化除杂,除重金属

磷酸二氢锂&#xff0c;化学式LiH2PO4&#xff0c;相对分子质量103.93&#xff0c;白色结晶或粉末。熔点大于100℃&#xff0c;相对密度2.5g/ml。每100毫升水中的溶解克数&#xff1a;126g/0℃。 锂离子二次电池在手提电脑、移动通讯、电动工具等方面具有广泛应用&#xff0c;在…

CSS @layer:深入理解与实战应用

CSS layer&#xff1a;深入理解与实战应用 在CSS的不断发展中&#xff0c;layer 规则的引入为样式表的组织和性能优化提供了全新的解决方案。本文将详细介绍CSS layer 的使用方法&#xff0c;以及它能解决的实际问题&#xff0c;帮助开发者更好地理解和应用这一特性。 一、lay…

使用VScode的Git版本控制功能(图文版)

☁️ 前言 今天让我来手把手教你简单入门VScode自带的Git版本控制。 &#x1f389; 初始化仓库 初始化仓库之后&#xff0c;仓库里的文件发生了任何改动都会有相应的提示&#xff0c;这对于我们开发和维护项目非常有帮助。 &#x1f389;提交更改 初始化仓库之后&#xff…

基于web的停车场管理系统设计与实现-计算机毕设 附源码 16856

基于web的停车场管理系统设计与实现 目 录 1 绪论 1.1 研究背景和意义 1.2国内外研究现状 1.3论文结构与章节安排 2 系统分析 2.1 可行性分析 2.1.1 技术可行性分析 2.1.2 经济可行性分析 2.1.3 操作可行性分析 2.2 系统功能分析 2.2.1 功能性分析 2.2.2 非功能性分…

【自动化】考试答题自动化完成答案,如何实现100%正确呢

一、科目仿真考试不能自动答题 我的答案是可以的&#xff0c;电脑程序可以模拟人的操作完成所有的答题并提交结束考试 二、分析页面内容 完成一个题目&#xff0c;包括判断题&#xff0c;对与错2选1答案&#xff0c;单选题ABCD4选1答案&#xff0c;多选题大家想一想 F12查看按…

电商行业虚拟公户供应商分账

在快速发展的电商时代&#xff0c;资金流动的高效与安全成为了企业运营中不可忽视的重要环节。电商虚拟公户供应商分账系统的出现&#xff0c;正是为解决这一难题而设计的创新解决方案。本文将深入探讨电商虚拟公户供应商分账的概念、优势及其在电商行业中的应用。 电商虚拟公…

网页html版——在线查字典的一个web服务器

HTML&#xff08;HyperText Markup Language&#xff09; HTML是一种用于创建网页的标准标记语言。可以用dreamwave这个工具来写 使用文本编辑器&#xff08;如Notepad、Sublime Text、Visual Studio Code等&#xff09;创建一个新的文件&#xff0c;并将其保存为 .html 文件…

基于layui实现简单的万智牌生命计数器页面

对照手机App“旅法师营地”的万智牌生命计数器窗口&#xff08;如下图所示&#xff09;&#xff0c;使用layui、jQuery等实现简单的万智牌生命计数器页面。   主要实现的功能如下&#xff1a;   1&#xff09;点击左右两侧的-1、1、-5、5区域更新左右两侧生命值&#xff1…

【MATLAB学习笔记】绘图——自定义标记(Marker)形状,实现与MATLAB自带标记基本一致的功能(自适应缩放、自适应裁剪)

目录 前言自定义标记函数自定义标记函数的说明纵横比调整将图形大小按磅数设置平移标记点绘制标记点边界标记点不裁剪 拓展功能——标记点自适应绘图区的缩放绘图区缩放回调函数标记点大小自适应标记点裁剪自适应 示例基本绘图自定义标记函数的使用 总代码主函数自定义标记函数…