kite 使用 go_使用Apache Storm和Kite SDK Morphlines的可配置ETL处理

kite 使用 go

从我担任软件工程师的第一天起,我总是听到很多方面相同要求:

我们希望所有内容都可配置,我们希望在运行时更改所有内容,我们希望有一个可视化工具来应用所有这些逻辑,以便非开发人员使用和配置我们的应用程序。

我也喜欢这种通用范围,但是众所周知,软件系统的适应性不强,客户的需求也不稳定。

在过去的几年中,我们已经使用传统的框架/技术(JMX,分布式缓存,Spring或JEE等)构建了此类可配置应用程序(并非100%可配置)。

近年来,我们的体系结构中还必须包含一个附加概念,这就是大数据 (或3V或4V或任何更合适的词)的概念。 这个概念淘汰了我们熟悉并在旧的3层应用程序中应用的各种解决方案或变通方法。

有趣的是,我很多次都和十年前一样。 这是软件开发的规则,它永远不会结束,因此个人才能和新冒险也永远不会结束:-)

主要问题仍然是相同的,即如何构建可配置的ETL分布式应用程序

因此,我构建了一个小型的适应性解决方案,该解决方案在许多使用案例中可能会有所帮助。 我在大数据世界中使用了3种常用工具: JavaApache StormKite SDK Morplines 。 Java是主要的编程语言, Apache Storm是分布式流处理引擎,而Kite SDK Morphlines是可配置的ETL引擎。

风筝SDK Morplines

从其描述复制: Morphlines是一个开源框架,可减少构建和更改Hadoop ETL流处理应用程序所需的时间和精力,该应用程序可将数据提取,转换并加载到Apache Solr,HBase,HDFS,Enterprise Data Warehouse或Analytic Online Dashboards中。 morphline是一个丰富的配置文件,可轻松定义一个转换链,该转换链可使用来自任何类型数据源的任何类型的数据,处理数据并将结果加载到Hadoop组件中。 它用简单的配置步骤代替了Java编程,并相应地减少了与开发和维护定制ETL项目相关的成本和集成工作。

除了内置命令外 ,您还可以轻松实现自己的命令 ,并在吗啉配置文件中使用它。

示例Morphline配置读取一个JSON字符串,解析它,然后只记录一个特定的JSON元素:

morphlines : [{id : json_terminal_logimportCommands : ["org.kitesdk.**"]commands : [# read the JSON blob{ readJson: {} }# extract JSON objects into head fields{ extractJsonPaths {flatten: truepaths: {name: /nameage: /age}} }# log data{ logInfo {format : "name: {}, record: {}"args : ["@{name}", "@{}"]}}]
}]

风暴变身螺栓

为了在Storm中使用Morphlines,我实现了一个自定义MorphlinesBolt 。 该螺栓的主要职责是:

  • 通过配置文件初始化Morphlines处理程序
  • 初始化映射说明:
    a)从元组到Morphline输入,以及
    b)从Morphline输出到新的输出元组
  • 使用已初始化的Morplines上下文处理每个传入事件
  • 如果Bolt不是Terminal ,则使用提供的Mapper (类型“ b”),使用Morphline执行的输出发出一个新的Tuple。

简单的可配置ETL拓扑

为了测试自定义MorphlinesBolt ,我编写了2个简单的测试。 在这些测试中,您可以看到MorphlinesBolt是如何初始化的,然后是每次执行的结果。 作为输入,我使用了一个自定义的Spout(RandomJsonTestSpout),它仅每100毫秒发出一次新的JSON字符串(可配置)。

DummyJsonTerminalLogTopology

一个简单的拓扑 ,该拓扑通过配置文件和每个传入的元组的执行Morphline处理程序来配置Morphline上下文。 在此拓扑上, MorphlinesBolt被配置为端子螺栓,这意味着对于每个输入Tuple不会发出新的Tuple。

public class DummyJsonTerminalLogTopology {public static void main(String[] args) throws Exception {Config config = new Config();RandomJsonTestSpout spout = new RandomJsonTestSpout().withComplexJson(false);String2ByteArrayTupleMapper tuppleMapper = new String2ByteArrayTupleMapper();tuppleMapper.configure(CmnStormCons.TUPLE_FIELD_MSG);MorphlinesBolt morphBolt = new MorphlinesBolt().withTupleMapper(tuppleMapper).withMorphlineId("json_terminal_log").withMorphlineConfFile("target/test-classes/morphline_confs/json_terminal_log.conf");TopologyBuilder builder = new TopologyBuilder();builder.setSpout("WORD_SPOUT", spout, 1);builder.setBolt("MORPH_BOLT", morphBolt, 1).shuffleGrouping("WORD_SPOUT");if (args.length == 0) {LocalCluster cluster = new LocalCluster();cluster.submitTopology("MyDummyJsonTerminalLogTopology", config, builder.createTopology());Thread.sleep(10000);cluster.killTopology("MyDummyJsonTerminalLogTopology");cluster.shutdown();System.exit(0);} else if (args.length == 1) {StormSubmitter.submitTopology(args[0], config, builder.createTopology());} else {System.out.println("Usage: DummyJsonTerminalLogTopology <topology_name>");}}
}

DummyJson2StringTopology

一个简单的拓扑 ,该拓扑通过配置文件和每个传入的元组的执行Morphline处理程序来配置Morphline上下文。 在此拓扑上, MorphlinesBolt被配置为普通螺栓,这意味着对于每个输入Tuple,它都会发出一个新的Tuple。

public class DummyJson2StringTopology {public static void main(String[] args) throws Exception {Config config = new Config();RandomJsonTestSpout spout = new RandomJsonTestSpout().withComplexJson(false);String2ByteArrayTupleMapper tuppleMapper = new String2ByteArrayTupleMapper();tuppleMapper.configure(CmnStormCons.TUPLE_FIELD_MSG);MorphlinesBolt morphBolt = new MorphlinesBolt().withTupleMapper(tuppleMapper).withMorphlineId("json2string").withMorphlineConfFile("target/test-classes/morphline_confs/json2string.conf")//.withOutputProcessors(Arrays.asList(resultRecordHandlers));.withOutputFields(CmnStormCons.TUPLE_FIELD_MSG).withRecordMapper(RecordHandlerFactory.genDefaultRecordHandler(String.class, new JsonNode2StringResultMapper()));LoggingBolt printBolt = new LoggingBolt().withFields(CmnStormCons.TUPLE_FIELD_MSG);TopologyBuilder builder = new TopologyBuilder();builder.setSpout("WORD_SPOUT", spout, 1);builder.setBolt("MORPH_BOLT", morphBolt, 1).shuffleGrouping("WORD_SPOUT");builder.setBolt("PRINT_BOLT", printBolt, 1).shuffleGrouping("MORPH_BOLT");if (args.length == 0) {LocalCluster cluster = new LocalCluster();cluster.submitTopology("MyDummyJson2StringTopology", config, builder.createTopology());Thread.sleep(10000);cluster.killTopology("MyDummyJson2StringTopology");cluster.shutdown();System.exit(0);} else if (args.length == 1) {StormSubmitter.submitTopology(args[0], config, builder.createTopology());} else {System.out.println("Usage: DummyJson2StringTopology <topology_name>");}}
}

最后的想法

MorphlinesBolt可以用作任何可配置ETL“解决方案”的一部分(作为单处理Bolt,作为终端Bolt,作为复杂管道的一部分,等等)。

morphlines_storm_topology_examples

我在github中的示例项目集合中,源代码作为maven模块( sv-etl-storm-morphlines )提供。

最好的组合是将MorphlinesBolt与Flux一起使用。 这可能会为您提供完全可配置的ETL拓扑!!!
我还没有添加为选项,以便保持更少的依赖关系(我可能添加了范围“ test”)。

该模块不是最终模块,我将尝试对其进行改进,因此许多人会在第一个实现中发现各种错误。

对于任何其他想法或澄清,请写评论:)

这是我2016年的第一篇文章! 希望您身体健康,思想和行动更好。 一切事物的首要美德/价值是人类以及对我们所有人赖以生存的环境(社会,地球,动物,植物等)的尊重。 所有其他都是次要优先事项,不应破坏优先事项所隐含的内容。 始终牢记最重要的美德,并在您采取的任何行动或思想中考虑它们。

翻译自: https://www.javacodegeeks.com/2016/01/configurable-etl-processing-using-apache-storm-kite-sdk-morphlines.html

kite 使用 go

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/336609.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

使用react实现select_使用 Hooks 优化 React 组件

奇技指南本文内容主要是我之前分享的文字版&#xff0c;若想看重点的话可以看之前的Slide: https://ppt.baomitu.com/d/75fc979a本文作者奇舞团前端开发工程师李喆明。需求描述由于作者所在的业务是资讯内容类业务&#xff0c;因而在业务中会经常碰到如下场景&#xff1a;有一个…

Windows内存修改初篇

​ #define _CRT_SECURE_NO_WARNINGS #include <stdio.h> #include <windows.h>BOOL FindFirst(DWORD dwValue);//对目标进程空间进行查找 BOOL FindNext(DWORD dwValue);//对目标空间进行2&#xff0c;3&#xff0c;4。。查找//查找数据的地址列表 DWORD g_arList…

斐波那契实现阶乘js_功能与命令式编程。 Java 8中的斐波那契,素数和阶乘

斐波那契实现阶乘js有多种编程风格/范例&#xff0c;但是两个著名的风格是Imperative和Functional 。 命令式编程是最主要的范例&#xff0c;因为几乎所有主流语言&#xff08;C &#xff0c;Java&#xff0c;C&#xff03;&#xff09;都在推广它。 但是在最近几年中&#xf…

cacti不能实时刷新流量图_介绍一种编码帧内刷新算法

0.引言本文主要介绍一种帧内刷新算法&#xff0c;解决I帧太大带来的延迟问题&#xff0c;可以在调优时&#xff0c;值得借鉴。帧内刷新技术避免 I 帧尖峰带来的带宽压力&#xff0c;可以有效地降低视频通信中的缓冲区延迟。帧内刷新算法是一种视频错误恢复的方法&#xff0c;通…

python kotlin_Java和Python中类似Kotlin的生成器,续:附加参数

python kotlin介绍 在今天的文章中&#xff0c;我们将继续上周的文章&#xff0c;内容涉及使用Java和Python创建类似Kotlin的构建器&#xff0c;并扩展了构建器API以采用一些可选参数以提高灵活性。 我们继续我们HTML示例&#xff0c;尝试添加标记属性&#xff0c;例如class&am…

按钮自动居中布局_CSS布局技巧

css实现左右布局和居中布局显示是前端进行页面设计的基础&#xff0c;也是全面了解并学习css一个很好的切入点&#xff0c;因为其中会涉及到对许多css基础点的认知。实现css入门&#xff0c;理解左右布局的实现方式是必经之路&#xff0c;同时也能使我们在项目中涉及前端编码的…

jooq_jOOQ星期二:拉斐尔·温特豪德(Rafael Winterhalter)正在与字节好友合作字节码...

jooq欢迎来到jOOQ Tuesdays系列。 在本系列文章中&#xff0c;我们每隔一个月的第三个星期二发布一篇文章&#xff0c;从jOOQ的角度采访我们发现该行业令人兴奋的人。 这包括从事SQL&#xff0c;Java&#xff0c;开放源代码以及各种其他相关主题的人员。 我们很高兴在第七版中…

多方法接口回调_啊?Java反射遇到接口

本文适合有点Java反射基础的同学&#xff0c;在Java反射调用方法时遇到接口参数是一件很蛋疼的事情。在反射调用方法时需要传参数&#xff0c;像传递基本数据类型进去用就完事&#xff0c;传个对象进去怎么整都没关系&#xff0c;因为你在外部有对象的引用&#xff0c;但 如果需…

0与1世界的初级编程篇之C语言

C语言是一门面向过程的计算机编程语言&#xff0c;与C、Java等面向对象编程语言有所不同。C语言的设计目标是提供一种能以简易的方式编译、处理低级存储器、仅产生少量的机器码以及不需要任何运行环境支持便能运行的编程语言。C语言描述问题比汇编语言迅速&#xff0c;工作量小…

swarm部署集群_WildFly Swarm –将Java EE应用程序部署为独立的Jar

swarm部署集群WildFly Swarm提供了一个简单的解决方案&#xff0c;用于将Java EE应用程序部署为独立的Jar文件。 这使得部署应用程序非常容易&#xff0c;尤其是REST或Web服务。 Swarm在这方面与Spring Boot非常相似&#xff0c;因为它可以快速开发Web&#xff08;.War&#xf…

QT 手动创建信号函数 与 槽函数

自定义信号槽必须要有Q_OBJCT 自己通过关键字signals创建信号函数,只声明信号函数即可&#xff0c;系统会自动定义它。 利用 public slots: 声明一个槽函数&#xff0c;槽函数不会自动生成&#xff0c;并且去定义它。

九九乘法表代码口述_利用随机函数实现座次表的随机排座

昨天听完讲座&#xff0c;晚上忍不住写了份学后感&#xff0c;今天有老师在文章下留言问如何实现随机排座&#xff0c;上午在快速理赔中心处理交通事故&#xff0c;处理完后将车开去4S店维修&#xff0c;回来后给娃做完中饭就开始写代码实现这个功能&#xff0c;因为有C功底&am…

jep使用_JEP 277“增强弃用”非常好。 但这是一个更好的选择

jep使用维护API很难。 我们正在维护非常复杂的jOOQ API。 但是就语义版本而言&#xff0c;我们遵循相对宽松的规则 。 当您阅读Brian Goetz和其他人关于在JDK中保持向后兼容性的评论时&#xff0c;我只能对他们的工作表示敬意。 显然&#xff0c;我们都希望最终移除Vector &a…

python 港股交易数据_GitHub - 116pythonZS/futuquant: 富途量化平台 API

FutuQuant - 富途量化投资平台 (Futu Quant Trading API)简介​FutuQuant开源项目可以满足使用富途牛牛软件进行量化投资的需求, 提供包括Python接口、Json接口的行情及交易的API。安装pip install futuquant注: 本API同时兼容Python2和Python3, 推荐安装anaconda环境&#xff…

LeetCode 16.01 交换两数

原题链接 交换 a,b两个数&#xff0c;不开辟额外控件 解析&#xff1a; 设a甲 b乙 aa^b; 转换 a甲^乙 b乙 ba^b; 转换 b甲^乙^乙 因为 乙^乙0 甲^0甲 &#xff0c;所以b甲 aa^b; 转换 a甲^乙^甲 同上所述&#xff0c;所以 a乙 至此…

古巴比伦乘法_古巴平台中的通用过滤器–类固醇上的excel过滤器

古巴比伦乘法正如我上次承诺的那样&#xff0c;我计划浏览该平台的某些功能&#xff0c;这些功能我认为非常有价值。 所以我将在这里做一些系列。 从明显的用户界面&#xff0c;过滤&#xff0c;安全性到一些高级功能&#xff08;如Web Portal&#xff0c;可扩展性&#xff0c;…

excel运行没反应_Excel数据很少文件却很大,问题出在哪里呢?两种方法轻松解决...

经常和Excel打交道的小伙伴可能会有这样的困惑&#xff0c;一个Excel工作簿中的数据明明很少&#xff0c;文件所占的空间却很大。打开这种Excel工作簿后&#xff0c;电脑CPU占用率瞬间飙升&#xff0c;甚至遇见工作簿没有响应的情况出现。遇见这样的工作簿让人窝火&#xff0c;…

约瑟夫环问题题解

按照1-8顺寻存储&#xff0c;起始位置为3&#xff0c;数到4的人出列。 #include<iostream> using namespace std; typedef struct node {int num;struct node* next; }Node; int main() {int n 8, k 3, m 4;Node*h (Node*)malloc(sizeof(Node));h->num 1;h->n…

java lambda::_基准测试:Java 8 Lambda和流如何使您的代码慢5倍

java lambda::与长期的实现相比&#xff0c;Java 8 lambda和流的性能如何&#xff1f; Lambda表达式和流在Java 8中受到了热烈的欢迎。这些是迄今为止很激动人心的功能&#xff0c;很长一段时间以来&#xff0c;它们就已经应用到Java中了。 新的语言功能使我们可以在代码中采用…

如何在java中实现小数点自增_java编个计算器怎么在加入小数点

展开全部我做的可以运行&#xff0c;你看看吧&#xff01;import java.awt.*;import java.awt.event.*;import java.lang.*;import javax.swing.*;public class Counter extends Frame{//声明三个面板的布局GridLayout gl1,gl2,gl3;Panel p0,p1,p2,p3;JTextField tf1;TextField…