大数据技术之Flume 企业开发案例——自定义 Sink(10)

目录

自定义 Sink

1)介绍

2)需求

3)编码

4)测试


自定义 Sink

1)介绍

Sink 不断地轮询 Channel 中的事件并批量地移除它们,随后将这些事件批量写入到存储或索引系统,或者发送到另一个 Flume Agent。

Sink 是完全事务性的。在从 Channel 批量删除数据之前,每个 Sink 会用 Channel 启动一个事务。批量事件一旦成功写入到存储系统或下一个 Flume Agent,Sink 就利用 Channel 提交事务。事务一旦被提交,该 Channel 从自己的内部缓冲区删除事件。

Sink 组件的目的地包括 hdfs、logger、avro、thrift、ipc、file、null、HBase、solr、自定义等。虽然官方提供的 Sink 类型已经很多,但在实际开发中可能仍不能满足需求。此时,可以根据实际需求来自定义 Sink。

官方提供了自定义 Sink 的接口:Flume Developer Guideicon-default.png?t=N7T8https://flume.apache.org/FlumeDeveloperGuide.html#sink。自定义 MySink 需要继承 AbstractSink 类并实现 Configurable 接口。

主要实现的方法包括:

  • configure(Context context) —— 初始化 context(读取配置文件内容)
  • process() —— 从 Channel 读取获取数据(event),这个方法将被循环调用。

使用场景:例如读取 Channel 数据写入 MySQL 或其他文件系统。

2)需求

使用 Flume 接收数据,并在 Sink 端给每条数据添加前缀和后缀,输出到控制台。前后缀可以从 Flume 任务配置文件中配置。

流程分析:

  • MySink
  • process():从 Channel 中取数据,添加前后缀,写入日志。
  • 输出示例:hello:lzl:hello
  • lzl

数据流:

  • source
  • channel
  • sink

步骤:

  1. 编码
    • AbstractSink
  2. 打包到集群并编写任务配置文件
    • Configurable
    • configure():读取任务配置文件中的配置信息。

3)编码

package com.lzl;import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;public class MySink extends AbstractSink implements Configurable {// 创建 Logger 对象private static final Logger LOG = LoggerFactory.getLogger(AbstractSink.class);private String prefix;private String suffix;@Overridepublic Status process() throws EventDeliveryException {// 声明返回值状态信息Status status;// 获取当前 Sink 绑定的 ChannelChannel ch = getChannel();// 获取事务Transaction txn = ch.getTransaction();// 声明事件Event event;// 开启事务txn.begin();// 读取 Channel 中的事件,直到读取到事件结束循环while (true) {event = ch.take();if (event != null) {break;}}try {// 处理事件(打印)LOG.info(prefix + new String(event.getBody()) + suffix);// 事务提交txn.commit();status = Status.READY;} catch (Exception e) {// 遇到异常,事务回滚txn.rollback();status = Status.BACKOFF;} finally {// 关闭事务txn.close();}return status;}@Overridepublic void configure(Context context) {// 读取配置文件内容,有默认值prefix = context.getString("prefix", "hello:");// 读取配置文件内容,无默认值suffix = context.getString("suffix");}
}

4)测试

(1)打包 将写好的代码打包,并放到 Flume 的 lib 目录(例如 /opt/module/flume)下。

(2)配置文件

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444# Describe the sink
a1.sinks.k1.type = com.lzl.MySink
#a1.sinks.k1.prefix = lzl:
a1.sinks.k1.suffix = :lzl# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

(3)开启任务

[lzl@hadoop12  flume]$ bin/flume-ng agent -c conf/ -f job/mysink.conf -n a1 -Dflume.root.logger=INFO,console
[lzl@hadoop12  ~]$ nc localhost 44444
hello
OK
lzl
OK

(4)查看结果 

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/52645.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

jenkins 开启控制台详细日志

1、开启控制台详细日志,查看真正报错原因 开启后生成流水线语句: 2、根本问题 使用jenkins再次构建,查看控制台日志 报错: 意思是在执行ssh命令的时候, /root/apps/jenkins/portal/portal-server/Dockerfile 路径下没…

极狐GitLab 如何管理 Kubernetes 集群?

极狐GitLab 是 GitLab 在中国的发行版,专门面向中国程序员和企业提供企业级一体化 DevOps 平台,用来帮助用户实现需求管理、源代码托管、CI/CD、安全合规,而且所有的操作都是在一个平台上进行,省事省心省钱。可以一键安装极狐GitL…

【计算机网络】电路交换、报文交换、分组交换

电路交换(Circuit Switching):通过物理线路的连接,动态地分配传输线路资源 ​​​​

《机器学习》 SVM支持向量机 推导、参数解析、可视化实现

目录 一、SVM支持向量机 1、什么是SVM 例如: 2、SVM的主要特点是: 二、SVM方程 1、超平面方程 2、标签问题 3、决策函数: 符号函数: 整合: 4、距离问题 1)点到直线距离 2)点到平面…

关于喷墨打印:液滴喷射及基材影响的那些事儿

大家好,今天我们来探讨一篇关于液滴喷射在生物应用中相关知识的文章——《Understanding droplet jetting on varying substrate for biological applications》是发表于《International Journal of Bioprinting》。在生物打印领域,了解液滴在不同基材上…

【Pytorch】Linear 层,举例:相机参数和Instance Feaure通过Linear层生成Group Weights

背景 看论文看到这个pipeline,对于相机参数和Instance Fature 的融合有点兴趣,研究如下: Linear 层 Linear 层是最基本的神经网络层之一,也称为全连接层。它将输入与每个输出神经元完全连接。每个连接都有一个权重和一个偏置。…

elasticsearch快照存储到linux本地路径或分布式存储系统mioio

一、使用linux本地目录做快照存储 1.编辑 elasticsearch.yml 文件,添加以下配置: path.repo: ["/path/to/your/backup/dir"]2.创建一个文件系统类型的快照仓库 PUT /_snapshot/my_local_repository {"type": "fs",&quo…

磷酸二氢钾溶液净化除杂,除重金属

磷酸二氢锂,化学式LiH2PO4,相对分子质量103.93,白色结晶或粉末。熔点大于100℃,相对密度2.5g/ml。每100毫升水中的溶解克数:126g/0℃。 锂离子二次电池在手提电脑、移动通讯、电动工具等方面具有广泛应用,在…

使用VScode的Git版本控制功能(图文版)

☁️ 前言 今天让我来手把手教你简单入门VScode自带的Git版本控制。 🎉 初始化仓库 初始化仓库之后,仓库里的文件发生了任何改动都会有相应的提示,这对于我们开发和维护项目非常有帮助。 🎉提交更改 初始化仓库之后&#xff…

基于web的停车场管理系统设计与实现-计算机毕设 附源码 16856

基于web的停车场管理系统设计与实现 目 录 1 绪论 1.1 研究背景和意义 1.2国内外研究现状 1.3论文结构与章节安排 2 系统分析 2.1 可行性分析 2.1.1 技术可行性分析 2.1.2 经济可行性分析 2.1.3 操作可行性分析 2.2 系统功能分析 2.2.1 功能性分析 2.2.2 非功能性分…

【自动化】考试答题自动化完成答案,如何实现100%正确呢

一、科目仿真考试不能自动答题 我的答案是可以的,电脑程序可以模拟人的操作完成所有的答题并提交结束考试 二、分析页面内容 完成一个题目,包括判断题,对与错2选1答案,单选题ABCD4选1答案,多选题大家想一想 F12查看按…

基于layui实现简单的万智牌生命计数器页面

对照手机App“旅法师营地”的万智牌生命计数器窗口(如下图所示),使用layui、jQuery等实现简单的万智牌生命计数器页面。   主要实现的功能如下:   1)点击左右两侧的-1、1、-5、5区域更新左右两侧生命值&#xff1…

【MATLAB学习笔记】绘图——自定义标记(Marker)形状,实现与MATLAB自带标记基本一致的功能(自适应缩放、自适应裁剪)

目录 前言自定义标记函数自定义标记函数的说明纵横比调整将图形大小按磅数设置平移标记点绘制标记点边界标记点不裁剪 拓展功能——标记点自适应绘图区的缩放绘图区缩放回调函数标记点大小自适应标记点裁剪自适应 示例基本绘图自定义标记函数的使用 总代码主函数自定义标记函数…

入门STM32--按键输入

上一篇博客我们介绍了如何使用GPIO配置跑马灯,根据GPIO的基本结构图,我们能够发现,他肯定不单单有输出的功能,肯定可以检测IO上的电平变化,实际上就是输入的功能。 1.按键 在大多数情况下,按键是一种简单的…

【第54课】XSS跨站Cookie盗取表单劫持网络钓鱼溯源分析项目平台框架

免责声明 本文发布的工具和脚本,仅用作测试和学习研究,禁止用于商业用途,不能保证其合法性,准确性,完整性和有效性,请根据情况自行判断。 如果任何单位或个人认为该项目的脚本可能涉嫌侵犯其权利&#xff0…

1259:【例9.3】求最长不下降序列 动态规划

1259:【例9.3】求最长不下降序列 题目链接 【输入样例】 【输入样例】 14 13 7 9 16 38 24 37 18 44 19 21 22 63 15【输出样例】 max8 7 9 16 18 19 21 22 63思路: 确定状态: a[n]数组放数据, dp[n]数组放第i个位子前最长子序…

kafka发送消息-生产者发送消息的分区策略(消息发送到哪个分区中?是什么策略)

生产者发送消息的分区策略(消息发送到哪个分区中?是什么策略) 1、默认策略,程序自动计算并指定分区1.1、指定key,不指定分区1.2、不指定key,不指定分区 2、轮询分配策略RoundRobinPartitioner2.1、创建配置…

Linux网络:网络基础

Linux网络:网络基础 一、网络诞生背景及产生的诸多问题1. 1 网络诞生背景1.2 网络诞生面临的困境 二、网络协议栈(OSI七层模型、CP/IP五层模型)2.1 TCP/IP五层(或四层)模型 三、网络和系统关系四、网络传输流程4.1 同一个局域网中的两台主机进…

折腾 Quickwit,Rust 编写的分布式搜索引擎-官方教程

快速上手 在本快速入门指南中,我们将安装 Quickwit,创建一个索引,添加文档,最后执行搜索查询。本指南中使用的所有 Quickwit 命令都在 CLI 参考文档 中进行了记录。 https://quickwit.io/docs/main-branch/reference/cli 使用 Qui…

flutter 中 ssl 双向证书校验

SSL 证书: 在处理 https 请求的时候,通常可以使用 中间人攻击的方式 获取 https 请求以及响应参数。应为通常我们是 SSL 单向认证,服务器并没有验证我们的客户端的证书。为了防止这种中间人攻击的情况。我么可以通过 ssl 双向认证的方式。即…