flume sink 简介及官方用例

1、HDFS Sink

此sink将事件写入 Hadoop 分布式文件系统 (HDFS) 中。它目前支持创建文本和序列文件。它支持两种文件类型的压缩。可以根据经过的时间或数据大小或事件数定期滚动文件(关闭当前文件并创建一个新文件)。它还按事件起源的时间戳或计算机等属性对数据进行存储/分区。HDFS 目录路径可能包含格式转义序列,这些转义序列将由 HDFS 接收器替换,以生成用于存储事件的目录/文件名。使用此 sink 需要安装 hadoop,以便 Flume 可以使用 Hadoop jar 与 HDFS 集群进行通信。请注意,需要支持 sync() 调用的 Hadoop 版本。

注意 对于所有与时间相关的转义序列,事件的标头中必须存在键为“timestamp”的标头(除非 hdfs.useLocalTimeStamp 设置为 true)。自动添加此功能的一种方法是使用 TimestampInterceptor。

Name

Default

Description

channel

type

The component type name, needs to be hdfs

hdfs.path

HDFS directory path (eg hdfs://namenode/flume/webdata/)

Example for agent named a1:

a1.channels = c1

a1.sinks = k1

a1.sinks.k1.type = hdfs

a1.sinks.k1.channel = c1

a1.sinks.k1.hdfs.path = /flume/events/%Y-%m-%d/%H%M/%S

a1.sinks.k1.hdfs.filePrefix = events-

a1.sinks.k1.hdfs.round = true

a1.sinks.k1.hdfs.roundValue = 10

a1.sinks.k1.hdfs.roundUnit = minute

上述配置会将时间戳四舍五入到最后 10 分钟。例如,如果事件的时间戳为 2012 年 6 月 12 日上午 11:54:34,则 hdfs 路径将变为 /flume/events/2012-06-12/1150/00。

2、Hive Sink

此接收器将包含分隔文本或 JSON 数据的事件直接流式传输到 Hive 表或分区中。事件是使用 Hive 事务写入的。一旦将一组事件提交到 Hive,Hive 查询就会立即看到这些事件。Flume 将流式传输到的分区可以预先创建,也可以选择性地在缺少分区时创建它们。传入事件数据中的字段将映射到 Hive 表中的相应列。

Name

Default

Description

channel

type

The component type name, needs to be hive

hive.metastore

Hive metastore URI (eg thrift://a.b.com:9083 )

hive.database

Hive database name

hive.table

Hive table name

hive.partition

Comma separate list of partition values identifying the partition to write to. May contain escape sequences. E.g: If the table is partitioned by (continent: string, country :string, time : string) then ‘Asia,India,2014-02-26-01-21’ will indicate continent=Asia,country=India,time=2014-02-26-01-21

serializer

Serializer is responsible for parsing out field from the event and mapping them to columns in the hive table. Choice of serializer depends upon the format of the data in the event. Supported serializers: DELIMITED and JSON

Example Hive table :

create table weblogs ( id int , msg string )

    partitioned by (continent string, country string, time string)

    clustered by (id) into 5 buckets

    stored as orc;

Example for agent named a1:

a1.channels = c1

a1.channels.c1.type = memory

a1.sinks = k1

a1.sinks.k1.type = hive

a1.sinks.k1.channel = c1

a1.sinks.k1.hive.metastore = thrift://127.0.0.1:9083

a1.sinks.k1.hive.database = logsdb

a1.sinks.k1.hive.table = weblogs

a1.sinks.k1.hive.partition = asia,%{country},%Y-%m-%d-%H-%M

a1.sinks.k1.useLocalTimeStamp = false

a1.sinks.k1.round = true

a1.sinks.k1.roundValue = 10

a1.sinks.k1.roundUnit = minute

a1.sinks.k1.serializer = DELIMITED

a1.sinks.k1.serializer.delimiter = "\t"

a1.sinks.k1.serializer.serdeSeparator = '\t'

a1.sinks.k1.serializer.fieldnames =id,,msg

上述配置会将时间戳四舍五入到最后 10 分钟。例如,如果事件的时间戳标头设置为 2012 年 6 月 12 日上午 11:54:34,并且“country”标头设置为“india”,则该事件的计算结果将达到分区 (continent='asia',country='india',time='2012-06-12-11-50')。序列化程序配置为接受包含三个字段的制表符分隔输入,并跳过第二个字段。

3、Logger Sink

在 INFO 级别记录事件。通常可用于测试/调试目的。

Property Name

Default

Description

channel

type

The component type name, needs to be logger

maxBytesToLog

16

Maximum number of bytes of the Event body to log

Example for agent named a1:

a1.channels = c1

a1.sinks = k1

a1.sinks.k1.type = logger

a1.sinks.k1.channel = c1

4、Avro Sink

发送到此sink的 Flume 事件将转换为 Avro 事件,并发送到配置的主机名/端口对。事件以配置的批处理大小的批处理方式从配置的通道中获取。

Property Name

Default

Description

channel

type

The component type name, needs to be avro.

hostname

The hostname or IP address to bind to.

port

The port # to listen on.

Example for agent named a1:

a1.channels = c1

a1.sinks = k1

a1.sinks.k1.type = avro

a1.sinks.k1.channel = c1

a1.sinks.k1.hostname = 10.10.10.10

a1.sinks.k1.port = 4545

5、Thrift Sink

发送到此接收器的 Flume 事件将转换为 Thrift 事件,并发送到配置的主机名/端口对。事件从配置的通道中按配置的批处理大小批量获取

通过启用 kerberos 身份验证,可以将 Thrift sink 配置为在安全模式下启动。要与以安全模式启动的 Thrift 源进行通信,Thrift 接收器也应在安全模式下运行。client-principal 和 client-keytab 是 Thrift 接收器用于向 kerberos KDC 进行身份验证的属性。server-principal 表示此接收器配置为在安全模式下连接到的 Thrift 源的主体。

Property Name

Default

Description

channel

type

The component type name, needs to be thrift.

hostname

The hostname or IP address to bind to.

port

The port # to listen on.

Example for agent named a1:

a1.channels = c1

a1.sinks = k1

a1.sinks.k1.type = thrift

a1.sinks.k1.channel = c1

a1.sinks.k1.hostname = 10.10.10.10

a1.sinks.k1.port = 4545

6、File Roll Sink

将事件存储在本地文件系统上

Property Name

Default

Description

channel

type

The component type name, needs to be file_roll.

sink.directory

The directory where files will be stored

Example for agent named a1:

a1.channels = c1

a1.sinks = k1

a1.sinks.k1.type = file_roll

a1.sinks.k1.channel = c1

a1.sinks.k1.sink.directory = /var/log/flume

7、IRC Sink

IRC 接收器从附加通道获取消息,并将这些消息中继到配置的 IRC 目标。

Property Name

Default

Description

channel

type

The component type name, needs to be irc

hostname

The hostname or IP address to connect to

port

6667

The port number of remote host to connect

nick

Nick name

user

User name

password

User password

chan

channel

Example for agent named a1:

a1.channels = c1

a1.sinks = k1

a1.sinks.k1.type = irc

a1.sinks.k1.channel = c1

a1.sinks.k1.hostname = irc.yourdomain.com

a1.sinks.k1.nick = flume

a1.sinks.k1.chan = #flume

8、HBaseSinks¶.

此接收器将数据写入 HBase。Hbase 配置是从类路径中遇到的第一个hbase-site.xml中选取的。实现 HbaseEventSerializer 的类(由配置指定)用于将事件转换为 HBase 全量和/或增量。然后将这些全量和增量写入 HBase。此接收器提供与 HBase 相同的一致性保证,HBase 目前是逐行原子性。如果 Hbase 无法写入某些事件,接收器将回滚该事务中的所有事件。

HBaseSink 支持写入数据以保护 HBase。若要写入安全 HBase,代理正在运行的用户必须对接收器配置为写入的表具有写入权限。可以在配置中指定用于对 KDC 进行身份验证的主体和密钥表。Flume 代理类路径中的hbase-site.xml必须将身份验证设置为 kerberos(有关如何执行此操作的详细信息,请参阅 HBase 文档)

为方便起见,Flume 提供了两个序列化器。SimpleHbaseEventSerializer (org.apache.flume.sink.hbase.SimpleHbaseEventSerializer) 将事件正文按原样写入 HBase,并选择性地在 Hbase 中递增列。RegexHbaseEventSerializer (org.apache.flume.sink.hbase.RegexHbaseEventSerializer) 根据给定的正则表达式中断事件正文,并将每个部分写入不同的列中。类型为 FQCN:org.apache.flume.sink.hbase.HBaseSink。

Property Name

Default

Description

channel

type

The component type name, needs to be hbase

table

The name of the table in Hbase to write to.

columnFamily

The column family in Hbase to write to.

Example for agent named a1:

a1.channels = c1

a1.sinks = k1

a1.sinks.k1.type = hbase

a1.sinks.k1.table = foo_table

a1.sinks.k1.columnFamily = bar_cf

a1.sinks.k1.serializer = org.apache.flume.sink.hbase.RegexHbaseEventSerializer

a1.sinks.k1.channel = c1

9、HBase2Sink

HBase2Sink 相当于 HBase 版本 2 的 HBaseSink。提供的功能和配置参数与 HBaseSink 相同(sink 类型中的 hbase2 标记和包/类名称除外)。类型为FQCN: org.apache.flume.sink.hbase2.HBase2Sink

Property Name

Default

Description

channel

type

The component type name, needs to be hbase2

table

The name of the table in HBase to write to.

columnFamily

The column family in HBase to write to.

Example for agent named a1:

a1.channels = c1

a1.sinks = k1

a1.sinks.k1.type = hbase2

a1.sinks.k1.table = foo_table

a1.sinks.k1.columnFamily = bar_cf

a1.sinks.k1.serializer = org.apache.flume.sink.hbase2.RegexHBase2EventSerializer

a1.sinks.k1.channel = c1

10、AsyncHBaseSink

此接收器使用异步模型将数据写入 HBase。实现 AsyncHbaseEventSerializer 的类(由配置指定)用于将事件转换为 HBase 全量和/或增量。然后将这些看全量和增量写入 HBase。此接收器使用 Asynchbase API 写入 HBase。此接收器提供与 HBase 相同的一致性保证,HBase 目前是逐行原子性。如果 Hbase 无法写入某些事件,接收器将回滚该事务中的所有事件。AsyncHBaseSink 只能与 HBase 1.x 一起使用。AsyncHBaseSink 使用的异步客户端库不适用于 HBase 2。类型为 FQCN:org.apache.flume.sink.hbase.AsyncHBaseSink。

Property Name

Default

Description

channel

type

The component type name, needs to be asynchbase

table

The name of the table in Hbase to write to.

columnFamily

The column family in Hbase to write to.

Example for agent named a1:

a1.channels = c1

a1.sinks = k1

a1.sinks.k1.type = asynchbase

a1.sinks.k1.table = foo_table

a1.sinks.k1.columnFamily = bar_cf

a1.sinks.k1.serializer = org.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer

a1.sinks.k1.channel = c1

11、MorphlineSolrSink

该sink从 Flume 事件中提取数据,对其进行转换,并将其近乎实时地加载到 Apache Solr 服务器中,而 Apache Solr 服务器又向最终用户或搜索应用程序提供查询。

此接收器非常适合将原始数据流式传输到 HDFS(通过 HdfsSink)并同时提取、转换和加载相同数据到 Solr(通过 MorphlineSolrSink)的用例。具体而言,此sink可以处理来自不同数据源的任意异构原始数据,并将其转换为对搜索应用程序有用的数据模型。

ETL 功能可使用 morphline 配置文件进行自定义,该文件定义了一系列转换命令,这些命令将事件记录从一个命令传递到另一个命令。

Morphlines 可以看作是 Unix 管道的演变,其中数据模型被推广为处理通用记录流,包括任意二进制有效负载。morphline 命令有点像 Flume Interceptor。Morphlines 可以嵌入到 Hadoop 组件(如 Flume)中

提供用于解析和转换一组标准数据格式(如日志文件、Avro、CSV、文本、HTML、XML、PDF、Word、Excel 等)的命令,并且可以添加用于其他数据格式的其他自定义命令和解析器作为 morphline 插件。可以对任何类型的数据格式进行索引,并且可以生成任何类型的Solr模式的任何Solr文档,并且可以注册和执行任何自定义ETL逻辑。

Morphlines 操作连续的记录流。数据模型可以描述如下:记录是一组命名字段,其中每个字段都有一个或多个值的有序列表。值可以是任何 Java 对象。也就是说,记录本质上是一个哈希表,其中每个哈希表条目都包含一个 String 键和一个 Java 对象列表作为值。(该实现使用 Guava 的 ArrayListMultimap,即 ListMultimap)。请注意,一个字段可以有多个值,并且任何两个记录都不需要使用通用字段名称。

此sink将 Flume 事件的主体填充到 morphline 记录的 _attachment_body 字段中,并将 Flume 事件的标头复制到同名的记录字段中。然后,命令可以对此数据执行操作。

支持路由到 SolrCloud 集群以提高可扩展性。索引负载可以分布在大量 MorphlineSolrSink 上,以提高可伸缩性。索引负载可以在多个MorphlineSolrSinks之间复制,以实现高可用性,例如使用Flume功能,如负载平衡Sink Processor。MorphlineInterceptor 还可以帮助实现到多个 Solr 集合的动态路由(例如,用于多租户)。

您的环境所需的 morphline 和 solr jar 必须放在 Apache Flume 安装的 lib 目录中。

The type is the FQCN: org.apache.flume.sink.solr.morphline.MorphlineSolrSink

Property Name

Default

Description

channel

type

The component type name, needs to be org.apache.flume.sink.solr.morphline.MorphlineSolrSink

morphlineFile

The relative or absolute path on the local file system to the morphline configuration file. Example: /etc/flume-ng/conf/morphline.conf

Example for agent named a1:

a1.channels = c1

a1.sinks = k1

a1.sinks.k1.type = org.apache.flume.sink.solr.morphline.MorphlineSolrSink

a1.sinks.k1.channel = c1

a1.sinks.k1.morphlineFile = /etc/flume-ng/conf/morphline.conf

# a1.sinks.k1.morphlineId = morphline1

# a1.sinks.k1.batchSize = 1000

# a1.sinks.k1.batchDurationMillis = 1000

12、Kafka Sink

可以将数据发布到 Kafka topic。其中一个目标是将 Flume 与 Kafka 集成,以便基于拉取的处理系统可以处理来自各种 Flume 源的数据。

目前支持 Kafka 服务器版本 0.10.1.0 或更高版本。测试完成到 2.0.1,这是发布时最高的可用版本。

Property Name

Default

Description

type

Must be set to org.apache.flume.sink.kafka.KafkaSink

kafka.bootstrap.servers

List of brokers Kafka-Sink will connect to, to get the list of topic partitions This can be a partial list of brokers, but we recommend at least two for HA. The format is comma separated list of hostname:port

kafka Sink 使用 FlumeEvent 标头中的主题和键属性将事件发送到 Kafka。如果标头中存在主题,则事件将发送到该特定主题,覆盖为 Sink 配置的主题。如果标头中存在 key,则 Kafka 将使用该 key 在主题分区之间对数据进行分区。具有相同键的事件将发送到同一分区。如果键为 null,则事件将发送到随机分区。

下面给出了 Kafka 接收器的示例配置。以前缀 kafka.producer 开头的属性,即 Kafka 生产者。创建 Kafka 生产者时传递的属性不限于此示例中给出的属性。此外,还可以在此处包含您的自定义属性,并通过作为方法参数传入的 Flume Context 对象在预处理器中访问它们。

a1.sinks.k1.channel = c1

a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink

a1.sinks.k1.kafka.topic = mytopic

a1.sinks.k1.kafka.bootstrap.servers = localhost:9092

a1.sinks.k1.kafka.flumeBatchSize = 20

a1.sinks.k1.kafka.producer.acks = 1

a1.sinks.k1.kafka.producer.linger.ms = 1

a1.sinks.k1.kafka.producer.compression.type = snappy

Flume 和 Kafka 之间的通信通道支持安全认证和数据加密。对于安全身份验证,可以从 Kafka 版本 0.9.0 开始使用 SASL/GSSAPI (Kerberos V5) 或 SSL(即使参数名为 SSL,实际协议是 TLS 实现)。

截至目前,数据加密仅由 SSL/TLS 提供。

将 kafka.producer.security.protocol 设置为以下任一值意味着:

SASL_PLAINTEXT - Kerberos 或纯文本身份验证,无需数据加密

SASL_SSL - 具有数据加密功能的 Kerberos 或明文身份验证

SSL - 基于 TLS 的加密,具有可选身份验证。

警告 启用 SSL 时性能会下降,其程度取决于 CPU 类型和 JVM 实现。参考:Kafka 安全概述和用于跟踪此问题的 Jira:KAFKA-2561

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/14572.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

AI图书推荐:用100个ChatGPT提示词掌握Python编程

《用100个ChatGPT提示词掌握Python编程》(ChatGPT:Your Python Coach Mastering the Essentials in 100 Prompts) 塞尔吉奥罗哈斯-加莱亚诺(Sergio Rojas-Galeano)是一位热情的计算机科学家,对人工智能、机器学习、进化…

C++中获取int最大与最小值(补)

上文中,我们学习了C中获取int最大与最小值的两种方法:C库和移位运算,这篇文章将解决在移位运算中遇到的各种报错,并提出一种新的生成int最值的方法 上文链接:http://t.csdnimg.cn/cn7Ad 移位运算取最值常见报错 Dev…

汇编语言(STC89C52)

指令是计算机计算CPU根据人的意图来执行某种操作的命令。一台计算机所执行的全部指令的集合,称为这个CPU的指令系统。而想要使计算机按照人们的要求完成一项工作,就必须让CPU按顺序执行预设的操作,即逐条执行人们编写的指令。这种按照人民要求…

C++ 写的_string类,兼容std::string, MFC CString和 C# 的string

代码例子: using namespace lf; int main() { CString s1 _t("http://www.csdn.net"); _string s2 s1; CString s3 s2; _pcn(s1); _pcn(s2); _pcn(s3); return 0; } 输出: _Str.h /***************************************…

网创教程:WordPress插件网创自动采集并发布

网创教程:WordPress插件网创自动采集并发布 使用插件注意事项: 如果遇到404错误,请先检查并调整网站的伪静态设置,这是最常见的问题。需要定制化服务,请随时联系我。 本次更新内容 我们进行了多项更新和优化&#x…

深入解析kube-scheduler的算法自定义插件

目录 ​编辑 一、问题引入 二、自定义步骤 三、最佳实践考虑 一、问题引入 当涉及到 Kubernetes 集群的调度和资源分配时,kube-scheduler 是一个关键组件。kube-scheduler 负责根据集群的调度策略,将 Pod 分配到适当的节点上。kube-scheduler 默认使…

python爬虫学习代码1

百度翻译:利用爬虫技术模拟人工查询英文单词,将查到的信息保存到本地 import requests import json # 1.指定url post_url https://fanyi.baidu.com/sug # 2.UA标识 headers {"User-Agent": Mozilla/5.0 (Windows NT 10.0; Win64; x64) Appl…

pyqt6入门案例

效果预览 hello.ui <?xml version"1.0" encoding"UTF-8"?> <ui version"4.0"><class>Dialog</class><widget class"QDialog" name"Dialog"><property name"geometry"><…

android studio接入facebook踩坑1

今天在接入facebook第三方登录的时候&#xff0c;点击登录按钮&#xff0c;APP闪退&#xff0c;并报错 java.lang.RuntimeException Failure delivering result ResultInfo{whonull,request64206,result-1} 新文章链接https://lengmo714.top/facebook1.html 如下图&#xff1a;…

OpenGL学习入门及开发环境搭建

最近学习OpenGL开发&#xff0c;被各种openGL库搞得晕头转向&#xff0c;什么glut, glew glfw glad等等。 可以参考这边博客:OpenGL 下面的 glut freeglut glfw 都是个啥_glx wgl的中文-CSDN博客 glfw是glut的升级版&#xff0c;跨平台的主要处理窗口 事件相关。 glad是glew…

React项目知识积累(四)

1.useMemo( ) 在 React 中&#xff0c;useMemo 是一个 Hook&#xff0c;用于记忆计算结果&#xff0c;只有当依赖项之一发生变化时&#xff0c;才会重新计算。这有助于避免不必要的计算和渲染&#xff0c;从而提高应用程序的性能。 基本语法如下&#xff1a; const memoized…

html多节点生成图片并导出zip包

html多节点生成图片并导出zip包 背景 在做项目时遇到一个要将html节点展示的图片列表统一导出为zip包的需求。 难点 将html节点生成图片将多张图片加入zip包中&#xff0c;然后下载 解决html生成图片问题 参考html截图的思路使用 pnpm add html-to-image如何将图片资源生成z…

鸿蒙OS开发:【一次开发,多端部署】(多设备自适应能力)简单介绍

多设备自适应能力 介绍 本示例是《一次开发&#xff0c;多端部署》的配套示例代码&#xff0c;展示了[页面开发的一多能力]&#xff0c;包括自适应布局、响应式布局、典型布局场景以及资源文件使用。 名称简介 开发前请熟悉鸿蒙开发指导文档&#xff1a;gitee.com/li-shizhe…

数据可视化技术头歌测试合集

努力是为了不平庸~ 学习的最大理由是想摆脱平庸&#xff0c;早一天就多一份人生的精彩&#xff1b;迟一天就多一天平庸的困扰 目录 时间趋势可视化-柱形图 第1关&#xff1a;“大胃王”比赛数据柱形图绘制——绘制柱形图的基本步骤 任务描述 相关知识 观察和处理数据 绘…

Linux中gcc/g++的基本使用

目录 gcc/g的使用gcc/g是如何生成可执行文件的预处理编译汇编链接 库.o文件是如何与库链接的&#xff1f; debug版本和release版本 gcc/g的使用 在windows中&#xff0c;我们在VS中编写好了代码之后就可以直接在VS中对源码进行编译等操作后运行 而在Linux下&#xff0c;我们可…

LeetCode 279 —— 完全平方数

阅读目录 1. 题目2. 解题思路3. 代码实现 1. 题目 2. 解题思路 此图利用动态规划进行求解&#xff0c;首先&#xff0c;我们求出小于 n n n 的所有完全平方数&#xff0c;存放在数组 squareNums 中。 定义 dp[n] 为和为 n n n 的完全平方数的最小数量&#xff0c;那么有状态…

vue 展示svg矢量图可缩放拖动

使用插件&#xff1a;svg-pan-zoom <template> <!-- svg图--><div id"svgContainer"></div> </template><script> import svgPanZoom from svg-pan-zoom import svgFile from ../datav/img/220kVscb.svg // 路径根据实际情况调…

MySQL存储过程实现累加运算 1+2+…+n 等于多少?

MySQL创建存储过程&#xff0c;实现累加运算&#xff0c;计算 12…n 等于多少。具体的代码如下 1、实现计算123…n的和 DELIMITER // CREATE PROCEDURE sp_add_sum_num(IN n INT) BEGIN DECLARE i INT; DECLARE sum INT; SET i 1; SET sum 0;WHILE i < n DO SET sum …

若依框架实战指南:从入门到精通

在当今快节奏的软件开发环境中&#xff0c;选择一个高效、可靠的开发框架至关重要。若依框架&#xff08;RuoYi&#xff09;作为一个基于Spring Boot和MyBatis的快速开发平台&#xff0c;以其强大的功能和易用性受到了广泛欢迎。本文将详细介绍若依框架的使用方式&#xff0c;包…

计算机组成结构—中断和异常

一、基本概念和分类 计算机在执行程序的过程中&#xff0c;有时会遇到一些异常情况或者特殊请求&#xff1b;这时就需要计算机暂停正在运行的程序&#xff0c;转而先去处理这些异常或特殊请求&#xff0c;处理结束之后再返回程序的断点处继续执行。这种处理方式就被称为 “中断…