使用Apache Storm和Kite SDK Morphlines的可配置ETL处理

从我担任软件工程师的第一天起,我总是听到很多方面相同要求:

我们希望所有内容都可配置,我们希望在运行时更改所有内容,我们希望有一个可视化工具来应用所有这些逻辑,以便非开发人员使用和配置我们的应用程序。

我也喜欢这种通用范围,但是众所周知,软件系统的适应性不强,客户的需求也不稳定。

在过去的几年中,我们已经使用传统的框架/技术(JMX,分布式缓存,Spring或JEE等)构建了此类可配置应用程序(并非100%可配置)。

近年来,我们的体系结构中还必须包含一个附加概念,这就是大数据 (或3V或4V或任何更合适的词)的概念。 这个概念淘汰了我们熟悉并在旧的3层应用程序中应用的各种解决方案或变通方法。

有趣的是,我很多次都和十年前一样。 这是软件开发的规则,它永远不会结束,因此个人才能和新冒险也永远不会结束:-)

主要问题仍然是相同的,即如何构建可配置的ETL分布式应用程序

因此,我建立了一个小型的适应性强的解决方案,该解决方案在许多用例中可能会有所帮助。 我在大数据世界中使用了3种常用工具: JavaApache StormKite SDK Morplines 。 Java是主要的编程语言, Apache Storm是分布式流处理引擎,而Kite SDK Morphlines是可配置的ETL引擎。

风筝SDK Morplines

从其描述复制而来: Morphlines是一个开源框架,它减少了构建和更改Hadoop ETL流处理应用程序所需的时间和精力,该应用程序可将数据提取,转换并加载到Apache Solr,HBase,HDFS,Enterprise Data Warehouse或Analytic Online Dashboards中。 morphline是一个丰富的配置文件,可以轻松定义一个转换链,该转换链可以使用来自任何类型数据源的任何类型的数据,处理数据并将结果加载到Hadoop组件中。 它用简单的配置步骤代替了Java编程,并相应地减少了与开发和维护定制ETL项目相关的成本和集成工作。

除了内置命令外 ,您还可以轻松实现自己的命令 ,并在吗啉配置文件中使用它。

示例Morphline配置读取一个JSON字符串,解析它,然后只记录一个特定的JSON元素:

morphlines : [{id : json_terminal_logimportCommands : ["org.kitesdk.**"]commands : [# read the JSON blob{ readJson: {} }# extract JSON objects into head fields{ extractJsonPaths {flatten: truepaths: {name: /nameage: /age}} }# log data{ logInfo {format : "name: {}, record: {}"args : ["@{name}", "@{}"]}}]
}]

风暴变身螺栓

为了在Storm中使用Morphlines,我实现了一个自定义MorphlinesBolt 。 该螺栓的主要职责是:

  • 通过配置文件初始化Morphlines处理程序
  • 初始化映射说明:
    a)从元组到吗啉输入,以及
    b)从Morphline输出到新的输出元组
  • 使用已初始化的Morplines上下文处理每个传入事件
  • 如果Bolt不是Terminal ,则使用提供的Mapper (类型“ b”),使用Morphline执行的输出发出一个新的Tuple。

简单的可配置ETL拓扑

为了测试自定义MorphlinesBolt ,我编写了2个简单的测试。 在这些测试中,您可以看到MorphlinesBolt是如何初始化的,然后是每次执行的结果。 作为输入,我使用了一个自定义的Spout(RandomJsonTestSpout),它仅每100毫秒发出一次新的JSON字符串(可配置)。

DummyJsonTerminalLogTopology

一个简单的拓扑 ,该拓扑通过配置文件和每个传入的元组的执行Morphline处理程序来配置Morphline上下文。 在此拓扑上, MorphlinesBolt被配置为终端螺栓,这意味着对于每个输入Tuple不会发出新的Tuple。

public class DummyJsonTerminalLogTopology {public static void main(String[] args) throws Exception {Config config = new Config();RandomJsonTestSpout spout = new RandomJsonTestSpout().withComplexJson(false);String2ByteArrayTupleMapper tuppleMapper = new String2ByteArrayTupleMapper();tuppleMapper.configure(CmnStormCons.TUPLE_FIELD_MSG);MorphlinesBolt morphBolt = new MorphlinesBolt().withTupleMapper(tuppleMapper).withMorphlineId("json_terminal_log").withMorphlineConfFile("target/test-classes/morphline_confs/json_terminal_log.conf");TopologyBuilder builder = new TopologyBuilder();builder.setSpout("WORD_SPOUT", spout, 1);builder.setBolt("MORPH_BOLT", morphBolt, 1).shuffleGrouping("WORD_SPOUT");if (args.length == 0) {LocalCluster cluster = new LocalCluster();cluster.submitTopology("MyDummyJsonTerminalLogTopology", config, builder.createTopology());Thread.sleep(10000);cluster.killTopology("MyDummyJsonTerminalLogTopology");cluster.shutdown();System.exit(0);} else if (args.length == 1) {StormSubmitter.submitTopology(args[0], config, builder.createTopology());} else {System.out.println("Usage: DummyJsonTerminalLogTopology <topology_name>");}}
}

DummyJson2StringTopology

一个简单的拓扑 ,该拓扑通过配置文件和每个传入的元组的执行Morphline处理程序来配置Morphline上下文。 在此拓扑上, MorphlinesBolt被配置为普通螺栓,这意味着对于每个输入Tuple,它都会发出一个新的Tuple。

public class DummyJson2StringTopology {public static void main(String[] args) throws Exception {Config config = new Config();RandomJsonTestSpout spout = new RandomJsonTestSpout().withComplexJson(false);String2ByteArrayTupleMapper tuppleMapper = new String2ByteArrayTupleMapper();tuppleMapper.configure(CmnStormCons.TUPLE_FIELD_MSG);MorphlinesBolt morphBolt = new MorphlinesBolt().withTupleMapper(tuppleMapper).withMorphlineId("json2string").withMorphlineConfFile("target/test-classes/morphline_confs/json2string.conf")//.withOutputProcessors(Arrays.asList(resultRecordHandlers));.withOutputFields(CmnStormCons.TUPLE_FIELD_MSG).withRecordMapper(RecordHandlerFactory.genDefaultRecordHandler(String.class, new JsonNode2StringResultMapper()));LoggingBolt printBolt = new LoggingBolt().withFields(CmnStormCons.TUPLE_FIELD_MSG);TopologyBuilder builder = new TopologyBuilder();builder.setSpout("WORD_SPOUT", spout, 1);builder.setBolt("MORPH_BOLT", morphBolt, 1).shuffleGrouping("WORD_SPOUT");builder.setBolt("PRINT_BOLT", printBolt, 1).shuffleGrouping("MORPH_BOLT");if (args.length == 0) {LocalCluster cluster = new LocalCluster();cluster.submitTopology("MyDummyJson2StringTopology", config, builder.createTopology());Thread.sleep(10000);cluster.killTopology("MyDummyJson2StringTopology");cluster.shutdown();System.exit(0);} else if (args.length == 1) {StormSubmitter.submitTopology(args[0], config, builder.createTopology());} else {System.out.println("Usage: DummyJson2StringTopology <topology_name>");}}
}

最后的想法

MorphlinesBolt可以用作任何可配置ETL“解决方案”的一部分(作为单处理Bolt,作为终端Bolt,作为复杂管道的一部分,等等)。

morphlines_storm_topology_examples

在github中的示例项目集中,源代码作为Maven模块( sv-etl-storm-morphlines )提供。

最好的组合是将MorphlinesBolt与Flux一起使用。 这可能会为您提供完全可配置的ETL拓扑!!!
我还没有添加为选项,以便保持较少的依赖关系(我可以添加范围“ test”)。

该模块不是最终模块,我将尝试对其进行改进,因此许多人会在第一个实现中发现各种错误。

对于任何其他想法或说明,请写评论:)

这是我2016年的第一篇文章! 希望您身体健康,思想和行动更好。 一切的第一项美德/价值是人类以及对我们所生活的环境(社会,地球,动物,植物等)的尊重。 所有其他都是次要优先事项,不应破坏优先事项所隐含的内容。 始终牢记最重要的美德,并在您采取的任何行动或思想中考虑它们。

翻译自: https://www.javacodegeeks.com/2016/01/configurable-etl-processing-using-apache-storm-kite-sdk-morphlines.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/355848.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

oracle 闪回操作(flashback)

234390216 的留下学习 原文地址 http://haohaoxuexi.iteye.com/blog/1594391 Oracle的闪回功能可以在对数据库进行不完全恢复的情况下&#xff0c;对某一个指定的表进行恢复。闪回数据库是进行时间点恢复的新方法&#xff0c;它能够快速将Oracle恢复到以前的时间&#xff0c;以…

用法与区别_生抽老抽、蚝油味极鲜,总算搞清楚区别了,用法大不同,别用错了...

生抽老抽、蚝油味极鲜&#xff0c;总算搞清楚区别了&#xff0c;用法大不同&#xff0c;别再用错了各位读者朋友们大家好&#xff0c;感谢阅读我分享的美食文章&#xff0c;经验和大家一起共享&#xff0c;今天我要和大家分享的内容是&#xff1a;『生抽老抽、蚝油味极鲜&#…

centos yum安装_centos7上yum安装碰到的坑

估计是之前更改过这个虚机的配置&#xff0c;故现在正常步骤安装完yum之后&#xff0c;无法使用&#xff0c;具体表现在 执行yum makecache的时候报错&#xff0c;现在no module name yum结果发现&#xff0c;无论怎么修改 usr/bin/yum的配置文件&#xff0c;都无法成功解决上面…

poj 1330 Nearest Common Ancestors LCA/DFS

题目链接: http://poj.org/problem?id1330 题意: 求出两点间的最近公共祖先。 题解: 第一种&#xff1a; 并查集维护&#xff1a;http://www.cnblogs.com/procedure2012/archive/2012/01/29/2331468.html 利用并查集在每次对子树进行遍历时进行合并&#xff0c;因为对以x为根…

设计模式的Java 8 Lambda表达式–装饰器设计模式

Decorator模式&#xff08;也称为Wrapper &#xff09;允许将行为静态或动态地添加到单个对象&#xff0c;而不会影响同一类中其他对象的行为。 可以将其视为子类的替代方法。 我们知道子类在编译时会增加行为&#xff0c;并且更改会影响原始类的所有实例。 另一方面&#xff0…

ie浏览器网页版进入_IE浏览器打开网页提示无法打开Internet站点的解决办法

IE浏览器打开网页时提示Internet explorer无法打开站点&#xff0c;这是什么问题&#xff1f;Internet explorer打不开网页提示无法打开站点怎么办&#xff1f;请看下文五种解决办法。方法一&#xff1a;管理加载项打开浏览器—工具—Internet选项这时会出现“Internet选项”对…

epic转移游戏_Epic游戏商城更改退款政策 和steam一模一样

游侠网关注我们&#xff0c;获得最快的游戏资讯Epic正在打造自己的数字游戏商城&#xff0c;一方面对开发者提供更慷慨的销售分成&#xff0c;另一方面对玩家提供每两个月更新一次的免费游戏。虽然Epic游戏商城中的作品数量还在起步阶段&#xff0c;也缺少一些关键的常用功能&a…

计算机组成原理实验三报告,计算机组成原理实验三报告

计算机组成原理实验三报告 实 验 报 告 三课 程 计算机组成原理 姓 名 学 号实验项目 存储器实验 同组姓名 学 号指导教师 专业班级 计算机科学与技术 09 实验时间 2011-6-6实验三 存储器实验一、实验目的1&#xff0e;掌握存储器的功能和构成。2&#xff0e;了解静态随机存储器…

防盗Java EE –保护您的Java EE企业应用程序

redev离我们仅有几天的路程&#xff0c;我受邀作了两次演讲。 其中之一是关于我最喜欢的主题&#xff1a;安全性和Java EE。 它旨在实现两个目标。 一方面向典型的Java EE开发人员介绍整个应用程序安全过程和主要目标。 而且还要查看有关Java EE在满足典型需求时必须提供的内容…

wsl nvidia驱动_WIN10安装NVIDIA面板两种方式

无应用商店安装NVIDIA面板进入NVIDIA官网下载标准版驱动https://www.nvidia.cn/Download/Find.aspx?langcn​www.nvidia.cn驱动类型选择标准下载的名称选择NVIDIA Studoio Driver SD这样下载出来的驱动就带有NVIDIA面板控制面有应用商店但是下载不动的可以使用下面的方式打开网…

Python之内置函数

Python内置函数 #1、语法 # eval(str,[,globasl[,locals]]) # exec(str,[,globasl[,locals]])#2、区别 #示例一&#xff1a; s123 print(eval(s)) #eval用来执行表达式&#xff0c;并返回表达式执行的结果 print(exec(s)) #exec用来执行语句&#xff0c;不会返回任何值6 None …

latex 无穷_《天龙3D》新资料片“骑乐无穷”即将上线

驭风逐战&#xff0c;骑乐无穷。11月12日金庸正版授权、全民第一武侠RPG手游《天龙3D》新资料片“骑乐无穷”即将上线!全新坐骑装备系统开启&#xff0c;升星养成坐骑装备;全新坐骑相关副本四绝夺魁&#xff0c;多人闯关PVP、PVE组队竞技;元旦主题月即将开启&#xff0c;趣味游…

广州大学计算机学院毕业设计,【广州大学】毕业设计(计算机科学与技术)专业要求...

广州大学成人高等教育毕业设计专业要求【专业名称】计算机科学与技术【适用范围】非学位论文【执笔者】谷岩【完成形式】个人独立【写作形式】毕业设计【写作要求】1&#xff0e;目标毕业设计是计算机科学与技术专业人才培养的重要环节。其主要目标是培养学生综合应用计算机科学…

实验二+140+阮晨曦

---恢复内容开始--- 一、实验目的 掌握覆盖测试的基本方法和实践 二、实验要求 运用逻辑覆盖测试的覆盖准则设计被测程序的测试用例&#xff0c;并运行测试用例检查程序的正确与否&#xff0c;给出程序缺陷小结。 三、实验内容 &#xff08;1&#xff09;设计某程序的路径覆盖测…

hadoop重命名文件_Hadoop -- 3. 从Flume到HDFS

提起Flume, 就先讲一下它的基本作用, 它可以从不同的数据源导入到一个集中的地方存放起来,基本架构如下图所示*上图为Flume Data Flow Model, Ref: Flume 1.9.0 User Guide本篇文章会做一个小demo, 数据从spooling directory来(而不是官网图中画的Web Server), 先经过channel, …

电脑计算器_CPA考生注意!2020考场只允许带这种计算器

注册会计师每年采用闭卷、计算机化考试方式。根据往年考生的反应&#xff0c;计算器的使用在考场上发挥了非常大的作用。值得大家注意的是&#xff0c;并不是所有的计算器都能带进考场&#xff0c;考试对计算器有什么要求?如何挑选到正确的计算器&#xff1f;我们一起来看看&a…

最早的齿轮计算机,世界最古老“计算机”出土后110年,科学家终于解开它的秘密...

伦敦大学学院(UCL)的研究团队&#xff0c;提出了“一个激进的新模型&#xff0c;与所有数据相匹配&#xff0c;并最终优雅地展示出了古希腊人眼中的宇宙”。1901年&#xff0c;在希腊岛屿安提基特拉的海岸&#xff0c;潜水员偶然发现了一艘古代沉船。沉船中的一件文物&#xff…

通过Java 8中的Applicative Builder组合多个异步结果

几个月前&#xff0c;我发布了一个出版物 &#xff0c;在其中详细解释了我提出的名为Outcome的抽象&#xff0c;它通过强制使用语义帮助了我很多 没有副作用的代码。 通过遵循这种简单&#xff08;但功能强大&#xff09;的约定&#xff0c;我最终将任何类型的故障&#xff08;…

diskgenius 接触“只读“失败_相亲总是失败,这三个步骤你都做了吗?

原标题&#xff1a;相亲总是失败&#xff0c;这三个步骤你都做了吗&#xff1f;虽然现在爱情很稀缺&#xff0c;但想必大家都想找个男女朋友&#xff0c;想谈一场甜甜的恋爱。 而相亲&#xff0c;无疑是脱单最直接、最有效的方式&#xff0c;没有之一。但不管是经人介绍&#x…

agv系统介绍_重载AGV小车主要结构及导航原理是什么?

相信对AGV有过了解的朋友都知道&#xff0c;当我们在进行工业生产过程时&#xff0c;重载AGV小车可以帮我们实现无人驾驶搬运的一个工作&#xff0c;可以保证AGV在运行时不用通过人工干预的情况下来完成现场的搬运工作,通过无人驾驶技术进行自主导航将货物自动从起始位置搬运到…