Java大数据处理的流行框架

大数据挑战

在公司需要处理不断增长的数据量的各个领域中,对大数据的概念有不同的理解。 在大多数这些情况下,需要以某种方式设计所考虑的系统,以便能够处理该数据,而不会随着数据大小的增加而牺牲吞吐量。 从本质上讲,这导致需要构建高度可伸缩的系统,以便可以根据在给定时间点需要处理的数据量来分配更多资源。

构建这样的系统是一项耗时且复杂的活动,因此,可以使用第三方框架和库来提供现成的可伸缩性要求。 在Java应用程序中已经有很多不错的选择,本文将简要讨论一些最受欢迎的选择:

1个大数据

行动框架

我们将通过实现一个简单的管道来处理每个设备的数据,以测量给定区域的空气质量指数,从而演示每个框架。 为简单起见,我们假定来自设备的数字数据是分批接收或以流方式接收的。 在整个示例中,我们将使用THRESHOLD常量表示该值,在该值之上,我们认为一个区域被污染。

阿帕奇火花

在Spark中,我们需要先将数据转换为正确的格式。 我们将使用数据集,但我们也可以选择数据帧或RDD(弹性分布式数据集)作为数据表示的替代方法。 然后,我们可以应用许多Spark转换和操作,以便以分布式方式处理数据。

 public long countPollutedRegions(String[] numbers) { // runs a Spark master that takes up 4 cores SparkSession session = SparkSession.builder(). appName( "AirQuality" ). master( "local[4]" ). getOrCreate(); // converts the array of numbers to a Spark dataset Dataset numbersSet = session.createDataset(Arrays.asList(numbers), Encoders.STRING());         // runs the data pipeline on the local spark long pollutedRegions = numbersSet.map(number -> Integer.valueOf(number), Encoders. INT ()) .filter(number -> number > THRESHOLD).count();                 return pollutedRegions; } 

如果要更改上述应用程序以从外部源读取数据,写入外部数据源并在Spark集群而不是本地Spark实例上运行,我们将具有以下执行流程:

2大数据

Spark驱动程序可以是单独的实例,也可以是Spark群集的一部分。

Apache Flink

与Spark相似,我们需要在Flink DataSet中表示数据,然后对其应用必要的转换和操作:

 public long countPollutedRegions(String[] numbers) throws Exception { // creates a Flink execution environment with proper configuration StreamExecutionEnvironment env = StreamExecutionEnvironment. createLocalEnvironment(); // converts the array of numbers to a Flink dataset and creates // the data pipiline DataStream stream = env.fromCollection(Arrays.asList(numbers)). map(number -> Integer.valueOf(number)) .filter(number -> number > THRESHOLD).returns(Integer. class ); long pollutedRegions = 0; Iterator numbersIterator = DataStreamUtils.collect(stream); while (numbersIterator.hasNext()) { pollutedRegions++; numbersIterator.next(); } return pollutedRegions; } 

如果要更改上述应用程序以从外部源读取数据,写入外部数据源并在Flink群集上运行,我们将具有以下执行流程:

3大数据

将应用程序提交到Flink群集的Flink客户端是Flink CLI实用程序或JobManager的UI。

阿帕奇风暴

在Storm中,数据管道被创建为Spouts(数据源)和Bolts(数据处理单元)的拓扑。 由于Storm通常会处理无限制的数据流,因此我们会将空气质量指数编号数组的处理模拟为有限制的流:

 public void countPollutedRegions(String[] numbers) throws Exception { // builds the topology as a combination of spouts and bolts TopologyBuilder builder = new TopologyBuilder(); builder.setSpout( "numbers-spout" StormAirQualitySpout(numbers)); "numbers-spout" , new StormAirQualitySpout(numbers)); builder.setBolt( "number-bolt" , new StormAirQualityBolt()). shuffleGrouping( "numbers-spout" shuffleGrouping( "numbers-spout" );         // prepares Storm conf and along with the topology submits it for // execution to a local Storm cluster Config conf = new Config(); conf.setDebug( true ); LocalCluster localCluster = null; try { localCluster = new LocalCluster(); localCluster.submitTopology( "airquality-topology" , conf, builder.createTopology()); Thread.sleep(10000); localCluster.shutdown(); } catch (InterruptedException ex) { localCluster.shutdown(); } } 

我们有一个喷嘴可以为空气质量指数编号的数组提供数据源,还有一个仅过滤指示污染区域的螺栓:

 public class StormAirQualitySpout extends BaseRichSpout { private boolean emitted = false ; private SpoutOutputCollector collector; private String[] numbers; public StormAirQualitySpout(String[] numbers) { this .numbers = numbers; }     @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare( new Fields( "number" )); } @Override public void open(Map paramas, TopologyContext context, SpoutOutputCollector collector) { this .collector = collector; } @Override public void nextTuple() { // we make sure that the numbers array is processed just once by // the spout if (!emitted) { for (String number : numbers) { collector.emit( new Values(number)); } emitted = true ; } }  } 
 public class StormAirQualityBolt extends BaseRichBolt { private static final int THRESHOLD = 10; private int pollutedRegions = 0; @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare( new Fields( "number" )); } @Override public void prepare(Map params,  TopologyContext context,  OutputCollector collector) { } @Override public void execute(Tuple tuple) { String number = tuple.getStringByField( "number" ); Integer numberInt = Integer.valueOf(number); if (numberInt > THRESHOLD) { pollutedRegions++; } }  } 

我们正在使用LocalCluster实例提交到本地Storm集群,这对于开发很方便,但是我们想将Storm拓扑提交到生产集群。 在这种情况下,我们将具有以下执行流程:

4大数据

阿帕奇点燃

在Ignite中,我们需要先将数据放入分布式缓存中,然后再运行数据处理管道,该管道是在Ignite群集上以分布式方式执行的SQL查询的前者:

 public long countPollutedRegions(String[] numbers) { IgniteConfiguration igniteConfig = new IgniteConfiguration(); CacheConfiguration cacheConfig = new CacheConfiguration(); // cache key is number index in the array and value is the number cacheConfig.setIndexedTypes(Integer. class , String. class ); cacheConfig.setName(NUMBERS_CACHE); igniteConfig.setCacheConfiguration(cacheConfig);         try (Ignite ignite = Ignition.start(igniteConfig)) { IgniteCache cache = ignite.getOrCreateCache(NUMBERS_CACHE); // adds the numbers to the Ignite cache try (IgniteDataStreamer streamer = ignite.dataStreamer(cache.getName())) { int key = 0; for (String number : numbers) { streamer.addData(key++, number); } } // performs an SQL query over the cached numbers SqlFieldsQuery query = new SqlFieldsQuery( "select * from String where _val > " + THRESHOLD);             FieldsQueryCursor<List> cursor = cache.query(query); int pollutedRegions = cursor.getAll().size(); return pollutedRegions; }  } 

如果我们要在Ignite群集中运行应用程序,它将具有以下执行流程:

榛树喷射机

Hazelcast Jet在Hazelcast IMDG之上运行,并且与Ignite相似,如果我们要处理数据,我们需要首先将其放入Hazelcast IMDG群集中:

 public long countPollutedRegions(String[] numbers) { // prepares the Jet data processing pipeline Pipeline p = Pipeline.create(); p.drawFrom(Sources.list( "numbers" )). map(number -> Integer.valueOf((String) number)) .filter(number -> number > THRESHOLD).drainTo(Sinks.list( "filteredNumbers" )); JetInstance jet = Jet.newJetInstance(); IList numbersList = jet.getList( "numbers" ); numbersList.addAll(Arrays.asList(numbers)); try { // submits the pipeline in the Jet cluster jet.newJob(p).join(); // gets the filtered data from Hazelcast IMDG List filteredRecordsList = jet.getList( "filteredNumbers" ); int pollutedRegions = filteredRecordsList.size(); return pollutedRegions; } finally { Jet.shutdownAll(); } } 

但是请注意,Jet还提供集成而无需外部数据源,并且不需要将数据存储在IMDG群集中。 您也可以在不首先将数据存储到列表中的情况下进行聚合(查看Github中包含改进版本的完整示例)。 感谢Hazelcast工程团队的Jaromir和Can的宝贵意见。

如果我们要在Hazelcast Jet集群中运行该应用程序,它将具有以下执行流程:

卡夫卡流

Kafka Streams是一个客户端库,使用Kafka主题作为数据处理管道的源和接收器。 为了在我们的方案中使用Kafka Streams库,我们将把空气质量指数数字放入数字 Kafka主题中:

 public long countPollutedRegions() { List result = new LinkedList(); // key/value pairs contain string items final Serde stringSerde = Serdes.String(); // prepares and runs the data processing pipeline final StreamsBuilder builder = new StreamsBuilder(); builder.stream( "numbers" , Consumed.with(stringSerde, stringSerde)) .map((key, value) -> new KeyValue(key, Integer.valueOf(value))). filter((key, value) -> value > THRESHOLD) .foreach((key, value) -> { result.add(value.toString()); });     final Topology topology = builder.build(); final KafkaStreams streams = new KafkaStreams(topology, createKafkaStreamsConfiguration()); streams.start(); try { Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } int pollutedRegions = result.size(); System.out.println( "Number of severely polluted regions: " + pollutedRegions); streams.close(); return pollutedRegions; } private Properties createKafkaStreamsConfiguration() { Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "text-search-config" ); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092" ); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); return props; } 

我们的Kafka Stream应用程序实例将具有以下执行流程:

脉冲星函数

Apache Pulsar函数是轻量级的计算过程,可与Apache Pulsar集群一起以无服务器的方式工作。 假设我们在Pulsar集群中传输空气质量指数,我们可以编写一个函数来计算超出给定阈值的指数数量,并将结果写回到Pulsar,如下所示:

 public class PulsarFunctionsAirQualityApplication implements Function { private static final int HIGH_THRESHOLD = 10; @Override public Void process(String input, Context context) throws Exception {         int number = Integer.valueOf(input);         if (number > HIGH_THRESHOLD) { context.incrCounter( "pollutedRegions" , 1); } return null; }  } 

该函数以及Pulsar集群的执行流程如下:

Pulsar函数可以在Pulsar群集中运行,也可以作为单独的应用程序运行。

摘要

在本文中,我们简要回顾了一些可用于在Java中实现大数据处理系统的最受欢迎的框架。 所提供的每个框架都相当大,值得单独发表一篇文章。 尽管非常简单,但我们的空气质量指数数据管道却展示了这些框架的运行方式,您可以以此为基础来扩展您可能会进一步感兴趣的每个框架中的知识。 您可以在此处查看完整的代码示例。

翻译自: https://www.javacodegeeks.com/2019/12/popular-frameworks-for-big-data-processing-in-java.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/340576.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

工信部python证书多少钱_python requests SSL证书问题

错误信息如下&#xff1a;requests.exceptions.SSLError: ("bad handshake: Error([(SSL routines, tls_process_server_certificate, certificate verify failed)],)",)python做爬虫&#xff0c;对于有的网站&#xff0c;需要验证证书&#xff0c;比如&#xff1a;1…

mysql binlog线程恢复_使用MySQL SQL线程回放Binlog实现恢复

[toc]1. 需求部分1.1 基于MySQL复制同步特性&#xff0c;尝试使用Replication的SQL线程来回放binlog&#xff0c;可基于以下逻辑模拟场景做全量xtrabackup备份模拟日常备份执行sysbench压测4张表&#xff0c;20个线程&#xff0c;压测10分钟&#xff0c;模拟大量binlog删除实例…

带有Prometheus的Spring Boot和测微表第6部分:保护指标

以前&#xff0c;我们使用Prometheus成功启动了Spring Boot应用程序。 Spring应用程序中的一个端点正在公开我们的指标数据&#xff0c;以便Prometheus能够检索它们。 想到的主要问题是如何保护此信息。 Spring已经为我们提供了强大的安全框架 因此&#xff0c;将其轻松用于…

列举python中常用的数据类型_列举Python常用数据类型并尽量多的写出其中的方法...

#1 把字符串的第一个字符大写string.capitalize()#2 返回一个原字符串居中,并使用空格填充至长度 width 的新字符串string.center(width)#3 返回 str 在 string 里面出现的次数&#xff0c;如果 beg 或者 end 指定则返回指定范围内 str 出现的次数string.count(str, beg0, endl…

mysql 二元分词_MySQL 中文分词原理

一&#xff0c;首先我们来了解一下其他几个知识点&#xff1a;1. Mysql的索引意义&#xff1f;索引是加快访问表内容的基本手段&#xff0c;尤其是在涉及多个表的关联查询里。当然&#xff0c;索引可以加快检索速度&#xff0c;但是它也同时降低了索引列的插入&#xff0c;删除…

python 元类 type_Python 使用元类type创建类对象常见应用详解

本文实例讲述了Python 使用元类type创建类对象。分享给大家供大家参考&#xff0c;具体如下&#xff1a;type("123") 可以查看变量的类型;同时 type("类名",(父类),{类属性:值,类属性2:值}) 可以创建一个类。在Python中不建议一个函数具有不同的功能(重载)…

使用AWS Elastic Beanstalk轻松进行Spring Boot部署

朋友不允许朋友写用户身份验证。 厌倦了管理自己的用户&#xff1f; 立即尝试Okta的API和Java SDK。 在几分钟之内即可对任何应用程序中的用户进行身份验证&#xff0c;管理和保护。 几乎所有应用程序都依赖于身份验证。 开发人员以及雇用他们的公司都想确认谁在发出请求&…

mysql报错乱码_连接mysql服务器报错时,出现乱码

页头用了header(content-type:text/html;charsetutf-8);try{$this->dbonew PDO($dsn,$dbuser,$dbpassword);}catch(Exception $e){echo $e->getMessage();}连接失败时会报错&#xff0c;但是乱码&#xff0c;IE下编码查看是UTF-8&#xff0c;但是是乱码&#xff0c;如果选…

自学python条件_自学Python2.8-条件(if、if...else)

自学Python2.8-条件(if、if...else)1.if 判断语句if语句是用来进行判断的&#xff0c;其使用格式如下&#xff1a;if 要判断的条件:条件成立时&#xff0c;要做的事情当“判断条件”成立(True)时&#xff0c;才执行语句&#xff1b;反之&#xff0c;则不执行。执行语句可以为多…

mac lion 安装 mysql_mac osx下安装mysql

操作系统版本&#xff1a;mac osx 10.11mysql版本&#xff1a;官网下载dmg v5.6.33 https://www.mysql.com/安装步骤1.双击dmg安装2.开启mysql服务系统偏好设置-底部-mysql-打开服务这个时候还不能使用mysql命令&#xff0c;需要配置mysql命令的路径。3.配置环境变量mysql的路…

python爬虫实训日志_Python学习学习日志——爬虫《第一篇》(BeautifulSoup)

爬虫简介(学习日志第一篇)一、爬虫介绍爬虫&#xff1a;一段自动抓取互联网信息的程序&#xff0c;从互联网上抓取对于我们有价值的信息。二、Pyyhon爬虫架构Python 爬虫架构主要由五个部分组成&#xff0c;分别是调度器、URL管理器、网页下载器、网页解析器、应用程序(爬取的有…

zookeeper 负载_ZooKeeper,策展人以及微服务负载平衡的工作方式

zookeeper 负载Zookeeper如何确保每个工人都能从工作委托经理那里愉快地完成工作。 Apache ZooKeeper是注册&#xff0c;管理和发现在不同计算机上运行的服务的工具。 当我们必须处理具有许多节点的分布式系统时&#xff0c;它是技术堆栈中必不可少的成员&#xff0c;这些节点…

mysql error handler_MySql错误处理(二) - Condition Handle

20.2.10.2. DECLARE处理程序DECLARE handler_type HANDLER FOR condition_value[,...] sp_statementhandler_type:CONTINUE| EXIT| UNDOcondition_value:SQLSTATE [VALUE] sqlstate_value| condition_name| SQLWARNING| NOT FOUND| SQLEXCEPTION| mysql_error_code这个语句指定…

方程组的直接解法和迭代法 python_数据与算法总结——基本数值算法2(线性方程组)...

4 基本数值算法4.2 线性方程组4.2.1 线性方程组的特性解的存在性和唯一性满足下面条件之一&#xff0c;A非奇异&#xff0c;可逆&#xff1a;如果b属于A的列向量张成的空间&#xff0c;则称方程组是相容的。范数需要满足次可加性&#xff08;三角不等式&#xff09;。对于n维矢…

高效的企业测试-集成测试(3/6)

本系列的这一部分将展示如何通过代码级以及系统级集成测试来验证我们的应用程序。 &#xff08;代码级&#xff09;集成测试 集成测试一词有时在不同的上下文中使用不同。 根据Wikipedia的定义&#xff0c;我指的是在代码级别上验证多个组件之间相互作用的测试。 通常&#x…

mysql level用法_MYSQL使用方法

1.查询一张表&#xff1a; select * from 表名&#xff1b;2.查询指定字段&#xff1a;select 字段1&#xff0c;字段2&#xff0c;字段3….from 表名&#xff1b;3.where条件查询&#xff1a;select 字段1&#xff0c;字段2&#xff0c;字段3 frome 表名 where 条件表达式…

python程序设计之文件_Python程序设计之文件操作(2)

print(sub_path)if os.path.isdir(sub_path):visitdir(sub_path)path1C:UsersQinHsiuPythonProjectsStringoovisitdir(path1)方法二&#xff1a;使用walk()函数来实现#方法二,通过walk()方法指定遍历目录def visidir2(path):if not os.path.isdir(path):print(error!,endn)retu…

带Prometheus的Spring Boot和测微表第4部分:基础项目

在以前的文章中&#xff0c;我们介绍了Spring Micrometer和InfluxDB。 所以你要问我为什么普罗米修斯。 原因是Prometheus在InfluxDB的拉模型与推模型上进行操作。 这意味着&#xff0c;如果将千分尺与InfluxDB一起使用&#xff0c;则在将结果推送到数据库中时肯定会有一些开…

前端如何实现网络速度测试功能_分析Web前端测试要点,从架构原理上进行分析,希望大家能够掌握...

基于Web前端分析过程&#xff0c;大概有十几个测试要点&#xff0c;我们今天主要来讲解结合前五个要点进行详细解说。前端测试点主要针对前端展开&#xff0c;什么叫前端分析呢&#xff1f;就是我们所有的分析和测试要点所站的视角都是针对客户端或者浏览器来对系统进行分析和测…

mysql数据库表中的类型_MySQL数据库中表类型MyISAM与InnoDB的区别

MyISAM 和 InnoDB 讲解InnoDB和MyISAM是许多人在使用MySQL时最常用的两个表类型&#xff0c;这两个表类型各有优劣&#xff0c;视具体应用而定。基本的差别为&#xff1a;MyISAM类型不支持事务处理等高级处理&#xff0c;而InnoDB类型支持。MyISAM类型的表强调的是性能&#xf…