基于华为MRS3.2.0实时Flink消费Kafka落盘至HDFS的Hive外部表的调度方案

文章目录

  • 1 Kafka
    • 1.1 Kerberos安全模式的认证与环境准备
    • 1.2 创建一个测试主题
    • 1.3 消费主题的接收测试
  • 2 Flink
    • 1.1 Kerberos安全模式的认证与环境准备
    • 1.2 Flink任务的开发
  • 3 HDFS与Hive
    • 3.1 Shell脚本的编写思路
    • 3.2 脚本测试方法
  • 4 DolphinScheduler


该需求为实时接收对手Topic,并进行消费落盘至Hive。

在具体的实施中,基于华为MRS 3.2.0安全模式带kerberos认证的Kafka2.4、Flink1.15、Hadoop3.3.1、Hive3.1,调度平台为开源dolphinscheduler。
在这里插入图片描述

本需求的完成全部参考华为官方MRS3.2.0开发文档,相关章节是普通版的安全模式。

华为官方文档:https://support.huaweicloud.com/cmpntguide-mrs/mrs_01_1031.html

1 Kafka

1.1 Kerberos安全模式的认证与环境准备

着手开发前,需要将FushionInsight租户加入kafkaadmin组,保证有创建主题和消费主题的权限,在得到此权限时,切勿对集群中的主题进行危险操作。

保证租户权限后,开始准备开发环境。该步骤需要安装Idea客户端在windows本地,同时安装兼容的maven版本,华为MRS需要安装至少OpenJDK 1.8.0_332的版本。

运行环境的配置则需要在FushionInsight的web管理界面下载kafka的完整客户端,包括config配置文件也需要下载。另外windows本地的hosts文件中要和FushionInsight中的集群地址有映射,可手动添加,同时应保证本地和集群能ping通。

参考文档:https://support.huaweicloud.com/devg3-mrs/mrs_07_130006.html

1.2 创建一个测试主题

在Linux环境中执行:

bin/kafka-topics.sh --create --bootstrap-server <Kafka集群IP:21007> --command-config config/client.properties --partitions 1 --replication-factor 1 --topic testTopic

创建一个测试testTopic,创建成功后,FushionInsight的web界面会报topic只有一个分区副本的警告,请忽略它。

同时也可以开启两个新的终端窗口用于测试生产者和消费者:

  1. bin/kafka-console-producer.sh --broker-list <Kafka集群IP:21007> --topic <Topic名称> --producer.config config/producer.properties
  2. bin/kafka-console-consumer.sh --topic <Topic名称> --bootstrap-server <Kafka集群IP:21007> --consumer.config config/consumer.properties

参考文档:https://support.huaweicloud.com/devg3-mrs/mrs_07_130031.html

1.3 消费主题的接收测试

通过以下网站下载华为MRS所需的样例代码:

https://github.com/huaweicloud/huaweicloud-mrs-example/tree/mrs-3.2.0

下载样例代码之后需要在华为镜像站下载代码所需依赖,华为MRS所需的组件依赖不同于apache的开源版本,需要单独配置maven的setting文件华为中央仓库进行下载,在开发时,组件相关的依赖都需要用下载华为的。

镜像地址:

https://repo.huaweicloud.com/repository/maven/huaweicloudsdk/org/apache/

华为开源镜像站:

https://mirrors.huaweicloud.com/home

完成依赖和样例代码项目创建即可开发,在开发程序时,需要将用于安全认证的keytab文件即“user.keytab”和“krb5.conf”文件以及config所有配置文件放置在样例工程的“kafka-examples\src\main\resources”目录下。

在开发时,这些安全认证只需要生成一个jaas文件并设置相关环境变量即可。华为提供了LoginUtil相关接口来完成这些配置,样例代码中只需要配置用户自己租户名称和对应的keytab文件名称即可。

创建生产测试时,首先需要修改KafkaProperties类中的生产主题名,接下来在com.huawei.bigdata.kafka.example.Producer类中修改租户账号,keytab位置即可,运行成功后,会向主题推送100条测试数据,此时,我们在1.2小节中开启的消费者窗口就能接受到生产的数据。

在具体的测试中,需要控制消息发送的间隔和消息次数,方便后续开发Flink。一般来说,每秒发送一条,一直发送即可。

至此,Kafka的主题消费测试完成,接下来需要用Flink将主题落盘到HDFS。

如果运行代码时报和clock相关的错误,是因为本地时间和FushionInsight集群时间不一致所致,请将本地时间和服务器时间差控制在5分钟内。

参考文档:
https://support.huaweicloud.com/devg3-mrs/mrs_07_130012.html

2 Flink

1.1 Kerberos安全模式的认证与环境准备

用户在提交Flink应用程序时,需要与Yarn、HDFS等之间进行通信。那么提交Flink的应用程序中需要设置安全认证,确保Flink程序能够正常运行。
在这里插入图片描述
图为Flink在华为MRS安全模式的认证体系。

对于Kafka的权限在章节1.1已经获取,另外要保证有yarn资源的使用权限,还需要对HDFS的/flink/flink-checkpoint目录获取权限,保证执行。有了相关权限之后,再下载kerberos认证凭据文件,keytab和conf。准备运行环境同Kafka类似,需要对Flink客户端进行配置,注意config文件应该在权限修改之后获取。

Flink整个系统存在三种认证方式,使用kerberos认证、使用security cookie进行认证、使用YARN内部的认证机制。在进行安全认证时,可以用flink自带的wordcount样例程序进行提交测试,根据提交结果反馈再进行适配,直到提交成功。如果报auth相关的错误,可能还是权限问题,可以尝试先将租户权限给到最大,谨慎操作,先保证代码能通。

参考文档:
https://support.huaweicloud.com/devg3-mrs/mrs_07_050010.html

1.2 Flink任务的开发

最终在yarn队列运行的flink程序是从本地idea打包,通过flink run提交的。前面安全模式已经打通,在开发时仍然是使用华为官方的flink样例代码进行修改调试。

在具体的flink程序开发中,由于是开启了kerberos认证的安全模式,需要加入判断安全模式登录的代码段在main方法,以下代码来自华为官方样例:

 if (LoginUtil.isSecurityModel()) {try {LOG.info("Securitymode start.");//!!注意,安全认证时,需要用户手动修改为自己申请的机机账号LoginUtil.securityPrepare(USER_PRINCIPAL, USER_KEYTAB_FILE);} catch (IOException e) {LOG.error("Security prepare failure.");LOG.error("The IOException occured : {}.", e);return;}LOG.info("Security prepare success.");}

对于具体需求的开发参照开源Flink的apache官方文档即可,只需要保证依赖是华为官方镜像站的。

在该需求中,是将消费的数据落盘到HDFS中。开发中要用到FlinkKafkaConsumer方法创建kafka消费者,拿到流数据。该方法在Flink1.17版本被弃用,但是Flink1.15仍然可以用,具体开发方法可参考Flink1.13的官方文档Apache Kafka 连接器。

FlinkKafkaConsumer方法参考文档:
https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/connectors/datastream/kafka/

接收的Kafka数据,我们不需要处理,测试时直接测试主题的数据写入HDFS即可,需要用StreamingFileSink方法。该方法可以设置按照日期分桶,我们设置.withBucketAssigner为每天一个桶,保证每天消费的数据在一个文件中,同时用该方法传入日期格式参数yyyy-MM-dd,这样便于使用shell调度每日增量数据时日期变量的传递。

FileSink方法参考文档:
https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/connectors/datastream/file_sink/

另外,关于Sink到HDFS的数据文件(part file) 生命周期有几种状态,其中当文件名为in-progress表示当前文件正在写入中,此时的文件是不能被Hive读到的,我们需要将该文件的状态通过checkpoint机制转变为Finished。需要配置env.enableCheckpointing(60000)开启checkpoint,该参数是60秒开启一次。

完成代码开发后无法在本地测试,只能通过maven打包到华为服务器,通过flink run提交到yarn,此时可以指定并行度及其他配置。

通过以上方法即可实现将我们测试主题中的数据存储在按照每天一个yyyy-mm-dd命名的文件夹中。

3 HDFS与Hive

HDFS与Hive的交互也可以使用FlinkSQL,但是考虑到未来对数据的加工过滤,在此需求中选择将数据落盘HDFS再通过Shell命令调度至Hive。

3.1 Shell脚本的编写思路

  1. source华为的环境,认证状态成功;

  2. 创建日期变量:c_date=$(date '+%Y-%m-%d')

  3. 在beeline -u中执行HiveSQL代码:

    • 使用beeline的变量函数--hivevar将在外部注册的c_date变量注册为hive beeline的变量;

    • 创建临时外部表,映射字段一行数据,建表语句中指定位置为Flink写入的当日日期变量的HDFS数据文件夹;

    • 将临时表中的数据解析,一般是json数据,可通过get_json_object函数解析为字段,insert into table存入贴源层正式表;

    • 删除临时表;

  4. 有需要的话,也可以添加日志路径,将执行结果追加至日志。

3.2 脚本测试方法

该脚本的执行原理是首先在刷新华为租户环境,然后创建时间变量,并且是yyyy-mm-dd格式,与flink写入在HDFS中的每日增量文件夹名相同;

然后在beeline客户端中注册beeline的变量,将linux的时间变量传入beeline;

解下来是建临时表,将HDFS中的增量数据先写入,再解析字段到下一层标准表,同时删除临时表,通过此方法即完成每天新增数据的导入。

需要注意的是,hive -e命令似乎由于认证安全设置,无法在华为集群节点机使用。

4 DolphinScheduler

通过将脚本文件挂在DS调度中,每天在Flink完成消费落盘后,即可执行该shell。DS的部署不在华为MRS集群,在客户端节点中,使用开源版本即可,DS更方便查看每天的调度执行日志。

需要注意的是,目前我的需求中每天的新增数据大约2000-10000条,可以在短时间内完成调度执行。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/637527.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

使用vscode在wsl2中配置clangd环境

在vscode中安装这三个插件&#xff08;clangd需要科学上网或者从VSIX安装&#xff09; 之后创建一个空目录并进去。 使用快捷键ctrlshiftp&#xff0c;输入命令 Cmake:Quick Start 根据步骤选择。注意在创建CMakeLists.txt这一步选择跳过&#xff0c;直接输入enter&#xff0c…

Linux 驱动开发基础知识——认识LED驱动程序 (二)

个人名片&#xff1a; &#x1f981;作者简介&#xff1a;一名喜欢分享和记录学习的在校大学生 &#x1f42f;个人主页&#xff1a;妄北y &#x1f427;个人QQ&#xff1a;2061314755 &#x1f43b;个人邮箱&#xff1a;2061314755qq.com &#x1f989;个人WeChat&#xff1a;V…

Linux之进程间通信(管道)

目录 一、进程间通信 1、进程间通信的概念 2、进程间通信的目的 3、进程间通信的分类 二、管道 1、管道基本介绍 2、匿名管道 3、命名管道 一、进程间通信 1、进程间通信的概念 什么是进程间通信&#xff1f; 我们在学习了进程的相关知识后&#xff0c;知道&#xff…

树的一些经典 Oj题 讲解

关于树的遍历 先序遍历 我们知道 树的遍历有 前序遍历 中序遍历 后序遍历 然后我们如果用递归的方式去解决&#xff0c;对我们来说应该是轻而易举的吧&#xff01;那我们今天要讲用迭代&#xff08;非递归&#xff09;实现 树的相关遍历 首先呢 我们得知道 迭代解法 本质上也…

浅析云服务oss/obs/cos对象存储安全攻防

文章目录 前言云存储服务1.1 初识对象存储1.2 腾讯云COS桶1.3 公开读取风险 对象存储桶风险2.1 Bucket Object遍历2.2 Bucket 名称的爆破2.3 Bucket ACL可读写2.4 任意写与文件覆盖2.5 Bucket 域名的接管 AccessKey凭证泄露3.1 行云管家接管主机3.2 Github泄露AK/SK3.3 客户端程…

Chatgpt+Comfyui绘图源码说明及本地部署文档

其他文档地址&#xff1a; ChatgptComfyui绘图源码运营文档 ChatgptComfyui绘图源码线上部署文档 一、源码说明 1、源码目录说明 app_home&#xff1a;app官网源码chatgpt-java&#xff1a;管理后台服务端源码、用户端的服务端源码chatgpt-pc&#xff1a;电脑网页前端源码cha…

两条链表相同位数相加[中等]

优质博文IT-BLOG-CN 一、题目 给你两个非空的链表&#xff0c;表示两个非负的整数。它们每位数字都是按照逆序的方式存储的&#xff0c;并且每个节点只能存储一位数字。请你将两个数相加&#xff0c;并以相同形式返回一个表示和的链表。你可以假设除了数字0之外&#xff0c;这…

【征服Redis12】redis的主从复制问题

从现在开始&#xff0c;我们来讨论redis集群的问题&#xff0c;在前面我们介绍了RDB和AOF两种同步机制&#xff0c;那你是否考虑过这两个机制有什么用呢&#xff1f;其中的一个重要作用就是为了集群同步设计的。 Redis是一个高性能的键值存储系统&#xff0c;广泛应用于Web应用…

【React】Redux的使用详解

文章目录 Redux的三大原则Redux官方图react-redux使用 1、创建store管理全局状态​ 2、在项目index.js根节点引用 3、 在需要使用redux的页面或者组件中&#xff0c;通过connect高阶组件映射到该组件的props中 redux中异步操作如何使用redux-thunkcombineReducers函数 Re…

数据结构和算法笔记4:排序算法-归并排序

归并排序算法完全遵循分治模式。直观上其操作如下&#xff1a; 分解&#xff1a;分解待排序的n个元素的序列成各具n/2个元素的两个子序列。解决&#xff1a;使用归并排序递归地排序两个子序列。合并&#xff1a;合并两个已排序的子序列以产生已排序的答案。 我们直接来看例子…

Flutter 与 Android原生 相互通信:BasicMessageChannel、MethodChannel、EventChannel

前言 本文主要讲解&#xff0c;使用不同的 Channel 让 Flutter 和 Android原生 进行通信&#xff0c;由于只是讲解两端通信&#xff0c;所以可视化效果不好&#xff1b; 不过我写了一篇专门讲解 Flutter 嵌入 Android原生View的文章 Flutter 页面嵌入 Android原生 View-CSDN…

小程序使用echarts图表-雷达图

本文介绍下小程序中如何使用echarts 如果是通过npm安装&#xff0c;这样是全部安装的&#xff0c;体积有点大 我这边是使用echarts中的一个组件来实现的&#xff0c;下边是具体流程&#xff0c;实际效果是没有外边的红色边框的&#xff0c;加红色边框的效果是这篇说明 1.echa…

IDEA的database使用

一、数据据库 在使用database之前&#xff0c;首先你的电脑要安装好了数据库并且启动。 MySQL卸载手册 链接&#xff1a;https://pan.baidu.com/doc/share/AVXW5SG6T76puBOWnPegmw-602323264797863 提取码&#xff1a;hlgf MySQL安装图解 链接&#xff1a;https://pan.baidu.…

机器学习笔记——机器学习的分类

1 机器学习是啥 机器学习是人工智能的一个分支&#xff0c;它是一门研究机器获取新知识和新技能&#xff0c;并识别现有知识的学问。 机器学习已广泛应用于数据挖掘、计算机视觉、自然语言处理、生物特征识别、搜索引擎、医学诊断、检测信用卡欺诈、证券市场分析、DNA 序列测…

用Python实现Excel中的Vlookup功能

目录 一、引言 二、准备工作 三、实现Vlookup功能 1、导入pandas库 2、准备数据 3、实现Vlookup功能 4、处理结果 5、保存结果 四、完整代码示例 五、注意事项 六、总结 一、引言 在Excel中&#xff0c;Vlookup是一个非常实用的函数&#xff0c;它可以帮助我们在表…

Web概述

Web 概述&#xff1a;Web是World Wide Web的简称&#xff0c;是一个由许多互联网服务组成的信息空间。它由超文本文档、图像、视频和其他多媒体资源组成&#xff0c;并通过超文本传输协议&#xff08;HTTP&#xff09;进行传输。特点&#xff1a;Web的主要特点是其开放性和可访…

java数据结构与算法刷题-----LeetCode485. 最大连续 1 的个数

java数据结构与算法刷题目录&#xff08;剑指Offer、LeetCode、ACM&#xff09;-----主目录-----持续更新(进不去说明我没写完)&#xff1a;https://blog.csdn.net/grd_java/article/details/123063846 文章目录 1. 法一&#xff0c;双指针2. 法二&#xff1a;变量计数 1. 法一…

面对不平衡二元分类问题是否需要使用SMOTE技术?

摘要 在训练分类模型之前平衡数据是解决表格数据中不平衡二元分类任务的流行技术。平衡通常是通过复制少数样本或生成合成少数样本来实现的。虽然众所周知&#xff0c;平衡对每个分类模型的影响不同&#xff0c;但大多数先进的实证研究并未将强大的最先进&#xff08;SOTA&…

Qt6入门教程 8:信号和槽机制(连接方式)

目录 一.一个信号与槽连接的例子 二.第五个参数 1.Qt::AutoConnection 2.Qt::DirectConnection 3.Qt::QueuedConnection 4.Qt::BlockingQueuedConnection 5.Qt::UniqueConnection 三.信号 四.connect函数原型 五.信号与槽的多种用法 六.槽的属性 一.一个信号与槽连接…

R语言【cli】——builtin_theme():内置的CLI主题

Package cli version 3.6.0 Description 此主题始终处于活动状态&#xff0c;并且位于主题堆栈的底部。 Usage builtin_theme(dark getOption("cli.theme_dark", "auto")) Argument 参数【dark】&#xff1a;是否使用黑暗主题。cli.theme_dark选项可用…