大数据挑战
在公司需要处理不断增长的数据量的各个领域中,对大数据的概念有不同的理解。 在大多数这些情况下,需要以某种方式设计所考虑的系统,以便能够处理该数据,而不会随着数据大小的增加而牺牲吞吐量。 从本质上讲,这导致需要构建高度可伸缩的系统,以便可以根据在给定时间点需要处理的数据量来分配更多资源。
构建这样的系统是一项耗时且复杂的活动,因此,可以使用第三方框架和库来提供现成的可伸缩性要求。 在Java应用程序中已经有很多不错的选择,本文将简要讨论一些最受欢迎的选择:
行动框架
我们将通过实现一个简单的管道来处理每个设备的数据,以测量给定区域的空气质量指数,从而演示每个框架。 为简单起见,我们假定来自设备的数字数据是分批接收或以流方式接收的。 在整个示例中,我们将使用THRESHOLD常量表示该值,在该值之上,我们认为一个区域被污染。
阿帕奇火花
在Spark中,我们需要先将数据转换为正确的格式。 我们将使用数据集,但我们也可以选择数据帧或RDD(弹性分布式数据集)作为数据表示的替代方法。 然后,我们可以应用许多Spark转换和操作,以便以分布式方式处理数据。
public long countPollutedRegions(String[] numbers) { // runs a Spark master that takes up 4 cores SparkSession session = SparkSession.builder(). appName( "AirQuality" ). master( "local[4]" ). getOrCreate(); // converts the array of numbers to a Spark dataset Dataset numbersSet = session.createDataset(Arrays.asList(numbers), Encoders.STRING()); // runs the data pipeline on the local spark long pollutedRegions = numbersSet.map(number -> Integer.valueOf(number), Encoders. INT ()) .filter(number -> number > THRESHOLD).count(); return pollutedRegions; }
如果要更改上述应用程序以从外部源读取数据,写入外部数据源并在Spark集群而不是本地Spark实例上运行,我们将具有以下执行流程:
Spark驱动程序可以是单独的实例,也可以是Spark群集的一部分。
Apache Flink
与Spark相似,我们需要在Flink DataSet中表示数据,然后对其应用必要的转换和操作:
public long countPollutedRegions(String[] numbers) throws Exception { // creates a Flink execution environment with proper configuration StreamExecutionEnvironment env = StreamExecutionEnvironment. createLocalEnvironment(); // converts the array of numbers to a Flink dataset and creates // the data pipiline DataStream stream = env.fromCollection(Arrays.asList(numbers)). map(number -> Integer.valueOf(number)) .filter(number -> number > THRESHOLD).returns(Integer. class ); long pollutedRegions = 0; Iterator numbersIterator = DataStreamUtils.collect(stream); while (numbersIterator.hasNext()) { pollutedRegions++; numbersIterator.next(); } return pollutedRegions; }
如果要更改上述应用程序以从外部源读取数据,写入外部数据源并在Flink群集上运行,我们将具有以下执行流程:
将应用程序提交到Flink群集的Flink客户端是Flink CLI实用程序或JobManager的UI。
阿帕奇风暴
在Storm中,数据管道被创建为Spouts(数据源)和Bolts(数据处理单元)的拓扑。 由于Storm通常会处理无限制的数据流,因此我们会将空气质量指数编号数组的处理模拟为有限制的流:
public void countPollutedRegions(String[] numbers) throws Exception { // builds the topology as a combination of spouts and bolts TopologyBuilder builder = new TopologyBuilder(); builder.setSpout( "numbers-spout" StormAirQualitySpout(numbers)); "numbers-spout" , new StormAirQualitySpout(numbers)); builder.setBolt( "number-bolt" , new StormAirQualityBolt()). shuffleGrouping( "numbers-spout" shuffleGrouping( "numbers-spout" ); // prepares Storm conf and along with the topology submits it for // execution to a local Storm cluster Config conf = new Config(); conf.setDebug( true ); LocalCluster localCluster = null; try { localCluster = new LocalCluster(); localCluster.submitTopology( "airquality-topology" , conf, builder.createTopology()); Thread.sleep(10000); localCluster.shutdown(); } catch (InterruptedException ex) { localCluster.shutdown(); } }
我们有一个喷嘴可以为空气质量指数编号的数组提供数据源,还有一个仅过滤指示污染区域的螺栓:
public class StormAirQualitySpout extends BaseRichSpout { private boolean emitted = false ; private SpoutOutputCollector collector; private String[] numbers; public StormAirQualitySpout(String[] numbers) { this .numbers = numbers; } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare( new Fields( "number" )); } @Override public void open(Map paramas, TopologyContext context, SpoutOutputCollector collector) { this .collector = collector; } @Override public void nextTuple() { // we make sure that the numbers array is processed just once by // the spout if (!emitted) { for (String number : numbers) { collector.emit( new Values(number)); } emitted = true ; } } }
public class StormAirQualityBolt extends BaseRichBolt { private static final int THRESHOLD = 10; private int pollutedRegions = 0; @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare( new Fields( "number" )); } @Override public void prepare(Map params, TopologyContext context, OutputCollector collector) { } @Override public void execute(Tuple tuple) { String number = tuple.getStringByField( "number" ); Integer numberInt = Integer.valueOf(number); if (numberInt > THRESHOLD) { pollutedRegions++; } } }
我们正在使用LocalCluster实例提交到本地Storm集群,这对于开发很方便,但是我们想将Storm拓扑提交到生产集群。 在这种情况下,我们将具有以下执行流程:
阿帕奇点燃
在Ignite中,我们需要先将数据放入分布式缓存中,然后再运行数据处理管道,该管道是在Ignite群集上以分布式方式执行的SQL查询的前者:
public long countPollutedRegions(String[] numbers) { IgniteConfiguration igniteConfig = new IgniteConfiguration(); CacheConfiguration cacheConfig = new CacheConfiguration(); // cache key is number index in the array and value is the number cacheConfig.setIndexedTypes(Integer. class , String. class ); cacheConfig.setName(NUMBERS_CACHE); igniteConfig.setCacheConfiguration(cacheConfig); try (Ignite ignite = Ignition.start(igniteConfig)) { IgniteCache cache = ignite.getOrCreateCache(NUMBERS_CACHE); // adds the numbers to the Ignite cache try (IgniteDataStreamer streamer = ignite.dataStreamer(cache.getName())) { int key = 0; for (String number : numbers) { streamer.addData(key++, number); } } // performs an SQL query over the cached numbers SqlFieldsQuery query = new SqlFieldsQuery( "select * from String where _val > " + THRESHOLD); FieldsQueryCursor<List> cursor = cache.query(query); int pollutedRegions = cursor.getAll().size(); return pollutedRegions; } }
如果我们要在Ignite群集中运行应用程序,它将具有以下执行流程:
榛树喷射机
Hazelcast Jet在Hazelcast IMDG之上运行,并且与Ignite相似,如果我们要处理数据,我们需要首先将其放入Hazelcast IMDG群集中:
public long countPollutedRegions(String[] numbers) { // prepares the Jet data processing pipeline Pipeline p = Pipeline.create(); p.drawFrom(Sources.list( "numbers" )). map(number -> Integer.valueOf((String) number)) .filter(number -> number > THRESHOLD).drainTo(Sinks.list( "filteredNumbers" )); JetInstance jet = Jet.newJetInstance(); IList numbersList = jet.getList( "numbers" ); numbersList.addAll(Arrays.asList(numbers)); try { // submits the pipeline in the Jet cluster jet.newJob(p).join(); // gets the filtered data from Hazelcast IMDG List filteredRecordsList = jet.getList( "filteredNumbers" ); int pollutedRegions = filteredRecordsList.size(); return pollutedRegions; } finally { Jet.shutdownAll(); } }
但是请注意,Jet还提供集成而无需外部数据源,并且不需要将数据存储在IMDG群集中。 您也可以在不首先将数据存储到列表中的情况下进行聚合(查看Github中包含改进版本的完整示例)。 感谢Hazelcast工程团队的Jaromir和Can的宝贵意见。
如果我们要在Hazelcast Jet集群中运行该应用程序,它将具有以下执行流程:
卡夫卡流
Kafka Streams是一个客户端库,使用Kafka主题作为数据处理管道的源和接收器。 为了在我们的方案中使用Kafka Streams库,我们将把空气质量指数数字放入数字 Kafka主题中:
public long countPollutedRegions() { List result = new LinkedList(); // key/value pairs contain string items final Serde stringSerde = Serdes.String(); // prepares and runs the data processing pipeline final StreamsBuilder builder = new StreamsBuilder(); builder.stream( "numbers" , Consumed.with(stringSerde, stringSerde)) .map((key, value) -> new KeyValue(key, Integer.valueOf(value))). filter((key, value) -> value > THRESHOLD) .foreach((key, value) -> { result.add(value.toString()); }); final Topology topology = builder.build(); final KafkaStreams streams = new KafkaStreams(topology, createKafkaStreamsConfiguration()); streams.start(); try { Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } int pollutedRegions = result.size(); System.out.println( "Number of severely polluted regions: " + pollutedRegions); streams.close(); return pollutedRegions; } private Properties createKafkaStreamsConfiguration() { Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "text-search-config" ); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092" ); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); return props; }
我们的Kafka Stream应用程序实例将具有以下执行流程:
脉冲星函数
Apache Pulsar函数是轻量级的计算过程,可与Apache Pulsar集群一起以无服务器的方式工作。 假设我们在Pulsar集群中传输空气质量指数,我们可以编写一个函数来计算超出给定阈值的指数数量,并将结果写回到Pulsar,如下所示:
public class PulsarFunctionsAirQualityApplication implements Function { private static final int HIGH_THRESHOLD = 10; @Override public Void process(String input, Context context) throws Exception { int number = Integer.valueOf(input); if (number > HIGH_THRESHOLD) { context.incrCounter( "pollutedRegions" , 1); } return null; } }
该函数以及Pulsar集群的执行流程如下:
Pulsar函数可以在Pulsar群集中运行,也可以作为单独的应用程序运行。
摘要
在本文中,我们简要回顾了一些可用于在Java中实现大数据处理系统的最受欢迎的框架。 所提供的每个框架都相当大,值得单独发表一篇文章。 尽管非常简单,但我们的空气质量指数数据管道却展示了这些框架的运行方式,您可以以此为基础来扩展您可能会进一步感兴趣的每个框架中的知识。 您可以在此处查看完整的代码示例。
翻译自: https://www.javacodegeeks.com/2019/12/popular-frameworks-for-big-data-processing-in-java.html