尚硅谷大数据项目《在线教育之实时数仓》笔记008

视频地址:尚硅谷大数据项目《在线教育之实时数仓》_哔哩哔哩_bilibili

目录

第10章 数仓开发之DWS层

P066

P067

P068

P069

P070

P071

P072

P073

P074

P075

P076

P077

P078

P079

P080

P081

P082


第10章 数仓开发之DWS层

P066

第10章 数仓开发之DWS层

设计要点:
(1)DWS层的设计参考指标体系。
(2)DWS层表名的命名规范为dws_数据域_统计粒度_业务过程_统计周期(window)。
注:window 表示窗口对应的时间范围。
10.1 流量域来源关键词粒度页面浏览各窗口汇总表
10.1.1 主要任务

    从 Kafka 页面浏览明细主题读取数据,过滤搜索行为,使用自定义 UDTF(一进多出)函数对搜索内容分词。统计各窗口各关键词出现频次,写入 ClickHouse。
10.1.2 思路分析

尚硅谷大数据项目之在线教育数仓\尚硅谷大数据项目之在线教育数仓-3实时\资料\13.总线矩阵及指标体系

在线教育实时指标体系.xlsx

P067

DwsTrafficSourceKeywordPageViewWindow

//TODO 1 创建环境设置状态后端
//TODO 2 自定义拆词函数
//TODO 3 读取kafka中的page_log数据
//TODO 4 过滤数据得到搜索的关键字
//TODO 5 使用自定义函数对关键字拆词
//TODO 6 分组开窗合并计算
//TODO 7 转换为流
//TODO 8 写出到clickHouse中
//TODO 9 运行任务

package com.atguigu.edu.realtime.util;import org.wltea.analyzer.core.IKSegmenter;
import org.wltea.analyzer.core.Lexeme;import java.io.IOException;
import java.io.StringReader;
import java.util.ArrayList;/*** @author yhm* @create 2023-04-25 16:05*/
public class KeyWordUtil {public static ArrayList<String> analyze(String text) {StringReader reader = new StringReader(text);IKSegmenter ikSegmenter = new IKSegmenter(reader, true);ArrayList<String> strings = new ArrayList<>();try {Lexeme lexeme = null;while ((lexeme = ikSegmenter.next()) != null) {String keyWord = lexeme.getLexemeText();strings.add(keyWord);}} catch (IOException e) {e.printStackTrace();}return strings;}public static void main(String[] args) {String s = "Apple iPhoneXSMax (A2104) 256GB 深空灰色 移动联通电信4G手机 双卡双待";ArrayList<String> strings = analyze(s);System.out.println(strings);}
}

P068

User-defined Functions | Apache Flink

DwsTrafficSourceKeywordPageViewWindow

//TODO 1 创建环境设置状态后端
//TODO 2 自定义拆词函数
//TODO 3 读取kafka中的page_log数据
//TODO 4 过滤数据得到搜索的关键字
//TODO 5 使用自定义函数对关键字拆词

//TODO 6 分组开窗合并计算
//TODO 7 转换为流
//TODO 8 写出到clickHouse中
//TODO 9 运行任务

P069

package com.atguigu.edu.realtime.app.dws;import com.atguigu.edu.realtime.app.func.KeyWordUDTF;
import com.atguigu.edu.realtime.bean.KeywordBean;
import com.atguigu.edu.realtime.common.EduConfig;
import com.atguigu.edu.realtime.common.EduConstant;
import com.atguigu.edu.realtime.util.ClickHouseUtil;
import com.atguigu.edu.realtime.util.EnvUtil;
import com.atguigu.edu.realtime.util.KafkaUtil;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import java.sql.PreparedStatement;
import java.sql.SQLException;/*** @author yhm* @create 2023-04-25 16:01*/
public class DwsTrafficSourceKeywordPageViewWindow {public static void main(String[] args) throws Exception {//TODO 1 创建环境设置状态后端StreamExecutionEnvironment env = EnvUtil.getExecutionEnvironment(1);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);//TODO 2 自定义拆词函数tableEnv.createTemporarySystemFunction("ik_analyze", new KeyWordUDTF());//TODO 3 读取kafka中的page_log数据String topicName = "dwd_traffic_page_log";String groupId = "dws_traffic_source_keyword_page_view_window";tableEnv.executeSql("create table page_log(\n" +"    common map<String,String>,\n" +"    page map<String,String>,\n" +"    ts bigint, \n" +"    row_time as to_timestamp(from_unixtime(ts/1000,'yyyy-MM-dd HH:mm:ss')), \n" +"    WATERMARK FOR row_time AS row_time - INTERVAL '3' SECOND" +")" + KafkaUtil.getKafkaDDL(topicName, groupId));//TODO 4 过滤数据得到搜索的关键字//① page 字段下 item 字段不为 null;//② page 字段下 last_page_id 为 search;//③ page 字段下 item_type 为 keyword。Table searchTable = tableEnv.sqlQuery("select \n" +"    page['item'] full_word,\n" +"    row_time\n" +"from page_log\n" +"where page['item'] is not null \n" +"and page['item_type'] ='keyword'\n" +// "and page['last_page_id'] = 'search'" +"");tableEnv.createTemporaryView("search_table", searchTable);//TODO 5 使用自定义函数对关键字拆词Table splitTable = tableEnv.sqlQuery("select \n" +"    keyword,\n" +"    row_time\n" +"from search_table ,\n" +"lateral table (ik_analyze(full_word)) as t(keyword)");tableEnv.createTemporaryView("split_table", splitTable);tableEnv.executeSql("select * from split_table").print();//TODO 6 分组开窗合并计算//TODO 7 转换为流//TODO 8 写出到clickHouse中//TODO 9 运行任务}
}

P070

Window Aggregation | Apache Flink

P071

10.1.4 ClickHouse 建表语句

drop table if exists dws_traffic_source_keyword_page_view_window;

create table if not exists dws_traffic_source_keyword_page_view_window

(

    stt           DateTime,

    edt           DateTime,

    source        String,

    keyword       String,

    keyword_count UInt64,

    ts            UInt64

) engine = ReplacingMergeTree(ts)

      partition by toYYYYMMDD(stt)

      order by (stt, edt, source, keyword);

package com.atguigu.edu.realtime.util;import com.atguigu.edu.realtime.bean.KeywordBean;
import com.atguigu.edu.realtime.bean.TransientSink;
import com.atguigu.edu.realtime.common.EduConfig;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;import java.lang.reflect.Field;
import java.sql.PreparedStatement;
import java.sql.SQLException;/*** @author yhm* @create 2023-04-25 18:23*/
public class ClickHouseUtil {// 设计泛型 通过传入的数据类型自动补充sql 写出到clickhousepublic static <T> SinkFunction<T> getJdbcSink(String sql) {return JdbcSink.<T>sink(sql, new JdbcStatementBuilder<T>() {@Overridepublic void accept(PreparedStatement preparedStatement, T obj) throws SQLException {// T是泛型,明文是不知道什么类型的,需要使用反射获取Field[] declaredFields = obj.getClass().getDeclaredFields();int skip = 0;for (int i = 0; i < declaredFields.length; i++) {Field field = declaredFields[i];field.setAccessible(true);// 获取属性的注解TransientSink annotation = field.getAnnotation(TransientSink.class);if (annotation != null) {skip++;continue;}// 使用类模板的属性名 get对象 获取值try {Object o = field.get(obj);preparedStatement.setObject(i + 1 - skip, o);} catch (IllegalAccessException e) {e.printStackTrace();}}}}, JdbcExecutionOptions.builder().withBatchIntervalMs(5000L).withBatchSize(5).build(),new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl(EduConfig.CLICKHOUSE_URL).withDriverName(EduConfig.CLICKHOUSE_DRIVER).build());}
}

[atguigu@node001 ~]$ sudo systemctl start clickhouse-server
[atguigu@node001 ~]$ clickhouse-client -m

node001 :) SHOW DATABASES;

node001 :) CREATE DATABASE edu_realtime;
node001 :) SHOW DATABASES;

node001 :) USE edu_realtime;
node001 :) SHOW TABLES FROM edu_realtime;

node001 :) SELECT * FROM dws_traffic_source_keyword_page_view_window;

[atguigu@node001 ~]$ sudo systemctl start clickhouse-server
[atguigu@node001 ~]$ clickhouse-client -m
ClickHouse client version 20.4.5.36 (official build).
Connecting to localhost:9000 as user default.
Connected to ClickHouse server version 20.4.5 revision 54434.node001 :) CREATE DATABASE edu_realtime;CREATE DATABASE edu_realtimeOk.0 rows in set. Elapsed: 0.044 sec. node001 :) SHOW DATABASES;SHOW DATABASES┌─name───────────────────────────┐
│ _temporary_and_external_tables │
│ default                        │
│ edu_realtime                   │
│ system                         │
└────────────────────────────────┘4 rows in set. Elapsed: 0.031 sec. node001 :) SHOW TABLES FROM edu_realtime;SHOW TABLES FROM edu_realtimeOk.0 rows in set. Elapsed: 0.028 sec. node001 :) use edu_realtime;USE edu_realtimeOk.0 rows in set. Elapsed: 0.002 sec. node001 :) create table if not exists dws_traffic_source_keyword_page_view_window
:-] (
:-]     stt           DateTime,
:-]     edt           DateTime,
:-]     source        String,
:-]     keyword       String,
:-]     keyword_count UInt64,
:-]     ts            UInt64
:-] ) engine = ReplacingMergeTree(ts)
:-]       partition by toYYYYMMDD(stt)
:-]       order by (stt, edt, source, keyword);CREATE TABLE IF NOT EXISTS dws_traffic_source_keyword_page_view_window
(`stt` DateTime, `edt` DateTime, `source` String, `keyword` String, `keyword_count` UInt64, `ts` UInt64
)
ENGINE = ReplacingMergeTree(ts)
PARTITION BY toYYYYMMDD(stt)
ORDER BY (stt, edt, source, keyword)Ok.0 rows in set. Elapsed: 0.016 sec. node001 :) use edu_realtime;
USE edu_realtimeOk.0 rows in set. Elapsed: 0.002 sec. node001 :) SHOW TABLES FROM edu_realtime;
SHOW TABLES FROM edu_realtime┌─name────────────────────────────────────────┐
│ dws_traffic_source_keyword_page_view_window │
└─────────────────────────────────────────────┘1 rows in set. Elapsed: 0.007 sec. node001 :) SELECT * FROM dws_traffic_source_keyword_page_view_window;SELECT *
FROM dws_traffic_source_keyword_page_view_window┌─────────────────stt─┬─────────────────edt─┬─source─┬─keyword─┬─keyword_count─┬────────────ts─┐
│ 2022-02-21 23:58:30 │ 2022-02-21 23:58:40 │ SEARCH │ java    │            19 │ 1699513951000 │
│ 2022-02-21 23:58:30 │ 2022-02-21 23:58:40 │ SEARCH │ 前端    │            27 │ 1699513951000 │
│ 2022-02-21 23:58:50 │ 2022-02-21 23:59:00 │ SEARCH │ 前端    │            14 │ 1699513951000 │
│ 2022-02-21 23:59:00 │ 2022-02-21 23:59:10 │ SEARCH │ 大      │            19 │ 1699513951000 │
│ 2022-02-21 23:59:00 │ 2022-02-21 23:59:10 │ SEARCH │ 数据库  │             4 │ 1699513951000 │
└─────────────────────┴─────────────────────┴────────┴─────────┴───────────────┴───────────────┘
┌─────────────────stt─┬─────────────────edt─┬─source─┬─keyword─┬─keyword_count─┬────────────ts─┐
│ 2022-02-21 23:59:50 │ 2022-02-22 00:00:00 │ SEARCH │ hadoop  │            19 │ 1699513951000 │
│ 2022-02-21 23:59:50 │ 2022-02-22 00:00:00 │ SEARCH │ python  │            19 │ 1699513951000 │
│ 2022-02-21 23:59:50 │ 2022-02-22 00:00:00 │ SEARCH │ 前端    │            39 │ 1699513951000 │
│ 2022-02-21 23:59:50 │ 2022-02-22 00:00:00 │ SEARCH │ 数据库  │            19 │ 1699513951000 │
└─────────────────────┴─────────────────────┴────────┴─────────┴───────────────┴───────────────┘
┌─────────────────stt─┬─────────────────edt─┬─source─┬─keyword─┬─keyword_count─┬────────────ts─┐
│ 2022-02-21 23:59:00 │ 2022-02-21 23:59:10 │ SEARCH │ 数据    │            19 │ 1699513951000 │
│ 2022-02-21 23:59:20 │ 2022-02-21 23:59:30 │ SEARCH │ java    │            33 │ 1699513951000 │
│ 2022-02-21 23:59:30 │ 2022-02-21 23:59:40 │ SEARCH │ flink   │            20 │ 1699513951000 │
│ 2022-02-21 23:59:30 │ 2022-02-21 23:59:40 │ SEARCH │ java    │            20 │ 1699513951000 │
│ 2022-02-21 23:59:30 │ 2022-02-21 23:59:40 │ SEARCH │ 前端    │            19 │ 1699513951000 │
└─────────────────────┴─────────────────────┴────────┴─────────┴───────────────┴───────────────┘
┌─────────────────stt─┬─────────────────edt─┬─source─┬─keyword─┬─keyword_count─┬────────────ts─┐
│ 2022-02-22 00:00:10 │ 2022-02-22 00:00:20 │ SEARCH │ 多线程  │            20 │ 1699513951000 │
└─────────────────────┴─────────────────────┴────────┴─────────┴───────────────┴───────────────┘
┌─────────────────stt─┬─────────────────edt─┬─source─┬─keyword─┬─keyword_count─┬────────────ts─┐
│ 2022-02-22 00:00:20 │ 2022-02-22 00:00:30 │ SEARCH │ flink   │             4 │ 1699513951000 │
│ 2022-02-22 00:00:20 │ 2022-02-22 00:00:30 │ SEARCH │ java    │            27 │ 1699513951000 │
└─────────────────────┴─────────────────────┴────────┴─────────┴───────────────┴───────────────┘
┌─────────────────stt─┬─────────────────edt─┬─source─┬─keyword─┬─keyword_count─┬────────────ts─┐
│ 2022-02-21 20:51:40 │ 2022-02-21 20:51:50 │ SEARCH │ 多线程  │             1 │ 1699513903000 │
│ 2022-02-21 20:52:00 │ 2022-02-21 20:52:10 │ SEARCH │ hadoop  │             1 │ 1699449059000 │
│ 2022-02-21 20:53:10 │ 2022-02-21 20:53:20 │ SEARCH │ 多线程  │             1 │ 1699447298000 │
│ 2022-02-21 20:54:20 │ 2022-02-21 20:54:30 │ SEARCH │ 大      │             1 │ 1699447298000 │
│ 2022-02-21 20:54:20 │ 2022-02-21 20:54:30 │ SEARCH │ 数据    │             1 │ 1699447298000 │
│ 2022-02-21 23:59:50 │ 2022-02-22 00:00:00 │ SEARCH │ 前端    │             3 │ 1699449067000 │
│ 2022-02-21 23:59:50 │ 2022-02-22 00:00:00 │ SEARCH │ 数据库  │             1 │ 1699449067000 │
└─────────────────────┴─────────────────────┴────────┴─────────┴───────────────┴───────────────┘
┌─────────────────stt─┬─────────────────edt─┬─source─┬─keyword─┬─keyword_count─┬────────────ts─┐
│ 2022-02-22 00:00:10 │ 2022-02-22 00:00:20 │ SEARCH │ 多线程  │             2 │ 1699449067000 │
└─────────────────────┴─────────────────────┴────────┴─────────┴───────────────┴───────────────┘1003 rows in set. Elapsed: 0.114 sec. Processed 1.00 thousand rows, 54.17 KB (8.79 thousand rows/s., 474.47 KB/s.) node001 :) 

package com.atguigu.edu.realtime.app.dws;import com.atguigu.edu.realtime.app.func.KeyWordUDTF;
import com.atguigu.edu.realtime.bean.KeywordBean;
import com.atguigu.edu.realtime.common.EduConfig;
import com.atguigu.edu.realtime.common.EduConstant;
import com.atguigu.edu.realtime.util.ClickHouseUtil;
import com.atguigu.edu.realtime.util.EnvUtil;
import com.atguigu.edu.realtime.util.KafkaUtil;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import java.sql.PreparedStatement;
import java.sql.SQLException;/*** @author yhm* @create 2023-04-25 16:01*/
public class DwsTrafficSourceKeywordPageViewWindow {public static void main(String[] args) throws Exception {//TODO 1 创建环境设置状态后端StreamExecutionEnvironment env = EnvUtil.getExecutionEnvironment(1);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);//TODO 2 自定义拆词函数tableEnv.createTemporarySystemFunction("ik_analyze", new KeyWordUDTF());//TODO 3 读取kafka中的page_log数据String topicName = "dwd_traffic_page_log";String groupId = "dws_traffic_source_keyword_page_view_window";tableEnv.executeSql("create table page_log(\n" +"    common map<String,String>,\n" +"    page map<String,String>,\n" +"    ts bigint, \n" +"    row_time as to_timestamp(from_unixtime(ts/1000,'yyyy-MM-dd HH:mm:ss')), \n" +"    WATERMARK FOR row_time AS row_time - INTERVAL '3' SECOND" +")" + KafkaUtil.getKafkaDDL(topicName, groupId));//TODO 4 过滤数据得到搜索的关键字//① page 字段下 item 字段不为 null;//② page 字段下 last_page_id 为 search;//③ page 字段下 item_type 为 keyword。Table searchTable = tableEnv.sqlQuery("select \n" +"    page['item'] full_word,\n" +"    row_time\n" +"from page_log\n" +"where page['item'] is not null \n" +"and page['item_type'] ='keyword'\n" +// "and page['last_page_id'] = 'search'" +"");tableEnv.createTemporaryView("search_table", searchTable);//TODO 5 使用自定义函数对关键字拆词Table splitTable = tableEnv.sqlQuery("select \n" +"    keyword,\n" +"    row_time\n" +"from search_table ,\n" +"lateral table (ik_analyze(full_word)) as t(keyword)");tableEnv.createTemporaryView("split_table", splitTable);//tableEnv.executeSql("select * from split_table").print();//TODO 6 分组开窗合并计算Table keywordBeanTable = tableEnv.sqlQuery("select \n" +"    date_format(TUMBLE_START(\n" +"    row_time, INTERVAL '10' second),'yyyy-MM-dd HH:mm:ss') stt,\n" +"    date_format(TUMBLE_END(\n" +"    row_time, INTERVAL '10' second),'yyyy-MM-dd HH:mm:ss') edt,\n" +"\n" + "'" + EduConstant.KEYWORD_SEARCH + "' source," +"    0 keywordLength,\n" +"    keyword,\n" +"    count(*) keyword_count,\n" +"    UNIX_TIMESTAMP()*1000 ts\n" +"from split_table\n" +"group by TUMBLE(row_time, INTERVAL '10' second),keyword");//TODO 7 转换为流DataStream<KeywordBean> keywordBeanDataStream = tableEnv.toDataStream(keywordBeanTable, KeywordBean.class);keywordBeanDataStream.print();//TODO 8 写出到clickHouse中keywordBeanDataStream.addSink(ClickHouseUtil.<KeywordBean>getJdbcSink("insert into dws_traffic_source_keyword_page_view_window values(?,?,?,?,?,?)"));//TODO 9 运行任务env.execute();}
}

P072

package com.atguigu.edu.realtime.bean;import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;/*** @author yhm* @create 2023-04-25 18:37*/
@Target(ElementType.FIELD)
@Retention(RetentionPolicy.RUNTIME)
public @interface TransientSink {
}

P073

10.2 流量域版本-来源-地区-访客类别粒度页面浏览各窗口汇总表
10.2.1 主要任务

DWS 层是为 ADS 层服务的,通过对指标体系的分析,本节汇总表中需要有会话数、页面浏览数、浏览总时长、独立访客数、跳出会话数五个度量字段。我们的任务是统计这五个指标,并将数据写入 ClickHouse 汇总表。

P074

DwsTrafficVcSourceArIsNewPageViewWindow

TODO 1 ~ TODO 6

P075

DwsTrafficVcSourceArIsNewPageViewWindow

TODO 7 ~ TODO 9

P076

PhoenixUtil、public static <T> List<T> queryList(String sql, Class<T> clazz) {}

P077

DimUtil、public static JSONObject getDimInfoNoCache(String tableName, Tuple2<String, String>... columnNamesAn {}

[atguigu@node001 ~]$ start-hbase.sh
[atguigu@node001 ~]$ cd /opt/module/hbase/apache-phoenix-5.0.0-HBase-2.0-bin/
[atguigu@node001 apache-phoenix-5.0.0-HBase-2.0-bin]$ bin/sqlline.py node001:2181

P078

DwsTrafficVcSourceArIsNewPageViewWindow

TODO 10

P079

10.2 流量域版本-来源-地区-访客类别粒度页面浏览各窗口汇总表

10.2.2 思路分析

4)旁路缓存优化

外部数据源的查询常常是流式计算的性能瓶颈。以本程序为例,每次查询都要连接 Hbase,数据传输需要做序列化、反序列化,还有网络传输,严重影响时效性。可以通过旁路缓存对查询进行优化。

P080

DimUtil、public static JSONObject getDimInfo(String tableName, Tuple2<String, String>... columnNamesAndValues) {}

P081

DimUtil 、public static void deleteCached(String tableName, String id) {}

[atguigu@node001 ~]$ redis-server ./my_redis.conf 
[atguigu@node001 ~]$ redis-cli 
127.0.0.1:6379> ping
PONG
127.0.0.1:6379> 
[atguigu@node001 ~]$ /opt/module/hbase/apache-phoenix-5.0.0-HBase-2.0-bin/bin/sqlline.py node001:2181

P082

10.2 流量域版本-来源-地区-访客类别粒度页面浏览各窗口汇总表

10.2.2 思路分析

6)异步 IO

DwsTrafficVcSourceArIsNewPageViewWindow

//TODO 10 维度关联

[atguigu@node001 ~]$ clickhouse-client -m
ClickHouse client version 20.4.5.36 (official build).
Connecting to localhost:9000 as user default.
Connected to ClickHouse server version 20.4.5 revision 54434.node001 :) show databases;SHOW DATABASES┌─name───────────────────────────┐
│ _temporary_and_external_tables │
│ default                        │
│ edu_realtime                   │
│ system                         │
└────────────────────────────────┘4 rows in set. Elapsed: 0.019 sec. node001 :) use edu_realtime;USE edu_realtimeOk.0 rows in set. Elapsed: 0.007 sec. node001 :) drop table if exists dws_traffic_vc_source_ar_is_new_page_view_window;DROP TABLE IF EXISTS dws_traffic_vc_source_ar_is_new_page_view_windowOk.0 rows in set. Elapsed: 0.007 sec. node001 :) create table dws_traffic_vc_source_ar_is_new_page_view_window(
:-] stt DateTime,
:-] edt DateTime,
:-] version_code String,
:-] source_id String,
:-] source_name String,
:-] ar String,
:-] province_name String,
:-] is_new String,
:-] uv_count UInt64,
:-] total_session_count UInt64,
:-] page_view_count UInt64,
:-] total_during_time UInt64,
:-] jump_session_count UInt64,
:-] ts UInt64
:-] ) engine = ReplacingMergeTree(ts)
:-] partition by toYYYYMMDD(stt)
:-] order by(stt, edt, version_code, source_id, source_name, ar, province_name, is_new);CREATE TABLE dws_traffic_vc_source_ar_is_new_page_view_window
(`stt` DateTime, `edt` DateTime, `version_code` String, `source_id` String, `source_name` String, `ar` String, `province_name` String, `is_new` String, `uv_count` UInt64, `total_session_count` UInt64, `page_view_count` UInt64, `total_during_time` UInt64, `jump_session_count` UInt64, `ts` UInt64
)
ENGINE = ReplacingMergeTree(ts)
PARTITION BY toYYYYMMDD(stt)
ORDER BY (stt, edt, version_code, source_id, source_name, ar, province_name, is_new)Ok.0 rows in set. Elapsed: 0.043 sec. node001 :) select * from edu_realtime.dws_traffic_vc_source_ar_is_new_page_view_window;SELECT *
FROM edu_realtime.dws_traffic_vc_source_ar_is_new_page_view_windowOk.0 rows in set. Elapsed: 0.071 sec. node001 :) 
package com.atguigu.edu.realtime.app.dws;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.edu.realtime.app.func.DimAsyncFunction;
import com.atguigu.edu.realtime.bean.DwsTrafficForSourcePvBean;
import com.atguigu.edu.realtime.util.*;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.streaming.api.datastream.*;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;import java.time.Duration;
import java.util.concurrent.TimeUnit;/*** @author yhm* @create 2023-04-26 16:08*/
public class DwsTrafficVcSourceArIsNewPageViewWindow {public static void main(String[] args) throws Exception {//TODO 1 创建环境设置状态后端StreamExecutionEnvironment env = EnvUtil.getExecutionEnvironment(1);//TODO 2 读取pageLog主题数据String pageTopic = "dwd_traffic_page_log";String groupId = "dws_traffic_vc_source_ar_is_new_page_view_window";KafkaSource<String> pageSource = KafkaUtil.getKafkaConsumer(pageTopic, groupId);DataStreamSource<String> pageStream = env.fromSource(pageSource, WatermarkStrategy.noWatermarks(), "page_log");//TODO 3 读取独立访客数据String uvTopic = "dwd_traffic_unique_visitor_detail";KafkaSource<String> uvSource = KafkaUtil.getKafkaConsumer(uvTopic, groupId);DataStreamSource<String> uvStream = env.fromSource(uvSource, WatermarkStrategy.noWatermarks(), "uv_detail");//TODO 4 读取跳出用户数据String jumpTopic = "dwd_traffic_user_jump_detail";KafkaSource<String> jumpSource = KafkaUtil.getKafkaConsumer(jumpTopic, groupId);DataStreamSource<String> jumpStream = env.fromSource(jumpSource, WatermarkStrategy.noWatermarks(), "jump_detail");//TODO 5 转换数据结构SingleOutputStreamOperator<DwsTrafficForSourcePvBean> pageBeanStream = pageStream.map(new MapFunction<String, DwsTrafficForSourcePvBean>() {@Overridepublic DwsTrafficForSourcePvBean map(String value) throws Exception {// 将page_log的一条日志转换为一个对应的javaBeanJSONObject jsonObject = JSON.parseObject(value);JSONObject common = jsonObject.getJSONObject("common");JSONObject page = jsonObject.getJSONObject("page");Long ts = jsonObject.getLong("ts");return DwsTrafficForSourcePvBean.builder().versionCode(common.getString("vc")).sourceId(common.getString("sc")).ar(common.getString("ar")).isNew(common.getString("is_new")).uvCount(0L).totalSessionCount(page.getString("last_page_id") == null ? 1L : 0L).pageViewCount(1L).totalDuringTime(page.getLong("during_time")).jumpSessionCount(0L).ts(ts).build();}});SingleOutputStreamOperator<DwsTrafficForSourcePvBean> uvBeanStream = uvStream.map(new MapFunction<String, DwsTrafficForSourcePvBean>() {@Overridepublic DwsTrafficForSourcePvBean map(String value) throws Exception {// 将page_log的一条日志转换为一个对应的javaBeanJSONObject jsonObject = JSON.parseObject(value);JSONObject common = jsonObject.getJSONObject("common");Long ts = jsonObject.getLong("ts");return DwsTrafficForSourcePvBean.builder().versionCode(common.getString("vc")).sourceId(common.getString("sc")).ar(common.getString("ar")).isNew(common.getString("is_new")).uvCount(1L).totalSessionCount(0L).pageViewCount(0L).totalDuringTime(0L).jumpSessionCount(0L).ts(ts).build();}});SingleOutputStreamOperator<DwsTrafficForSourcePvBean> jumpBeanStream = jumpStream.map(new MapFunction<String, DwsTrafficForSourcePvBean>() {@Overridepublic DwsTrafficForSourcePvBean map(String value) throws Exception {// 将page_log的一条日志转换为一个对应的javaBeanJSONObject jsonObject = JSON.parseObject(value);JSONObject common = jsonObject.getJSONObject("common");Long ts = jsonObject.getLong("ts");return DwsTrafficForSourcePvBean.builder().versionCode(common.getString("vc")).sourceId(common.getString("sc")).ar(common.getString("ar")).isNew(common.getString("is_new")).uvCount(0L).totalSessionCount(0L).pageViewCount(0L).totalDuringTime(0L).jumpSessionCount(1L).ts(ts).build();}});//TODO 6 合并3条数据流DataStream<DwsTrafficForSourcePvBean> unionStream = pageBeanStream.union(uvBeanStream).union(jumpBeanStream);//TODO 7 添加水位线SingleOutputStreamOperator<DwsTrafficForSourcePvBean> withWaterMarkStream = unionStream.assignTimestampsAndWatermarks(WatermarkStrategy.<DwsTrafficForSourcePvBean>forBoundedOutOfOrderness(Duration.ofSeconds(15L)).withTimestampAssigner(new SerializableTimestampAssigner<DwsTrafficForSourcePvBean>() {@Overridepublic long extractTimestamp(DwsTrafficForSourcePvBean element, long recordTimestamp) {return element.getTs();}}));//TODO 8 分组开窗WindowedStream<DwsTrafficForSourcePvBean, String, TimeWindow> windowStream = withWaterMarkStream.keyBy(new KeySelector<DwsTrafficForSourcePvBean, String>() {@Overridepublic String getKey(DwsTrafficForSourcePvBean value) throws Exception {return value.getVersionCode()+ value.getSourceId()+ value.getAr()+ value.getIsNew();}}).window(TumblingEventTimeWindows.of(Time.seconds(10L)));//TODO 9 聚合统计SingleOutputStreamOperator<DwsTrafficForSourcePvBean> reduceStream = windowStream.reduce(new ReduceFunction<DwsTrafficForSourcePvBean>() {@Overridepublic DwsTrafficForSourcePvBean reduce(DwsTrafficForSourcePvBean value1, DwsTrafficForSourcePvBean value2) throws Exception {// 合并相同common信息的数据value1.setTotalSessionCount(value1.getTotalSessionCount() + value2.getTotalSessionCount());value1.setUvCount(value1.getUvCount() + value2.getUvCount());value1.setTotalDuringTime(value1.getTotalDuringTime() + value2.getTotalDuringTime());value1.setJumpSessionCount(value1.getJumpSessionCount() + value2.getJumpSessionCount());value1.setPageViewCount(value1.getPageViewCount() + value2.getPageViewCount());return value1;}}, new ProcessWindowFunction<DwsTrafficForSourcePvBean, DwsTrafficForSourcePvBean, String, TimeWindow>() {@Overridepublic void process(String s, Context context, Iterable<DwsTrafficForSourcePvBean> elements, Collector<DwsTrafficForSourcePvBean> out) throws Exception {TimeWindow timeWindow = context.window();String start = DateFormatUtil.toYmdHms(timeWindow.getStart());String end = DateFormatUtil.toYmdHms(timeWindow.getEnd());for (DwsTrafficForSourcePvBean element : elements) {element.setStt(start);element.setEdt(end);// 修正时间戳element.setTs(System.currentTimeMillis());out.collect(element);}}});reduceStream.print();//TODO 10 维度关联reduceStream.map(new MapFunction<DwsTrafficForSourcePvBean, DwsTrafficForSourcePvBean>() {@Overridepublic DwsTrafficForSourcePvBean map(DwsTrafficForSourcePvBean value) throws Exception {// 关联来源名称String sourceId = value.getSourceId();String provinceId = value.getAr();JSONObject dimBaseSource = DimUtil.getDimInfo("DIM_BASE_SOURCE", sourceId);String sourceName = dimBaseSource.getString("SOURCE_SITE");value.setSourceName(sourceName);JSONObject dimBaseProvince = DimUtil.getDimInfo("DIM_BASE_PROVINCE", provinceId);String provinceName = dimBaseProvince.getString("NAME");value.setProvinceName(provinceName);return value;}}).print();// 异步操作// 关联来源表SingleOutputStreamOperator<DwsTrafficForSourcePvBean> sourceBeanStream = AsyncDataStream.unorderedWait(reduceStream, new DimAsyncFunction<DwsTrafficForSourcePvBean>("DIM_BASE_SOURCE") {@Overridepublic void join(DwsTrafficForSourcePvBean obj, JSONObject jsonObject) throws Exception {String sourceName = jsonObject.getString("SOURCE_SITE");obj.setSourceName(sourceName);}@Overridepublic String getKey(DwsTrafficForSourcePvBean obj) {return obj.getSourceId();}}, 1, TimeUnit.MINUTES);// 关联省份SingleOutputStreamOperator<DwsTrafficForSourcePvBean> dimBeanStream = AsyncDataStream.unorderedWait(sourceBeanStream, new DimAsyncFunction<DwsTrafficForSourcePvBean>("DIM_BASE_PROVINCE") {@Overridepublic void join(DwsTrafficForSourcePvBean obj, JSONObject jsonObject) throws Exception {String provinceName = jsonObject.getString("NAME");obj.setProvinceName(provinceName);}@Overridepublic String getKey(DwsTrafficForSourcePvBean obj) {return obj.getAr();}}, 1, TimeUnit.MINUTES);//TODO 11 写出到clickHousedimBeanStream.addSink(ClickHouseUtil.getJdbcSink(" " +"insert into dws_traffic_vc_source_ar_is_new_page_view_window values" +"(?,?,?,?,?,?,?,?,?,?,?,?,?,?)"));// TODO 12 执行任务env.execute();}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/175143.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

消失的数字,旋转数组(leetcode 一题多解)

目录 一、消失的数字 思路一&#xff08;暴力求解&#xff09;代码实现&#xff1a; 思路二&#xff08;数列的思想&#xff09;代码实现&#xff1a; 思路三&#xff08;异或的运用&#xff09;代码实现&#xff1a; 二、轮转数组 思路一&#xff08;暴力求解&#xff09…

Vue3 + Scss 实现主题切换效果

Vue3 Scss 实现主题切换效果 先给大家看一下主题切换的效果&#xff1a; 像这样的效果实现起来并不难&#xff0c;只是比较麻烦&#xff0c;目前我知道的有两种方式可以实现&#xff0c;分别是 CSS 变量、样式文件切换&#xff0c;下面是该效果的核心实现方法 CSS变量 给…

电脑如何定时关机?

电脑如何定时关机&#xff1f;我承认自己是个相当粗心的人&#xff0c;尤其是在急于离开时经常会忘记关闭电脑&#xff0c;结果就是电量耗尽&#xff0c;导致电脑自动关机。而且&#xff0c;在我使用电脑的时候&#xff0c;经常需要进行软件下载、更新等任务。如果我一直坐等任…

设计模式—迪米特原则(LOD)

1.背景 1987年秋天由美国Northeastern University的Ian Holland提出&#xff0c;被UML的创始者之一Booch等普及。后来&#xff0c;因为在经典著作《 The Pragmatic Programmer》而广为人知。 2.概念 迪米特法则&#xff08;Law of Demeter&#xff09;又叫作最少知识原则&…

c语言,输入整数n(行数,本例为4),按照如下规则打印数字图片 1 5 9 13 2 6 10 14 3 7 11 15 4 8 12 16

c语言&#xff0c;输入整数n(行数&#xff0c;本例为4&#xff09;&#xff0c;按照如下规则打印数字图片 1 5 9 13 2 6 10 14 3 7 11 15 4 8 12 16 以下是使用C语言编写的程序&#xff0c;根据输入的行数打印数字图片的规则&#xff1a; #include <stdio.h>int main() …

CV计算机视觉每日开源代码Paper with code速览-2023.11.22

点击CV计算机视觉&#xff0c;关注更多CV干货 论文已打包&#xff0c;点击进入—>下载界面 点击加入—>CV计算机视觉交流群 1.【语义分割】Mobile-Seed: Joint Semantic Segmentation and Boundary Detection for Mobile Robots 论文地址&#xff1a;https://arxiv.or…

7种SQL进阶用法【转】

1.自定义排序(ORDER BY FIELD) 在MySQL中ORDER BY排序除了可以用ASC和DESC之外,还可以使使用自定义排序方式来实现 CREATE TABLE movies ( id INT PRIMARY KEY AUTO_INCREMENT, movie_name VARCHAR(255), actors VARCHAR(255), price DECIMAL(10,2) DEFAULT 50, release date…

ES 8.x开始(docker-compose安装、kibana使用、java操作)

学习文档地址 一、Docker安装 这里使用docker-compose来安装&#xff0c;方便后续迁移&#xff0c;Elasticserach和kibina一起安装。 1、创建安装目录 configdataplugins 2、配置文件 配置文件有两个&#xff0c;一个是ES的配置文件&#xff0c;一个docker-compose的配置文件 …

龙芯loongarch64服务器编译安装pyarrow

1、简介 pyarrow是一个高效的Python库,用于在Python应用程序和Apache Arrow之间进行交互。Arrow是一种跨语言的内存格式,可以快速高效地转移大型数据集合。它提供了一种通用的数据格式,将数据在内存中表示为表格,并支持诸如序列化和分布式读取等功能。 龙芯的Python仓库安…

Ubuntu 22.03 LTS 安装deepin-terminal 分屏

安装 源里面自带了这个软件&#xff0c;可以直接装 sudo apt install deepin-terminal 启动 按下Win键&#xff0c;输入deep即可快速检索出图标&#xff0c;点击启动 效果 分屏 CtrlShiftH 水平分割 CtrlShiftJ 垂直分割 最多分割成四个小窗口&#xff0c;鼠标点击可以切换…

三方支付接口成为了电商竞争力的新动力

在当前快速发展的互联网时代&#xff0c;随着电子商务行业的兴起&#xff0c;支付体验已经成为企业获取竞争优势的重要因素。一个快速、安全、便捷的支付环节不仅可以提升用户的体验&#xff0c;还能有效促进交易的完成。在众多支付解决方案中&#xff0c;三方支付接口因其独特…

PCL 计算点云图中任意两点的欧式距离

目录 一、算法原理二、代码实现三、结果展示四、相关链接本文由CSDN点云侠原创,原文链接。如果你不是在点云侠的博客中看到该文章,那么此处便是不要脸的爬虫与GPT。 一、算法原理 使用PCL实现在可视化界面上用鼠标点选两个点,输出两点的坐标和两点之间的欧式距离。 二、代码…

linux 内核线程

内核线程类似于用户进程&#xff0c;通常用于并发处理些工作&#xff0c;它是一种在内核空间实现后台任务的方式&#xff0c;并且可以参与时间片轮转调度。 内核线程可以进行繁忙的异步事件处理&#xff0c;也可以睡眠等待某事件的发生&#xff0c;内核线程可以访问内核函数和…

Linux操作系统使用及C高级编程-D17D18编译与调试

编译 当有线程创建时编译&#xff1a;gcc test.c -o test -lpthread 分文件编写时主要是分为&#xff1a;.c&#xff08;函数声明的具体实现&#xff09;、.h&#xff08;说明性文件&#xff1a;#define 结构体共用体 声明&#xff09;、.c(main) 条件编译 一般情况下&#x…

激光线提取

在做单线激光三维重建&#xff0c;和多线激光三维重建的时候都会设计到激光线提取算法的实现&#xff0c;如何保持高速和高精度是关键 &#xff0c;最近优化了steger中心线提取算法&#xff0c;通过并行化实现在cpu版本可以做到2m,GPU版本可以做到0.6ms左右&#xff0c;完全可…

Flask 运用Xterm实现交互终端

Xterm是一个基于X Window System的终端仿真器&#xff08;Terminal Emulator&#xff09;。Xterm最初由MIT开发&#xff0c;它允许用户在X Window环境下运行文本终端程序。Xterm提供了一个图形界面终端&#xff0c;使用户能够在图形桌面环境中运行命令行程序。而xterm.js是一个…

Kotlin学习——kt入门合集博客 kt里的委派模式Delegation kt里的特性

Kotlin 是一门现代但已成熟的编程语言&#xff0c;旨在让开发人员更幸福快乐。 它简洁、安全、可与 Java 及其他语言互操作&#xff0c;并提供了多种方式在多个平台间复用代码&#xff0c;以实现高效编程。 https://play.kotlinlang.org/byExample/01_introduction/02_Functio…

[SpringCloud] SpringCloud配置中心的核心原理

SpringCloud是什么时候去拉取配置中心的配置中心客户端的配置信息为什么要写在bootstrap文件中对象中注入的属性是如何动态刷新的一些开源的配置中心是如何整合SpringCloud的 文章目录 1.从SpringBoot的启动过程说起1.1 大致过程 2.准备Environment的核心操作2.1 前置操作 3.pr…

SEOAI每周资讯和Linus思考 231127

欢迎查看 Linus筋斗云SEO 的每周资讯整理&#xff0c;本周的一些要点&#xff1a; Google11月核心更新和评论更新仍然没有结束9个搜索结果页的主要变化&#xff1a;图标、品牌、粉丝数、新模块GSC已索引页面狂掉&#xff1f;Google的问题&#xff0c;已修复黑五网一期间的搜索…

强化学习中的Q学习

Q学习&#xff08;Q-Learning&#xff09;是强化学习中的一种基于值的学习方法&#xff0c;用于在有限马尔可夫决策过程&#xff08;MDP&#xff09;中学习最优的动作策略。Q学习主要用于离散状态和离散动作的问题。 以下是Q学习的基本概念和步骤&#xff1a; Q-Value&#xf…