尚硅谷大数据项目《在线教育之实时数仓》笔记005

视频地址:尚硅谷大数据项目《在线教育之实时数仓》_哔哩哔哩_bilibili

目录

第9章 数仓开发之DWD层

P031

P032

P033

P034

P035

P036

P037

P038

P039

P040


第9章 数仓开发之DWD层

P031

DWD层设计要点:

(1)DWD层的设计依据是维度建模理论,该层存储维度模型的事实表。

(2)DWD层表名的命名规范为dwd_数据域_表名

存放事实表,从kafka的topic_log和topic_db中读取需要用到的业务流程相关数据,将业务流程关联起来做成明细数据写回kafka当中。

尚硅谷大数据学科全套教程\3.尚硅谷大数据学科--项目实战\尚硅谷大数据项目之在线教育数仓\尚硅谷大数据项目之在线教育数仓-3实时\资料\13.总线矩阵及指标体系

在线教育实时业务总线矩阵.xlsx

9.1.3 图解

P032

package com.atguigu.edu.realtime.app.dwd.log;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.edu.realtime.util.DateFormatUtil;
import com.atguigu.edu.realtime.util.EnvUtil;
import com.atguigu.edu.realtime.util.KafkaUtil;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SideOutputDataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;/*** @author * @create 2023-04-21 14:01*/
public class BaseLogApp {public static void main(String[] args) throws Exception {//TODO 1 创建环境设置状态后端StreamExecutionEnvironment env = EnvUtil.getExecutionEnvironment(1);//TODO 2 从kafka中读取主流数据String topicName = "topic_log";String groupId = "base_log_app";DataStreamSource<String> baseLogSource = env.fromSource(KafkaUtil.getKafkaConsumer(topicName, groupId),WatermarkStrategy.noWatermarks(),"base_log_source");//TODO 3 对数据进行清洗转换// 3.1 定义侧输出流OutputTag<String> dirtyStreamTag = new OutputTag<String>("dirtyStream") {};// 3.2 清洗转换SingleOutputStreamOperator<JSONObject> cleanedStream = baseLogSource.process(new ProcessFunction<String, JSONObject>() {@Overridepublic void processElement(String value, Context ctx, Collector<JSONObject> out) throws Exception {try {JSONObject jsonObject = JSON.parseObject(value);out.collect(jsonObject);} catch (Exception e) {ctx.output(dirtyStreamTag, value);}}});// 3.3 将脏数据写出到kafka对应的主题SideOutputDataStream<String> dirtyStream = cleanedStream.getSideOutput(dirtyStreamTag);String dirtyTopicName = "dirty_data";dirtyStream.sinkTo(KafkaUtil.getKafkaProducer(dirtyTopicName, "dirty_trans"));//TODO 4 新老访客标记修复//TODO 5 数据分流//TODO 6 写出到kafka不同的主题//TODO 7 执行任务}
}

P033

KafkaUtil.java

P034

新老访客逻辑介绍

P035

BaseLogApp.java

//TODO 4 新老访客标记修复

[atguigu@node001 log]$ pwd
/opt/module/data_mocker/01-onlineEducation/log
[atguigu@node001 log]$ cat -n 200 app.2023-09-19.log
{"common":{"ar":"26","ba":"iPhone","ch":"Appstore","is_new":"0","md":"iPhone 8","mid":"mid_188","os":"iOS 13.3.1","sc":"1","sid":"b4d6c8eb-d025-4855-af0a-fe351ff16ef9","uid":"20","vc":"v2.1.134"},"page":{"during_time":901000,"item":"173","item_type":"paper_id","last_page_id":"course_detail","page_id":"exam"},"ts":1645456489411}
{"common":{"ar":"26","ba":"iPhone","ch":"Appstore","is_new":"0","md":"iPhone 8","mid":"mid_188","os":"iOS 13.3.1","sc":"1","sid":"b4d6c8eb-d025-4855-af0a-fe351ff16ef9","uid":"20","vc":"v2.1.134"},"page":{"during_time":901000,"item":"173","item_type":"paper_id","last_page_id":"course_detail","page_id":"exam"},"ts":1645456489411
}

P036

BaseLogApp.java

//TODO 5 数据分流

P037

//TODO 6 写出到kafka不同的主题

hadoop、zookeeper、kafka。

  1. [atguigu@node001 ~]$ kafka-console-consumer.sh --bootstrap-server node001:9092 --topic page_topic

  2. [atguigu@node001 ~]$ kafka-console-consumer.sh --bootstrap-server node001:9092 --topic action_topic

  3. [atguigu@node001 ~]$ kafka-console-consumer.sh --bootstrap-server node001:9092 --topic display_topic

  4. [atguigu@node001 ~]$ kafka-console-consumer.sh --bootstrap-server node001:9092 --topic start_topic

  5. [atguigu@node001 ~]$ kafka-console-consumer.sh --bootstrap-server node001:9092 --topic error_topic

  6. [atguigu@node001 ~]$ kafka-console-consumer.sh --bootstrap-server node001:9092 --topic appVideo_topic

[atguigu@node001 ~]$ kafka-console-consumer.sh --bootstrap-server node001:9092 --topic page_topic
[2023-11-01 14:36:17,581] WARN [Consumer clientId=consumer-console-consumer-7492-1, groupId=console-consumer-7492] Error while fetching metadata with correlation id 2 : {page_topic=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
[2023-11-01 14:36:18,710] WARN [Consumer clientId=consumer-console-consumer-7492-1, groupId=console-consumer-7492] Error while fetching metadata with correlation id 6 : {page_topic=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
[2023-11-01 14:36:18,720] WARN [Consumer clientId=consumer-console-consumer-7492-1, groupId=console-consumer-7492] The following subscribed topics are not assigned to any members: [page_topic]  (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[atguigu@node001 ~]$ f1.sh start-------- 启动 node001 采集flume启动 -------
[atguigu@node001 ~]$ cd /opt/module/data
data/        data_mocker/ datax/       
[atguigu@node001 ~]$ cd /opt/module/data
data/        data_mocker/ datax/       
[atguigu@node001 ~]$ cd /opt/module/data_mocker/
[atguigu@node001 data_mocker]$ cd 01-onlineEducation/
[atguigu@node001 01-onlineEducation]$ ll
总用量 30460
-rw-rw-r-- 1 atguigu atguigu     2223 9月  19 10:43 application.yml
-rw-rw-r-- 1 atguigu atguigu  4057995 7月  25 10:28 edu0222.sql
-rw-rw-r-- 1 atguigu atguigu 27112074 7月  25 10:28 edu2021-mock-2022-06-18.jar
drwxrwxr-x 2 atguigu atguigu     4096 10月 26 14:01 log
-rw-rw-r-- 1 atguigu atguigu     1156 7月  25 10:44 logback.xml
-rw-rw-r-- 1 atguigu atguigu      633 7月  25 10:45 path.json
[atguigu@node001 01-onlineEducation]$ java -jar edu2021-mock-2022-06-18.jar 
SLF4J: Class path contains multiple SLF4J bindings.

P038

9.2 流量域独立访客事务事实表

P039

package com.atguigu.edu.realtime.app.dwd.log;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONAware;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.edu.realtime.util.DateFormatUtil;
import com.atguigu.edu.realtime.util.EnvUtil;
import com.atguigu.edu.realtime.util.KafkaUtil;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichFilterFunction;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;/*** @author yhm* @create 2023-04-21 16:24*/
public class DwdTrafficUniqueVisitorDetail {public static void main(String[] args) throws Exception {// TODO 1 创建环境设置状态后端StreamExecutionEnvironment env = EnvUtil.getExecutionEnvironment(4);// TODO 2 读取kafka日志主题数据String topicName = "dwd_traffic_page_log";DataStreamSource<String> pageLogStream = env.fromSource(KafkaUtil.getKafkaConsumer(topicName, "dwd_traffic_unique_visitor_detail"), WatermarkStrategy.noWatermarks(), "unique_visitor_source");// TODO 3 转换结构,过滤last_page_id不为空的数据SingleOutputStreamOperator<JSONObject> firstPageStream = pageLogStream.flatMap(new FlatMapFunction<String, JSONObject>() {@Overridepublic void flatMap(String value, Collector<JSONObject> out) throws Exception {try {JSONObject jsonObject = JSON.parseObject(value);String lastPageID = jsonObject.getJSONObject("page").getString("last_page_id");if (lastPageID == null) {out.collect(jsonObject);}} catch (Exception e) {e.printStackTrace();}}});// TODO 4 安装mid分组KeyedStream<JSONObject, String> keyedStream = firstPageStream.keyBy(new KeySelector<JSONObject, String>() {@Overridepublic String getKey(JSONObject value) throws Exception {return value.getJSONObject("common").getString("mid");}});// TODO 5 判断独立访客SingleOutputStreamOperator<JSONObject> filteredStream = keyedStream.filter(new RichFilterFunction<JSONObject>() {ValueState<String> lastVisitDtState;@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);ValueStateDescriptor<String> stringValueStateDescriptor = new ValueStateDescriptor<>("last_visit_dt", String.class);// 设置状态的存活时间stringValueStateDescriptor.enableTimeToLive(StateTtlConfig.newBuilder(Time.days(1L))// 设置状态的更新模式为创建及写入// 每次重新写入的时候记录时间  到1天删除状态.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite).build());lastVisitDtState = getRuntimeContext().getState(stringValueStateDescriptor);}@Overridepublic boolean filter(JSONObject jsonObject) throws Exception {String visitDt = DateFormatUtil.toDate(jsonObject.getLong("ts"));String lastVisitDt = lastVisitDtState.value();// 对于迟到的数据,last日期会大于visit日期,数据也不要if (lastVisitDt == null || (DateFormatUtil.toTs(lastVisitDt) < DateFormatUtil.toTs(visitDt))) {lastVisitDtState.update(visitDt);return true;}return false;}});// TODO 6 将独立访客数据写出到对应的kafka主题String targetTopic = "dwd_traffic_unique_visitor_detail";SingleOutputStreamOperator<String> sinkStream = filteredStream.map((MapFunction<JSONObject, String>) JSONAware::toJSONString);sinkStream.sinkTo(KafkaUtil.getKafkaProducer(targetTopic, "unique_visitor_trans"));// TODO 7 运行任务env.execute();}
}

P040

[atguigu@node001 ~]$ kafka-console-consumer.sh --bootstrap-server node001:9092 --topic dwd_traffic_unique_visitor_detail
[atguigu@node001 ~]$ kafka-console-consumer.sh --bootstrap-server node001:9092 --topic dwd_traffic_page_log[atguigu@node001 01-onlineEducation]$ cd /opt/module/data_mocker/01-onlineEducation/
[atguigu@node001 01-onlineEducation]$ java -jar edu2021-mock-2022-06-18.jar

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/128364.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

什么是配电室电能监测系统?

为了保证电力系统的安全、稳定、经济运行成为了当务之急。配电室电能监测系统作为一种新兴技术&#xff0c;有效提高了配电室的运行管理水平&#xff0c;降低了电力系统的风险。接下来&#xff0c;小编来为大家介绍下配电室电能监测系统&#xff0c;一起来看下吧&#xff01; 一…

【Java 进阶篇】Java ServletContext详解:获取MIME类型

MIME&#xff08;Multipurpose Internet Mail Extensions&#xff09;类型是一种标识文件类型的文本标签&#xff0c;通常用于指示浏览器如何处理Web服务器返回的文件。在Java Web应用程序中&#xff0c;ServletContext对象提供了一种方便的方法来获取文件的MIME类型。本篇博客…

el-input输入校验插件(正则表达式)

使用方法&#xff1a;在main.js文件中注册插件然后直接在<el-input>加入‘v-插件名’ (1)在main.js文件&#xff1a; // 只能输入数字指令 import onlyNumber from /directive/only-number; Vue.use(onlyNumber); &#xff08;2&#xff09;在src/directive文件夹中 &a…

docker部署elk

目录 前言 一、创建程序工作路径 二、创建私有网络 三、部署elasticsearch 1.先搜速后下载 2.创建一个基础的容器&#xff08;此步骤是为了拷贝容器里的文件&#xff09; 3.拷贝文件到宿主机 3.1进入容器 3.2拷贝并授权 3.3删除基础容器 4.创建容器 5.访问9200测试 …

Redis入门指南学习笔记(2):常用数据类型解析

一.前言 本文主要介绍Redis中包含几种主要数据类型&#xff1a;字符串类型、哈希类型、列表类型、集合类型和有序集合类型。 二.字符串类型 字符串类型是Redis中最基本的数据类型&#xff0c;它是其他4种数据类型的基础&#xff0c;其他数据类型与字符串类型的差别从某种角度…

【Linux学习笔记】进程概念(上)

1. 冯诺依曼体系结构2. 操作系统的作用3. 进程 1. 冯诺依曼体系结构 如图&#xff0c;这是一个冯诺依曼体系结构简图 其中这里的存储器指的是内存&#xff01; 用通俗的话来解释这个图&#xff0c;就是数据从输入设备进入&#xff0c;然后进入到存储器&#xff0c;CPU从存储器…

阿里云OS系统Alibaba Cloud Linux 3系统的安全更新命令

给客户部署的服务&#xff0c;进入运维阶段&#xff0c;但是经常被客户监测到服务器漏洞&#xff0c;现在整理一下&#xff0c;服务器漏洞问题更新命令步骤。 服务器系统&#xff1a; 阿里云linux服务器&#xff1a;Alibaba Cloud Linux 3 漏洞类型和描述&#xff1a; #3214…

新体验:万圣节夜晚的新游戏!--愤怒的南瓜

引言&#xff1a; Chatgpt4.0 所带来的冲击似乎远超出人们想象&#xff0c;网页小游戏《愤怒的南瓜》在昨日&#xff08;万圣节夜晚&#xff09;火爆了外网。一位名为 Javi Lopez 的外国小哥使用 Midjourney、DALL•E 3 和 GPT-4 打开了一个无限可能的世界&#xff0c;重新演绎…

【Python全栈_公开课学习记录】

一、初识python (一).Python起源 Python创始人为吉多范罗苏姆&#xff08;荷兰&#xff09;&#xff0c;Python崇尚优美、清晰、简明的编辑风格。Python语言结构清晰简单、数据库丰富、运行成熟稳定&#xff0c;科学计算统计分析领先。目前广泛应用于云计算、Web开发、科学运算…

DSP 开发例程(5): tcp_server

目录 DSP 开发例程(5): tcp_server创建工程源码编辑tcp_echo.chelloWorld.c 调试说明 DSP 开发例程(5): tcp_server 此例程实现在 EVM6678L 开发板上创建 TCP Server进程, 完成计算机与开发板之间的 TCP/IP 通信. 例程源码可从我的 gitee 仓库上克隆或下载. 点击 DSP 开发教程…

【机器学习合集】模型设计之注意力机制动态网络 ->(个人学习记录笔记)

文章目录 注意力机制1. 注意力机制及其应用1.1 注意力机制的定义1.2 注意力机制的典型应用 2. 注意力模型设计2.1 空间注意力机制2.2 空间注意力模型2.3 通道注意力机制2.4 空间与通道注意力机制2.5 自注意力机制2.5 级联attention 动态网络1. 动态网络的定义2. 基于丢弃策略的…

PostgreSQL逻辑管理结构

1.数据库逻辑结构介绍 2.数据库基本操作 2.1 创建数据库 CREATE DATABASE name [ [ WITH ] [ OWNER [] user_name ] [ TEMPLATE [] template ] [ ENCODING [] encoding ] [ LC_COLLATE [] lc_collate ] [ LC_CTYPE [] lc_ctype ] [ TABLESPACE [] tablespace ] [ CONNECTION L…

Day17力扣打卡

打卡记录 参加会议的最多员工数&#xff08;拓扑排序 分类讨论&#xff09; 链接 计算内向基环树的最大基环&#xff0c;基环树基环为2的情况分类讨论。 class Solution { public:int maximumInvitations(vector<int> &favorite) {int n favorite.size();vector…

4.多层感知机-3GPT版

#pic_center R 1 R_1 R1​ R 2 R^2 R2 目录 知识框架No.1 多层感知机一、感知机1、感知机2、训练感知机3、图形解释4、收敛定理5、XOR问题6、总结 二、多层感知机1、XOR2、单隐藏层3、单隐藏层-单分类4、为什么需要非线性激活函数5、Sigmoid函数6、Tanh函数7、ReLU函数8、多类分…

SDK是什么

SDK 是“Software Development Kit”&#xff08;软件开发工具包&#xff09;的缩写&#xff0c;它是一组用于开发特定软件应用、硬件平台、计算机系统或操作系统的开发工具的集合。SDK 通常包括一组开发工具、库、文档和示例代码&#xff0c;以帮助开发者更快地开发和部署应用…

SAML- 安全断言标记语言

一、概念 安全断言标记语言&#xff08;SAML&#xff09;是一种开放标准&#xff0c;用于在各方之间&#xff08;特别是身份提供商和服务提供商之间&#xff09;交换身份验证和授权数据。SAML 是一种基于XML的安全断言标记语言&#xff08;服务提供商用来做出访问控制决策的语句…

HTML标签、CSS介绍

标签的分类: 块级/行内 # 块级标签: 独占一行 h1~h6 p div """ 块儿级标签可以修改长宽. 行内标签不可以, 就算修改了也不会变化.块级标签内部可以嵌套任意的块级标签和行内标签. 特例: 是p标签虽然是块级标签 但是它只能嵌套行内标签 不能嵌套块级标签. 如…

linux安装apache并配置userid站点

目录 一、linux安装apache的方式 1、安装wget 2、下载CentOS 7的repo文件 3、更新镜像源 二、安装apache 1.通过命令直接安装apache(linux的软件包为httpd) 2.启动httpd服务 3.访问一下 三、apache配置文件 1.主配置文件 2.修改根目录 3.修改下端口 4.apache的工作…

BUUCTF 数据包中的线索 1

BUUCTF:https://buuoj.cn/challenges 题目描述&#xff1a; 公安机关近期截获到某网络犯罪团伙在线交流的数据包&#xff0c;但无法分析出具体的交流内容&#xff0c;聪明的你能帮公安机关找到线索吗&#xff1f; 密文&#xff1a; 下载附件&#xff0c;解压得到一个.pcapng文…

【兔子王赠书第5期】ChatGPT速学通:文案写作+PPT制作+数据分析+知识学习与变现

文章目录 前言ChatGPT推荐图书作者简介内容简介推荐理由 粉丝福利尾声 前言 程序员如果有一天代码写不动了&#xff0c;还能干什么&#xff1f; 一位 80 后女程序员“兰猫”给出了她的答案——转型 AI 写手。兰猫从事程序员工作十余年&#xff0c;在繁重的工作压力下&#xf…