Java大数据处理的流行框架

大数据挑战

在公司需要处理不断增长的数据量的各个领域中,对大数据的概念有不同的理解。 在大多数这些情况下,需要以某种方式设计所考虑的系统,以便能够处理该数据,而不会随着数据大小的增加而牺牲吞吐量。 从本质上讲,这导致需要构建高度可伸缩的系统,以便可以根据在给定时间点需要处理的数据量来分配更多资源。

构建这样的系统是一项耗时且复杂的活动,因此,可以使用第三方框架和库来提供现成的可伸缩性要求。 在Java应用程序中已经有很多不错的选择,本文将简要讨论一些最受欢迎的选择:

1个大数据

行动框架

我们将通过实现一个简单的管道来处理每个设备的数据,以测量给定区域的空气质量指数,从而演示每个框架。 为简单起见,我们假定来自设备的数字数据是分批接收或以流方式接收的。 在整个示例中,我们将使用THRESHOLD常量表示该值,在该值之上,我们认为一个区域被污染。

阿帕奇火花

在Spark中,我们需要先将数据转换为正确的格式。 我们将使用数据集,但我们也可以选择数据帧或RDD(弹性分布式数据集)作为数据表示的替代方法。 然后,我们可以应用许多Spark转换和操作,以便以分布式方式处理数据。

 public long countPollutedRegions(String[] numbers) { // runs a Spark master that takes up 4 cores SparkSession session = SparkSession.builder(). appName( "AirQuality" ). master( "local[4]" ). getOrCreate(); // converts the array of numbers to a Spark dataset Dataset numbersSet = session.createDataset(Arrays.asList(numbers), Encoders.STRING());         // runs the data pipeline on the local spark long pollutedRegions = numbersSet.map(number -> Integer.valueOf(number), Encoders. INT ()) .filter(number -> number > THRESHOLD).count();                 return pollutedRegions; } 

如果要更改上述应用程序以从外部源读取数据,写入外部数据源并在Spark集群而不是本地Spark实例上运行,我们将具有以下执行流程:

2大数据

Spark驱动程序可以是单独的实例,也可以是Spark群集的一部分。

Apache Flink

与Spark相似,我们需要在Flink DataSet中表示数据,然后对其应用必要的转换和操作:

 public long countPollutedRegions(String[] numbers) throws Exception { // creates a Flink execution environment with proper configuration StreamExecutionEnvironment env = StreamExecutionEnvironment. createLocalEnvironment(); // converts the array of numbers to a Flink dataset and creates // the data pipiline DataStream stream = env.fromCollection(Arrays.asList(numbers)). map(number -> Integer.valueOf(number)) .filter(number -> number > THRESHOLD).returns(Integer. class ); long pollutedRegions = 0; Iterator numbersIterator = DataStreamUtils.collect(stream); while (numbersIterator.hasNext()) { pollutedRegions++; numbersIterator.next(); } return pollutedRegions; } 

如果要更改上述应用程序以从外部源读取数据,写入外部数据源并在Flink群集上运行,我们将具有以下执行流程:

3大数据

将应用程序提交到Flink群集的Flink客户端是Flink CLI实用程序或JobManager的UI。

阿帕奇风暴

在Storm中,数据管道被创建为Spouts(数据源)和Bolts(数据处理单元)的拓扑。 由于Storm通常会处理无限制的数据流,因此我们会将空气质量指数编号数组的处理模拟为有限制的流:

 public void countPollutedRegions(String[] numbers) throws Exception { // builds the topology as a combination of spouts and bolts TopologyBuilder builder = new TopologyBuilder(); builder.setSpout( "numbers-spout" StormAirQualitySpout(numbers)); "numbers-spout" , new StormAirQualitySpout(numbers)); builder.setBolt( "number-bolt" , new StormAirQualityBolt()). shuffleGrouping( "numbers-spout" shuffleGrouping( "numbers-spout" );         // prepares Storm conf and along with the topology submits it for // execution to a local Storm cluster Config conf = new Config(); conf.setDebug( true ); LocalCluster localCluster = null; try { localCluster = new LocalCluster(); localCluster.submitTopology( "airquality-topology" , conf, builder.createTopology()); Thread.sleep(10000); localCluster.shutdown(); } catch (InterruptedException ex) { localCluster.shutdown(); } } 

我们有一个喷嘴可以为空气质量指数编号的数组提供数据源,还有一个仅过滤指示污染区域的螺栓:

 public class StormAirQualitySpout extends BaseRichSpout { private boolean emitted = false ; private SpoutOutputCollector collector; private String[] numbers; public StormAirQualitySpout(String[] numbers) { this .numbers = numbers; }     @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare( new Fields( "number" )); } @Override public void open(Map paramas, TopologyContext context, SpoutOutputCollector collector) { this .collector = collector; } @Override public void nextTuple() { // we make sure that the numbers array is processed just once by // the spout if (!emitted) { for (String number : numbers) { collector.emit( new Values(number)); } emitted = true ; } }  } 
 public class StormAirQualityBolt extends BaseRichBolt { private static final int THRESHOLD = 10; private int pollutedRegions = 0; @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare( new Fields( "number" )); } @Override public void prepare(Map params,  TopologyContext context,  OutputCollector collector) { } @Override public void execute(Tuple tuple) { String number = tuple.getStringByField( "number" ); Integer numberInt = Integer.valueOf(number); if (numberInt > THRESHOLD) { pollutedRegions++; } }  } 

我们正在使用LocalCluster实例提交到本地Storm集群,这对于开发很方便,但是我们想将Storm拓扑提交到生产集群。 在这种情况下,我们将具有以下执行流程:

4大数据

阿帕奇点燃

在Ignite中,我们需要先将数据放入分布式缓存中,然后再运行数据处理管道,该管道是在Ignite群集上以分布式方式执行的SQL查询的前者:

 public long countPollutedRegions(String[] numbers) { IgniteConfiguration igniteConfig = new IgniteConfiguration(); CacheConfiguration cacheConfig = new CacheConfiguration(); // cache key is number index in the array and value is the number cacheConfig.setIndexedTypes(Integer. class , String. class ); cacheConfig.setName(NUMBERS_CACHE); igniteConfig.setCacheConfiguration(cacheConfig);         try (Ignite ignite = Ignition.start(igniteConfig)) { IgniteCache cache = ignite.getOrCreateCache(NUMBERS_CACHE); // adds the numbers to the Ignite cache try (IgniteDataStreamer streamer = ignite.dataStreamer(cache.getName())) { int key = 0; for (String number : numbers) { streamer.addData(key++, number); } } // performs an SQL query over the cached numbers SqlFieldsQuery query = new SqlFieldsQuery( "select * from String where _val > " + THRESHOLD);             FieldsQueryCursor<List> cursor = cache.query(query); int pollutedRegions = cursor.getAll().size(); return pollutedRegions; }  } 

如果我们要在Ignite群集中运行应用程序,它将具有以下执行流程:

榛树喷射机

Hazelcast Jet在Hazelcast IMDG之上运行,并且与Ignite相似,如果我们要处理数据,我们需要首先将其放入Hazelcast IMDG群集中:

 public long countPollutedRegions(String[] numbers) { // prepares the Jet data processing pipeline Pipeline p = Pipeline.create(); p.drawFrom(Sources.list( "numbers" )). map(number -> Integer.valueOf((String) number)) .filter(number -> number > THRESHOLD).drainTo(Sinks.list( "filteredNumbers" )); JetInstance jet = Jet.newJetInstance(); IList numbersList = jet.getList( "numbers" ); numbersList.addAll(Arrays.asList(numbers)); try { // submits the pipeline in the Jet cluster jet.newJob(p).join(); // gets the filtered data from Hazelcast IMDG List filteredRecordsList = jet.getList( "filteredNumbers" ); int pollutedRegions = filteredRecordsList.size(); return pollutedRegions; } finally { Jet.shutdownAll(); } } 

但是请注意,Jet还提供集成而无需外部数据源,并且不需要将数据存储在IMDG群集中。 您也可以在不首先将数据存储到列表中的情况下进行聚合(查看Github中包含改进版本的完整示例)。 感谢Hazelcast工程团队的Jaromir和Can的宝贵意见。

如果我们要在Hazelcast Jet集群中运行该应用程序,它将具有以下执行流程:

卡夫卡流

Kafka Streams是一个客户端库,使用Kafka主题作为数据处理管道的源和接收器。 为了在我们的方案中使用Kafka Streams库,我们将把空气质量指数数字放入数字 Kafka主题中:

 public long countPollutedRegions() { List result = new LinkedList(); // key/value pairs contain string items final Serde stringSerde = Serdes.String(); // prepares and runs the data processing pipeline final StreamsBuilder builder = new StreamsBuilder(); builder.stream( "numbers" , Consumed.with(stringSerde, stringSerde)) .map((key, value) -> new KeyValue(key, Integer.valueOf(value))). filter((key, value) -> value > THRESHOLD) .foreach((key, value) -> { result.add(value.toString()); });     final Topology topology = builder.build(); final KafkaStreams streams = new KafkaStreams(topology, createKafkaStreamsConfiguration()); streams.start(); try { Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } int pollutedRegions = result.size(); System.out.println( "Number of severely polluted regions: " + pollutedRegions); streams.close(); return pollutedRegions; } private Properties createKafkaStreamsConfiguration() { Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "text-search-config" ); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092" ); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); return props; } 

我们的Kafka Stream应用程序实例将具有以下执行流程:

脉冲星函数

Apache Pulsar函数是轻量级的计算过程,可与Apache Pulsar集群一起以无服务器的方式工作。 假设我们在Pulsar集群中传输空气质量指数,我们可以编写一个函数来计算超出给定阈值的指数数量,并将结果写回到Pulsar,如下所示:

 public class PulsarFunctionsAirQualityApplication implements Function { private static final int HIGH_THRESHOLD = 10; @Override public Void process(String input, Context context) throws Exception {         int number = Integer.valueOf(input);         if (number > HIGH_THRESHOLD) { context.incrCounter( "pollutedRegions" , 1); } return null; }  } 

该函数以及Pulsar集群的执行流程如下:

Pulsar函数可以在Pulsar群集中运行,也可以作为单独的应用程序运行。

摘要

在本文中,我们简要回顾了一些可用于在Java中实现大数据处理系统的最受欢迎的框架。 所提供的每个框架都相当大,值得单独发表一篇文章。 尽管非常简单,但我们的空气质量指数数据管道却展示了这些框架的运行方式,您可以以此为基础来扩展您可能会进一步感兴趣的每个框架中的知识。 您可以在此处查看完整的代码示例。

翻译自: https://www.javacodegeeks.com/2019/12/popular-frameworks-for-big-data-processing-in-java.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/340576.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

带有Prometheus的Spring Boot和测微表第6部分:保护指标

以前&#xff0c;我们使用Prometheus成功启动了Spring Boot应用程序。 Spring应用程序中的一个端点正在公开我们的指标数据&#xff0c;以便Prometheus能够检索它们。 想到的主要问题是如何保护此信息。 Spring已经为我们提供了强大的安全框架 因此&#xff0c;将其轻松用于…

使用AWS Elastic Beanstalk轻松进行Spring Boot部署

朋友不允许朋友写用户身份验证。 厌倦了管理自己的用户&#xff1f; 立即尝试Okta的API和Java SDK。 在几分钟之内即可对任何应用程序中的用户进行身份验证&#xff0c;管理和保护。 几乎所有应用程序都依赖于身份验证。 开发人员以及雇用他们的公司都想确认谁在发出请求&…

mysql报错乱码_连接mysql服务器报错时,出现乱码

页头用了header(content-type:text/html;charsetutf-8);try{$this->dbonew PDO($dsn,$dbuser,$dbpassword);}catch(Exception $e){echo $e->getMessage();}连接失败时会报错&#xff0c;但是乱码&#xff0c;IE下编码查看是UTF-8&#xff0c;但是是乱码&#xff0c;如果选…

zookeeper 负载_ZooKeeper,策展人以及微服务负载平衡的工作方式

zookeeper 负载Zookeeper如何确保每个工人都能从工作委托经理那里愉快地完成工作。 Apache ZooKeeper是注册&#xff0c;管理和发现在不同计算机上运行的服务的工具。 当我们必须处理具有许多节点的分布式系统时&#xff0c;它是技术堆栈中必不可少的成员&#xff0c;这些节点…

高效的企业测试-集成测试(3/6)

本系列的这一部分将展示如何通过代码级以及系统级集成测试来验证我们的应用程序。 &#xff08;代码级&#xff09;集成测试 集成测试一词有时在不同的上下文中使用不同。 根据Wikipedia的定义&#xff0c;我指的是在代码级别上验证多个组件之间相互作用的测试。 通常&#x…

带Prometheus的Spring Boot和测微表第4部分:基础项目

在以前的文章中&#xff0c;我们介绍了Spring Micrometer和InfluxDB。 所以你要问我为什么普罗米修斯。 原因是Prometheus在InfluxDB的拉模型与推模型上进行操作。 这意味着&#xff0c;如果将千分尺与InfluxDB一起使用&#xff0c;则在将结果推送到数据库中时肯定会有一些开…

前端如何实现网络速度测试功能_分析Web前端测试要点,从架构原理上进行分析,希望大家能够掌握...

基于Web前端分析过程&#xff0c;大概有十几个测试要点&#xff0c;我们今天主要来讲解结合前五个要点进行详细解说。前端测试点主要针对前端展开&#xff0c;什么叫前端分析呢&#xff1f;就是我们所有的分析和测试要点所站的视角都是针对客户端或者浏览器来对系统进行分析和测…

将Websocket与Spring Framework和Vuejs结合使用

Websocket是客户端和服务器之间的全双工&#xff08;持久&#xff09;连接&#xff0c;因此两者可以彼此共享信息&#xff0c;而无需重复建立新的连接。 这消除了从客户端重复轮询以从服务器获取更新的需要。 并非所有浏览器都支持Websocket&#xff0c;因此我们利用SockJS ja…

python函数和模块的使用方法_Python学习06_函数和模块的使用

引入在写有些代码的时候&#xff0c;会发现有些步骤重复了多次&#xff0c;他也不像循环&#xff0c;都是相同的东西在重复&#xff0c;而是指做某件事情的步骤方法&#xff0c;做事的人或对象发生了改变&#xff0c;但是方法却没有改变。要想写出高质量的代码&#xff0c;首先…

tmemo 选择消除行_Divi模块,行和部分加入高级动画选项

一切元素的动画选项每个Divi模块&#xff0c;行和部分都带有高级动画选项&#xff0c;你可以使用这些选项来吸引访问者并使页面更加耀眼。Divi引入一个全新的动画系统&#xff0c;并将这些高级动画选项扩展到每个Divi模块&#xff0c;行和部分&#xff01;这些新选项已合并到一…

java8 streams_Java 8 Friday:使用Streams API时的10个细微错误

java8 streams在Data Geekery &#xff0c;我们喜欢Java。 而且&#xff0c;由于我们真的很喜欢jOOQ的流畅的API和查询DSL &#xff0c;我们对Java 8将为我们的生态系统带来什么感到非常兴奋。 Java 8星期五 每个星期五&#xff0c;我们都会向您展示一些不错的教程风格的Java …

python带参数装饰器 函数名_python 全栈开发,Day11(函数名应用,闭包,装饰器初识,带参数以及带返回值的装饰器)...

一、函数名应用函数名是什么&#xff1f;函数名是函数的名字&#xff0c;本质&#xff1a;变量&#xff0c;特殊的变量。函数名()&#xff0c;执行此函数。python 规范写法1. #后面加一个空格&#xff0c;再写内容&#xff0c;就没有波浪线了。2.一行代码写完&#xff0c;下面一…

python逐行写入excel_快来看看Python如何玩转Excel

来源&#xff1a;ID(innerV)如何用Python来操作Excel文件呢&#xff1f;首先&#xff0c;使用pip 包管理器来安装两个包&#xff0c;安装命令&#xff1a;pip install xlrd pip install xlwt我们来看读取excel的例子&#xff0c;第1行&#xff0c;import 导入xlrd包第4行&#…

Java面试准备:15个Java面试问题

并非所有的访谈都将重点放在算法和数据结构上—通常&#xff0c;访谈通常只侧重于您声称是专家的语言或技术。在此类访谈中&#xff0c;通常没有任何“陷阱”问题&#xff0c;而是它们要求您利用内存和使用该语言的经验–换句话说&#xff0c;它们测试您对编程语言的了解。 但…

mysql排插问题_MySQL一次数据插入故障记录

某天突然收到报警&#xff0c;数据库大量事务等待&#xff0c;进到数据库后发线大量的插入操作被阻塞&#xff0c;且都是同一个表的。通过 show engine innodb status 发现插入操作都是在等待索引 idx_create_time(create_time) 的 insert intention lock(跟 gap 锁互斥)&#…

纯净pe工具_微PE工具箱2.0

(特殊时期&#xff0c;在家时间多一些&#xff0c;突然想到多年的公众号&#xff0c;重启试试&#xff0c;嗯就先每一天推荐一个软件吧)微PE工具箱(WinPE)是一款非常好用的PE系统(独立的预安装环境)&#xff0c;非常纯净&#xff0c;是装机维护得力的助手。安装简单&#xff0c…

sping jdbc 链接mysql_Spring Boot JDBC 连接数据库示例

文本将对在spring Boot构建的Web应用中&#xff0c;基于MySQL数据库的几种数据库连接方式进行介绍。包括JDBC、JPA、MyBatis、多数据源和事务。JDBC 连接数据库1、属性配置文件(application.properties)spring.datasource.urljdbc:mysql://localhost:3306/testspring.datasourc…

二分查找递归与非递归的时间比较_我们说一说Python的查找算法!

相信大家在面试开发岗和算法岗时&#xff0c;评委最喜欢问的就是&#xff1a;您能给我说一下查找和排序算法有哪些&#xff1f;今天咱们就说一说Python中最常用的查找算法&#xff0c;下期我们再推出排序算法。首先要明白查找是查什么&#xff1f;我们希望能给定一个值&#xf…

jsf 自定义属性_如何在JSF中实现自定义密码强度指示器

jsf 自定义属性使用JavaScript验证密码强度是一项常见任务。 在本文中&#xff0c;我将展示如何向基于JSF的Web应用程序添加密码强度指示器。 的 PrimeFaces中的密码组件已经具有密码强度的反馈指示符&#xff0c;但是它有两个主要缺点&#xff1a; 反馈指示器没有响应&#…

OAuth 2.0 Java指南:5分钟保护您的应用程序安全

使用Okta的身份管理平台轻松部署您的应用程序 使用Okta的API在几分钟之内即可对任何应用程序中的用户进行身份验证&#xff0c;管理和保护。 今天尝试Okta。 现代应用程序依赖于用户身份验证&#xff0c;但是它可能给Java开发人员带来困难的挑战&#xff0c;以及一系列特定于框…