从我担任软件工程师的第一天起,我总是听到很多方面的相同要求:
“ 我们希望所有内容都可配置,我们希望在运行时更改所有内容,我们希望有一个可视化工具来应用所有这些逻辑,以便非开发人员使用和配置我们的应用程序。 ”
我也喜欢这种通用范围,但是众所周知,软件系统的适应性不强,客户的需求也不稳定。
在过去的几年中,我们已经使用传统的框架/技术(JMX,分布式缓存,Spring或JEE等)构建了此类可配置应用程序(并非100%可配置)。
近年来,我们的体系结构中还必须包含一个附加概念,这就是大数据 (或3V或4V或任何更合适的词)的概念。 这个新概念淘汰了我们熟悉并在旧的3层应用程序中应用的各种解决方案或变通方法。
有趣的是,我很多次都和十年前一样。 这是软件开发的规则,它永远不会结束,因此个人才能和新冒险也永远不会结束:-)
主要问题仍然是相同的,即如何构建可配置的ETL分布式应用程序 。
因此,我建立了一个小型的适应性强的解决方案,该解决方案在许多用例中可能会有所帮助。 我在大数据世界中使用了3种常用工具: Java , Apache Storm和Kite SDK Morplines 。 Java是主要的编程语言, Apache Storm是分布式流处理引擎,而Kite SDK Morphlines是可配置的ETL引擎。
风筝SDK Morplines
从其描述复制而来: Morphlines是一个开源框架,它减少了构建和更改Hadoop ETL流处理应用程序所需的时间和精力,该应用程序可将数据提取,转换并加载到Apache Solr,HBase,HDFS,Enterprise Data Warehouse或Analytic Online Dashboards中。 morphline是一个丰富的配置文件,可以轻松定义一个转换链,该转换链可以使用来自任何类型数据源的任何类型的数据,处理数据并将结果加载到Hadoop组件中。 它用简单的配置步骤代替了Java编程,并相应地减少了与开发和维护定制ETL项目相关的成本和集成工作。
除了内置命令外 ,您还可以轻松实现自己的命令 ,并在吗啉配置文件中使用它。
示例Morphline配置读取一个JSON字符串,解析它,然后只记录一个特定的JSON元素:
morphlines : [{id : json_terminal_logimportCommands : ["org.kitesdk.**"]commands : [# read the JSON blob{ readJson: {} }# extract JSON objects into head fields{ extractJsonPaths {flatten: truepaths: {name: /nameage: /age}} }# log data{ logInfo {format : "name: {}, record: {}"args : ["@{name}", "@{}"]}}]
}]
风暴变身螺栓
为了在Storm中使用Morphlines,我实现了一个自定义MorphlinesBolt 。 该螺栓的主要职责是:
- 通过配置文件初始化Morphlines处理程序
- 初始化映射说明:
a)从元组到吗啉输入,以及
b)从Morphline输出到新的输出元组 - 使用已初始化的Morplines上下文处理每个传入事件
- 如果Bolt不是Terminal ,则使用提供的Mapper (类型“ b”),使用Morphline执行的输出发出一个新的Tuple。
简单的可配置ETL拓扑
为了测试自定义MorphlinesBolt ,我编写了2个简单的测试。 在这些测试中,您可以看到MorphlinesBolt是如何初始化的,然后是每次执行的结果。 作为输入,我使用了一个自定义的Spout(RandomJsonTestSpout),它仅每100毫秒发出一次新的JSON字符串(可配置)。
DummyJsonTerminalLogTopology
一个简单的拓扑 ,该拓扑通过配置文件和每个传入的元组的执行Morphline处理程序来配置Morphline上下文。 在此拓扑上, MorphlinesBolt被配置为终端螺栓,这意味着对于每个输入Tuple不会发出新的Tuple。
public class DummyJsonTerminalLogTopology {public static void main(String[] args) throws Exception {Config config = new Config();RandomJsonTestSpout spout = new RandomJsonTestSpout().withComplexJson(false);String2ByteArrayTupleMapper tuppleMapper = new String2ByteArrayTupleMapper();tuppleMapper.configure(CmnStormCons.TUPLE_FIELD_MSG);MorphlinesBolt morphBolt = new MorphlinesBolt().withTupleMapper(tuppleMapper).withMorphlineId("json_terminal_log").withMorphlineConfFile("target/test-classes/morphline_confs/json_terminal_log.conf");TopologyBuilder builder = new TopologyBuilder();builder.setSpout("WORD_SPOUT", spout, 1);builder.setBolt("MORPH_BOLT", morphBolt, 1).shuffleGrouping("WORD_SPOUT");if (args.length == 0) {LocalCluster cluster = new LocalCluster();cluster.submitTopology("MyDummyJsonTerminalLogTopology", config, builder.createTopology());Thread.sleep(10000);cluster.killTopology("MyDummyJsonTerminalLogTopology");cluster.shutdown();System.exit(0);} else if (args.length == 1) {StormSubmitter.submitTopology(args[0], config, builder.createTopology());} else {System.out.println("Usage: DummyJsonTerminalLogTopology <topology_name>");}}
}
DummyJson2StringTopology
一个简单的拓扑 ,该拓扑通过配置文件和每个传入的元组的执行Morphline处理程序来配置Morphline上下文。 在此拓扑上, MorphlinesBolt被配置为普通螺栓,这意味着对于每个输入Tuple,它都会发出一个新的Tuple。
public class DummyJson2StringTopology {public static void main(String[] args) throws Exception {Config config = new Config();RandomJsonTestSpout spout = new RandomJsonTestSpout().withComplexJson(false);String2ByteArrayTupleMapper tuppleMapper = new String2ByteArrayTupleMapper();tuppleMapper.configure(CmnStormCons.TUPLE_FIELD_MSG);MorphlinesBolt morphBolt = new MorphlinesBolt().withTupleMapper(tuppleMapper).withMorphlineId("json2string").withMorphlineConfFile("target/test-classes/morphline_confs/json2string.conf")//.withOutputProcessors(Arrays.asList(resultRecordHandlers));.withOutputFields(CmnStormCons.TUPLE_FIELD_MSG).withRecordMapper(RecordHandlerFactory.genDefaultRecordHandler(String.class, new JsonNode2StringResultMapper()));LoggingBolt printBolt = new LoggingBolt().withFields(CmnStormCons.TUPLE_FIELD_MSG);TopologyBuilder builder = new TopologyBuilder();builder.setSpout("WORD_SPOUT", spout, 1);builder.setBolt("MORPH_BOLT", morphBolt, 1).shuffleGrouping("WORD_SPOUT");builder.setBolt("PRINT_BOLT", printBolt, 1).shuffleGrouping("MORPH_BOLT");if (args.length == 0) {LocalCluster cluster = new LocalCluster();cluster.submitTopology("MyDummyJson2StringTopology", config, builder.createTopology());Thread.sleep(10000);cluster.killTopology("MyDummyJson2StringTopology");cluster.shutdown();System.exit(0);} else if (args.length == 1) {StormSubmitter.submitTopology(args[0], config, builder.createTopology());} else {System.out.println("Usage: DummyJson2StringTopology <topology_name>");}}
}
最后的想法
MorphlinesBolt可以用作任何可配置ETL“解决方案”的一部分(作为单处理Bolt,作为终端Bolt,作为复杂管道的一部分,等等)。
在github中的示例项目集中,源代码作为Maven模块( sv-etl-storm-morphlines )提供。
最好的组合是将MorphlinesBolt与Flux一起使用。 这可能会为您提供完全可配置的ETL拓扑!!!
我还没有添加为选项,以便保持较少的依赖关系(我可以添加范围“ test”)。
该模块不是最终模块,我将尝试对其进行改进,因此许多人会在第一个实现中发现各种错误。
对于任何其他想法或说明,请写评论:)
这是我2016年的第一篇文章! 希望您身体健康,思想和行动更好。 一切的第一项美德/价值是人类以及对我们所生活的环境(社会,地球,动物,植物等)的尊重。 所有其他都是次要优先事项,不应破坏优先事项所隐含的内容。 始终牢记最重要的美德,并在您采取的任何行动或思想中考虑它们。
翻译自: https://www.javacodegeeks.com/2016/01/configurable-etl-processing-using-apache-storm-kite-sdk-morphlines.html