07_Hudi案例实战、Flink CDC 实时数据采集、Presto、FineBI 报表可视化等

7.第七章 Hudi案例实战
7.1 案例架构
7.2 业务数据
7.2.1 客户信息表
7.2.2 客户意向表
7.2.3 客户线索表
7.2.4 线索申诉表
7.2.5 客户访问咨询记录表
7.3 Flink CDC 实时数据采集
7.3.1 开启MySQL binlog
7.3.2 环境准备
7.3.3 实时采集数据
7.3.3.1 客户信息表
7.3.3.2 客户意向表
7.3.3.3 客户线索表
7.3.3.4 客户申诉表
7.3.3.5 客户访问咨询记录表
7.4 Presto 即席分析
7.4.1 Presto 是什么
7.4.2 Presto 安装部署
7.4.3 Hive 创建表
7.4.3.1 创建数据库
7.4.3.2 客户信息表
7.4.3.3 客户意向表
7.4.3.4 客户线索表
7.4.3.5 客户申诉表
7.4.3.6 客户访问咨询记录表
7.4.4 离线指标分析
7.4.4.1 每日报名量
7.4.4.2 每日访问量
7.4.4.3 每日意向数
7.4.4.4 每日线索量
7.5 Flink SQL 流式分析
7.5.1 业务需求
7.5.2 创建MySQL表
7.5.3 实时指标分析
7.5.3.1 今日访问量
7.5.3.2 今日咨询量
7.5.3.3 今日意向数
7.5.3.4 今日报名人数
7.5.3.5 今日有效线索量
7.6 FineBI 报表可视化

7. 第七章 Hudi案例实战

传智教育大数据分析平台,突出的是“真”,此项目是传智教育联合三方K12教育机构共同研发,并在上线发布后转换为课程,过程真实细致,采用主流的大数据技术和工具,主要针对客户(主要是学生)访问、咨询、线索、意向、报名、考勤等各类业务数据分析,根据分析结果优化平台的服务质量,最终满足用户的需求。教育大数据分析平台项目就是将大数据技术应用于教育培训领域,为企业经营提供数据支撑。

7.1 案例架构

本案例基于Flink SQL 与Hudi整合,将MySQL数据库业务数据,实时采集存储到Hudi表中,使用Presto和Flink SQL分别进行离线查询分析和流式查询数据,最后报表存储到MySQL数据库,使用FineBI整合进行可视化展示。
在这里插入图片描述- 1、MySQL数据库:
传智教育客户业务数据存储及离线实时分析报表结果存储,对接可视化FineBI工具展示。

  • 2、Flink SQL 引擎
    使用Flink SQL中CDC实时采集MySQL数据库表数据到Hudi表,此外基于Flink SQL Connector整合Hudi与MySQL,数据存储和查询。

  • 3、Apache Hudi:数据湖框架
    传智教育业务数据,最终存储到Hudi表(底层存储:HDFS分布式文件系统),统一管理数据文件,后期与Spark和Hive集成,进行业务指标分析。

  • 4、Presto 分析引擎
    一个Facebook开源的分布式SQL查询引擎,适用于交互式分析查询,数据量支持GB到PB字节。
    本案例中直接从Hudi表加载数据,其中依赖Hive MetaStore管理元数据。其中Presto可以集成多数据源,方便数据交互处理。

  • 5、FineBI:报表工具
    帆软公司的一款商业图表工具, 让图表制作更加简单

7.2 业务数据

本次案例实战业务数据,来源于实际的客户Customer产生业务数据(咨询、访问、报名、浏览等),存储在MySQL数据库:itcast_nev,使用业务表:
在这里插入图片描述

启动MySQL数据库,命令行方式登录,先创建数据库,再创建表,最后导入数据。

[root@node1 ~]# mysql -uroot -p123456CREATE DATABASE IF NOT EXISTS itcast_nev;
USE itcast_nev;

7.2.1 客户信息表

客户信息表:customer,创建表DDL语句:

CREATE TABLE IF NOT EXISTS itcast_nev.customer (`id` int(11) NOT NULL AUTO_INCREMENT,`customer_relationship_id` int(11) DEFAULT NULL COMMENT '当前意向id',`create_date_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',`update_date_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '最后更新时间',`deleted` bit(1) NOT NULL DEFAULT b'0' COMMENT '是否被删除(禁用)',`name` varchar(128) CHARACTER SET utf8 COLLATE utf8_bin NOT NULL DEFAULT '' COMMENT '姓名',`idcard` varchar(24) CHARACTER SET utf8 COLLATE utf8_bin DEFAULT '' COMMENT '身份证号',`birth_year` int(5) DEFAULT NULL COMMENT '出生年份',`gender` varchar(8) CHARACTER SET utf8 COLLATE utf8_bin DEFAULT 'MAN' COMMENT '性别',`phone` varchar(24) CHARACTER SET utf8 COLLATE utf8_bin NOT NULL DEFAULT '' COMMENT '手机号',`wechat` varchar(32) CHARACTER SET utf8 COLLATE utf8_bin DEFAULT '' COMMENT '微信',`qq` varchar(32) CHARACTER SET utf8 COLLATE utf8_bin DEFAULT '' COMMENT 'qq号',`email` varchar(56) CHARACTER SET utf8 COLLATE utf8_bin DEFAULT '' COMMENT '邮箱',`area` varchar(128) COLLATE utf8mb4_unicode_ci DEFAULT '' COMMENT '所在区域',`leave_school_date` date DEFAULT NULL COMMENT '离校时间',`graduation_date` date DEFAULT NULL COMMENT '毕业时间',`bxg_student_id` varchar(64) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '博学谷学员ID,可能未关联到,不存在',`creator` int(11) DEFAULT NULL COMMENT '创建人ID',`origin_type` varchar(32) CHARACTER SET utf8 COLLATE utf8_bin DEFAULT NULL COMMENT '数据来源',`origin_channel` varchar(32) CHARACTER SET utf8 COLLATE utf8_bin DEFAULT NULL COMMENT '来源渠道',`tenant` int(11) NOT NULL DEFAULT '0',`md_id` int(11) DEFAULT '0' COMMENT '中台id',PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;

预先导入客户信息数据至表中,使用命令:source

mysql> source /root/1-customer.sql ;

7.2.2 客户意向表

客户意向表:customer_relationship,创建表DDL语句:

CREATE TABLE IF NOT EXISTS itcast_nev.customer_relationship(`id` int(11) NOT NULL AUTO_INCREMENT,`create_date_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,`update_date_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '最后更新时间',`deleted` bit(1) NOT NULL DEFAULT b'0' COMMENT '是否被删除(禁用)',`customer_id` int(11) NOT NULL DEFAULT '0' COMMENT '所属客户id',`first_id` int(11) DEFAULT NULL COMMENT '第一条客户关系id',`belonger` int(11) DEFAULT NULL COMMENT '归属人',`belonger_name` varchar(10) DEFAULT NULL COMMENT '归属人姓名',`initial_belonger` int(11) DEFAULT NULL COMMENT '初始归属人',`distribution_handler` int(11) DEFAULT NULL COMMENT '分配处理人',`business_scrm_department_id` int(11) DEFAULT '0' COMMENT '归属部门',`last_visit_time` datetime DEFAULT NULL COMMENT '最后回访时间',`next_visit_time` datetime DEFAULT NULL COMMENT '下次回访时间',`origin_type` varchar(32) CHARACTER SET utf8 COLLATE utf8_bin DEFAULT NULL COMMENT '数据来源',`itcast_school_id` int(11) DEFAULT NULL COMMENT '校区Id',`itcast_subject_id` int(11) DEFAULT NULL COMMENT '学科Id',`intention_study_type` varchar(32) CHARACTER SET utf8 COLLATE utf8_bin DEFAULT NULL COMMENT '意向学习方式',`anticipat_signup_date` date DEFAULT NULL COMMENT '预计报名时间',`level` varchar(8) DEFAULT NULL COMMENT '客户级别',`creator` int(11) DEFAULT NULL COMMENT '创建人',`current_creator` int(11) DEFAULT NULL COMMENT '当前创建人:初始==创建人,当在公海拉回时为 拉回人',`creator_name` varchar(32) DEFAULT '' COMMENT '创建者姓名',`origin_channel` varchar(32) CHARACTER SET utf8 COLLATE utf8_bin DEFAULT NULL COMMENT '来源渠道',`comment` varchar(255) CHARACTER SET utf8 COLLATE utf8_bin DEFAULT '' COMMENT '备注',`first_customer_clue_id` int(11) DEFAULT '0' COMMENT '第一条线索id',`last_customer_clue_id` int(11) DEFAULT '0' COMMENT '最后一条线索id',`process_state` varchar(32) DEFAULT NULL COMMENT '处理状态',`process_time` datetime DEFAULT NULL COMMENT '处理状态变动时间',`payment_state` varchar(32) DEFAULT NULL COMMENT '支付状态',`payment_time` datetime DEFAULT NULL COMMENT '支付状态变动时间',`signup_state` varchar(32) CHARACTER SET utf8 COLLATE utf8_bin DEFAULT NULL COMMENT '报名状态',`signup_time` datetime DEFAULT NULL COMMENT '报名时间',`notice_state` varchar(32) DEFAULT NULL COMMENT '通知状态',`notice_time` datetime DEFAULT NULL COMMENT '通知状态变动时间',`lock_state` bit(1) DEFAULT b'0' COMMENT '锁定状态',`lock_time` datetime DEFAULT NULL COMMENT '锁定状态修改时间',`itcast_clazz_id` int(11) DEFAULT NULL COMMENT '所属ems班级id',`itcast_clazz_time` datetime DEFAULT NULL COMMENT '报班时间',`payment_url` varchar(1024) DEFAULT '' COMMENT '付款链接',`payment_url_time` datetime DEFAULT NULL COMMENT '支付链接生成时间',`ems_student_id` int(11) DEFAULT NULL COMMENT 'ems的学生id',`delete_reason` varchar(64) DEFAULT NULL COMMENT '删除原因',`deleter` int(11) DEFAULT NULL COMMENT '删除人',`deleter_name` varchar(32) DEFAULT NULL COMMENT '删除人姓名',`delete_time` datetime DEFAULT NULL COMMENT '删除时间',`course_id` int(11) DEFAULT NULL COMMENT '课程ID',`course_name` varchar(64) DEFAULT NULL COMMENT '课程名称',`delete_comment` varchar(255) DEFAULT '' COMMENT '删除原因说明',`close_state` varchar(32) DEFAULT NULL COMMENT '关闭装填',`close_time` datetime DEFAULT NULL COMMENT '关闭状态变动时间',`appeal_id` int(11) DEFAULT NULL COMMENT '申诉id',`tenant` int(11) NOT NULL DEFAULT '0' COMMENT '租户',`total_fee` decimal(19,0) DEFAULT NULL COMMENT '报名费总金额',`belonged` int(11) DEFAULT NULL COMMENT '小周期归属人',`belonged_time` datetime DEFAULT NULL COMMENT '归属时间',`belonger_time` datetime DEFAULT NULL COMMENT '归属时间',`transfer` int(11) DEFAULT NULL COMMENT '转移人',`transfer_time` datetime DEFAULT NULL COMMENT '转移时间',`follow_type` int(4) DEFAULT '0' COMMENT '分配类型,0-自动分配,1-手动分配,2-自动转移,3-手动单个转移,4-手动批量转移,5-公海领取',`transfer_bxg_oa_account` varchar(64) DEFAULT NULL COMMENT '转移到博学谷归属人OA账号',`transfer_bxg_belonger_name` varchar(64) DEFAULT NULL COMMENT '转移到博学谷归属人OA姓名',PRIMARY KEY (`id`)
) ENGINE=InnoDB  DEFAULT CHARSET=utf8;

预先导入客户意向数据至表中,使用命令:source

mysql> source /root/2-customer_relationship.sql ;

7.2.3 客户线索表

客户线索表:customer_clue,创建表DDL语句:

CREATE TABLE IF NOT EXISTS itcast_nev.customer_clue(`id` int(11) NOT NULL AUTO_INCREMENT,`create_date_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',`update_date_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '最后更新时间',`deleted` bit(1) NOT NULL DEFAULT b'0' COMMENT '是否被删除(禁用)',`customer_id` int(11) DEFAULT NULL COMMENT '客户id',`customer_relationship_id` int(11) DEFAULT NULL COMMENT '客户关系id',`session_id` varchar(48) COLLATE utf8_bin DEFAULT '' COMMENT '七陌会话id',`sid` varchar(48) COLLATE utf8_bin DEFAULT '' COMMENT '访客id',`status` varchar(16) COLLATE utf8_bin DEFAULT '' COMMENT '状态(undeal待领取 deal 已领取 finish 已关闭 changePeer 已流转)',`user` varchar(16) COLLATE utf8_bin DEFAULT '' COMMENT '所属坐席',`create_time` datetime DEFAULT NULL COMMENT '七陌创建时间',`platform` varchar(16) COLLATE utf8_bin DEFAULT '' COMMENT '平台来源 (pc-网站咨询|wap-wap咨询|sdk-app咨询|weixin-微信咨询)',`s_name` varchar(32) COLLATE utf8_bin DEFAULT '' COMMENT '用户名称',`seo_source` varchar(255) COLLATE utf8_bin DEFAULT '' COMMENT '搜索来源',`seo_keywords` varchar(255) COLLATE utf8_bin DEFAULT '' COMMENT '关键字',`ip` varchar(48) COLLATE utf8_bin DEFAULT '' COMMENT 'IP地址',`referrer` text COLLATE utf8_bin COMMENT '上级来源页面',`from_url` text COLLATE utf8_bin COMMENT '会话来源页面',`landing_page_url` text COLLATE utf8_bin COMMENT '访客着陆页面',`url_title` varchar(1024) COLLATE utf8_bin DEFAULT '' COMMENT '咨询页面title',`to_peer` varchar(255) COLLATE utf8_bin DEFAULT '' COMMENT '所属技能组',`manual_time` datetime DEFAULT NULL COMMENT '人工开始时间',`begin_time` datetime DEFAULT NULL COMMENT '坐席领取时间 ',`reply_msg_count` int(11) DEFAULT '0' COMMENT '客服回复消息数',`total_msg_count` int(11) DEFAULT '0' COMMENT '消息总数',`msg_count` int(11) DEFAULT '0' COMMENT '客户发送消息数',`comment` varchar(1024) COLLATE utf8_bin DEFAULT '' COMMENT '备注',`finish_reason` varchar(255) COLLATE utf8_bin DEFAULT '' COMMENT '结束类型',`finish_user` varchar(32) COLLATE utf8_bin DEFAULT '' COMMENT '结束坐席',`end_time` datetime DEFAULT NULL COMMENT '会话结束时间',`platform_description` varchar(255) COLLATE utf8_bin DEFAULT '' COMMENT '客户平台信息',`browser_name` varchar(255) COLLATE utf8_bin DEFAULT '' COMMENT '浏览器名称',`os_info` varchar(255) COLLATE utf8_bin DEFAULT '' COMMENT '系统名称',`area` varchar(255) COLLATE utf8_bin DEFAULT NULL COMMENT '区域',`country` varchar(16) COLLATE utf8_bin DEFAULT '' COMMENT '所在国家',`province` varchar(16) COLLATE utf8_bin DEFAULT '' COMMENT '省',`city` varchar(255) COLLATE utf8_bin DEFAULT '' COMMENT '城市',`creator` int(11) DEFAULT '0' COMMENT '创建人',`name` varchar(64) COLLATE utf8_bin DEFAULT '' COMMENT '客户姓名',`idcard` varchar(24) COLLATE utf8_bin DEFAULT '' COMMENT '身份证号',`phone` varchar(24) COLLATE utf8_bin DEFAULT '' COMMENT '手机号',`itcast_school_id` int(11) DEFAULT NULL COMMENT '校区Id',`itcast_school` varchar(128) COLLATE utf8_bin DEFAULT '' COMMENT '校区',`itcast_subject_id` int(11) DEFAULT NULL COMMENT '学科Id',`itcast_subject` varchar(128) COLLATE utf8_bin DEFAULT '' COMMENT '学科',`wechat` varchar(32) COLLATE utf8_bin DEFAULT '' COMMENT '微信',`qq` varchar(32) COLLATE utf8_bin DEFAULT '' COMMENT 'qq号',`email` varchar(56) COLLATE utf8_bin DEFAULT '' COMMENT '邮箱',`gender` varchar(8) COLLATE utf8_bin DEFAULT 'MAN' COMMENT '性别',`level` varchar(8) COLLATE utf8_bin DEFAULT NULL COMMENT '客户级别',`origin_type` varchar(32) COLLATE utf8_bin DEFAULT '' COMMENT '数据来源渠道',`information_way` varchar(32) COLLATE utf8_bin DEFAULT NULL COMMENT '资讯方式',`working_years` date DEFAULT NULL COMMENT '开始工作时间',`technical_directions` varchar(255) COLLATE utf8_bin DEFAULT '' COMMENT '技术方向',`customer_state` varchar(32) COLLATE utf8_bin DEFAULT '' COMMENT '当前客户状态',`valid` bit(1) DEFAULT b'0' COMMENT '该线索是否是网资有效线索',`anticipat_signup_date` date DEFAULT NULL COMMENT '预计报名时间',`clue_state` varchar(32) COLLATE utf8_bin DEFAULT 'NOT_SUBMIT' COMMENT '线索状态',`scrm_department_id` int(11) DEFAULT NULL COMMENT 'SCRM内部部门id',`superior_url` text COLLATE utf8_bin COMMENT '诸葛获取上级页面URL',`superior_source` varchar(1024) COLLATE utf8_bin DEFAULT NULL COMMENT '诸葛获取上级页面URL标题',`landing_url` text COLLATE utf8_bin COMMENT '诸葛获取着陆页面URL',`landing_source` varchar(1024) COLLATE utf8_bin DEFAULT NULL COMMENT '诸葛获取着陆页面URL来源',`info_url` text COLLATE utf8_bin COMMENT '诸葛获取留咨页URL',`info_source` varchar(255) COLLATE utf8_bin DEFAULT NULL COMMENT '诸葛获取留咨页URL标题',`origin_channel` varchar(32) COLLATE utf8_bin DEFAULT '' COMMENT '投放渠道',`course_id` int(32) DEFAULT NULL,`course_name` varchar(255) COLLATE utf8_bin DEFAULT NULL,`zhuge_session_id` varchar(500) COLLATE utf8_bin DEFAULT NULL,`is_repeat` int(4) NOT NULL DEFAULT '0' COMMENT '是否重复线索(手机号维度) 0:正常 1:重复',`tenant` int(11) NOT NULL DEFAULT '0' COMMENT '租户id',`activity_id` varchar(16) COLLATE utf8_bin DEFAULT NULL COMMENT '活动id',`activity_name` varchar(64) COLLATE utf8_bin DEFAULT NULL COMMENT '活动名称',`follow_type` int(4) DEFAULT '0' COMMENT '分配类型,0-自动分配,1-手动分配,2-自动转移,3-手动单个转移,4-手动批量转移,5-公海领取',`shunt_mode_id` int(11) DEFAULT NULL COMMENT '匹配到的技能组id',`shunt_employee_group_id` int(11) DEFAULT NULL COMMENT '所属分流员工组',PRIMARY KEY (`id`)
) ENGINE=InnoDB  DEFAULT CHARSET=utf8 COLLATE=utf8_bin;

预先导入客户线索表数据至表中,使用命令:source

mysql> source /root/3-customer_clue.sql;

7.2.4 线索申诉表

线索申诉表:customer_appeal,创建表DDL语句:

CREATE TABLE IF NOT EXISTS itcast_nev.customer_appeal
(id int auto_increment primary key COMMENT '主键',customer_relationship_first_id int not NULL COMMENT '第一条客户关系id',employee_id int NULL COMMENT '申诉人',employee_name varchar(64) NULL COMMENT '申诉人姓名',employee_department_id int NULL COMMENT '申诉人部门',employee_tdepart_id int NULL COMMENT '申诉人所属部门',appeal_status int(1) not NULL COMMENT '申诉状态,0:待稽核 1:无效 2:有效',audit_id int NULL COMMENT '稽核人id',audit_name varchar(255) NULL COMMENT '稽核人姓名',audit_department_id int NULL COMMENT '稽核人所在部门',audit_department_name varchar(255) NULL COMMENT '稽核人部门名称',audit_date_time datetime NULL COMMENT '稽核时间',create_date_time datetime DEFAULT CURRENT_TIMESTAMP NULL COMMENT '创建时间(申诉时间)',update_date_time timestamp DEFAULT CURRENT_TIMESTAMP NULL ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',deleted bit DEFAULT b'0'  not NULL COMMENT '删除标志位',tenant int DEFAULT 0 not NULL
)ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;

预先导入线索申诉数据至表中,使用命令:source

mysql> source /root/4-customer_appeal.sql ;

7.2.5 客户访问咨询记录表

客户访问咨询记录表:web_chat_ems,创建表DDL语句:

create table IF NOT EXISTS itcast_nev.web_chat_ems(id int auto_increment primary key comment '主键' ,create_date_time timestamp null comment '数据创建时间',session_id varchar(48) default '' not null comment '七陌sessionId',sid varchar(48) collate utf8_bin  default '' not null comment '访客id',create_time datetime null comment '会话创建时间',seo_source varchar(255) collate utf8_bin default '' null comment '搜索来源',seo_keywords varchar(512) collate utf8_bin default '' null comment '关键字',ip varchar(48) collate utf8_bin  default '' null comment 'IP地址',area varchar(255) collate utf8_bin default '' null comment '地域',country varchar(16) collate utf8_bin  default '' null comment '所在国家',province varchar(16) collate utf8_bin  default '' null comment '省',city varchar(255) collate utf8_bin default '' null comment '城市',origin_channel varchar(32) collate utf8_bin  default '' null comment '投放渠道',user varchar(255) collate utf8_bin default '' null comment '所属坐席',manual_time datetime null comment '人工开始时间',begin_time datetime null comment '坐席领取时间 ',end_time datetime null comment '会话结束时间',last_customer_msg_time_stamp datetime null comment '客户最后一条消息的时间',last_agent_msg_time_stamp datetime null comment '坐席最后一下回复的时间',reply_msg_count int(12) default 0  null comment '客服回复消息数',msg_count int(12) default 0  null comment '客户发送消息数',browser_name varchar(255) collate utf8_bin default '' null comment '浏览器名称',os_info varchar(255) collate utf8_bin default '' null comment '系统名称'
);

预先导入访问咨询记录至表中,使用命令:source

mysql> source /root/5-web_chat_ems.sql;

7.3 Flink CDC 实时数据采集

Flink 1.11 引入了 Flink SQL CDC,方便将RDBMS表数据,实时采集到存储系统,比如Hudi表等,其中MySQL CDC连接器允许从MySQL数据库读取快照数据和增量数据
在这里插入图片描述

7.3.1 开启MySQL binlog

MySQL CDC,需要首先开启MySQL数据库binlog日志,再重启MySQL数据库服务。

  • 第一步、开启MySQL binlog日志
[root@node1 ~]# vim /etc/my.cnf 

在[mysqld]下面添加内容:

server-id=2
log-bin=mysql-bin
binlog_format=row
expire_logs_days=15
binlog_row_image=full

在这里插入图片描述

  • 第二步、重启MySQL Server
service mysqld restart

登录MySQL Client命令行,查看是否生效。
在这里插入图片描述

  • 第三步、下载Flink CDC MySQL Jar包
    由于使用Flink 1.12.2版本,目前支持Flink CDC 版本:1.3.0,添加maven 依赖:
<!-- https://mvnrepository.com/artifact/com.alibaba.ververica/flink-connector-mysql-cdc -->
<dependency><groupId>com.alibaba.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>1.3.0</version>
</dependency>

如果使用Flink SQL Client,需要将jar包放到 $FLINK_HOME/lib 目录中:
在这里插入图片描述

7.3.2 环境准备

实时数据采集,既可以编写Java程序,又可以直接运行DDL语句。

  • 方式一:启动Flink SQL Client,执行编写DDL语句,Flink Job提交到Standalone集群
-- 启动HDFS服务
hadoop-daemon.sh start namenode 
hadoop-daemon.sh start datanode-- 启动Flink Standalone集群
export HADOOP_CLASSPATH=`/export/server/hadoop/bin/hadoop classpath`
/export/server/flink/bin/start-cluster.sh-- 启动SQL Client
/export/server/flink/bin/sql-client.sh embedded \
-j /export/server/flink/lib/hudi-flink-bundle_2.12-0.9.0.jar shell-- 设置属性
set execution.result-mode=tableau;
set execution.checkpointing.interval=3sec;
SET execution.runtime-mode = streaming; 
  • 方式二:使用IDEA创建Maven工程,添加相关依赖,编写程序,执行DDL语句。
    依赖pom.xml添内容如下:
<repositories><repository><id>nexus-aliyun</id><name>Nexus aliyun</name><url>http://maven.aliyun.com/nexus/content/groups/public</url></repository><repository><id>central_maven</id><name>central maven</name><url>https://repo1.maven.org/maven2</url></repository><repository><id>cloudera</id><url>https://repository.cloudera.com/artifactory/cloudera-repos/</url></repository><repository><id>apache.snapshots</id><name>Apache Development Snapshot Repository</name><url>https://repository.apache.org/content/repositories/snapshots/</url><releases><enabled>false</enabled></releases><snapshots><enabled>true</enabled></snapshots></repository>
</repositories><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>${java.version}</maven.compiler.source><maven.compiler.target>${java.version}</maven.compiler.target><java.version>1.8</java.version><scala.binary.version>2.12</scala.binary.version><flink.version>1.12.2</flink.version><hadoop.version>2.7.3</hadoop.version><mysql.version>8.0.16</mysql.version>
</properties><dependencies><!-- Flink Client --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime-web_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><!-- Flink Table API & SQL --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.hudi</groupId><artifactId>hudi-flink-bundle_${scala.binary.version}</artifactId><version>0.9.0</version></dependency><dependency><groupId>com.alibaba.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>1.3.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-shaded-hadoop-2-uber</artifactId><version>2.7.5-10.0</version></dependency><!-- MySQL--><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>${mysql.version}</version></dependency><!-- slf4j及log4j --><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.7</version><scope>runtime</scope></dependency><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.17</version><scope>runtime</scope></dependency></dependencies><build><sourceDirectory>src/main/java</sourceDirectory><testSourceDirectory>src/test/java</testSourceDirectory><plugins><!-- 编译插件 --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.5.1</version><configuration><source>1.8</source><target>1.8</target><!--<encoding>${project.build.sourceEncoding}</encoding>--></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-surefire-plugin</artifactId><version>2.18.1</version><configuration><useFile>false</useFile><disableXmlReport>true</disableXmlReport><includes><include>**/*Test.*</include><include>**/*Suite.*</include></includes></configuration></plugin><!-- 打jar包插件(会包含所有依赖) --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>2.3</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><filters><filter><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers><transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"></transformers></configuration></execution></executions></plugin></plugins>
</build>

编写程序,实现数据实时采集同步,主要三个步骤:输入表InputTable、输出表outputTable,查询插入INSERT…SELECT语句,示意图如下:
在这里插入图片描述

本次案例,为了更加只管看到效果,启动Flink SQL Client客户端,编写DDL和DML语句,直接执行。

7.3.3 实时采集数据

基于Flink CDC 实时采集数据,需要创建输入Input和输出Output两张表,再编写INSERT…SELECT 插入查询语句
在这里插入图片描述

接下来将MySQL数据库5张业务数据表数据,实时采集同步到Hudi表中(存储HDFS文件系统)。

7.3.3.1 客户信息表

同步客户信息表【customer】数据到Hudi表中,按照上述步骤编写DDL和DML语句并执行。

  • 第一步、输入表InputTable
create table tbl_customer_mysql (id STRING PRIMARY KEY NOT ENFORCED,customer_relationship_id STRING,create_date_time STRING,update_date_time STRING,deleted STRING,name STRING,idcard STRING,birth_year STRING,gender STRING,phone STRING,wechat STRING,qq STRING,email STRING,area STRING,leave_school_date STRING,graduation_date STRING,bxg_student_id STRING,creator STRING,origin_type STRING,origin_channel STRING,tenant STRING,md_id STRING
)WITH ('connector' = 'mysql-cdc','hostname' = 'node1.itcast.cn','port' = '3306','username' = 'root','password' = '123456','server-time-zone' = 'Asia/Shanghai','debezium.snapshot.mode' = 'initial','database-name' = 'itcast_nev','table-name' = 'customer'
);
  • 第二步、输出表OutputTable
CREATE TABLE edu_customer_hudi(id STRING PRIMARY KEY NOT ENFORCED,customer_relationship_id STRING,create_date_time STRING,update_date_time STRING,deleted STRING,name STRING,idcard STRING,birth_year STRING,gender STRING,phone STRING,wechat STRING,qq STRING,email STRING,area STRING,leave_school_date STRING,graduation_date STRING,bxg_student_id STRING,creator STRING,origin_type STRING,origin_channel STRING,tenant STRING,md_id STRING,part STRING
)
PARTITIONED BY (part)
WITH('connector'='hudi','path'= 'hdfs://node1.itcast.cn:8020/hudi-warehouse/edu_customer_hudi', 'table.type'= 'MERGE_ON_READ','hoodie.datasource.write.recordkey.field'= 'id', 'write.precombine.field'= 'create_date_time','write.tasks'= '1','write.rate.limit'= '2000', 'compaction.tasks'= '1', 'compaction.async.enabled'= 'true','compaction.trigger.strategy'= 'num_commits','compaction.delta_commits'= '1','changelog.enabled'= 'true'
);
  • 第三步、插入查询语句
insert into edu_customer_hudi 
select *, CAST(CURRENT_DATE AS STRING) AS part from tbl_customer_mysql;

此时生成Flink job,提交到Standalone集群运行,首先将表中历史数据同步到Hudi表,再实时同步增量数据。
在这里插入图片描述

7.3.3.2 客户意向表

同步客户意向表【customer_relationship】数据到Hudi表中,按照上述步骤编写DDL和DML语句并执行。

  • 第一步、输入表InputTable
create table tbl_customer_relationship_mysql (id string PRIMARY KEY NOT ENFORCED,create_date_time string,update_date_time string,deleted string,customer_id string,first_id string,belonger string,belonger_name string,initial_belonger string,distribution_handler string,business_scrm_department_id string,last_visit_time string,next_visit_time string,origin_type string,itcast_school_id string,itcast_subject_id string,intention_study_type string,anticipat_signup_date string,`level` string,creator string,current_creator string,creator_name string,origin_channel string,`comment` string,first_customer_clue_id string,last_customer_clue_id string,process_state string,process_time string,payment_state string,payment_time string,signup_state string,signup_time string,notice_state string,notice_time string,lock_state string,lock_time string,itcast_clazz_id string,itcast_clazz_time string,payment_url string,payment_url_time string,ems_student_id string,delete_reason string,deleter string,deleter_name string,delete_time string,course_id string,course_name string,delete_comment string,close_state string,close_time string,appeal_id string,tenant string,total_fee string,belonged string,belonged_time string,belonger_time string,transfer string,transfer_time string,follow_type string,transfer_bxg_oa_account string,transfer_bxg_belonger_name string
)WITH('connector' = 'mysql-cdc','hostname' = 'node1.itcast.cn','port' = '3306','username' = 'root','password' = '123456','server-time-zone' = 'Asia/Shanghai','debezium.snapshot.mode' = 'initial','database-name' = 'itcast_nev','table-name' = 'customer_relationship'
);
  • 第二步、输出表OutputTable
    create table edu_customer_relationship_hudi(
    id string PRIMARY KEY NOT ENFORCED,
    create_date_time string,
    update_date_time string,
    deleted string,
    customer_id string,
    first_id string,
    belonger string,
    belonger_name string,
    initial_belonger string,
    distribution_handler string,
    business_scrm_department_id string,
    last_visit_time string,
    next_visit_time string,
    origin_type string,
    itcast_school_id string,
    itcast_subject_id string,
    intention_study_type string,
    anticipat_signup_date string,
    level string,
    creator string,
    current_creator string,
    creator_name string,
    origin_channel string,
    comment string,
    first_customer_clue_id string,
    last_customer_clue_id string,
    process_state string,
    process_time string,
    payment_state string,
    payment_time string,
    signup_state string,
    signup_time string,
    notice_state string,
    notice_time string,
    lock_state string,
    lock_time string,
    itcast_clazz_id string,
    itcast_clazz_time string,
    payment_url string,
    payment_url_time string,
    ems_student_id string,
    delete_reason string,
    deleter string,
    deleter_name string,
    delete_time string,
    course_id string,
    course_name string,
    delete_comment string,
    close_state string,
    close_time string,
    appeal_id string,
    tenant string,
    total_fee string,
    belonged string,
    belonged_time string,
    belonger_time string,
    transfer string,
    transfer_time string,
    follow_type string,
    transfer_bxg_oa_account string,
    transfer_bxg_belonger_name string,
    part STRING
    )
    PARTITIONED BY (part)
    WITH(
    ‘connector’=‘hudi’,
    ‘path’= ‘hdfs://node1.itcast.cn:8020/hudi-warehouse/edu_customer_relationship_hudi’,
    ‘table.type’= ‘MERGE_ON_READ’,
    ‘hoodie.datasource.write.recordkey.field’= ‘id’,
    ‘write.precombine.field’= ‘create_date_time’,
    ‘write.tasks’= ‘1’,
    ‘write.rate.limit’= ‘2000’,
    ‘compaction.tasks’= ‘1’,
    ‘compaction.async.enabled’= ‘true’,
    ‘compaction.trigger.strategy’= ‘num_commits’,
    ‘compaction.delta_commits’= ‘1’,
    ‘changelog.enabled’= ‘true’
    );

  • 第三步、插入查询语句

insert into edu_customer_relationship_hudi 
select *, CAST(CURRENT_DATE AS STRING) AS part from tbl_customer_relationship_mysql;

查看HDFS文件系统,同步全量数据存储Hudi目录:
在这里插入图片描述

7.3.3.3 客户线索表

同步客户线索表【customer_clue】数据到Hudi表,按照上述步骤编写DDL和DML语句并执行。

  • 第一步、输入表InputTable
create table tbl_customer_clue_mysql (id string PRIMARY KEY NOT ENFORCED,create_date_time string,update_date_time string,deleted string,customer_id string,customer_relationship_id string,session_id string,sid string,status string,`user` string,create_time string,platform string,s_name string,seo_source string,seo_keywords string,ip string,referrer string,from_url string,landing_page_url string,url_title string,to_peer string,manual_time string,begin_time string,reply_msg_count string,total_msg_count string,msg_count string,`comment` string,finish_reason string,finish_user string,end_time string,platform_description string,browser_name string,os_info string,area string,country string,province string,city string,creator string,name string,idcard string,phone string,itcast_school_id string,itcast_school string,itcast_subject_id string,itcast_subject string,wechat string,qq string,email string,gender string,`level` string,origin_type string,information_way string,working_years string,technical_directions string,customer_state string,valid string,anticipat_signup_date string,clue_state string,scrm_department_id string,superior_url string,superior_source string,landing_url string,landing_source string,info_url string,info_source string,origin_channel string,course_id string,course_name string,zhuge_session_id string,is_repeat string,tenant string,activity_id string,activity_name string,follow_type string,shunt_mode_id string,shunt_employee_group_id string
)WITH('connector' = 'mysql-cdc','hostname' = 'node1.itcast.cn','port' = '3306','username' = 'root','password' = '123456','server-time-zone' = 'Asia/Shanghai','debezium.snapshot.mode' = 'initial','database-name' = 'itcast_nev','table-name' = 'customer_clue'
);
  • 第二步、输出表OutputTable
create table edu_customer_clue_hudi (id string PRIMARY KEY NOT ENFORCED,create_date_time string,update_date_time string,deleted string,customer_id string,customer_relationship_id string,session_id string,sid string,status string,`user` string,create_time string,platform string,s_name string,seo_source string,seo_keywords string,ip string,referrer string,from_url string,landing_page_url string,url_title string,to_peer string,manual_time string,begin_time string,reply_msg_count string,total_msg_count string,msg_count string,`comment` string,finish_reason string,finish_user string,end_time string,platform_description string,browser_name string,os_info string,area string,country string,province string,city string,creator string,name string,idcard string,phone string,itcast_school_id string,itcast_school string,itcast_subject_id string,itcast_subject string,wechat string,qq string,email string,gender string,`level` string,origin_type string,information_way string,working_years string,technical_directions string,customer_state string,valid string,anticipat_signup_date string,clue_state string,scrm_department_id string,superior_url string,superior_source string,landing_url string,landing_source string,info_url string,info_source string,origin_channel string,course_id string,course_name string,zhuge_session_id string,is_repeat string,tenant string,activity_id string,activity_name string,follow_type string,shunt_mode_id string,shunt_employee_group_id string,part STRING
)
PARTITIONED BY (part)
WITH('connector'='hudi','path'= 'hdfs://node1.itcast.cn:8020/hudi-warehouse/edu_customer_clue_hudi', 'table.type'= 'MERGE_ON_READ','hoodie.datasource.write.recordkey.field'= 'id', 'write.precombine.field'= 'create_date_time','write.tasks'= '1','write.rate.limit'= '2000', 'compaction.tasks'= '1', 'compaction.async.enabled'= 'true','compaction.trigger.strategy'= 'num_commits','compaction.delta_commits'= '1','changelog.enabled'= 'true'
);
  • 第三步、插入查询语句
insert into edu_customer_clue_hudi 
select *, CAST(CURRENT_DATE AS STRING) AS part from tbl_customer_clue_mysql;

查看HDFS文件系统,同步全量数据存储Hudi目录:
在这里插入图片描述

7.3.3.4 客户申诉表

同步客户申诉表【customer_appeal】数据到Hudi表,按照上述步骤编写DDL和DML语句执行。

  • 第一步、输入表InputTable
create table tbl_customer_appeal_mysql (id string PRIMARY KEY NOT ENFORCED,customer_relationship_first_id string,employee_id string,employee_name string,employee_department_id string,employee_tdepart_id string,appeal_status string,audit_id string,audit_name string,audit_department_id string,audit_department_name string,audit_date_time string,create_date_time string,update_date_time string,deleted string,tenant string
)WITH ('connector' = 'mysql-cdc','hostname' = 'node1.itcast.cn','port' = '3306','username' = 'root','password' = '123456','server-time-zone' = 'Asia/Shanghai','debezium.snapshot.mode' = 'initial','database-name' = 'itcast_nev','table-name' = 'customer_appeal'
);
  • 第二步、输出表OutputTable
create table edu_customer_appeal_hudi (id string PRIMARY KEY NOT ENFORCED,customer_relationship_first_id STRING,employee_id STRING,employee_name STRING,employee_department_id STRING,employee_tdepart_id STRING,appeal_status STRING,audit_id STRING,audit_name STRING,audit_department_id STRING,audit_department_name STRING,audit_date_time STRING,create_date_time STRING,update_date_time STRING,deleted STRING,tenant STRING,part STRING
)
PARTITIONED BY (part)
WITH('connector'='hudi','path'= 'hdfs://node1.itcast.cn:8020/hudi-warehouse/edu_customer_appeal_hudi', 'table.type'= 'MERGE_ON_READ','hoodie.datasource.write.recordkey.field'= 'id', 'write.precombine.field'= 'create_date_time','write.tasks'= '1','write.rate.limit'= '2000', 'compaction.tasks'= '1', 'compaction.async.enabled'= 'true','compaction.trigger.strategy'= 'num_commits','compaction.delta_commits'= '1','changelog.enabled'= 'true'
);
  • 第三步、插入查询语句
insert into edu_customer_appeal_hudi 
select *, CAST(CURRENT_DATE AS STRING) AS part from tbl_customer_appeal_mysql;

查看HDFS文件系统,同步全量数据存储Hudi目录:
在这里插入图片描述

7.3.3.5 客户访问咨询记录表

同步客服访问咨询记录表【web_chat_ems】数据到Hudi表中,按照上述步骤编写DDL和DML语句并执行。

  • 第一步、输入表InputTable
create table tbl_web_chat_ems_mysql (id string PRIMARY KEY NOT ENFORCED,create_date_time string,session_id string,sid string,create_time string,seo_source string,seo_keywords string,ip string,area string,country string,province string,city string,origin_channel string,`user` string,manual_time string,begin_time string,end_time string,last_customer_msg_time_stamp string,last_agent_msg_time_stamp string,reply_msg_count string,msg_count string,browser_name string,os_info string
)WITH('connector' = 'mysql-cdc','hostname' = 'node1.itcast.cn','port' = '3306','username' = 'root','password' = '123456','server-time-zone' = 'Asia/Shanghai','debezium.snapshot.mode' = 'initial','database-name' = 'itcast_nev','table-name' = 'web_chat_ems'
);
  • 第二步、输出表OutputTable
create table edu_web_chat_ems_hudi (id string PRIMARY KEY NOT ENFORCED,create_date_time string,session_id string,sid string,create_time string,seo_source string,seo_keywords string,ip string,area string,country string,province string,city string,origin_channel string,`user` string,manual_time string,begin_time string,end_time string,last_customer_msg_time_stamp string,last_agent_msg_time_stamp string,reply_msg_count string,msg_count string,browser_name string,os_info string,part STRING
)
PARTITIONED BY (part)
WITH('connector'='hudi','path'= 'hdfs://node1.itcast.cn:8020/hudi-warehouse/edu_web_chat_ems_hudi', 'table.type'= 'MERGE_ON_READ','hoodie.datasource.write.recordkey.field'= 'id', 'write.precombine.field'= 'create_date_time','write.tasks'= '1','write.rate.limit'= '2000', 'compaction.tasks'= '1', 'compaction.async.enabled'= 'true','compaction.trigger.strategy'= 'num_commits','compaction.delta_commits'= '1','changelog.enabled'= 'true'
);
  • 第三步、插入查询语句
insert into edu_web_chat_ems_hudi 
select *, CAST(CURRENT_DATE AS STRING) AS part from tbl_web_chat_ems_mysql;

查看HDFS文件系统,同步全量数据存储Hudi目录:
在这里插入图片描述

至此将传智教育核心客户相关业务数据,采集同步到Hudi表中,此时5个Flink job依然在Standalone集群上运行,如果各个表中有业务数据产生,同样实时获取,存储到Hudi表中。
在这里插入图片描述

7.4 Presto 即席分析

使用Presto 分析Hudi表数据,最终将结果直接存储到MySQL数据库表中,示意图如下。

  • 第一、Hive 中创建表,关联Hudi表
  • 第二、Presto集成Hive,加载Hive表数据
  • 第三、Presto集成MySQL,读取或者保存数据

7.4.1 Presto 是什么

Presto是一款Facebook开源的MPP架构的OLAP查询引擎,可针对不同数据源执行大容量数据集的一款分布式SQL执行引擎。适用于交互式分析查询,数据量支持GB到PB字节。

  • 1、清晰的架构,是一个能够独立运行的系统,不依赖于任何其他外部系统。例如调度,presto自身提供了对集群的监控,可以根据监控信息完成调度。
  • 2、简单的数据结构,列式存储,逻辑行,大部分数据都可以轻易的转化成presto所需要的这种数据结构。
  • 3、丰富的插件接口,完美对接外部存储系统,或者添加自定义的函数。
    在这里插入图片描述

官网:https://prestodb.io/
Presto采用典型的master-slave模型,由一个Coordinator节点,一个Discovery Server节点,多个Worker节点组成,Discovery Server通常内嵌于Coordinator节点中。
在这里插入图片描述

  • 1、coordinator(master)负责meta管理,worker管理,query的解析和调度
  • 2、worker则负责计算和读写
  • 3、discovery server, 通常内嵌于coordinator节点中,也可以单独部署,用于节点心跳。在下文中,默认discovery和coordinator共享一台机器。

Presto 数据模型:采取三层表结构
在这里插入图片描述

  • 1、catalog 对应某一类数据源,例如hive的数据,或mysql的数据
  • 2、schema 对应mysql中的数据库
  • 3、table 对应mysql中的表

7.4.2 Presto 安装部署

采用单节点部署安装Presto,服务器名称:node1.itcast.cn,IP地址:192.168.88.100。

  • 1、JDK8安装
java -version

在这里插入图片描述

  • 2、上传解压Presto安装包
# 创建安装目录
mkdir -p /export/server# yum安装上传文件插件lrzsz
yum install -y lrzsz# 上传安装包到node1的/export/server目录
presto-server-0.245.1.tar.gz# 解压、重命名
tar -xzvf presto-server-0.245.1.tar.gz
ln -s presto-server-0.245.1 presto#创建配置文件存储目录
mkdir -p /export/server/presto/etc
  • 3、配置presto
  • etc/config.properties
vim /export/server/presto/etc/config.properties

内容:

coordinator=true
node-scheduler.include-coordinator=true
http-server.http.port=8090
query.max-memory=6GB
query.max-memory-per-node=2GB
query.max-total-memory-per-node=2GB
discovery-server.enabled=true
discovery.uri=http://192.168.88.100:8090
  • etc/jvm.config
vim /export/server/presto/etc/jvm.config

内容:

-server
-Xmx3G
-XX:+UseG1GC
-XX:G1HeapRegionSize=32M
-XX:+UseGCOverheadLimit
-XX:+ExplicitGCInvokesConcurrent
-XX:+HeapDumpOnOutOfMemoryError
-XX:+ExitOnOutOfMemoryError
  • etc/node.properties
vim /export/server/presto/etc/node.properties

内容:

node.environment=hudipresto
node.id=presto-node1
node.data-dir=/export/server/presto/data
  • etc/catalog/hive.properties
mkdir -p /export/server/presto/etc/catalog
vim /export/server/presto/etc/catalog/hive.properties

内容:
connector.name=hive-hadoop2
hive.metastore.uri=thrift://192.168.88.100:9083
hive.parquet.use-column-names=true
hive.config.resources=/export/server/presto/etc/catalog/core-site.xml,/export/server/presto/etc/catalog/hdfs-site.xml

  • etc/catalog/mysql.properties
vim /export/server/presto/etc/catalog/mysql.properties

内容:

connector.name=mysql
connection-url=jdbc:mysql://node1.itcast.cn:3306
connection-user=root
connection-password=123456
  • 4、启动服务
    进入Presto安装目录,执行 $PRESTO_HOME/bin中脚本
/export/server/presto/bin/launcher start

使用jps查看进程是否存在,进程名称:PrestoServer
在这里插入图片描述

此外WEB UI界面:

http://192.168.88.100:8090/ui/
在这里插入图片描述

  • 5、Presto CLI命令行客户端
    下载CLI客户端
presto-cli-0.241-executable.jar

上传presto-cli-0.245.1-executable.jar到/export/server/presto/bin

mv presto-cli-0.245.1-executable.jar presto
chmod +x presto

CLI客户端启动

/export/server/presto/bin/presto --server 192.168.88.100:8090

在这里插入图片描述

7.4.3 Hive 创建表

为了让Presto分析Hudi表中数据,需要将Hudi表映射关联到Hive表中。接下来,再Hive中创建5张传智教育客户业务数据表,映射关联到Hudi表。
在这里插入图片描述

启动HDFS服务、HiveMetaStore和HiveServer服务,运行Beeline命令行:

-- 启动HDFS服务
hadoop-daemon.sh start namenode 
hadoop-daemon.sh start datanode-- Hive服务
/export/server/hive/bin/start-metastore.sh 
/export/server/hive/bin/start-hiveserver2.sh-- 启动Beeline客户端
/export/server/hive/bin/beeline -u jdbc:hive2://node1.itcast.cn:10000 -n root -p 123456
设置Hive本地模式,方便测试使用:
-- 设置Hive本地模式
set hive.exec.mode.local.auto=true;
set hive.exec.mode.local.auto.tasks.max=10;
set hive.exec.mode.local.auto.inputbytes.max=50000000;

7.4.3.1 创建数据库

创建传智教育数据存储数据库database:

-- 创建数据库
CREATE DATABASE IF NOT EXISTS edu_hudi ;
-- 使用数据库
USE edu_hudi ;

7.4.3.2 客户信息表

编写DDL语句创建表:

CREATE EXTERNAL TABLE edu_hudi.tbl_customer(id string,customer_relationship_id string,create_date_time string,update_date_time string,deleted string,name string,idcard string,birth_year string,gender string,phone string,wechat string,qq string,email string,area string,leave_school_date string,graduation_date string,bxg_student_id string,creator string,origin_type string,origin_channel string,tenant string,md_id string
)PARTITIONED BY (day_str string)
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' 
STORED AS INPUTFORMAT 'org.apache.hudi.hadoop.HoodieParquetInputFormat' 
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION '/hudi-warehouse/edu_customer_hudi' ;

由于是分区表,所以添加分区:

ALTER TABLE edu_hudi.tbl_customer ADD IF NOT EXISTS PARTITION(day_str='2021-11-29') 
location '/hudi-warehouse/edu_customer_hudi/2021-11-29' ;

7.4.3.3 客户意向表

编写DDL语句创建表:

CREATE EXTERNAL TABLE edu_hudi.tbl_customer_relationship(id string,create_date_time string,update_date_time string,deleted string,customer_id string,first_id string,belonger string,belonger_name string,initial_belonger string,distribution_handler string,business_scrm_department_id string,last_visit_time string,next_visit_time string,origin_type string,itcast_school_id string,itcast_subject_id string,intention_study_type string,anticipat_signup_date string,`level` string,creator string,current_creator string,creator_name string,origin_channel string,`comment` string,first_customer_clue_id string,last_customer_clue_id string,process_state string,process_time string,payment_state string,payment_time string,signup_state string,signup_time string,notice_state string,notice_time string,lock_state string,lock_time string,itcast_clazz_id string,itcast_clazz_time string,payment_url string,payment_url_time string,ems_student_id string,delete_reason string,deleter string,deleter_name string,delete_time string,course_id string,course_name string,delete_comment string,close_state string,close_time string,appeal_id string,tenant string,total_fee string,belonged string,belonged_time string,belonger_time string,transfer string,transfer_time string,follow_type string,transfer_bxg_oa_account string,transfer_bxg_belonger_name string
)PARTITIONED BY (day_str string)
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' 
STORED AS INPUTFORMAT 'org.apache.hudi.hadoop.HoodieParquetInputFormat' 
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION '/hudi-warehouse/edu_customer_relationship_hudi' ;

由于是分区表,所以添加分区:

ALTER TABLE edu_hudi.tbl_customer_relationship ADD IF NOT EXISTS PARTITION(day_str='2021-11-29') 
location '/hudi-warehouse/edu_customer_relationship_hudi/2021-11-29' ;

7.4.3.4 客户线索表

编写DDL语句创建表:

CREATE EXTERNAL TABLE edu_hudi.tbl_customer_clue(id string,create_date_time string,update_date_time string,deleted string,customer_id string,customer_relationship_id string,session_id string,sid string,status string,`user` string,create_time string,platform string,s_name string,seo_source string,seo_keywords string,ip string,referrer string,from_url string,landing_page_url string,url_title string,to_peer string,manual_time string,begin_time string,reply_msg_count string,total_msg_count string,msg_count string,`comment` string,finish_reason string,finish_user string,end_time string,platform_description string,browser_name string,os_info string,area string,country string,province string,city string,creator string,name string,idcard string,phone string,itcast_school_id string,itcast_school string,itcast_subject_id string,itcast_subject string,wechat string,qq string,email string,gender string,`level` string,origin_type string,information_way string,working_years string,technical_directions string,customer_state string,valid string,anticipat_signup_date string,clue_state string,scrm_department_id string,superior_url string,superior_source string,landing_url string,landing_source string,info_url string,info_source string,origin_channel string,course_id string,course_name string,zhuge_session_id string,is_repeat string,tenant string,activity_id string,activity_name string,follow_type string,shunt_mode_id string,shunt_employee_group_id string
)
PARTITIONED BY (day_str string)
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' 
STORED AS INPUTFORMAT 'org.apache.hudi.hadoop.HoodieParquetInputFormat' 
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION '/hudi-warehouse/edu_customer_clue_hudi' ;

由于是分区表,所以添加分区:

ALTER TABLE edu_hudi.tbl_customer_clue ADD IF NOT EXISTS PARTITION(day_str='2021-11-29') 
location '/hudi-warehouse/edu_customer_clue_hudi/2021-11-29' ;

7.4.3.5 客户申诉表

编写DDL语句创建表:

CREATE EXTERNAL TABLE edu_hudi.tbl_customer_appeal(id string,customer_relationship_first_id STRING,employee_id STRING,employee_name STRING,employee_department_id STRING,employee_tdepart_id STRING,appeal_status STRING,audit_id STRING,audit_name STRING,audit_department_id STRING,audit_department_name STRING,audit_date_time STRING,create_date_time STRING,update_date_time STRING,deleted STRING,tenant STRING
)
PARTITIONED BY (day_str string)
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' 
STORED AS INPUTFORMAT 'org.apache.hudi.hadoop.HoodieParquetInputFormat' 
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION '/hudi-warehouse/edu_customer_appeal_hudi' ;

由于是分区表,所以添加分区:

ALTER TABLE edu_hudi.tbl_customer_appeal ADD IF NOT EXISTS PARTITION(day_str='2021-11-29') 
location '/hudi-warehouse/edu_customer_appeal_hudi/2021-11-29' ;

7.4.3.6 客户访问咨询记录表

编写DDL语句创建表:

CREATE EXTERNAL TABLE edu_hudi.tbl_web_chat_ems (id string,create_date_time string,session_id string,sid string,create_time string,seo_source string,seo_keywords string,ip string,area string,country string,province string,city string,origin_channel string,`user` string,manual_time string,begin_time string,end_time string,last_customer_msg_time_stamp string,last_agent_msg_time_stamp string,reply_msg_count string,msg_count string,browser_name string,os_info string
)
PARTITIONED BY (day_str string)
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' 
STORED AS INPUTFORMAT 'org.apache.hudi.hadoop.HoodieParquetInputFormat' 
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION '/hudi-warehouse/edu_web_chat_ems_hudi' ;

由于是分区表,所以添加分区:

ALTER TABLE edu_hudi.tbl_web_chat_ems ADD IF NOT EXISTS PARTITION(day_str='2021-11-29') 
location '/hudi-warehouse/edu_web_chat_ems_hudi/2021-11-29' ;

7.4.4 离线指标分析

使用Presto分析Hudi表数据,需要将集成jar包:hudi-presto-bundle-0.9.0.jar,放入到Presto插件目录:/export/server/presto/plugin/hive-hadoop2中:
在这里插入图片描述

启动Presto Client 客户端命令行,查看Hive中创建数据库:
在这里插入图片描述

使用数据库:edu_hudi,查看有哪些表:
在这里插入图片描述
接下来,按照业务指标需求,使用Presto,分析Hudi表数据,将指标直接保存MySQL数据库。
在这里插入图片描述

首先在MySQL数据库中,创建database,专门存储分析指标表:

-- 创建数据库
CREATE DATABASE `itcast_rpt` /*!40100 DEFAULT CHARACTER SET utf8 */;

7.4.4.1 每日报名量

对客户意向表数据统计分析:每日客户报名量,先创建MySQL表,再编写SQL,最后保存数据。

  • MySQL表:itcast_rpt.stu_apply
CREATE TABLE  IF NOT EXISTS `itcast_rpt`.`stu_apply` (`report_date` longtext,`report_total` bigint(20) NOT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
  • 指标SQL语句:
WITH tmp AS (SELECT format_datetime(from_unixtime(cast(payment_time as bigint) / 1000),'yyyy-MM-dd')AS day_value, customer_id FROM hive.edu_hudi.tbl_customer_relationship WHERE day_str = '2021-11-29' AND payment_time IS NOT NULL AND payment_state = 'PAID' AND deleted = 'false'
)
SELECT day_value, COUNT(customer_id) AS total FROM tmp GROUP BY day_value ;
  • 分析结果保存MySQL表:
INSERT INTO mysql.itcast_rpt.stu_apply (report_date, report_total) 
SELECT day_value, total FROM (SELECT day_value, COUNT(customer_id) AS total FROM (SELECT format_datetime(from_unixtime(cast(payment_time as bigint) / 1000), 'yyyy-MM-dd')AS day_value, customer_id FROM hive.edu_hudi.tbl_customer_relationship WHERE day_str = '2021-11-29' AND payment_time IS NOT NULL AND payment_state = 'PAID' AND deleted = 'false') GROUP BY day_value
) ;

查看数据库表中数据:
在这里插入图片描述

7.4.4.2 每日访问量

对客户意向表数据统计分析:每日客户访问量,先创建MySQL表,再编写SQL,最后保存数据。

  • MySQL表:itcast_rpt.web_pv
CREATE TABLE  IF NOT EXISTS `itcast_rpt`.`web_pv` (`report_date` longtext,`report_total` bigint(20) NOT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
  • 指标SQL语句:
WITH tmp AS (SELECT id, format_datetime(from_unixtime(cast(create_time as bigint) / 1000), 'yyyy-MM-dd')AS day_valueFROM hive.edu_hudi.tbl_web_chat_ems WHERE day_str = '2021-11-29' 
)
SELECT day_value, COUNT(id) AS total FROM tmp GROUP BY day_value ;
  • 分析结果保存MySQL表:
INSERT INTO mysql.itcast_rpt.web_pv (report_date, report_total) 
SELECT day_value, COUNT(id) AS total FROM (SELECT id, format_datetime(from_unixtime(cast(create_time as bigint) / 1000), 'yyyy-MM-dd') AS day_valueFROM hive.edu_hudi.tbl_web_chat_ems WHERE day_str = '2021-11-29' 
) GROUP BY day_value ;

查看数据库表中数据:
在这里插入图片描述

7.4.4.3 每日意向数

对客户意向表数据统计分析:每日客户意向数,先创建MySQL表,再编写SQL,最后保存数据。

  • MySQL表:itcast_rpt.stu_intention
CREATE TABLE  IF NOT EXISTS `itcast_rpt`.`stu_intention` (`report_date` longtext,`report_total` bigint(20) NOT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
  • 指标SQL语句:
WITH tmp AS (SELECT id, format_datetime(from_unixtime(cast(create_date_time as bigint) / 1000), 'yyyy-MM-dd')AS day_valueFROM hive.edu_hudi.tbl_customer_relationship WHERE day_str = '2021-11-29' AND create_date_time IS NOT NULL AND deleted = 'false'
)
SELECT day_value, COUNT(id) AS total FROM tmp GROUP BY day_value ;
  • 分析结果保存MySQL表:
INSERT INTO mysql.itcast_rpt.stu_intention (report_date, report_total) 
SELECT day_value, COUNT(id) AS total FROM (SELECT id, format_datetime(from_unixtime(cast(create_date_time as bigint) / 1000), 'yyyy-MM-dd')AS day_valueFROM hive.edu_hudi.tbl_customer_relationship WHERE day_str = '2021-11-29' AND create_date_time IS NOT NULL AND deleted = 'false'
) GROUP BY day_value ;

查看数据库表中数据:
在这里插入图片描述

7.4.4.4 每日线索量

对客户意向表数据统计分析:每日客户线索量,先创建MySQL表,再编写SQL,最后保存数据。

  • MySQL表:itcast_rpt.stu_clue
CREATE TABLE IF NOT EXISTS `itcast_rpt`.`stu_clue` (`report_date` longtext,`report_total` bigint(20) NOT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
  • 指标SQL语句:
WITH tmp AS (SELECT id, format_datetime(from_unixtime(cast(create_date_time as bigint) / 1000), 'yyyy-MM-dd')AS day_valueFROM hive.edu_hudi.tbl_customer_clue WHERE day_str = '2021-11-29' AND clue_state IS NOT NULL AND deleted = 'false'
)
SELECT day_value, COUNT(id) AS total FROM tmp GROUP BY day_value ;
  • 分析结果保存MySQL表:
INSERT INTO mysql.itcast_rpt.stu_clue (report_date, report_total) 
SELECT day_value, COUNT(id) AS total FROM (SELECT id, format_datetime(from_unixtime(cast(create_date_time as bigint) / 1000), 'yyyy-MM-dd')AS day_valueFROM hive.edu_hudi.tbl_customer_clue WHERE day_str = '2021-11-29' AND clue_state IS NOT NULL AND deleted = 'false'
) GROUP BY day_value ;

查看数据库表中数据:
在这里插入图片描述

7.5 Flink SQL 流式分析

使用Flink SQL流式查询Hudi表今日实时数据,统计离线指标对应今日实时指标,最后使用FineBI实时大屏展示。
在这里插入图片描述

基于Flink SQL Connector与Hudi和MySQL集成,编写SQL流式查询分析,在SQL Clientk客户端命令行执行DDL语句和SELECT语句。

7.5.1 业务需求

实时对传智教育客户每日业务数据进行基本指标统计,如下所示:
在这里插入图片描述

总共有5个指标,涉及到3张业务表:客户访问记录表、客户线索表和客户意向表,其中每个指标实时数据存储到MySQL数据库中一张表。
在这里插入图片描述

每个实时指标统计,分为三个步骤:

  • 第1步、创建输入表,流式加载Hudi表数据;
  • 第2步、创建输出表,实时保存数据至MySQL表;
  • 第3步、依据业务,编写SQL语句,查询输入表数据,并将结果插入输出表;
    在这里插入图片描述

7.5.2 创建MySQL表

每个实时指标存储到MySQL数据库一张表,首先创建5个指标对应的5张表,名称不一样,字段一样,DDL语句如下:

  • 指标1:今日访问量
CREATE TABLE `itcast_rpt`.`realtime_web_pv` (`report_date` varchar(255) NOT NULL,`report_total` bigint(20) NOT NULL,PRIMARY KEY (`report_date`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
  • 指标2:今日咨询量
CREATE TABLE `itcast_rpt`.`realtime_stu_consult` (`report_date` varchar(255) NOT NULL,`report_total` bigint(20) NOT NULL,PRIMARY KEY (`report_date`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
  • 指标3:今日意向数
CREATE TABLE `itcast_rpt`.`realtime_stu_intention` (`report_date` varchar(255) NOT NULL,`report_total` bigint(20) NOT NULL,PRIMARY KEY (`report_date`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
  • 指标4:今日报名人数
CREATE TABLE `itcast_rpt`.`realtime_stu_apply` (`report_date` varchar(255) NOT NULL,`report_total` bigint(20) NOT NULL,PRIMARY KEY (`report_date`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
  • 指标5:今日有效线索量
CREATE TABLE `itcast_rpt`.`realtime_stu_clue` (`report_date` varchar(255) NOT NULL,`report_total` bigint(20) NOT NULL,PRIMARY KEY (`report_date`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

7.5.3 实时指标分析

实时统计5个指标,加载3个Hudi表数据,如下所示:
在这里插入图片描述

  • 1、今日访问量和今日咨询量,流式加载表:edu_web_chat_ems_hudi 数据
    在这里插入图片描述

  • 2、今日意向数和今日报名人数,流式加载表:edu_customer_relationship_hudi数据
    在这里插入图片描述

  • 3、今日有效线索量,流式加载表:edu_customer_clue_hudi 数据
    在这里插入图片描述

启动HDFS服务和Standalone集群,运行SQL Client客户端,设置属性:

-- 启动HDFS服务
hadoop-daemon.sh start namenode 
hadoop-daemon.sh start datanode-- 启动Flink Standalone集群
export HADOOP_CLASSPATH=`/export/server/hadoop/bin/hadoop classpath`
/export/server/flink/bin/start-cluster.sh-- 启动SQL Client
/export/server/flink/bin/sql-client.sh embedded \
-j /export/server/flink/lib/hudi-flink-bundle_2.12-0.9.0.jar shell-- 设置属性
set execution.result-mode=tableau;
set execution.checkpointing.interval=3sec;
-- 流处理模式
SET execution.runtime-mode = streaming; 

7.5.3.1 今日访问量

在这里插入图片描述

首先创建输入表:流式加载,Hudi表数据:

CREATE TABLE edu_web_chat_ems_hudi (id string PRIMARY KEY NOT ENFORCED,create_date_time string,session_id string,sid string,create_time string,seo_source string,seo_keywords string,ip string,area string,country string,province string,city string,origin_channel string,`user` string,manual_time string,begin_time string,end_time string,last_customer_msg_time_stamp string,last_agent_msg_time_stamp string,reply_msg_count string,msg_count string,browser_name string,os_info string,part STRING
)
PARTITIONED BY (part)
WITH('connector'='hudi','path'= 'hdfs://node1.itcast.cn:8020/hudi-warehouse/edu_web_chat_ems_hudi', 'table.type'= 'MERGE_ON_READ','hoodie.datasource.write.recordkey.field'= 'id', 'write.precombine.field'= 'create_date_time','read.streaming.enabled' = 'true','read.streaming.check-interval' = '5','read.tasks' = '1'
);

统计结果,存储至视图View:

CREATE VIEW IF NOT EXISTS view_tmp_web_pv AS
SELECT day_value, COUNT(id) AS total FROM (SELECTFROM_UNIXTIME(CAST(create_time AS BIGINT) / 1000, 'yyyy-MM-dd') AS day_value, idFROM edu_web_chat_ems_hudiWHERE part = CAST(CURRENT_DATE AS STRING)
) GROUP BY  day_value;

保存MySQL数据库:

-- SQL Connector MySQL
CREATE TABLE realtime_web_pv_mysql (report_date STRING,report_total BIGINT, PRIMARY KEY (report_date) NOT ENFORCED
) WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://node1.itcast.cn:3306/itcast_rpt','driver' = 'com.mysql.cj.jdbc.Driver','username' = 'root','password' = '123456','table-name' = 'realtime_web_pv'
);-- INSERT INTO 插入
INSERT INTO  realtime_web_pv_mysql SELECT day_value, total FROM view_tmp_web_pv;

7.5.3.2 今日咨询量

在这里插入图片描述
由于今日访问量与今日咨询量,都是查询Hudi中表:edu_web_chat_emes_hudi,所以前面流式加载增量加载数据以后,此处就不需要。
统计结果,存储至视图View:

CREATE VIEW IF NOT EXISTS view_tmp_stu_consult AS
SELECT day_value, COUNT(id) AS total FROM (SELECTFROM_UNIXTIME(CAST(create_time AS BIGINT) / 1000, 'yyyy-MM-dd') AS day_value, idFROM edu_web_chat_ems_hudiWHERE part = CAST(CURRENT_DATE AS STRING) AND msg_count > 0
) GROUP BY  day_value;

保存MySQL数据库:

-- SQL Connector MySQL
CREATE TABLE realtime_stu_consult_mysql (report_date STRING,report_total BIGINT, PRIMARY KEY (report_date) NOT ENFORCED
) WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://node1.itcast.cn:3306/itcast_rpt','driver' = 'com.mysql.cj.jdbc.Driver','username' = 'root','password' = '123456','table-name' = 'realtime_stu_consult'
);-- INSERT INTO 插入
INSERT INTO  realtime_stu_consult_mysql SELECT day_value, total FROM view_tmp_stu_consult;

7.5.3.3 今日意向数

在这里插入图片描述

首先创建输入表:流式加载,Hudi表数据:

create table edu_customer_relationship_hudi(id string PRIMARY KEY NOT ENFORCED,create_date_time string,update_date_time string,deleted string,customer_id string,first_id string,belonger string,belonger_name string,initial_belonger string,distribution_handler string,business_scrm_department_id string,last_visit_time string,next_visit_time string,origin_type string,itcast_school_id string,itcast_subject_id string,intention_study_type string,anticipat_signup_date string,`level` string,creator string,current_creator string,creator_name string,origin_channel string,`comment` string,first_customer_clue_id string,last_customer_clue_id string,process_state string,process_time string,payment_state string,payment_time string,signup_state string,signup_time string,notice_state string,notice_time string,lock_state string,lock_time string,itcast_clazz_id string,itcast_clazz_time string,payment_url string,payment_url_time string,ems_student_id string,delete_reason string,deleter string,deleter_name string,delete_time string,course_id string,course_name string,delete_comment string,close_state string,close_time string,appeal_id string,tenant string,total_fee string,belonged string,belonged_time string,belonger_time string,transfer string,transfer_time string,follow_type string,transfer_bxg_oa_account string,transfer_bxg_belonger_name string,part STRING
)
PARTITIONED BY (part)
WITH('connector'='hudi','path'= 'hdfs://node1.itcast.cn:8020/hudi-warehouse/edu_customer_relationship_hudi', 'table.type'= 'MERGE_ON_READ','hoodie.datasource.write.recordkey.field'= 'id', 'write.precombine.field'= 'create_date_time','read.streaming.enabled' = 'true','read.streaming.check-interval' = '5',    'read.tasks' = '1'
);

统计结果,存储至视图View:

CREATE VIEW IF NOT EXISTS view_tmp_stu_intention AS
SELECT day_value, COUNT(id) AS total FROM (SELECTFROM_UNIXTIME(CAST(create_date_time AS BIGINT) / 1000, 'yyyy-MM-dd') AS day_value, idFROM edu_customer_relationship_hudiWHERE part = CAST(CURRENT_DATE AS STRING) AND create_date_time IS NOT NULL AND deleted = 'false'
) GROUP BY  day_value;

保存MySQL数据库:

-- SQL Connector MySQL
CREATE TABLE realtime_stu_intention_mysql (report_date STRING,report_total BIGINT, PRIMARY KEY (report_date) NOT ENFORCED
) WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://node1.itcast.cn:3306/itcast_rpt','driver' = 'com.mysql.cj.jdbc.Driver','username' = 'root','password' = '123456','table-name' = 'realtime_stu_intention'
);-- INSERT INTO 插入
INSERT INTO  realtime_stu_intention_mysql SELECT day_value, total 
FROM view_tmp_stu_intention;

7.5.3.4 今日报名人数

在这里插入图片描述

由于今日意向量与今日报名人数,都是查询Hudi中表:edu_customer_relationship_hudi,所以前面流式加载增量加载数据以后,此处就不需要。
统计结果,存储至视图View:

CREATE VIEW IF NOT EXISTS view_tmp_stu_apply AS
SELECT day_value, COUNT(id) AS total FROM (SELECTFROM_UNIXTIME(CAST(payment_time AS BIGINT) / 1000, 'yyyy-MM-dd') AS day_value, idFROM edu_customer_relationship_hudiWHERE part = CAST(CURRENT_DATE AS STRING) AND payment_time IS NOT NULL 
AND payment_state = 'PAID' AND deleted = 'false'
) GROUP BY  day_value;

保存MySQL数据库:

-- SQL Connector MySQL
CREATE TABLE realtime_stu_apply_mysql (report_date STRING,report_total BIGINT, PRIMARY KEY (report_date) NOT ENFORCED
) WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://node1.itcast.cn:3306/itcast_rpt','driver' = 'com.mysql.cj.jdbc.Driver','username' = 'root','password' = '123456','table-name' = 'realtime_stu_apply'
);-- INSERT INTO 插入
INSERT INTO  realtime_stu_apply_mysql SELECT day_value, total FROM view_tmp_stu_apply;

7.5.3.5 今日有效线索量

在这里插入图片描述

首先创建输入表:流式加载,Hudi表数据:

create table edu_customer_clue_hudi(id string PRIMARY KEY NOT ENFORCED,create_date_time string,update_date_time string,deleted string,customer_id string,customer_relationship_id string,session_id string,sid string,status string,`user` string,create_time string,platform string,s_name string,seo_source string,seo_keywords string,ip string,referrer string,from_url string,landing_page_url string,url_title string,to_peer string,manual_time string,begin_time string,reply_msg_count string,total_msg_count string,msg_count string,`comment` string,finish_reason string,finish_user string,end_time string,platform_description string,browser_name string,os_info string,area string,country string,province string,city string,creator string,name string,idcard string,phone string,itcast_school_id string,itcast_school string,itcast_subject_id string,itcast_subject string,wechat string,qq string,email string,gender string,`level` string,origin_type string,information_way string,working_years string,technical_directions string,customer_state string,valid string,anticipat_signup_date string,clue_state string,scrm_department_id string,superior_url string,superior_source string,landing_url string,landing_source string,info_url string,info_source string,origin_channel string,course_id string,course_name string,zhuge_session_id string,is_repeat string,tenant string,activity_id string,activity_name string,follow_type string,shunt_mode_id string,shunt_employee_group_id string,part STRING
)
PARTITIONED BY (part)
WITH('connector'='hudi','path'= 'hdfs://node1.itcast.cn:8020/hudi-warehouse/edu_customer_clue_hudi', 'table.type'= 'MERGE_ON_READ','hoodie.datasource.write.recordkey.field'= 'id', 'write.precombine.field'= 'create_date_time','read.streaming.enabled' = 'true','read.streaming.check-interval' = '5',    'read.tasks' = '1'
);

统计结果,存储至视图View:

CREATE VIEW IF NOT EXISTS view_tmp_stu_clue AS
SELECT day_value, COUNT(id) AS total FROM (SELECTFROM_UNIXTIME(CAST(create_date_time AS BIGINT) / 1000, 'yyyy-MM-dd') AS day_value, idFROM edu_customer_clue_hudiWHERE part = CAST(CURRENT_DATE AS STRING) AND clue_state IS NOT NULL AND deleted = 'false'
) GROUP BY  day_value;

保存MySQL数据库:

-- SQL Connector MySQL
CREATE TABLE realtime_stu_clue_mysql (report_date STRING,report_total BIGINT, PRIMARY KEY (report_date) NOT ENFORCED
) WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://node1.itcast.cn:3306/itcast_rpt','driver' = 'com.mysql.cj.jdbc.Driver','username' = 'root','password' = '123456','table-name' = 'realtime_stu_clue'
);-- INSERT INTO 插入
INSERT INTO  realtime_stu_clue_mysql SELECT day_value, total FROM view_tmp_stu_clue;

至此,传智教育客户数据统计分析完成,既包含离线分析,又包含实时流式分析,其中今日指标为实时流式计算(Flink SQL 流式查询),昨天指标为离线批处理(Presto 内存分析)。

7.6 FineBI 报表可视化

使用FineBI,连接数据MySQL数据库,加载业务指标报表数据,以不同图表展示。
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/40787.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

ubuntu安装jdk、emqx、nginx

一、安装jdk 要在Ubuntu上安装JDK 1.8&#xff0c;您可以按照以下步骤进行操作&#xff1a; 打开终端&#xff08;CtrlAltT&#xff09;。确保您的系统已更新&#xff1a; sudo apt update sudo apt upgrade安装OpenJDK 8&#xff1a; sudo apt install openjdk-8-jdk安装完成…

.net core发布到IIS上出现 HTTP 错误 500.19

1.检查.net core 环境运行环境是否安装完成&#xff0c;类似如下环境 2.IIS是否安装全 本次原因就是IIS未安装全导致的 按照网上说的手动重启iis&#xff08;iisreset&#xff09;也不行

基于C#的消息处理的应用程序 - 开源研究系列文章

今天讲讲基于C#里的基于消息处理的应用程序的一个例子。 我们知道&#xff0c;Windows操作系统的程序是基于消息处理的。也就是说&#xff0c;程序接收到消息代码定义&#xff0c;然后根据消息代码定义去处理对应的操作。前面有一个博文例子( C#程序的启动显示方案(无窗口进程发…

【数据结构】 ArrayList简介与实战

文章目录 什么是ArrayListArrayList相关说明 ArrayList使用ArrayList的构造无参构造指定顺序表初始容量利用其他 Collection 构建 ArrayListArrayList常见操作获取list有效元素个数获取和设置index位置上的元素在list的index位置插入指定元素删除指定元素删除list中index位置上…

Android开发之性能优化:过渡绘制解决方案

1. 过渡绘制 屏幕上某一像素点在一帧中被重复绘制多次&#xff0c;就是过渡绘制。 下图中多个卡片跌在一起&#xff0c;但是只有第一个卡片是完全可见的。背后的卡片只有部分可见。但是Android系统在绘制时会将下层的卡片进行绘制&#xff0c;接着再将上层的卡片进行绘制。但其…

springcloud3 hystrix实现服务降级的案例配置2

一 服务降级的说明 1.1 服务降级说明 "服务器忙&#xff0c;请稍后在试"不让客户达等待&#xff0c;立即返回一个友好的提示。 1.2 服务降级的触发情况 1.程序运行异常&#xff1b; 2.超时&#xff1b; 3.服务熔断触发服务降级&#xff1b;4 .线程池/信号量打…

电商增强现实3D模型优化需要关注的4个方面

到目前为止&#xff0c;AR技术已经发展到足以在更广泛的范围内实施。 在电子商务中&#xff0c;这项技术有望提供更令人兴奋的购物体验。 为了实现这一目标&#xff0c;在这篇博客中&#xff0c;我将介绍如何针对电子商务中的 AR 优化 3D 模型。 推荐&#xff1a;用 NSDT编辑器…

Python 函数

Built-in Functions — Python 3.11.4 documentation

Transformer(二)(VIT,TNT)(基于视觉CV)

目录 1.视觉中的Attention 2.VIT框架&#xff08;图像分类&#xff0c;不需要decoder&#xff09; 2.1整体框架 2.2.CNN和Transformer遇到的问题 2.3.1CNN 2.3.2Transformer 2.3.3二者对比 2.4.公式理解 3TNT 参考文献 1.视觉中的Attention 对于人类而言看到一幅图可以立…

【设计模式——学习笔记】23种设计模式——解释器模式Interpreter(原理讲解+应用场景介绍+案例介绍+Java代码实现)

案例引入 通过解释器模式来实现四则运算&#xff0c;如计算ab-c的值&#xff0c;具体要求 先输入表达式的形式&#xff0c;比如abc-de&#xff0c;要求表达式的字母不能重复在分别输入a,b,c,d,e的值最后求出结果 传统方案 编写一个方法&#xff0c;接收表达式的形式&#xf…

请解释一下CSS中的rem和em单位有什么不同,分别如何使用?

聚沙成塔每天进步一点点 ⭐ 专栏简介⭐ CSS中的rem和em单位的区别和使用⭐ em单位使用示例&#xff1a; ⭐ rem 单位使用示例&#xff1a; ⭐ 区别和适用场景⭐ 写在最后 ⭐ 专栏简介 前端入门之旅&#xff1a;探索Web开发的奇妙世界 记得点击上方或者右侧链接订阅本专栏哦 几何…

Nginx常见的三个漏洞

目录 $uri导致的CRLF注入漏洞 两种常见场景 表示uri的三个变量 案例 目录穿越漏洞 案例 Http Header被覆盖的问题 案例 $uri导致的CRLF注入漏洞 两种常见场景 用户访问http://example.com/aabbcc&#xff0c;自动跳转到https://example.com/aabbcc 用户访问http://exa…

MySQL存储过程 、存储函数、以及优缺点

存储过程 VS 存储函数&#xff08;函数&#xff09; | | 关键字 |调用语法 | 返回值 | 应用场景 | |-存储过程-|-procedure-|-call 存储过程()-|-理解为0个或多个-|-一般用于更新-| | 存储函数 | function | select 函数() | 只能是一个 | 一般用于查询结构为一个值并返回时| …

讯飞星火、文心一言和通义千问同时编“贪吃蛇”游戏,谁会胜出?

同时向讯飞星火、文心一言和通义千问三个国产AI模型提个相同的问题&#xff1a; “python 写一个贪吃蛇的游戏代码” 看哪一家AI写的程序直接能用&#xff0c;谁就胜出&#xff01; 讯飞星火 讯飞星火给出的代码&#xff1a; import pygame import sys import random# 初…

步入React正殿 - React组件设计模式

目录 扩展学习资料 高阶组件 /src/components/hoc/withTooltip.js /src/components/hoc/itemA.jsx /src/components/hoc/itemB.jsx /src/App.js 函数作为子组件【Render pprops】 函数作为子组件 /src/components/rp/itemC.jsx【父组件】 /src/components/rp/withToo…

214、仿真-基于51单片机温度甲醛一氧化碳(co)电机净化报警Proteus仿真设计(程序+Proteus仿真+配套资料等)

毕设帮助、开题指导、技术解答(有偿)见文未 目录 一、硬件设计 二、设计功能 三、Proteus仿真图 四、程序源码 资料包括&#xff1a; 需要完整的资料可以点击下面的名片加下我&#xff0c;找我要资源压缩包的百度网盘下载地址及提取码。 方案选择 单片机的选择 方案一&a…

Qt+Pyhton实现麒麟V10系统下word文档读写功能

目录 前言1.C调用python1.1 安装Python开发环境1.2 修改Qt工程配置1.3 初始化Python环境1.4 C 调用Python 函数1.5 常用的Python接口 2.python虚拟环境2.1Python虚拟环境简介2.2 virtualenv 安装及使用2.3 在C程序中配置virtualenv 虚拟环境 3.python-docx库的应用4.总结 前言 …

网络安全 Day29-运维安全项目-iptables防火墙

iptables防火墙 1. 防火墙概述2. 防火墙2.1 防火墙种类及使用说明2.2 必须熟悉的名词2.3 iptables 执行过程※※※※※2.4 表与链※※※※※2.4.1 简介2.4.2 每个表说明2.4.2.1 filter表 :star::star::star::star::star:2.4.2.2 nat表 2.5 环境准备及命令2.6 案例01&#xff1a…

ChatGLM2-6B安装部署(详尽版)

1、环境部署 安装Anaconda3 安装GIT 安装GUDA 11.8 安装NVIDIA 图形化驱动 522.25版本&#xff0c;如果电脑本身是更高版本则不用更新 1.1、检查CUDA 运行cmd或者Anaconda&#xff0c;运行以下命令 nvidia-smi CUDA Version是版本信息&#xff0c;Dricer Version是图形化…

LeetCode 160.相交链表

文章目录 &#x1f4a1;题目分析&#x1f4a1;解题思路&#x1f6a9;步骤一&#xff1a;找尾节点&#x1f6a9;步骤二&#xff1a;判断尾节点是否相等&#x1f6a9;步骤三&#xff1a;找交点&#x1f344;思路1&#x1f344;思路2 &#x1f514;接口源码 题目链接&#x1f449;…