emr上使用sparkrunner运行beam数据流水线

参考资料

  • https://time.geekbang.org/column/intro/167?tab=catalog

Apache Beam和其他开源项目不太一样,它并不是一个数据处理平台,本身也无法对数据进行处理。Beam所提供的是一个统一的编程模型思想,而我们可以通过这个统一出来的接口来编写符合自己需求的处理逻辑,这个处理逻辑将会被转化成为底层运行引擎相应的API去运行

beam的编程模型需要让我们根据“WWWH”这四个问题来进行数据处理逻辑的编写

在这里插入图片描述

  1. 是现在已有的各种大数据处理平台(例如Apache Spark或者Apache Flink),在Beam中它们也被称为Runner
  2. 是可移植的统一模型层,各个Runners将会依据中间抽象出来的这个模型思想,提供一套符合这个模型的APIs出来,以供上层转换。
  3. 是SDK层。SDK层将会给工程师提供不同语言版本的API来编写数据处理逻辑,这些逻辑就会被转化成Runner中相应的API来运行。
  4. 是可扩展库层。工程师可以根据已有的Beam SDK,贡献分享出更多的新开发者SDK、IO连接器、转换操作库等等。
  5. 是应用层,各种应用将会通过下层的Beam SDK或工程师贡献的开发者SDK来实现。
  6. 社区层。全世界的工程师可以提出问题,解决问题,实现解决问题的思路。

beam编程模型主要逻辑为What、Where、When、How

  • what,要做什么计算?得到什么样的结果?Beam SDK中各种transform操作就是用来回答这个问题的。这包括我们经常使用到批处理逻辑,训练机器学习模型的逻辑等等。

  • where,计算什么时间范围的数据?这里的“时间”指的是数据的事件时间。

  • when,何时将计算结果输出?我们可以通过使用水位线和触发器配合触发计算

  • how,后续数据的处理结果如何影响之前的处理结果呢?这个问题可以通过累加模式来解决,常见的累加模式有:丢弃(结果之间是独立且不同的)、累积(后来的结果建立在先前的结果上)等等。

Beam的编程模型将所有的数据处理逻辑都分割成了这四个纬度,统一成了Beam SDK。我们在基于Beam SDK构建数据处理业务逻辑时,只需要根据业务需求,按照这四个维度调用具体的API,即可生成符合自己要求的数据处理逻辑。Beam会自动转化数据处理逻辑,并提交到具体的Runner上去执行

Beam将数据封装为PCollection,就是Parallel Collection,意思是可并行计算的数据集(PCollection和RDD十分相似)

  1. PCollection的创建完全取决于需求。比如,在测试中PCollection往往来自于代码生成的伪造数据,或者从文件中读取
PCollection<String> lines = p.apply("ReadMyFile", TextIO.read().from("protocol://path/to/some/inputData.txt"));
  1. 需要为PCollection的元素编写Coder。计算流程最终会运行在一个分布式系统。所有的数据都有可能在网络上的计算机之间相互传递。Coder就是在告诉Beam,怎样把数据类型序列化和逆序列化以方便在网络上传输。
PipelineOptions options = PipelineOptionsFactory.create();
Pipeline p = Pipeline.create(options);
CoderRegistry cr = p.getCoderRegistry();
cr.registerCoder(Integer.class, BigEndianIntegerCoder.class);
  1. PCollection是无序的
  2. PCollection没有固定大小。PCollection可以是有界的,也可以是无界的,Beam也是用window来分割持续更新的无界数据
  3. PCollection具有不可变性,PCollection不提供任何修改它所承载数据的方式。Beam的PCollection都是延迟执行(deferred execution)的模式

进一步,Beam把数据转换抽象成了有向图。PCollection是有向图中的边,而Transform是有向图里的节点(不符合直觉),因为区分节点和边的关键是看一个Transform是不是会有一个多余的输入和输出

Beam中所有的数据处理逻辑都会被抽象成数据流水线(Pipeline)来运行。数据流水线是对于数据处理逻辑的一个封装,它包括了从读取数据集,将数据集转换成想要的结果和输出结果数据集这样的一整套流程

  • 创建Beam数据流水线的同时,必须给这个流水线定义一个选项(Options)。告诉Beam用户的Pipeline应该如何运行。例如,是在本地的内存上运行,还是在Apache Flink上运行
PipelineOptions options = PipelineOptionsFactory.create();
Pipeline p = Pipeline.create(options);
  • 数据流水线中,每次PCollection经过一个Transform之后,流水线都会新创建一个PCollection出来。而这个新的PCollection又将成为下一个Transform的新输入。也可以使三个不同的Transform应用在它之上,从而再产生出三个不同的PCollection2、PCollection3和PCollection4出来

    在这里插入图片描述

流水线的底层思想其实还是动用了MapReduce的原理,在分布式环境下,整个数据流水线会启动N个Workers来同时处理PCollection。而在具体处理某一个特定Transform的时候,数据流水线会将这个Transform的输入数据集PCollection里面的元素分割成不同的Bundle,将这些Bundle分发给不同的Worker来处理

  • 具体会分配多少个Worker,以及将一个PCollection分割成多少个Bundle都是随机的,Beam数据流水线会尽可能地让整个处理流程达到完美并行(Embarrassingly Parallel)
  • 每一个Bundle在一个Worker机器里经过Transform逻辑后,也会产生出来一个新的Bundle

Beam的运行模式有直接运行模式,spark运行模式,flink运行模式等等

  • 直接运行模式的时候,Beam会在单机上用多线程来模拟分布式的并行处理。

  • spark运行模式则提供了和原生spark应用相同的数据管道

    The Apache Spark Runner can be used to execute Beam pipelines using Apache Spark. The Spark Runner can execute Spark pipelines just like a native Spark application; deploying a self-contained application for local mode, running on Spark’s Standalone RM, or using YARN or Mesos.

Beam目前只支持3.2.x版本spark,并且Beam和spark的版本对应关系不一致会引起让人困惑的问题

控制台启动emr-6.7.0集群,spark版本为3.2.1

在本地ide中进行如下编码

pom依赖配置

  • beam依赖为2.36.0,https://beam.apache.org/documentation/runners/spark/#deploying-spark-with-your-application
  • beam2.1.x版本持续出现beam java.lang.NoSuchMethodError: org.apache.spark.api.java.JavaPairRDD.flatMapValues报错
  • 如果使用directrunner本地测试需要beam-sdks-java-io-amazon-web-services依赖,否则出现s3 filesystem找不到错误
  • 指定awsreigon配置项需要beam-sdks-java-io-amazon-web-services依赖
  • 通过maven-shade-plugin插件将代码打包为super jar
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>athenaconnect</artifactId><version>1.0</version><name>Archetype - athenaconnect</name><url>http://maven.apache.org</url><dependencies><dependency><groupId>org.apache.beam</groupId><artifactId>beam-runners-spark</artifactId><version>2.36.0</version></dependency><dependency><groupId>org.apache.beam</groupId><artifactId>beam-sdks-java-io-amazon-web-services</artifactId><version>2.36.0</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>2.0.5</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-reload4j</artifactId><version>2.0.5</version></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><configuration><source>8</source><target>8</target></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.2.4</version><configuration><createDependencyReducedPom>false</createDependencyReducedPom><filters><filter><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters></configuration><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><shadedArtifactAttached>true</shadedArtifactAttached><shadedClassifierName>shaded</shadedClassifierName><transformers><transformerimplementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/></transformers></configuration></execution></executions></plugin></plugins></build>
</project>

仿照官方文档示例编写wordcount,https://beam.apache.org/get-started/quickstart-java/

package com.example;//import org.apache.beam.runners.direct.DirectRunner;
import org.apache.beam.runners.spark.SparkRunner;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.aws.options.AwsOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.FlatMapElements;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptors;import java.util.Arrays;public class BeamWC {public static void main(String[] args) {PipelineOptions options = PipelineOptionsFactory.create();options.setRunner(SparkRunner.class); //强制指定SparkRunner,也可以在运行时通过--runner SparkRunner指定options.as(AwsOptions.class).setAwsRegion("cn-north-1");//options.setRunner(DirectRunner.class); // 本地测试强制指定为direct runnerPipeline p = Pipeline.create(options);PCollection<String> lines = p.apply(TextIO.read().from("s3://bucketname/shakespeare/*"));PCollection<String> words = lines.apply("ExtractWords", FlatMapElements.into(TypeDescriptors.strings()).via((String word) -> Arrays.asList(word.split("[^\\p{L}]+"))));PCollection<KV<String, Long>> counts = words.apply(Count.<String>perElement());PCollection<String> formatted = counts.apply("FormatResults", MapElements.into(TypeDescriptors.strings()).via((KV<String, Long> wordCount) -> wordCount.getKey() + ": " + wordCount.getValue()));formatted.apply(TextIO.write().to("s3://bucketname/beamoutput/shakespeare"));p.run().waitUntilFinish();}
}

通过maven打包为jar包,并上传到master节点上,提交任务

  • 由于代码中已经指定了runner,因此这里没有使用–runner
  • --deploy-mode client将driver启动在master节点方便查看日志
spark-submit --class com.example.BeamWC --master yarn --deploy-mode client athenaconnect-1.0-shaded.jar

一些关键的日志

23/12/21 06:08:27 INFO FileBasedSource: Filepattern s3://bucketname/shakespeare/* matched 2 files with total size 300
23/12/21 06:08:27 INFO FileBasedSource: Splitting filepattern s3://bucketname/shakespeare/* into bundles of size 150 took 44 ms and produced 2 files and 2 bundles

确认spark任务的运行模式为yarn

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/59952.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

AUTOSAR CP SocketAdaptor(SoAd)规范导读

《AUTOSAR_SWS_SocketAdaptor》规范的主要内容包括&#xff1a; 简介和功能概述&#xff1a;说明了 AUTOSAR 基本软件模块 Socket Adaptor&#xff08;SoAd&#xff09;的功能、API 和配置。数据传输的 TCP/IP 概念在计算和电信环境中已成为标准&#xff0c;应用程序的寻址等…

代码随想录-栈和队列-用栈实现队列

问题描述 题目描述中有说不存在空栈的pop和peek&#xff0c;所以无需判断这个 解析 重点在于思路&#xff0c;代码白给。 要用栈实现队列&#xff0c;肯定是两个栈才可以。一个做入队操作&#xff0c;一个做出队操作。 首先入队简单&#xff0c;往栈里加就完事了。 出队复…

【设计模式】结构型模式(四):组合模式、享元模式

《设计模式之结构型模式》系列&#xff0c;共包含以下文章&#xff1a; 结构型模式&#xff08;一&#xff09;&#xff1a;适配器模式、装饰器模式结构型模式&#xff08;二&#xff09;&#xff1a;代理模式结构型模式&#xff08;三&#xff09;&#xff1a;桥接模式、外观…

轻型民用无人驾驶航空器安全操控------理论考试多旋翼部分笔记

官网&#xff1a;民用无人驾驶航空器综合管理平台 (caac.gov.cn) 说明&#xff1a;一是法规部分&#xff1b;二是多旋翼部分 本笔记全部来源于轻型民用无人驾驶航空器安全操控视频讲解平台 目录 官网&#xff1a;民用无人驾驶航空器综合管理平台 (caac.gov.cn) 一、轻型民用无人…

【leetcode练习·二叉树】用「分解问题」思维解题 I

本文参考labuladong算法笔记[【强化练习】用「分解问题」思维解题 I | labuladong 的算法笔记] 105. 从前序与中序遍历序列构造二叉树 | 力扣 | LeetCode | 给定两个整数数组 preorder 和 inorder &#xff0c;其中 preorder 是二叉树的先序遍历&#xff0c; inorder 是同一棵…

深入解析四种核心网络设备:集线器、桥接器、路由器和交换机

计算机网络系列课程《网络核心设备》 在现代网络技术中&#xff0c;集线器、桥接器、路由器和交换机扮演着至关重要的角色。本文&#xff0c;将深入探讨这四种设备的功能、工作原理及其在网络架构中的重要性。 集线器&#xff1a;基础网络连接设备 集线器&#xff08;Hub&…

宏景eHR uploadLogo.do 任意文件上传致RCE漏洞复现

0x01 产品简介 宏景eHR人力资源管理软件是一款专为复杂单组织或多组织客户设计的人力资源管理软件,融合了最新的互联网技术和先进的人力资源管理理念和实践。宏景eHR软件支持B/S架构,特别适合集团化管理和跨地域使用。它提供了全面的人力资源管理功能,包括人员、组织机构、…

ssm基于JAVA的网上订餐管理系统+vue

系统包含&#xff1a;源码论文 所用技术&#xff1a;SpringBootVueSSMMybatisMysql 免费提供给大家参考或者学习&#xff0c;获取源码看文章最下面 需要定制看文章最下面 目 录 目 录 I 摘 要 III ABSTRACT IV 1 绪论 1 1.1 课题背景 1 1.2 研究现状 1 1.3 研究内容…

NVR设备ONVIF接入平台EasyCVR私有化部署视频平台如何安装欧拉OpenEuler 20.3 MySQL

在当今数字化时代&#xff0c;安防视频监控系统已成为保障公共安全和个人财产安全的重要工具。NVR设备ONVIF接入平台EasyCVR作为一款功能强大的智能视频监控管理平台&#xff0c;它不仅提供了视频远程监控、录像、存储与回放等基础功能&#xff0c;还涵盖了视频转码、视频快照、…

测试网空投进行中 — 全面了解 DePIN 赛道潜力项目 ICN Protocol 及其不可错过的早期红利

随着云计算技术的飞速发展&#xff0c;越来越多的企业和个人对云服务的需求变得多样化且复杂化。然而&#xff0c;传统的中心化云服务平台&#xff08;如AWS、微软Azure等&#xff09;往往存在着高成本、数据隐私保护不足以及灵活性差等问题。 为了解决这些挑战&#xff0c;Imp…

CulturalBench :一个旨在评估大型语言模型在全球不同文化背景下知识掌握情况的基准测试数据集

2024-10-04&#xff0c;为了提升大型语言模型在不同文化背景下的实用性&#xff0c;华盛顿大学、艾伦人工智能研究所等机构联合创建了CulturalBench。这个数据集包含1,227个由人类编写和验证的问题&#xff0c;覆盖了包括被边缘化地区在内的45个全球区域。CulturalBench的推出&…

CAD VBA 图元颜色跟随图层

效果如下&#xff1a; 一、所有图元颜色为bylayer Sub 图元颜色跟随图层() Dim item As AcadEntityFor Each item In ThisDrawing.ModelSpace item.color acByLayer Next ThisDrawing.Regen acActiveViewport End Sub二、与图层颜色相同&#xff08;不是bylayer&#xff09;:…

介绍一下数组(c基础)(smart 版)

c初期&#xff0c;记住规则&#xff0c;用规则。 我只是介绍规则。&#xff08;有详细版&#xff0c;这适合smart人看&#xff09; 数组&#xff08;同类型&#xff09; int arr[n] {} ; int 是 元素类型。 int arr[n] {} ; arr为标识符。 {} 集合&#xff0c;元素有次…

【数据结构】插入排序——直接插入排序 和 希尔排序

直接插入排序 和 希尔排序 一、直接插入排序二、直接插入排序的弊端三、希尔排序&#xff08;1&#xff09;对插入排序的联想&#xff08;2&#xff09;希尔排序的思路 四、直接插入排序和希尔排序效率对比1>随机生成10000个数2>我们随机生成100000个数3>我们随机生成…

python使用turtle画图快速入门,轻松完成作业练习

turtle介绍 turtle是一个绘图库&#xff0c;可以通过编程进行绘图。其模拟了一个乌龟在屏幕上的运动过程。该库通常用于给青少年学习编程&#xff0c;当然&#xff0c;也可以使用其进行作图。 在一些学校中&#xff0c;可能在python学习的课程中&#xff0c;要求完成turtle绘…

K8S群集调度二

一、污点(Taint) 和 容忍(Tolerations) 1.1、污点(Taint) 设置在node上是对pod的一种作用 节点的亲和性&#xff0c;是Pod的一种属性&#xff08;偏好或硬性要求&#xff09;&#xff0c;它使Pod被吸引到一类特定的节点 而Taint 则相反&#xff0c;它使节点能够排斥一类特…

分布式唯一ID生成(二): leaf

文章目录 本系列前言号段模式双buffer优化biz优化动态step源码走读 雪花算法怎么设置workerId解决时钟回拨源码走读 总结 本系列 漫谈分布式唯一ID分布式唯一ID生成&#xff08;二&#xff09;&#xff1a;leaf&#xff08;本文&#xff09;分布式唯一ID生成&#xff08;三&am…

MVDR:最小方差无失真响应技术解析

目录 什么是MVDR&#xff1f;MVDR的工作原理主要步骤MVDR的应用场景MVDR的优势与挑战结论 什么是MVDR&#xff1f; MVDR&#xff08;Minimum Variance Distortionless Response&#xff0c;最小方差无失真响应&#xff09;是一种用于信号处理中的自适应滤波技术&#xff0c;广…

Flink安装和Flink CDC实现数据同步

一&#xff0c;Flink 和Flink CDC 1&#xff0c; Flink Apache Flink是一个框架和分布式处理引擎&#xff0c;用于对无界和有界数据流进行有状态计算。 中文文档 Apache Flink Documentation | Apache Flink 官方文档 &#xff1a;https://flink.apache.org Flink 中文社区…

【React.js】AntDesignPro左侧菜单栏栏目名称不显示的解决方案

作者&#xff1a;CSDN-PleaSure乐事 欢迎大家阅读我的博客 希望大家喜欢 使用环境&#xff1a;WebStorm 目录 问题概述 原因 解决方案 解决方法 潜在问题修改 最终效果呈现 额外内容 管理员界面路由配置 WebStorm背景更换 法一&#xff1a; 法二&#xff1a; 问题概…