kite 使用 go_使用Apache Storm和Kite SDK Morphlines的可配置ETL处理

kite 使用 go

从我担任软件工程师的第一天起,我总是听到很多方面相同要求:

我们希望所有内容都可配置,我们希望在运行时更改所有内容,我们希望有一个可视化工具来应用所有这些逻辑,以便非开发人员使用和配置我们的应用程序。

我也喜欢这种通用范围,但是众所周知,软件系统的适应性不强,客户的需求也不稳定。

在过去的几年中,我们已经使用传统的框架/技术(JMX,分布式缓存,Spring或JEE等)构建了此类可配置应用程序(并非100%可配置)。

近年来,我们的体系结构中还必须包含一个附加概念,这就是大数据 (或3V或4V或任何更合适的词)的概念。 这个概念淘汰了我们熟悉并在旧的3层应用程序中应用的各种解决方案或变通方法。

有趣的是,我很多次都和十年前一样。 这是软件开发的规则,它永远不会结束,因此个人才能和新冒险也永远不会结束:-)

主要问题仍然是相同的,即如何构建可配置的ETL分布式应用程序

因此,我构建了一个小型的适应性解决方案,该解决方案在许多使用案例中可能会有所帮助。 我在大数据世界中使用了3种常用工具: JavaApache StormKite SDK Morplines 。 Java是主要的编程语言, Apache Storm是分布式流处理引擎,而Kite SDK Morphlines是可配置的ETL引擎。

风筝SDK Morplines

从其描述复制: Morphlines是一个开源框架,可减少构建和更改Hadoop ETL流处理应用程序所需的时间和精力,该应用程序可将数据提取,转换并加载到Apache Solr,HBase,HDFS,Enterprise Data Warehouse或Analytic Online Dashboards中。 morphline是一个丰富的配置文件,可轻松定义一个转换链,该转换链可使用来自任何类型数据源的任何类型的数据,处理数据并将结果加载到Hadoop组件中。 它用简单的配置步骤代替了Java编程,并相应地减少了与开发和维护定制ETL项目相关的成本和集成工作。

除了内置命令外 ,您还可以轻松实现自己的命令 ,并在吗啉配置文件中使用它。

示例Morphline配置读取一个JSON字符串,解析它,然后只记录一个特定的JSON元素:

morphlines : [{id : json_terminal_logimportCommands : ["org.kitesdk.**"]commands : [# read the JSON blob{ readJson: {} }# extract JSON objects into head fields{ extractJsonPaths {flatten: truepaths: {name: /nameage: /age}} }# log data{ logInfo {format : "name: {}, record: {}"args : ["@{name}", "@{}"]}}]
}]

风暴变身螺栓

为了在Storm中使用Morphlines,我实现了一个自定义MorphlinesBolt 。 该螺栓的主要职责是:

  • 通过配置文件初始化Morphlines处理程序
  • 初始化映射说明:
    a)从元组到Morphline输入,以及
    b)从Morphline输出到新的输出元组
  • 使用已初始化的Morplines上下文处理每个传入事件
  • 如果Bolt不是Terminal ,则使用提供的Mapper (类型“ b”),使用Morphline执行的输出发出一个新的Tuple。

简单的可配置ETL拓扑

为了测试自定义MorphlinesBolt ,我编写了2个简单的测试。 在这些测试中,您可以看到MorphlinesBolt是如何初始化的,然后是每次执行的结果。 作为输入,我使用了一个自定义的Spout(RandomJsonTestSpout),它仅每100毫秒发出一次新的JSON字符串(可配置)。

DummyJsonTerminalLogTopology

一个简单的拓扑 ,该拓扑通过配置文件和每个传入的元组的执行Morphline处理程序来配置Morphline上下文。 在此拓扑上, MorphlinesBolt被配置为端子螺栓,这意味着对于每个输入Tuple不会发出新的Tuple。

public class DummyJsonTerminalLogTopology {public static void main(String[] args) throws Exception {Config config = new Config();RandomJsonTestSpout spout = new RandomJsonTestSpout().withComplexJson(false);String2ByteArrayTupleMapper tuppleMapper = new String2ByteArrayTupleMapper();tuppleMapper.configure(CmnStormCons.TUPLE_FIELD_MSG);MorphlinesBolt morphBolt = new MorphlinesBolt().withTupleMapper(tuppleMapper).withMorphlineId("json_terminal_log").withMorphlineConfFile("target/test-classes/morphline_confs/json_terminal_log.conf");TopologyBuilder builder = new TopologyBuilder();builder.setSpout("WORD_SPOUT", spout, 1);builder.setBolt("MORPH_BOLT", morphBolt, 1).shuffleGrouping("WORD_SPOUT");if (args.length == 0) {LocalCluster cluster = new LocalCluster();cluster.submitTopology("MyDummyJsonTerminalLogTopology", config, builder.createTopology());Thread.sleep(10000);cluster.killTopology("MyDummyJsonTerminalLogTopology");cluster.shutdown();System.exit(0);} else if (args.length == 1) {StormSubmitter.submitTopology(args[0], config, builder.createTopology());} else {System.out.println("Usage: DummyJsonTerminalLogTopology <topology_name>");}}
}

DummyJson2StringTopology

一个简单的拓扑 ,该拓扑通过配置文件和每个传入的元组的执行Morphline处理程序来配置Morphline上下文。 在此拓扑上, MorphlinesBolt被配置为普通螺栓,这意味着对于每个输入Tuple,它都会发出一个新的Tuple。

public class DummyJson2StringTopology {public static void main(String[] args) throws Exception {Config config = new Config();RandomJsonTestSpout spout = new RandomJsonTestSpout().withComplexJson(false);String2ByteArrayTupleMapper tuppleMapper = new String2ByteArrayTupleMapper();tuppleMapper.configure(CmnStormCons.TUPLE_FIELD_MSG);MorphlinesBolt morphBolt = new MorphlinesBolt().withTupleMapper(tuppleMapper).withMorphlineId("json2string").withMorphlineConfFile("target/test-classes/morphline_confs/json2string.conf")//.withOutputProcessors(Arrays.asList(resultRecordHandlers));.withOutputFields(CmnStormCons.TUPLE_FIELD_MSG).withRecordMapper(RecordHandlerFactory.genDefaultRecordHandler(String.class, new JsonNode2StringResultMapper()));LoggingBolt printBolt = new LoggingBolt().withFields(CmnStormCons.TUPLE_FIELD_MSG);TopologyBuilder builder = new TopologyBuilder();builder.setSpout("WORD_SPOUT", spout, 1);builder.setBolt("MORPH_BOLT", morphBolt, 1).shuffleGrouping("WORD_SPOUT");builder.setBolt("PRINT_BOLT", printBolt, 1).shuffleGrouping("MORPH_BOLT");if (args.length == 0) {LocalCluster cluster = new LocalCluster();cluster.submitTopology("MyDummyJson2StringTopology", config, builder.createTopology());Thread.sleep(10000);cluster.killTopology("MyDummyJson2StringTopology");cluster.shutdown();System.exit(0);} else if (args.length == 1) {StormSubmitter.submitTopology(args[0], config, builder.createTopology());} else {System.out.println("Usage: DummyJson2StringTopology <topology_name>");}}
}

最后的想法

MorphlinesBolt可以用作任何可配置ETL“解决方案”的一部分(作为单处理Bolt,作为终端Bolt,作为复杂管道的一部分,等等)。

morphlines_storm_topology_examples

我在github中的示例项目集合中,源代码作为maven模块( sv-etl-storm-morphlines )提供。

最好的组合是将MorphlinesBolt与Flux一起使用。 这可能会为您提供完全可配置的ETL拓扑!!!
我还没有添加为选项,以便保持更少的依赖关系(我可能添加了范围“ test”)。

该模块不是最终模块,我将尝试对其进行改进,因此许多人会在第一个实现中发现各种错误。

对于任何其他想法或澄清,请写评论:)

这是我2016年的第一篇文章! 希望您身体健康,思想和行动更好。 一切事物的首要美德/价值是人类以及对我们所有人赖以生存的环境(社会,地球,动物,植物等)的尊重。 所有其他都是次要优先事项,不应破坏优先事项所隐含的内容。 始终牢记最重要的美德,并在您采取的任何行动或思想中考虑它们。

翻译自: https://www.javacodegeeks.com/2016/01/configurable-etl-processing-using-apache-storm-kite-sdk-morphlines.html

kite 使用 go

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/336609.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

使用react实现select_使用 Hooks 优化 React 组件

奇技指南本文内容主要是我之前分享的文字版&#xff0c;若想看重点的话可以看之前的Slide: https://ppt.baomitu.com/d/75fc979a本文作者奇舞团前端开发工程师李喆明。需求描述由于作者所在的业务是资讯内容类业务&#xff0c;因而在业务中会经常碰到如下场景&#xff1a;有一个…

cacti不能实时刷新流量图_介绍一种编码帧内刷新算法

0.引言本文主要介绍一种帧内刷新算法&#xff0c;解决I帧太大带来的延迟问题&#xff0c;可以在调优时&#xff0c;值得借鉴。帧内刷新技术避免 I 帧尖峰带来的带宽压力&#xff0c;可以有效地降低视频通信中的缓冲区延迟。帧内刷新算法是一种视频错误恢复的方法&#xff0c;通…

按钮自动居中布局_CSS布局技巧

css实现左右布局和居中布局显示是前端进行页面设计的基础&#xff0c;也是全面了解并学习css一个很好的切入点&#xff0c;因为其中会涉及到对许多css基础点的认知。实现css入门&#xff0c;理解左右布局的实现方式是必经之路&#xff0c;同时也能使我们在项目中涉及前端编码的…

jooq_jOOQ星期二:拉斐尔·温特豪德(Rafael Winterhalter)正在与字节好友合作字节码...

jooq欢迎来到jOOQ Tuesdays系列。 在本系列文章中&#xff0c;我们每隔一个月的第三个星期二发布一篇文章&#xff0c;从jOOQ的角度采访我们发现该行业令人兴奋的人。 这包括从事SQL&#xff0c;Java&#xff0c;开放源代码以及各种其他相关主题的人员。 我们很高兴在第七版中…

多方法接口回调_啊?Java反射遇到接口

本文适合有点Java反射基础的同学&#xff0c;在Java反射调用方法时遇到接口参数是一件很蛋疼的事情。在反射调用方法时需要传参数&#xff0c;像传递基本数据类型进去用就完事&#xff0c;传个对象进去怎么整都没关系&#xff0c;因为你在外部有对象的引用&#xff0c;但 如果需…

0与1世界的初级编程篇之C语言

C语言是一门面向过程的计算机编程语言&#xff0c;与C、Java等面向对象编程语言有所不同。C语言的设计目标是提供一种能以简易的方式编译、处理低级存储器、仅产生少量的机器码以及不需要任何运行环境支持便能运行的编程语言。C语言描述问题比汇编语言迅速&#xff0c;工作量小…

QT 手动创建信号函数 与 槽函数

自定义信号槽必须要有Q_OBJCT 自己通过关键字signals创建信号函数,只声明信号函数即可&#xff0c;系统会自动定义它。 利用 public slots: 声明一个槽函数&#xff0c;槽函数不会自动生成&#xff0c;并且去定义它。

九九乘法表代码口述_利用随机函数实现座次表的随机排座

昨天听完讲座&#xff0c;晚上忍不住写了份学后感&#xff0c;今天有老师在文章下留言问如何实现随机排座&#xff0c;上午在快速理赔中心处理交通事故&#xff0c;处理完后将车开去4S店维修&#xff0c;回来后给娃做完中饭就开始写代码实现这个功能&#xff0c;因为有C功底&am…

jep使用_JEP 277“增强弃用”非常好。 但这是一个更好的选择

jep使用维护API很难。 我们正在维护非常复杂的jOOQ API。 但是就语义版本而言&#xff0c;我们遵循相对宽松的规则 。 当您阅读Brian Goetz和其他人关于在JDK中保持向后兼容性的评论时&#xff0c;我只能对他们的工作表示敬意。 显然&#xff0c;我们都希望最终移除Vector &a…

古巴比伦乘法_古巴平台中的通用过滤器–类固醇上的excel过滤器

古巴比伦乘法正如我上次承诺的那样&#xff0c;我计划浏览该平台的某些功能&#xff0c;这些功能我认为非常有价值。 所以我将在这里做一些系列。 从明显的用户界面&#xff0c;过滤&#xff0c;安全性到一些高级功能&#xff08;如Web Portal&#xff0c;可扩展性&#xff0c;…

excel运行没反应_Excel数据很少文件却很大,问题出在哪里呢?两种方法轻松解决...

经常和Excel打交道的小伙伴可能会有这样的困惑&#xff0c;一个Excel工作簿中的数据明明很少&#xff0c;文件所占的空间却很大。打开这种Excel工作簿后&#xff0c;电脑CPU占用率瞬间飙升&#xff0c;甚至遇见工作簿没有响应的情况出现。遇见这样的工作簿让人窝火&#xff0c;…

java lambda::_基准测试:Java 8 Lambda和流如何使您的代码慢5倍

java lambda::与长期的实现相比&#xff0c;Java 8 lambda和流的性能如何&#xff1f; Lambda表达式和流在Java 8中受到了热烈的欢迎。这些是迄今为止很激动人心的功能&#xff0c;很长一段时间以来&#xff0c;它们就已经应用到Java中了。 新的语言功能使我们可以在代码中采用…

如何在java中实现小数点自增_java编个计算器怎么在加入小数点

展开全部我做的可以运行&#xff0c;你看看吧&#xff01;import java.awt.*;import java.awt.event.*;import java.lang.*;import javax.swing.*;public class Counter extends Frame{//声明三个面板的布局GridLayout gl1,gl2,gl3;Panel p0,p1,p2,p3;JTextField tf1;TextField…

在Java中使用FileChannel和ByteBuffer对文件进行读写

过去&#xff0c;我讨论过RandomAccessFile以及如何将其用于在Java中进行更快的IO&#xff0c;在本Java NIO教程中&#xff0c;我们将了解如何通过使用FileChannel和ByteBuffer来使用读/写数据。 Channel提供了一种从文件读取数据的替代方法&#xff0c;它提供了比InputStream…

tp5防止sql注入mysql_PHP+Mysql防止SQL注入的方法(life)

这篇文章介绍的内容是关于PHPMysql防止SQL注入的方法&#xff0c;有着一定的参考价值&#xff0c;现在分享给大家&#xff0c;有需要的朋友可以参考一下我的官方群点击此处。方法一&#xff1a;mysql_real_escape_string -- 转义 SQL 语句中使用的字符串中的特殊字符&#xff0…

neo4j 关系属性_Neo4j:特定关系与一般关系+属性

neo4j 关系属性为了在Neo4j查询中获得最佳的遍历速度&#xff0c;我们应该使关系类型尽可能具体 。 让我们看一下几周前我在Skillsmatter上发表的“ 建模建议引擎建模 ”演讲中的一个例子。 我需要决定如何为成员和事件之间的“ RSVP”关系建模。 一个人可以对某个事件表示“…

2008r2配置 iis mysql php_Windows 2008 R2服务器配置文档iis+php+mysql

关闭防火墙关闭防火墙打开桌面远程连接修复系统依赖文件IIS配置添加角色创建一个网站验证iis是否成功删除默认创建的网站添加网站PHP配置安装解压&#xff0c;改名创建一个php.ini&#xff0c;php.ini由php.ini-production改名得到如果输入php -v后出现如下报错解决方法如下(安…

接口入口在什么地方_弱电工程施工图审查要点?有哪些地方需要审核?审核要求是什么?...

前言&#xff1a;弱电工程施工图审核标准是什么呢&#xff1f;需要审核哪些方面呢&#xff1f;有没有可以参考的地方&#xff1f;今天就分享一套弱电系统施工图审核技术要求&#xff0c;可以参考正文&#xff1a;1.设计文件设计文件是否完整(包括设计说明、平面图、系统图(单体…

drill apache_Apache Drill 1.4性能增强的简要概述

drill apache今天&#xff0c;我们很高兴地宣布&#xff0c;MapR发行版中现已提供Apache Drill 1.4。 钻1.4是MAPR生产就绪和支持的版本&#xff0c;可以从下载这里 &#xff0c;找到1.4版本说明这里 。 Drill 1.4以其高度灵活和可扩展的体系结构为基础&#xff0c;带来了多种…

docker java mysql_Docker 搭建 MySQL 服务

安装 Docker请参考我的另一篇文章建立镜像拉取镜像# 拉取最新版本镜像docker pull mysql# 拉取执行版本镜像docker pull mysql:版本号检查拉取是否成功docker images创建数据库容器(不建立数据映射)docker run -d --name mysql --rm -p 3306:3306 -e MYSQL_ROOT_PASSWORD123456…