flink sql设置并行度_Flink集成Hivestream模式用例

01

背景

基于前面的文章

Flink集成hive bath模式用例

knowfarhhy,公众号:大数据摘文Flink 集成Hive

,我们继续介绍stream模式下的用例。

02

流模式读取Hive

        EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();        TableEnvironment tableEnv = TableEnvironment.create(bsSettings);                //增加hive支持        String name            = "myhive";        String defaultDatabase = "dim";        String version         = "1.2.1";         String hivecondir  = System.getenv("HIVE_CONF_DIR");        HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hivecondir, version);        log.info("注册catalog");        tableEnv.registerCatalog(name, hive);        log.info("注册catalog完成");        log.info("使用catalog");        tableEnv.useCatalog(name);        log.info("注册database");        tableEnv.useDatabase(defaultDatabase);                tableEnv.sqlQuery("SELECT name,age,score,dt FROM myhive.dim.dim_flink_test").printSchema();        String[] fields = new String[4];        fields[0] = "name";        fields[1] = "age";        fields[2] = "score";        fields[3] = "dt";        TypeInformation[] fieldType = new TypeInformation[4];        fieldType[0] = Types.STRING;        fieldType[1] = Types.INT;        fieldType[2] = Types.LONG;        fieldType[3] = Types.STRING;        PrintTableUpsertSink sink = new PrintTableUpsertSink(fields,fieldType,true);        tableEnv.registerTableSink("inserttable",sink);               tableEnv.sqlUpdate("INSERT INTO inserttable SELECT name,age,score,dt FROM myhive.dim.dim_flink_test");               tableEnv.execute("stream_read_hive");

03

运行拓扑

afb233a70ad7173f8f3a4937ee0ede28.png

上图展示了第二节中的测试用例任务的拓扑图,我们会发现在流任务中出现了Finish这样的最终状态,而不是一个Running状态,这个主要是目前1.10版本支持Hive的功能没有那么完善,无法真正的实时读取Hive数据,以及无法检测Hive数据发生改变情况,只会在任务运行时候读取一次表数据,然后Hive相关的算子任务便会结束。如果想要更好的使用Hive,建议大家还是用Flink 1.11之后,功能更加强大完善。

为了方便看流任务,有Finished状态,提供另外一个流任务的拓扑图,便于看到区别:

d132cbde661e9121a68a065526d82269.png

具体的流方式读取Hive,即Hive Streaming,在Flink 1.11进行了相关的支持,这里提供几篇参考文章

f19ff18f78f2c41074c0b0e40347efee.png

相关Hive Streaming文章

Flink 1.11 新特性之 SQL Hive Streaming 简单示例

LittleMagic,公众号:Flink 中文社区Flink 1.11 新特性之 SQL Hive Streaming 简单示例

Flink x Zeppelin ,Hive Streaming 实战解析

狄杰@蘑菇街,公众号:Flink 中文社区Flink x Zeppelin ,Hive Streaming 实战解析

Flink SQL FileSystem Connector 分区提交与自定义小文件合并策略 

LittleMagic,公众号:Flink 中文社区Flink SQL FileSystem Connector 分区提交与自定义小文件合并策略 ​

Flink 1.11 SQL 使用攻略

李劲松,公众号:Flink 中文社区Flink 1.11 SQL 使用攻略

04

注意事项

任务运行环境:

(1)设置Job默认并行度为2

(2)基于K8s运行,申请了一个JobManager 以及一个 TaskManager

(3)TaskManager设置了8个Slot

上面的拓扑中,我们可以看到第一个算子的并行度是8,第二个算子是2,任务正常执行,是因为增加了其他设置才使得任务正常运行。

但是你可能会遇到下面情况:

c3bee7bcc7ab2f419f0ca94d9d34a9f9.png

第一个算子并行度是10,第二个算子并行度是2,因为集群只有8个Slot可用,就会导致资源不够,任务一直处于created状态,最终超时失败。

问题分析:

下面展示了设置HiveTableSource的并行度:

        int parallelism = conf.get(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM);    if (conf.getBoolean(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM)) {      int max = conf.getInteger(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MAX);      if (max < 1) {        throw new IllegalConfigurationException(            HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MAX.key() +                " cannot be less than 1");      }      int splitNum;      try {        long nano1 = System.nanoTime();        splitNum = inputFormat.createInputSplits(0).length;        long nano2 = System.nanoTime();        LOG.info(            "Hive source({}}) createInputSplits use time: {} ms",            tablePath,            (nano2 - nano1) / 1_000_000);      } catch (IOException e) {        throw new FlinkHiveException(e);      }      parallelism = Math.min(splitNum, max);    }    parallelism = limit > 0 ? Math.min(parallelism, (int) limit / 1000) : parallelism;    parallelism = Math.max(1, parallelism);    source.setParallelism(parallelism);

涉及的相关参数:

public class HiveOptions {  public static final ConfigOption TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER =      key("table.exec.hive.fallback-mapred-reader")          .defaultValue(false)          .withDescription(              "If it is false, using flink native vectorized reader to read orc files; " +                  "If it is true, using hadoop mapred record reader to read orc files.");  public static final ConfigOption TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM =      key("table.exec.hive.infer-source-parallelism")          .defaultValue(true)          .withDescription(              "If is false, parallelism of source are set by config.\n" +              "If is true, source parallelism is inferred according to splits number.\n");  public static final ConfigOption TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MAX =      key("table.exec.hive.infer-source-parallelism.max")          .defaultValue(1000)          .withDescription("Sets max infer parallelism for source operator.");}
table.exec.hive.infer-source-parallelism      - true : 并行度通过推导得到,依赖splits 数量      -  false : 通过config获得并行度table.exec.resource.default-parallelism    设置所有Operator(例如join agg filter等)的默认并行度table.exec.hive.infer-source-parallelism.max     设置HiveTableSource的最大并行度,默认值1000

(1)首先从config中获取所有算子的默认并行度

int parallelism = conf.get(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM);

(2)如果没有开启并行度自动推导,那么使用这个默认并行度

(3)如果开启了并行度推导,会根据计算的split数量与设置的最大并行度取最小值:

parallelism = Math.min(splitNum, max);
splitNum大小为下面方法返回数组的长度
  public HiveTableInputSplit[] createInputSplits(int minNumSplits)      throws IOException {    List hiveSplits = new ArrayList<>();    int splitNum = 0;    for (HiveTablePartition partition : partitions) {      StorageDescriptor sd = partition.getStorageDescriptor();      InputFormat format;      try {        format = (InputFormat)          Class.forName(sd.getInputFormat(), true, Thread.currentThread().getContextClassLoader()).newInstance();      } catch (Exception e) {        throw new FlinkHiveException("Unable to instantiate the hadoop input format", e);      }      ReflectionUtils.setConf(format, jobConf);      jobConf.set(INPUT_DIR, sd.getLocation());      //TODO: we should consider how to calculate the splits according to minNumSplits in the future.      org.apache.hadoop.mapred.InputSplit[] splitArray = format.getSplits(jobConf, minNumSplits);      for (org.apache.hadoop.mapred.InputSplit inputSplit : splitArray) {        hiveSplits.add(new HiveTableInputSplit(splitNum++, inputSplit, jobConf, partition));      }    }    return hiveSplits.toArray(new HiveTableInputSplit[0]);  }

通过上面的参数配置 ,我们可以合理的控制HiveTableSource的并行度,不至于超过集群的资源配置,无法启动任务。

e3beee503d4bec7151b72fea95c8f5af.png69926e12122c626c78cebe899b9c8850.pngfcbc6e99e31c447ed7bf063eab8e914d.gif

 !关注不迷路~ 各种福利、资源定期分享

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/513668.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

微软副总裁、Kubernetes 头号贡献者的他,给云原生带来奇迹!

我们做了一个街头采访&#xff0c;调查路人眼中的程序员到底是怎样的&#xff1f;提到程序员&#xff0c;大家似乎都有刻板印象&#xff1a;总是格子衬衫牛仔裤双肩包打扮&#xff0c;总是埋头敲代码&#xff0c;加班是常态……谁说程序员呆板木讷&#xff0c;只会埋头敲一行行…

云原生新边界——阿里云边缘计算云原生落地实践

简介&#xff1a; 日前&#xff0c;在由全球分布式云联盟主办的“Distributed Cloud | 2021 全球分布式云大会云原生论坛”上&#xff0c;阿里云高级技术专家黄玉奇发表了题为《云原生新边界&#xff1a;阿里云边缘计算云原生落地实践》的主题演讲。 作者 | 黄玉奇 来源 | 阿里…

HTTPS 协议到底比 HTTP 协议多些什么?

来源&#xff1a;杰哥的IT之旅作者&#xff1a;阿拉斯加最近卷了一篇 HTTP 协议的相关知识&#xff0c;大家可以一起来看一下~HTTP 简介HTTP 协议是 Hyper Text Transfer Protocol&#xff08;超文本传输协议&#xff09;的缩写&#xff0c;是用于从万维网&#xff08;WWW:Worl…

独家深度 | 一文看懂 ClickHouse vs Elasticsearch:谁更胜一筹?

简介&#xff1a; 本文的主旨在于通过彻底剖析ClickHouse和Elasticsearch的内核架构&#xff0c;从原理上讲明白两者的优劣之处&#xff0c;同时会附上一份覆盖多场景的测试报告给读者作为参考。 作者&#xff1a;阿里云数据库OLAP产品部 仁劼 Clickhouse是俄罗斯搜索巨头Yan…

golang 排序_堆 堆排序 优先队列 图文详解(Golang实现)

引入在实际应用中&#xff0c;我们经常需要从一组对象中查找 最大值 或 最小值 。当然我们可以每次都先排序&#xff0c;然后再进行查找&#xff0c;但是这种做法效率很低。哪么有没有一种特殊的数据结构&#xff0c;可以高效率的实现我们的需求呢&#xff0c;答案就是 堆(heap…

看懂 IPv6+,这篇就够了

来源&#xff1a;鲜枣课堂作者&#xff1a;小枣君5G网络的不断建设和普及&#xff0c;加速了我们迈入万物互联时代的步伐。我们的整个互联网络&#xff0c;正在发生翻天覆地的变化。急剧增加的网络连接数和流量&#xff0c;对网络的承载和传送能力&#xff0c;提出了前所未有的…

高德打车通用可编排订单状态机引擎设计

简介&#xff1a; 订单状态流转是交易系统的最为核心的工作&#xff0c;订单系统往往都会存在状态多、链路长、逻辑复杂的特点&#xff0c;还存在多场景、多类型、多业务维度等业务特性。在保证订单状态流转稳定性的前提下、可扩展性和可维护性是我们需要重点关注和解决的问题。…

边开飞机边换引擎?我们造了个新功能保障业务流量无损迁移

简介&#xff1a; 容器化部署应用可以降低企业成本&#xff0c;提升研发效率&#xff0c;解放运维人员。据 Gartner 预计&#xff0c;到 2022 年&#xff0c;将有 75&#xff05; 的企业将在生产中运行容器化应用程序。Kubernetes 是企业部署容器化应用的首选框架。由于 Kubern…

专访百度集团副总裁袁佛玉:科技创新对普惠金融正在充分发挥“乘数效应”

图为百度集团副总裁袁佛玉在发表演讲 “随着我国数字经济的腾飞&#xff0c;科技创新正在充分发挥对于普惠金融的“乘数效应”&#xff0c;加速普惠金融拓展的深度和广度。”百度集团副总裁袁佛玉在10月22日举办的2021金融街论坛“‘一带一路’金融减贫论坛”上表示。 袁佛玉…

Spring Cloud Stream 体系及原理介绍

简介&#xff1a; Spring Cloud Stream在 Spring Cloud 体系内用于构建高度可扩展的基于事件驱动的微服务&#xff0c;其目的是为了简化消息在 Spring Cloud 应用程序中的开发。 作者 | 洛夜 来源 | 阿里巴巴云原生公众号 Spring Cloud Stream在 Spring Cloud 体系内用于构建高…

工商银行分布式服务C10K场景的解决方案

简介&#xff1a; 未来&#xff0c;中国工商银行将持续致力于 Dubbo 的金融级规模化应用。 作者&#xff1a;颜高飞&#xff0c;微服务领域架构师&#xff0c;主要从事服务发现、高性能网络通信等研发工作&#xff0c;擅长 ZooKeeper、Dubbo、RPC 协议等技术方向。 Dubbo是一款…

使用html() undefined_SweetAlert2使用教程

SweetAlert2是一款功能强大的纯Js模态消息对话框插件。SweetAlert2用于替代浏览器默认的弹出对话框&#xff0c;它提供各种参数和方法&#xff0c;支持嵌入图片&#xff0c;背景&#xff0c;HTML标签等&#xff0c;并提供5种内置的情景类&#xff0c;功能非常强大。SweetAlert2…

埃森哲携手阿里云,采用K8s容器云服务为客户提供无限弹性

简介&#xff1a; 埃森哲作为全球领先的专业服务公司&#xff0c;在数字化、云计算等领域拥有全球领先的能力&#xff0c;我们在多年的实际客户项目中&#xff0c;找到并沉淀出了适合企业数字化转型的方法论&#xff0c;积累了丰富的落地经验。 作者&#xff1a;姚迪、周警伟 …

4阶范德蒙德行列式例题_线性代数入门——“爪型行列式”的计算及其应用

系列简介&#xff1a;这个系列文章讲解线性代数的基础内容&#xff0c;注重学习方法的培养。线性代数课程的一个重要特点(也是难点)是概念众多&#xff0c;而且各概念间有着千丝万缕的联系&#xff0c;对于初学者不易理解的问题我们会不惜笔墨加以解释。在内容上&#xff0c;以…

如何使用Arthas提高日常开发效率?

简介&#xff1a; 1. Arthas有什么功能&#xff0c;怎么用&#xff0c;请看&#xff1a;Arthas使用手册 2. Arthas命令比较复杂&#xff0c;一个帮助生成命令的IDEA插件&#xff1a;arthas idea plugin 使用文档 3. 基于Arthas实现的简单好用的热部署插件&#xff1a;ArthasHot…

stringutils 用哪个包 apache spring_spring整合mq、jsonp跨越、httpclient工具的使用

训练大纲(第087天)大家如果想快速有效的学习&#xff0c;思想核心是“以建立知识体系为核心”&#xff0c;具体方法是“守破离”。确保老师课堂上做的操作&#xff0c;反复练习直到熟练。第173次(ActiveMQ)学习主题&#xff1a;ActiveMQ学习目标&#xff1a;1 掌握什么是spring…

几种Java常用序列化框架的选型与对比

简介&#xff1a; 序列化与反序列化是我们日常数据持久化和网络传输中经常使用的技术&#xff0c;但是目前各种序列化框架让人眼花缭乱&#xff0c;不清楚什么场景到底采用哪种序列化框架。本文会将业界开源的序列化框架进行对比测试&#xff0c;分别从通用性、易用性、可扩展性…

12v小型电机型号大全_电动机型号参数大全,再也不怕看不懂电机型号了

电动机型号是便于使用、设计、制造等部门进行业务联系和简化技术文件中产品名称、规格、型式等叙述而引用的一种代号。下面为大家介绍电动机型号含义等信息。1电动机型号组成及含义由电机类型代号、电机特点代号、设计序号和励磁方式代号等四个小节顺序组成。1、类型代号是表征…

基于DataWorks搭建新零售数据中台

文章作者&#xff1a;许日&#xff08;欢伯&#xff09;&#xff0c;在2016年盒马早期的时候&#xff0c;转到盒马事业部作为在线数据平台的研发负责人&#xff0c;现任阿里云计算平台DataWorks建模引擎团队负责人。 文章简介&#xff1a;本篇文章向大家分享新零售企业如何基于…

身份云平台 Authing 完成 2300 万美元 A 轮融资

10 月 24 日&#xff0c;身份云平台 Authing 宣布完成 2300 万美元 A 轮融资。本轮融资由老虎环球基金领投&#xff0c;鼎晖VGC&#xff08;创新与成长基金&#xff09;、声网 Agora、老股东 GGV纪源资本和奇绩创坛跟投&#xff0c;跃为资本担任独家财务顾问。Authing 表示&…