视频地址:尚硅谷大数据项目《在线教育之实时数仓》_哔哩哔哩_bilibili
目录
第10章 数仓开发之DWS层
P066
P067
P068
P069
P070
P071
P072
P073
P074
P075
P076
P077
P078
P079
P080
P081
P082
第10章 数仓开发之DWS层
P066
第10章 数仓开发之DWS层
设计要点:
(1)DWS层的设计参考指标体系。
(2)DWS层表名的命名规范为dws_数据域_统计粒度_业务过程_统计周期(window)。
注:window 表示窗口对应的时间范围。
10.1 流量域来源关键词粒度页面浏览各窗口汇总表
10.1.1 主要任务
从 Kafka 页面浏览明细主题读取数据,过滤搜索行为,使用自定义 UDTF(一进多出)函数对搜索内容分词。统计各窗口各关键词出现频次,写入 ClickHouse。
10.1.2 思路分析
尚硅谷大数据项目之在线教育数仓\尚硅谷大数据项目之在线教育数仓-3实时\资料\13.总线矩阵及指标体系
在线教育实时指标体系.xlsx
P067
DwsTrafficSourceKeywordPageViewWindow
//TODO 1 创建环境设置状态后端
//TODO 2 自定义拆词函数
//TODO 3 读取kafka中的page_log数据
//TODO 4 过滤数据得到搜索的关键字
//TODO 5 使用自定义函数对关键字拆词
//TODO 6 分组开窗合并计算
//TODO 7 转换为流
//TODO 8 写出到clickHouse中
//TODO 9 运行任务
package com.atguigu.edu.realtime.util;import org.wltea.analyzer.core.IKSegmenter;
import org.wltea.analyzer.core.Lexeme;import java.io.IOException;
import java.io.StringReader;
import java.util.ArrayList;/*** @author yhm* @create 2023-04-25 16:05*/
public class KeyWordUtil {public static ArrayList<String> analyze(String text) {StringReader reader = new StringReader(text);IKSegmenter ikSegmenter = new IKSegmenter(reader, true);ArrayList<String> strings = new ArrayList<>();try {Lexeme lexeme = null;while ((lexeme = ikSegmenter.next()) != null) {String keyWord = lexeme.getLexemeText();strings.add(keyWord);}} catch (IOException e) {e.printStackTrace();}return strings;}public static void main(String[] args) {String s = "Apple iPhoneXSMax (A2104) 256GB 深空灰色 移动联通电信4G手机 双卡双待";ArrayList<String> strings = analyze(s);System.out.println(strings);}
}
P068
User-defined Functions | Apache Flink
DwsTrafficSourceKeywordPageViewWindow
//TODO 1 创建环境设置状态后端
//TODO 2 自定义拆词函数
//TODO 3 读取kafka中的page_log数据
//TODO 4 过滤数据得到搜索的关键字
//TODO 5 使用自定义函数对关键字拆词
//TODO 6 分组开窗合并计算
//TODO 7 转换为流
//TODO 8 写出到clickHouse中
//TODO 9 运行任务
P069
package com.atguigu.edu.realtime.app.dws;import com.atguigu.edu.realtime.app.func.KeyWordUDTF;
import com.atguigu.edu.realtime.bean.KeywordBean;
import com.atguigu.edu.realtime.common.EduConfig;
import com.atguigu.edu.realtime.common.EduConstant;
import com.atguigu.edu.realtime.util.ClickHouseUtil;
import com.atguigu.edu.realtime.util.EnvUtil;
import com.atguigu.edu.realtime.util.KafkaUtil;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import java.sql.PreparedStatement;
import java.sql.SQLException;/*** @author yhm* @create 2023-04-25 16:01*/
public class DwsTrafficSourceKeywordPageViewWindow {public static void main(String[] args) throws Exception {//TODO 1 创建环境设置状态后端StreamExecutionEnvironment env = EnvUtil.getExecutionEnvironment(1);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);//TODO 2 自定义拆词函数tableEnv.createTemporarySystemFunction("ik_analyze", new KeyWordUDTF());//TODO 3 读取kafka中的page_log数据String topicName = "dwd_traffic_page_log";String groupId = "dws_traffic_source_keyword_page_view_window";tableEnv.executeSql("create table page_log(\n" +" common map<String,String>,\n" +" page map<String,String>,\n" +" ts bigint, \n" +" row_time as to_timestamp(from_unixtime(ts/1000,'yyyy-MM-dd HH:mm:ss')), \n" +" WATERMARK FOR row_time AS row_time - INTERVAL '3' SECOND" +")" + KafkaUtil.getKafkaDDL(topicName, groupId));//TODO 4 过滤数据得到搜索的关键字//① page 字段下 item 字段不为 null;//② page 字段下 last_page_id 为 search;//③ page 字段下 item_type 为 keyword。Table searchTable = tableEnv.sqlQuery("select \n" +" page['item'] full_word,\n" +" row_time\n" +"from page_log\n" +"where page['item'] is not null \n" +"and page['item_type'] ='keyword'\n" +// "and page['last_page_id'] = 'search'" +"");tableEnv.createTemporaryView("search_table", searchTable);//TODO 5 使用自定义函数对关键字拆词Table splitTable = tableEnv.sqlQuery("select \n" +" keyword,\n" +" row_time\n" +"from search_table ,\n" +"lateral table (ik_analyze(full_word)) as t(keyword)");tableEnv.createTemporaryView("split_table", splitTable);tableEnv.executeSql("select * from split_table").print();//TODO 6 分组开窗合并计算//TODO 7 转换为流//TODO 8 写出到clickHouse中//TODO 9 运行任务}
}
P070
Window Aggregation | Apache Flink
P071
10.1.4 ClickHouse 建表语句
drop table if exists dws_traffic_source_keyword_page_view_window;
create table if not exists dws_traffic_source_keyword_page_view_window
(
stt DateTime,
edt DateTime,
source String,
keyword String,
keyword_count UInt64,
ts UInt64
) engine = ReplacingMergeTree(ts)
partition by toYYYYMMDD(stt)
order by (stt, edt, source, keyword);
package com.atguigu.edu.realtime.util;import com.atguigu.edu.realtime.bean.KeywordBean;
import com.atguigu.edu.realtime.bean.TransientSink;
import com.atguigu.edu.realtime.common.EduConfig;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;import java.lang.reflect.Field;
import java.sql.PreparedStatement;
import java.sql.SQLException;/*** @author yhm* @create 2023-04-25 18:23*/
public class ClickHouseUtil {// 设计泛型 通过传入的数据类型自动补充sql 写出到clickhousepublic static <T> SinkFunction<T> getJdbcSink(String sql) {return JdbcSink.<T>sink(sql, new JdbcStatementBuilder<T>() {@Overridepublic void accept(PreparedStatement preparedStatement, T obj) throws SQLException {// T是泛型,明文是不知道什么类型的,需要使用反射获取Field[] declaredFields = obj.getClass().getDeclaredFields();int skip = 0;for (int i = 0; i < declaredFields.length; i++) {Field field = declaredFields[i];field.setAccessible(true);// 获取属性的注解TransientSink annotation = field.getAnnotation(TransientSink.class);if (annotation != null) {skip++;continue;}// 使用类模板的属性名 get对象 获取值try {Object o = field.get(obj);preparedStatement.setObject(i + 1 - skip, o);} catch (IllegalAccessException e) {e.printStackTrace();}}}}, JdbcExecutionOptions.builder().withBatchIntervalMs(5000L).withBatchSize(5).build(),new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl(EduConfig.CLICKHOUSE_URL).withDriverName(EduConfig.CLICKHOUSE_DRIVER).build());}
}
[atguigu@node001 ~]$ sudo systemctl start clickhouse-server
[atguigu@node001 ~]$ clickhouse-client -mnode001 :) SHOW DATABASES;
node001 :) CREATE DATABASE edu_realtime;
node001 :) SHOW DATABASES;node001 :) USE edu_realtime;
node001 :) SHOW TABLES FROM edu_realtime;node001 :) SELECT * FROM dws_traffic_source_keyword_page_view_window;
[atguigu@node001 ~]$ sudo systemctl start clickhouse-server
[atguigu@node001 ~]$ clickhouse-client -m
ClickHouse client version 20.4.5.36 (official build).
Connecting to localhost:9000 as user default.
Connected to ClickHouse server version 20.4.5 revision 54434.node001 :) CREATE DATABASE edu_realtime;CREATE DATABASE edu_realtimeOk.0 rows in set. Elapsed: 0.044 sec. node001 :) SHOW DATABASES;SHOW DATABASES┌─name───────────────────────────┐
│ _temporary_and_external_tables │
│ default │
│ edu_realtime │
│ system │
└────────────────────────────────┘4 rows in set. Elapsed: 0.031 sec. node001 :) SHOW TABLES FROM edu_realtime;SHOW TABLES FROM edu_realtimeOk.0 rows in set. Elapsed: 0.028 sec. node001 :) use edu_realtime;USE edu_realtimeOk.0 rows in set. Elapsed: 0.002 sec. node001 :) create table if not exists dws_traffic_source_keyword_page_view_window
:-] (
:-] stt DateTime,
:-] edt DateTime,
:-] source String,
:-] keyword String,
:-] keyword_count UInt64,
:-] ts UInt64
:-] ) engine = ReplacingMergeTree(ts)
:-] partition by toYYYYMMDD(stt)
:-] order by (stt, edt, source, keyword);CREATE TABLE IF NOT EXISTS dws_traffic_source_keyword_page_view_window
(`stt` DateTime, `edt` DateTime, `source` String, `keyword` String, `keyword_count` UInt64, `ts` UInt64
)
ENGINE = ReplacingMergeTree(ts)
PARTITION BY toYYYYMMDD(stt)
ORDER BY (stt, edt, source, keyword)Ok.0 rows in set. Elapsed: 0.016 sec. node001 :) use edu_realtime;
USE edu_realtimeOk.0 rows in set. Elapsed: 0.002 sec. node001 :) SHOW TABLES FROM edu_realtime;
SHOW TABLES FROM edu_realtime┌─name────────────────────────────────────────┐
│ dws_traffic_source_keyword_page_view_window │
└─────────────────────────────────────────────┘1 rows in set. Elapsed: 0.007 sec. node001 :) SELECT * FROM dws_traffic_source_keyword_page_view_window;SELECT *
FROM dws_traffic_source_keyword_page_view_window┌─────────────────stt─┬─────────────────edt─┬─source─┬─keyword─┬─keyword_count─┬────────────ts─┐
│ 2022-02-21 23:58:30 │ 2022-02-21 23:58:40 │ SEARCH │ java │ 19 │ 1699513951000 │
│ 2022-02-21 23:58:30 │ 2022-02-21 23:58:40 │ SEARCH │ 前端 │ 27 │ 1699513951000 │
│ 2022-02-21 23:58:50 │ 2022-02-21 23:59:00 │ SEARCH │ 前端 │ 14 │ 1699513951000 │
│ 2022-02-21 23:59:00 │ 2022-02-21 23:59:10 │ SEARCH │ 大 │ 19 │ 1699513951000 │
│ 2022-02-21 23:59:00 │ 2022-02-21 23:59:10 │ SEARCH │ 数据库 │ 4 │ 1699513951000 │
└─────────────────────┴─────────────────────┴────────┴─────────┴───────────────┴───────────────┘
┌─────────────────stt─┬─────────────────edt─┬─source─┬─keyword─┬─keyword_count─┬────────────ts─┐
│ 2022-02-21 23:59:50 │ 2022-02-22 00:00:00 │ SEARCH │ hadoop │ 19 │ 1699513951000 │
│ 2022-02-21 23:59:50 │ 2022-02-22 00:00:00 │ SEARCH │ python │ 19 │ 1699513951000 │
│ 2022-02-21 23:59:50 │ 2022-02-22 00:00:00 │ SEARCH │ 前端 │ 39 │ 1699513951000 │
│ 2022-02-21 23:59:50 │ 2022-02-22 00:00:00 │ SEARCH │ 数据库 │ 19 │ 1699513951000 │
└─────────────────────┴─────────────────────┴────────┴─────────┴───────────────┴───────────────┘
┌─────────────────stt─┬─────────────────edt─┬─source─┬─keyword─┬─keyword_count─┬────────────ts─┐
│ 2022-02-21 23:59:00 │ 2022-02-21 23:59:10 │ SEARCH │ 数据 │ 19 │ 1699513951000 │
│ 2022-02-21 23:59:20 │ 2022-02-21 23:59:30 │ SEARCH │ java │ 33 │ 1699513951000 │
│ 2022-02-21 23:59:30 │ 2022-02-21 23:59:40 │ SEARCH │ flink │ 20 │ 1699513951000 │
│ 2022-02-21 23:59:30 │ 2022-02-21 23:59:40 │ SEARCH │ java │ 20 │ 1699513951000 │
│ 2022-02-21 23:59:30 │ 2022-02-21 23:59:40 │ SEARCH │ 前端 │ 19 │ 1699513951000 │
└─────────────────────┴─────────────────────┴────────┴─────────┴───────────────┴───────────────┘
┌─────────────────stt─┬─────────────────edt─┬─source─┬─keyword─┬─keyword_count─┬────────────ts─┐
│ 2022-02-22 00:00:10 │ 2022-02-22 00:00:20 │ SEARCH │ 多线程 │ 20 │ 1699513951000 │
└─────────────────────┴─────────────────────┴────────┴─────────┴───────────────┴───────────────┘
┌─────────────────stt─┬─────────────────edt─┬─source─┬─keyword─┬─keyword_count─┬────────────ts─┐
│ 2022-02-22 00:00:20 │ 2022-02-22 00:00:30 │ SEARCH │ flink │ 4 │ 1699513951000 │
│ 2022-02-22 00:00:20 │ 2022-02-22 00:00:30 │ SEARCH │ java │ 27 │ 1699513951000 │
└─────────────────────┴─────────────────────┴────────┴─────────┴───────────────┴───────────────┘
┌─────────────────stt─┬─────────────────edt─┬─source─┬─keyword─┬─keyword_count─┬────────────ts─┐
│ 2022-02-21 20:51:40 │ 2022-02-21 20:51:50 │ SEARCH │ 多线程 │ 1 │ 1699513903000 │
│ 2022-02-21 20:52:00 │ 2022-02-21 20:52:10 │ SEARCH │ hadoop │ 1 │ 1699449059000 │
│ 2022-02-21 20:53:10 │ 2022-02-21 20:53:20 │ SEARCH │ 多线程 │ 1 │ 1699447298000 │
│ 2022-02-21 20:54:20 │ 2022-02-21 20:54:30 │ SEARCH │ 大 │ 1 │ 1699447298000 │
│ 2022-02-21 20:54:20 │ 2022-02-21 20:54:30 │ SEARCH │ 数据 │ 1 │ 1699447298000 │
│ 2022-02-21 23:59:50 │ 2022-02-22 00:00:00 │ SEARCH │ 前端 │ 3 │ 1699449067000 │
│ 2022-02-21 23:59:50 │ 2022-02-22 00:00:00 │ SEARCH │ 数据库 │ 1 │ 1699449067000 │
└─────────────────────┴─────────────────────┴────────┴─────────┴───────────────┴───────────────┘
┌─────────────────stt─┬─────────────────edt─┬─source─┬─keyword─┬─keyword_count─┬────────────ts─┐
│ 2022-02-22 00:00:10 │ 2022-02-22 00:00:20 │ SEARCH │ 多线程 │ 2 │ 1699449067000 │
└─────────────────────┴─────────────────────┴────────┴─────────┴───────────────┴───────────────┘1003 rows in set. Elapsed: 0.114 sec. Processed 1.00 thousand rows, 54.17 KB (8.79 thousand rows/s., 474.47 KB/s.) node001 :)
package com.atguigu.edu.realtime.app.dws;import com.atguigu.edu.realtime.app.func.KeyWordUDTF;
import com.atguigu.edu.realtime.bean.KeywordBean;
import com.atguigu.edu.realtime.common.EduConfig;
import com.atguigu.edu.realtime.common.EduConstant;
import com.atguigu.edu.realtime.util.ClickHouseUtil;
import com.atguigu.edu.realtime.util.EnvUtil;
import com.atguigu.edu.realtime.util.KafkaUtil;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import java.sql.PreparedStatement;
import java.sql.SQLException;/*** @author yhm* @create 2023-04-25 16:01*/
public class DwsTrafficSourceKeywordPageViewWindow {public static void main(String[] args) throws Exception {//TODO 1 创建环境设置状态后端StreamExecutionEnvironment env = EnvUtil.getExecutionEnvironment(1);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);//TODO 2 自定义拆词函数tableEnv.createTemporarySystemFunction("ik_analyze", new KeyWordUDTF());//TODO 3 读取kafka中的page_log数据String topicName = "dwd_traffic_page_log";String groupId = "dws_traffic_source_keyword_page_view_window";tableEnv.executeSql("create table page_log(\n" +" common map<String,String>,\n" +" page map<String,String>,\n" +" ts bigint, \n" +" row_time as to_timestamp(from_unixtime(ts/1000,'yyyy-MM-dd HH:mm:ss')), \n" +" WATERMARK FOR row_time AS row_time - INTERVAL '3' SECOND" +")" + KafkaUtil.getKafkaDDL(topicName, groupId));//TODO 4 过滤数据得到搜索的关键字//① page 字段下 item 字段不为 null;//② page 字段下 last_page_id 为 search;//③ page 字段下 item_type 为 keyword。Table searchTable = tableEnv.sqlQuery("select \n" +" page['item'] full_word,\n" +" row_time\n" +"from page_log\n" +"where page['item'] is not null \n" +"and page['item_type'] ='keyword'\n" +// "and page['last_page_id'] = 'search'" +"");tableEnv.createTemporaryView("search_table", searchTable);//TODO 5 使用自定义函数对关键字拆词Table splitTable = tableEnv.sqlQuery("select \n" +" keyword,\n" +" row_time\n" +"from search_table ,\n" +"lateral table (ik_analyze(full_word)) as t(keyword)");tableEnv.createTemporaryView("split_table", splitTable);//tableEnv.executeSql("select * from split_table").print();//TODO 6 分组开窗合并计算Table keywordBeanTable = tableEnv.sqlQuery("select \n" +" date_format(TUMBLE_START(\n" +" row_time, INTERVAL '10' second),'yyyy-MM-dd HH:mm:ss') stt,\n" +" date_format(TUMBLE_END(\n" +" row_time, INTERVAL '10' second),'yyyy-MM-dd HH:mm:ss') edt,\n" +"\n" + "'" + EduConstant.KEYWORD_SEARCH + "' source," +" 0 keywordLength,\n" +" keyword,\n" +" count(*) keyword_count,\n" +" UNIX_TIMESTAMP()*1000 ts\n" +"from split_table\n" +"group by TUMBLE(row_time, INTERVAL '10' second),keyword");//TODO 7 转换为流DataStream<KeywordBean> keywordBeanDataStream = tableEnv.toDataStream(keywordBeanTable, KeywordBean.class);keywordBeanDataStream.print();//TODO 8 写出到clickHouse中keywordBeanDataStream.addSink(ClickHouseUtil.<KeywordBean>getJdbcSink("insert into dws_traffic_source_keyword_page_view_window values(?,?,?,?,?,?)"));//TODO 9 运行任务env.execute();}
}
P072
package com.atguigu.edu.realtime.bean;import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;/*** @author yhm* @create 2023-04-25 18:37*/
@Target(ElementType.FIELD)
@Retention(RetentionPolicy.RUNTIME)
public @interface TransientSink {
}
P073
10.2 流量域版本-来源-地区-访客类别粒度页面浏览各窗口汇总表
10.2.1 主要任务
DWS 层是为 ADS 层服务的,通过对指标体系的分析,本节汇总表中需要有会话数、页面浏览数、浏览总时长、独立访客数、跳出会话数五个度量字段。我们的任务是统计这五个指标,并将数据写入 ClickHouse 汇总表。
P074
DwsTrafficVcSourceArIsNewPageViewWindow
TODO 1 ~ TODO 6
P075
DwsTrafficVcSourceArIsNewPageViewWindow
TODO 7 ~ TODO 9
P076
PhoenixUtil、public static <T> List<T> queryList(String sql, Class<T> clazz) {}
P077
DimUtil、public static JSONObject getDimInfoNoCache(String tableName, Tuple2<String, String>... columnNamesAn {}
[atguigu@node001 ~]$ start-hbase.sh
[atguigu@node001 ~]$ cd /opt/module/hbase/apache-phoenix-5.0.0-HBase-2.0-bin/
[atguigu@node001 apache-phoenix-5.0.0-HBase-2.0-bin]$ bin/sqlline.py node001:2181
P078
DwsTrafficVcSourceArIsNewPageViewWindow
TODO 10
P079
10.2 流量域版本-来源-地区-访客类别粒度页面浏览各窗口汇总表
10.2.2 思路分析
4)旁路缓存优化
外部数据源的查询常常是流式计算的性能瓶颈。以本程序为例,每次查询都要连接 Hbase,数据传输需要做序列化、反序列化,还有网络传输,严重影响时效性。可以通过旁路缓存对查询进行优化。
P080
DimUtil、public static JSONObject getDimInfo(String tableName, Tuple2<String, String>... columnNamesAndValues) {}
P081
DimUtil 、public static void deleteCached(String tableName, String id) {}
[atguigu@node001 ~]$ redis-server ./my_redis.conf
[atguigu@node001 ~]$ redis-cli
127.0.0.1:6379> ping
PONG
127.0.0.1:6379>
[atguigu@node001 ~]$ /opt/module/hbase/apache-phoenix-5.0.0-HBase-2.0-bin/bin/sqlline.py node001:2181
P082
10.2 流量域版本-来源-地区-访客类别粒度页面浏览各窗口汇总表
10.2.2 思路分析
6)异步 IO
DwsTrafficVcSourceArIsNewPageViewWindow
//TODO 10 维度关联
[atguigu@node001 ~]$ clickhouse-client -m
ClickHouse client version 20.4.5.36 (official build).
Connecting to localhost:9000 as user default.
Connected to ClickHouse server version 20.4.5 revision 54434.node001 :) show databases;SHOW DATABASES┌─name───────────────────────────┐
│ _temporary_and_external_tables │
│ default │
│ edu_realtime │
│ system │
└────────────────────────────────┘4 rows in set. Elapsed: 0.019 sec. node001 :) use edu_realtime;USE edu_realtimeOk.0 rows in set. Elapsed: 0.007 sec. node001 :) drop table if exists dws_traffic_vc_source_ar_is_new_page_view_window;DROP TABLE IF EXISTS dws_traffic_vc_source_ar_is_new_page_view_windowOk.0 rows in set. Elapsed: 0.007 sec. node001 :) create table dws_traffic_vc_source_ar_is_new_page_view_window(
:-] stt DateTime,
:-] edt DateTime,
:-] version_code String,
:-] source_id String,
:-] source_name String,
:-] ar String,
:-] province_name String,
:-] is_new String,
:-] uv_count UInt64,
:-] total_session_count UInt64,
:-] page_view_count UInt64,
:-] total_during_time UInt64,
:-] jump_session_count UInt64,
:-] ts UInt64
:-] ) engine = ReplacingMergeTree(ts)
:-] partition by toYYYYMMDD(stt)
:-] order by(stt, edt, version_code, source_id, source_name, ar, province_name, is_new);CREATE TABLE dws_traffic_vc_source_ar_is_new_page_view_window
(`stt` DateTime, `edt` DateTime, `version_code` String, `source_id` String, `source_name` String, `ar` String, `province_name` String, `is_new` String, `uv_count` UInt64, `total_session_count` UInt64, `page_view_count` UInt64, `total_during_time` UInt64, `jump_session_count` UInt64, `ts` UInt64
)
ENGINE = ReplacingMergeTree(ts)
PARTITION BY toYYYYMMDD(stt)
ORDER BY (stt, edt, version_code, source_id, source_name, ar, province_name, is_new)Ok.0 rows in set. Elapsed: 0.043 sec. node001 :) select * from edu_realtime.dws_traffic_vc_source_ar_is_new_page_view_window;SELECT *
FROM edu_realtime.dws_traffic_vc_source_ar_is_new_page_view_windowOk.0 rows in set. Elapsed: 0.071 sec. node001 :)
package com.atguigu.edu.realtime.app.dws;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.edu.realtime.app.func.DimAsyncFunction;
import com.atguigu.edu.realtime.bean.DwsTrafficForSourcePvBean;
import com.atguigu.edu.realtime.util.*;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.streaming.api.datastream.*;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;import java.time.Duration;
import java.util.concurrent.TimeUnit;/*** @author yhm* @create 2023-04-26 16:08*/
public class DwsTrafficVcSourceArIsNewPageViewWindow {public static void main(String[] args) throws Exception {//TODO 1 创建环境设置状态后端StreamExecutionEnvironment env = EnvUtil.getExecutionEnvironment(1);//TODO 2 读取pageLog主题数据String pageTopic = "dwd_traffic_page_log";String groupId = "dws_traffic_vc_source_ar_is_new_page_view_window";KafkaSource<String> pageSource = KafkaUtil.getKafkaConsumer(pageTopic, groupId);DataStreamSource<String> pageStream = env.fromSource(pageSource, WatermarkStrategy.noWatermarks(), "page_log");//TODO 3 读取独立访客数据String uvTopic = "dwd_traffic_unique_visitor_detail";KafkaSource<String> uvSource = KafkaUtil.getKafkaConsumer(uvTopic, groupId);DataStreamSource<String> uvStream = env.fromSource(uvSource, WatermarkStrategy.noWatermarks(), "uv_detail");//TODO 4 读取跳出用户数据String jumpTopic = "dwd_traffic_user_jump_detail";KafkaSource<String> jumpSource = KafkaUtil.getKafkaConsumer(jumpTopic, groupId);DataStreamSource<String> jumpStream = env.fromSource(jumpSource, WatermarkStrategy.noWatermarks(), "jump_detail");//TODO 5 转换数据结构SingleOutputStreamOperator<DwsTrafficForSourcePvBean> pageBeanStream = pageStream.map(new MapFunction<String, DwsTrafficForSourcePvBean>() {@Overridepublic DwsTrafficForSourcePvBean map(String value) throws Exception {// 将page_log的一条日志转换为一个对应的javaBeanJSONObject jsonObject = JSON.parseObject(value);JSONObject common = jsonObject.getJSONObject("common");JSONObject page = jsonObject.getJSONObject("page");Long ts = jsonObject.getLong("ts");return DwsTrafficForSourcePvBean.builder().versionCode(common.getString("vc")).sourceId(common.getString("sc")).ar(common.getString("ar")).isNew(common.getString("is_new")).uvCount(0L).totalSessionCount(page.getString("last_page_id") == null ? 1L : 0L).pageViewCount(1L).totalDuringTime(page.getLong("during_time")).jumpSessionCount(0L).ts(ts).build();}});SingleOutputStreamOperator<DwsTrafficForSourcePvBean> uvBeanStream = uvStream.map(new MapFunction<String, DwsTrafficForSourcePvBean>() {@Overridepublic DwsTrafficForSourcePvBean map(String value) throws Exception {// 将page_log的一条日志转换为一个对应的javaBeanJSONObject jsonObject = JSON.parseObject(value);JSONObject common = jsonObject.getJSONObject("common");Long ts = jsonObject.getLong("ts");return DwsTrafficForSourcePvBean.builder().versionCode(common.getString("vc")).sourceId(common.getString("sc")).ar(common.getString("ar")).isNew(common.getString("is_new")).uvCount(1L).totalSessionCount(0L).pageViewCount(0L).totalDuringTime(0L).jumpSessionCount(0L).ts(ts).build();}});SingleOutputStreamOperator<DwsTrafficForSourcePvBean> jumpBeanStream = jumpStream.map(new MapFunction<String, DwsTrafficForSourcePvBean>() {@Overridepublic DwsTrafficForSourcePvBean map(String value) throws Exception {// 将page_log的一条日志转换为一个对应的javaBeanJSONObject jsonObject = JSON.parseObject(value);JSONObject common = jsonObject.getJSONObject("common");Long ts = jsonObject.getLong("ts");return DwsTrafficForSourcePvBean.builder().versionCode(common.getString("vc")).sourceId(common.getString("sc")).ar(common.getString("ar")).isNew(common.getString("is_new")).uvCount(0L).totalSessionCount(0L).pageViewCount(0L).totalDuringTime(0L).jumpSessionCount(1L).ts(ts).build();}});//TODO 6 合并3条数据流DataStream<DwsTrafficForSourcePvBean> unionStream = pageBeanStream.union(uvBeanStream).union(jumpBeanStream);//TODO 7 添加水位线SingleOutputStreamOperator<DwsTrafficForSourcePvBean> withWaterMarkStream = unionStream.assignTimestampsAndWatermarks(WatermarkStrategy.<DwsTrafficForSourcePvBean>forBoundedOutOfOrderness(Duration.ofSeconds(15L)).withTimestampAssigner(new SerializableTimestampAssigner<DwsTrafficForSourcePvBean>() {@Overridepublic long extractTimestamp(DwsTrafficForSourcePvBean element, long recordTimestamp) {return element.getTs();}}));//TODO 8 分组开窗WindowedStream<DwsTrafficForSourcePvBean, String, TimeWindow> windowStream = withWaterMarkStream.keyBy(new KeySelector<DwsTrafficForSourcePvBean, String>() {@Overridepublic String getKey(DwsTrafficForSourcePvBean value) throws Exception {return value.getVersionCode()+ value.getSourceId()+ value.getAr()+ value.getIsNew();}}).window(TumblingEventTimeWindows.of(Time.seconds(10L)));//TODO 9 聚合统计SingleOutputStreamOperator<DwsTrafficForSourcePvBean> reduceStream = windowStream.reduce(new ReduceFunction<DwsTrafficForSourcePvBean>() {@Overridepublic DwsTrafficForSourcePvBean reduce(DwsTrafficForSourcePvBean value1, DwsTrafficForSourcePvBean value2) throws Exception {// 合并相同common信息的数据value1.setTotalSessionCount(value1.getTotalSessionCount() + value2.getTotalSessionCount());value1.setUvCount(value1.getUvCount() + value2.getUvCount());value1.setTotalDuringTime(value1.getTotalDuringTime() + value2.getTotalDuringTime());value1.setJumpSessionCount(value1.getJumpSessionCount() + value2.getJumpSessionCount());value1.setPageViewCount(value1.getPageViewCount() + value2.getPageViewCount());return value1;}}, new ProcessWindowFunction<DwsTrafficForSourcePvBean, DwsTrafficForSourcePvBean, String, TimeWindow>() {@Overridepublic void process(String s, Context context, Iterable<DwsTrafficForSourcePvBean> elements, Collector<DwsTrafficForSourcePvBean> out) throws Exception {TimeWindow timeWindow = context.window();String start = DateFormatUtil.toYmdHms(timeWindow.getStart());String end = DateFormatUtil.toYmdHms(timeWindow.getEnd());for (DwsTrafficForSourcePvBean element : elements) {element.setStt(start);element.setEdt(end);// 修正时间戳element.setTs(System.currentTimeMillis());out.collect(element);}}});reduceStream.print();//TODO 10 维度关联reduceStream.map(new MapFunction<DwsTrafficForSourcePvBean, DwsTrafficForSourcePvBean>() {@Overridepublic DwsTrafficForSourcePvBean map(DwsTrafficForSourcePvBean value) throws Exception {// 关联来源名称String sourceId = value.getSourceId();String provinceId = value.getAr();JSONObject dimBaseSource = DimUtil.getDimInfo("DIM_BASE_SOURCE", sourceId);String sourceName = dimBaseSource.getString("SOURCE_SITE");value.setSourceName(sourceName);JSONObject dimBaseProvince = DimUtil.getDimInfo("DIM_BASE_PROVINCE", provinceId);String provinceName = dimBaseProvince.getString("NAME");value.setProvinceName(provinceName);return value;}}).print();// 异步操作// 关联来源表SingleOutputStreamOperator<DwsTrafficForSourcePvBean> sourceBeanStream = AsyncDataStream.unorderedWait(reduceStream, new DimAsyncFunction<DwsTrafficForSourcePvBean>("DIM_BASE_SOURCE") {@Overridepublic void join(DwsTrafficForSourcePvBean obj, JSONObject jsonObject) throws Exception {String sourceName = jsonObject.getString("SOURCE_SITE");obj.setSourceName(sourceName);}@Overridepublic String getKey(DwsTrafficForSourcePvBean obj) {return obj.getSourceId();}}, 1, TimeUnit.MINUTES);// 关联省份SingleOutputStreamOperator<DwsTrafficForSourcePvBean> dimBeanStream = AsyncDataStream.unorderedWait(sourceBeanStream, new DimAsyncFunction<DwsTrafficForSourcePvBean>("DIM_BASE_PROVINCE") {@Overridepublic void join(DwsTrafficForSourcePvBean obj, JSONObject jsonObject) throws Exception {String provinceName = jsonObject.getString("NAME");obj.setProvinceName(provinceName);}@Overridepublic String getKey(DwsTrafficForSourcePvBean obj) {return obj.getAr();}}, 1, TimeUnit.MINUTES);//TODO 11 写出到clickHousedimBeanStream.addSink(ClickHouseUtil.getJdbcSink(" " +"insert into dws_traffic_vc_source_ar_is_new_page_view_window values" +"(?,?,?,?,?,?,?,?,?,?,?,?,?,?)"));// TODO 12 执行任务env.execute();}
}