Flume之核心架构深入解析

 

我们一起来了解Source、Channel和Sink的全链路过程。

一、Flume架构分析

这个图中核心的组件是:

Source,ChannelProcessor,Channel,Sink。他们的关系结构如下:

Source  {ChannelProcessor  {Channel  ch1Channel  ch2…}
} 
Sink  {Channel  ch; 
} 
SinkGroup {Channel ch;Sink s1;Sink s2;…
}

二、各组件详细介绍

1、Source组件

Source是数据源的总称,我们往往设定好源后,数据将源源不断的被抓取或者被推送。

常见的数据源有:ExecSource,KafkaSource,HttpSource,NetcatSource,JmsSource,AvroSource等等。

所有的数据源统一实现一个接口类如下:

@InterfaceAudience.Public
@InterfaceStability.Stable public interface Source extends LifecycleAware, NamedComponent { /** * Specifies which channel processor will handle this source's events. * * @param channelProcessor */ public void setChannelProcessor(ChannelProcessor channelProcessor); /** * Returns the channel processor that will handle this source's events. */ public ChannelProcessor getChannelProcessor(); } 

Source提供了两种机制: PollableSource(轮询拉取)和EventDrivenSource(事件驱动):

上图展示的Source继承关系类图。

通过类图我们可以看到NetcatSource,ExecSource和HttpSource属于事件驱动模型。KafkaSource,SequenceGeneratorSource和JmsSource属于轮询拉取模型。

Source接口继承了LifecycleAware接口,它的的所有逻辑的实现在接口的start和stop方法中进行。

下图是类关系方法图:

Source接口定义的是最终的实现过程,比如通过日志抓取日志,这个抓取的过程和实际操作就是在对应的Source实现中,比如:ExecSource。那么这些Source实现由谁来驱动的呢?现在我们将介绍SourceRunner类。看一下类继承结构图:

我们看一下PollableSourceRunner和EventDrivenSourceRunner的具体实现:

//PollableSourceRunner:
public void start() { PollableSource source = (PollableSource) getSource(); ChannelProcessor cp = source.getChannelProcessor(); cp.initialize(); source.start(); runner = new PollingRunner(); runner.source = source; //Source实现类就在这里被赋与。 runner.counterGroup = counterGroup; runner.shouldStop = shouldStop; runnerThread = new Thread(runner); runnerThread.setName(getClass().getSimpleName() + "-" + source.getClass().getSimpleName() + "-" + source.getName()); runnerThread.start(); lifecycleState = LifecycleState.START; } //EventDrivenSourceRunner: @Override public void start() { Source source = getSource(); ChannelProcessor cp = source.getChannelProcessor(); cp.initialize(); source.start(); lifecycleState = LifecycleState.START; } 

注:其实所有的Source实现类内部都维护着线程,执行source.start()其实就是启动了相应的线程。

刚才我们看代码,代码中一直都在展示channelProcessor这个类,同时最上面架构设计图里面也提到了这个类,那它到底是干什么呢,下面我们就对其分解。

2、Channel组件

Channel用于连接Source和Sink,Source将日志信息发送到Channel,Sink从Channel消费日志信息;Channel是中转日志信息的一个临时存储,保存有Source组件传递过来的日志信息。

先看代码如下:

ChannelSelectorConfiguration selectorConfig = config.getSelectorConfiguration(); ChannelSelector selector = ChannelSelectorFactory.create(sourceChannels, selectorConfig); ChannelProcessor channelProcessor = new ChannelProcessor(selector); Configurables.configure(channelProcessor, config); source.setChannelProcessor(channelProcessor); 

ChannelSelectorFactory.create方法实现如下:

public static ChannelSelector create(List<Channel> channels, ChannelSelectorConfiguration conf) { String type = ChannelSelectorType.REPLICATING.toString(); if (conf != null){ type = conf.getType(); } ChannelSelector selector = getSelectorForType(type); selector.setChannels(channels); Configurables.configure(selector, conf); return selector; } 

其中我们看一下ChannelSelectorType这个枚举类,包括了几种类型:

public enum ChannelSelectorType {/** * Place holder for custom channel selectors not part of this enumeration. */ OTHER(null), /** * 复用通道选择器 */ REPLICATING("org.apache.flume.channel.ReplicatingChannelSelector"), /** * 多路通道选择器 */ MULTIPLEXING("org.apache.flume.channel.MultiplexingChannelSelector"); } 

ChannelSelector的类结构图如下所示:

注:RelicatingChannelSelector和MultiplexingChannelSelector是二个通道选择器,第一个是复用型通道选择器,也就是的默认的方式,会把接收到的消息发送给其他每个channel。第二个是多路通道选择器,这个会根据消息header中的参数进行通道选择。

说完通道选择器,正式来解释Channel是什么,先看一个接口类:

public interface Channel extends LifecycleAware, NamedComponent { public void put(Event event) throws ChannelException; public Event take() throws ChannelException; public Transaction getTransaction(); } 

注:put方法是用来发送消息,take方法是获取消息,transaction是用于事务操作。

类结构图如下:

3、Sink组件

Sink负责取出Channel中的消息数据,进行相应的存储文件系统,数据库,或者提交到远程服务器。

Sink在设置存储数据时,可以向文件系统中,数据库中,hadoop中储数据,在日志数据较少时,可以将数据存储在文件系中,并且设定一定的时间间隔保存数据。在日志数据较多时,可以将相应的日志数据存储到Hadoop中,便于日后进行相应的数据分析。

Sink接口类内容如下:

public interface Sink extends LifecycleAware, NamedComponent { public void setChannel(Channel channel); public Channel getChannel(); public Status process() throws EventDeliveryException; public static enum Status { READY, BACKOFF } } 

Sink是通过如下代码进行的创建:

Sink sink = sinkFactory.create(comp.getComponentName(), comp.getType()); 

DefaultSinkFactory.create方法如下:

public Sink create(String name, String type) throws FlumeException { Preconditions.checkNotNull(name, "name"); Preconditions.checkNotNull(type, "type"); logger.info("Creating instance of sink: {}, type: {}", name, type); Class<? extends Sink> sinkClass = getClass(type); try { Sink sink = sinkClass.newInstance(); sink.setName(name); return sink; } catch (Exception ex) { System.out.println(ex); throw new FlumeException("Unable to create sink: " + name + ", type: " + type + ", class: " + sinkClass.getName(), ex); } } 

注:Sink是通过SinkFactory工厂来创建,提供了DefaultSinkFactory默认工厂,程序会查找org.apache.flume.conf.sink.SinkType这个枚举类找到相应的Sink处理类,比如:org.apache.flume.sink.LoggerSink,如果没找到对应的处理类,直接通过Class.forName(className)进行直接查找实例化实现类。

Sink的类结构图如下:

与ChannelProcessor处理类对应的是SinkProcessor,由SinkProcessorFactory工厂类负责创建,SinkProcessor的类型由一个枚举类提供,看下面代码:

public enum SinkProcessorType {/** * Place holder for custom sinks not part of this enumeration. */ OTHER(null), /** * 故障转移 processor * * @see org.apache.flume.sink.FailoverSinkProcessor */ FAILOVER("org.apache.flume.sink.FailoverSinkProcessor"), /** * 默认processor * * @see org.apache.flume.sink.DefaultSinkProcessor */ DEFAULT("org.apache.flume.sink.DefaultSinkProcessor"), /** * 负载processor * * @see org.apache.flume.sink.LoadBalancingSinkProcessor */ LOAD_BALANCE("org.apache.flume.sink.LoadBalancingSinkProcessor"); private final String processorClassName; private SinkProcessorType(String processorClassName) { this.processorClassName = processorClassName; } public String getSinkProcessorClassName() { return processorClassName; } } 

SinkProcessor的类结构图如下:

说明:

1、FailoverSinkProcessor是故障转移处理器,当sink从通道拿数据信息时出错进行的相关处理,代码如下:

public Status process() throws EventDeliveryException { // 经过了冷却时间,再次发起重试 Long now = System.currentTimeMillis(); while(!failedSinks.isEmpty() && failedSinks.peek().getRefresh() < now) { //从失败队列中获取sink节点 FailedSink cur = failedSinks.poll(); Status s; try { //调用相应sink进行处理,比如将channel的数据读取存放到文件中, //这个存放文件的动作就在process中进行。 s = cur.getSink().process(); if (s == Status.READY) { //如果处理成功,则放到存活队列中 liveSinks.put(cur.getPriority(), cur.getSink()); activeSink = liveSinks.get(liveSinks.lastKey()); logger.debug("Sink {} was recovered from the fail list", cur.getSink().getName()); } else { // if it's a backoff it needn't be penalized. //如果处理失败,则继续放到失败队列中 failedSinks.add(cur); } return s; } catch (Exception e) { cur.incFails(); failedSinks.add(cur); } } Status ret = null; while(activeSink != null) { try { ret = activeSink.process(); return ret; } catch (Exception e) { logger.warn("Sink {} failed and has been sent to failover list", activeSink.getName(), e); activeSink = moveActiveToDeadAndGetNext(); } } 

2、LoadBalancingSinkProcessor是负载Sink处理器

首先我们和ChannelProcessor一样,我们也要重点说明一下SinkSelector这个选择器。

先看一下SinkSelector.configure方法的部分代码:

if (selectorTypeName.equalsIgnoreCase(SELECTOR_NAME_ROUND_ROBIN)) { selector = new RoundRobinSinkSelector(shouldBackOff); } else if (selectorTypeName.equalsIgnoreCase(SELECTOR_NAME_RANDOM)) { selector = new RandomOrderSinkSelector(shouldBackOff); } else { try { @SuppressWarnings("unchecked") Class<? extends SinkSelector> klass = (Class<? extends SinkSelector>) Class.forName(selectorTypeName); selector = klass.newInstance(); } catch (Exception ex) { throw new FlumeException("Unable to instantiate sink selector: " + selectorTypeName, ex); } } 

结合上面的代码,再看类结构图如下:

注:RoundRobinSinkSelector是轮询选择器,RandomOrderSinkSelector是随机分配选择器。

最后我们以KafkaSink为例看一下Sink里面的具体实现:

public Status process() throws EventDeliveryException { Status result = Status.READY; Channel channel = getChannel(); Transaction transaction = null; Event event = null; String eventTopic = null; String eventKey = null; try { long processedEvents = 0; transaction = channel.getTransaction(); transaction.begin(); messageList.clear(); for (; processedEvents < batchSize; processedEvents += 1) { event = channel.take(); if (event == null) { // no events available in channel break; } byte[] eventBody = event.getBody(); Map<String, String> headers = event.getHeaders(); if ((eventTopic = headers.get(TOPIC_HDR)) == null) { eventTopic = topic; } eventKey = headers.get(KEY_HDR); if (logger.isDebugEnabled()) { logger.debug("{Event} " + eventTopic + " : " + eventKey + " : " + new String(eventBody, "UTF-8")); logger.debug("event #{}", processedEvents); } // create a message and add to buffer KeyedMessage<String, byte[]> data = new KeyedMessage<String, byte[]> (eventTopic, eventKey, eventBody); messageList.add(data); } // publish batch and commit. if (processedEvents > 0) { long startTime = System.nanoTime(); producer.send(messageList); long endTime = System.nanoTime(); counter.addToKafkaEventSendTimer((endTime-startTime)/(1000*1000)); counter

转载于:https://www.cnblogs.com/hd-zg/p/5975399.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/470007.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

java连接cdh集群_有一台电脑需要通过beeline的方式连接到CDHclouderahadoop集群,提示报错。...

有一台电脑需要通过beeline的方式连接到CDHclouderahadoop集群&#xff0c;提示报错。码农的苦恼2020-02-24 10:25:48目前客户端服务器已经开通了3000-60000的端口访问权限&#xff0c;请问还需要开通哪些权限。谢谢&#xff01;17/11/14 09:19:40 WARN conf.HiveConf: DEPRECA…

最长公共子序列及其引申问题

最长公共子序列是经典的动态规划问题&#xff0c;在很多书籍和文章中都有介绍&#xff0c;这里对这一经典算法进行回顾并对两个follow up questions进行总结和分析。 1. 回顾LCS&#xff08;longest common subsequence&#xff09;解法&#xff0c;求LCS长度 典型的双序列动态…

怎么用java实现打字功能_怎么用JAVA编写一个打字游戏

该楼层疑似违规已被系统折叠 隐藏此楼查看此楼这个帖子是一年前发的&#xff0c;当时对Java也不是很懂&#xff0c;我看大家都对这个感兴趣&#xff0c;我把代码改了一下&#xff0c;发布出来&#xff0c;其实还有很多的地方还没有完善...&#xff0c;这个只能算一个Demo&#…

pythonfor循环列表排序_Python Day4950(for循环语句整理)

班长的图Python for循环可以遍历任何序列的项目&#xff0c;如一个列表或者一个字符串。一、Python 循环遍历列表元素1.for i in list():2.for i in enumerate(list):2.for i in range(len(list)):for i in list():for i in enumerate(list):for i in range(len(list)):二、Pyt…

Java异常处理深入理解_关于java异常处理机制的深入理解.doc

关于java异常处理机制的深入理解.doc 关于JAVA异常处理机制的深入理解1引子TRYCATCHFINALLY恐怕是大家再熟悉不过的语句了&#xff0c;而且感觉用起来也是很简单&#xff0c;逻辑上似乎也是很容易理解。不过&#xff0c;我亲自体验的“教训”告诉我&#xff0c;这个东西可不是想…

java 格式化 布尔型_这么久才知道Java中的format很强大!

Java中允许我们对指定的对象进行某种格式化&#xff0c;从而得到我们想要的格式化样式。Format首先介绍java.text包中的FormatForamt是一个抽象基类&#xff0c;其具体子类必须实现format(Object obj, StringBuffer toAppendTo, FieldPosition pos)和parseObject(String source…

【腾讯优测干货分享】从压测工具谈并发、压力、吞吐量

本文来自于腾讯bugly开发者社区&#xff0c;非经作者同意&#xff0c;请勿转载&#xff0c;原文地址&#xff1a;http://dev.qq.com/topic/580d914e07b7fc1c26a0cf7c 前言 随着部门业务的拓展&#xff0c;我们有了很多性能测试的机会&#xff0c;但在实战中&#xff0c;慢慢发现…

MySQL5.6 PERFORMANCE_SCHEMA 说明

背景&#xff1a; MySQL 5.5开始新增一个数据库&#xff1a;PERFORMANCE_SCHEMA&#xff0c;主要用于收集数据库服务器性能参数。并且库里表的存储引擎均为PERFORMANCE_SCHEMA&#xff0c;而用户是不能创建存储引擎为PERFORMANCE_SCHEMA的表。MySQL5.5默认是关闭的&#xff0c;…

04_类与对象_课程动手动脑问题以及课后实验性问题及解答集锦

Answer: 动手动脑&#xff1a; 1——以下代码为何无法通过编译&#xff1f;哪儿出错了&#xff1f; Answer: 因为类Foo的构造函数是有一个参数的&#xff0c;所以我们在new一个Foo类的对象时必须赋予一个符合条件的实参。 2—— 请运行TestStaticInitializeBlock.java示例&…

php如果能编译就完美了,centos7 完美编译PHP7 php-7.2.10.tar.gz

1.下载去官网下载。2、上传并解压tar -zxvf php-7.2.10.tar.gz3、进入文件夹cd php-7.2.104、安装相关依赖包yum install pcre pcre-devel zlib zlib-devel openssl openssl-devel gd gd-devel libjpeg libjpeg-devel libpng libpng-devel freetype freetype-devel e2fsprogs e…

2017年php还能火多久,PHP还会火吗?

据不完全数据得知&#xff0c;我国对PHP人才非常紧缺&#xff0c;大约每年有50万人左右。伴随着近几年信息化&#xff0c;智能化&#xff0c;网络化的发展&#xff0c;PHP的发展前景也是不可估量的&#xff0c;那么&#xff0c;你知道是什么影响PHP继续火热的吗?下面我们就来分…

[JZOJ P1288] [DP]矩阵取数

kaike 传送门 07年noipT3&#xff1f; 要我写我肯定放弃 嗯没错就是这么果断 据说要 高精 DP 状态&#xff1f; 举例说明&#xff0c;假设有矩阵 a1,a2,a3,a4....an b1,b2,b3,b4....bn 假设矩阵的最大得分取法为 a1*2b1*2a2*4b2*4a3*8b3*8.....an*2^nb2*2^n&#xff1b; 可以转…

Linux命令入门

// 查看日历cal // 修改密码passwd // 查看目录和文件ls -lls // 查看当前用户信息whoami // 查看当前在线用户userswho 在Linux中&#xff0c;可以使用 vi 编辑器创建一个文本文件&#xff0c;例如&#xff1a;$ vi filename上面的命令会创建文件 filename 并打开&#xff0c;…

Bug2算法的实现(RobotBASIC环境中仿真)

移动机器人智能的一个重要标志就是自主导航&#xff0c;而实现机器人自主导航有个基本要求——避障。之前简单介绍过Bug避障算法&#xff0c;但仅仅了解大致理论而不亲自动手实现一遍很难有深刻的印象&#xff0c;只能说似懂非懂。我不是天才&#xff0c;不能看几遍就理解理论中…

策略模式场景举例

容错恢复机制 容错恢复机制是应用程序开发中非常常见的功能。那么什么是容错恢复呢&#xff1f;简单点说就是&#xff1a;程序运行的时候&#xff0c;正常情况下应该按照某种方式来做&#xff0c;如果按照某种方式来做发生错误的话&#xff0c;系统并不会崩溃&#xff0…

php写抢票脚本,火车票抢票python代码公开揭秘!

市场上很多火车票抢票软件大家应该非常熟悉&#xff0c;但很少有人研究具体是怎么实现的&#xff0c;所以觉得很神秘&#xff0c;其实很简单。下面使用Python模拟抢票程序&#xff0c;给大家揭秘抢票到底是怎么回事。该代码仅供参考&#xff0c;主要用于大家沟通交流&#xff0…

代理模式——HeadFirst设计模式学习笔记

代理模式&#xff1a;为另一个对象提供一个替身或占位符控制这个对象的访问 特点&#xff1a; 让代理对象控制对象的访问&#xff0c;被代理对象可以是远程对象&#xff08;远程代理&#xff09;&#xff0c;创建开销较大对象&#xff08;虚拟代理&#xff09;&#xff0c;或需…

cursor用法java,Cursor的基本使用方法

Cursor的基本使用方法今天在用到Cursor的时候发现&#xff0c;有很多游标相关的知识还是有欠缺&#xff0c;在网上搜了篇基础讲解的文&#xff0c;觉得还不错&#xff0c;自己整理了一下发上来。虽然很基础&#xff0c;但是有一些内容之前确实没有很扎实得掌握&#xff0c;所以…

win7(64位)php5.5-Apache2.4-mysql5.6环境安装

win7&#xff08;64位&#xff09;安装搭建 php-5.5.10 apache2.4.7 mysql-5.6.16 环境 工具/原料 php-5.5.10-Win32-VC11-x64.zip 下载地址: http://windows.php.net/download/ httpd-2.4.7-win64-VC11.zip 下载地址: http://www.apachelounge.com/download/ mysql-5.6.16-win…

php开发环境 ubuntu,Ubuntu配置PHP开发环境

开发环境安装目前web服务器有很多&#xff0c;本文安装Apache服务器&#xff1b;本文使用的服务器是Mysql服务器。sudo apt install apache2常用命令重启Apache&#xff1a;sudo /etc/init.d/apache2 restart重启php&#xff1a;sudo /etc/init.d/php-fapm restart配置apache服…