Flink实时电商数仓之DWS层

需求分析

  • 关键词
    在这里插入图片描述
  • 统计关键词出现的频率

IK分词

进行分词需要引入IK分词器,使用它时需要引入相关的依赖。它能够将搜索的关键字按照日常的使用习惯进行拆分。比如将苹果iphone 手机,拆分为苹果,iphone, 手机。

<dependency><groupId>org.apache.doris</groupId><artifactId>flink-doris-connector-1.17</artifactId>
</dependency><dependency><groupId>com.janeluo</groupId><artifactId>ikanalyzer</artifactId>
</dependency>

测试代码如下:

public class IkUtil {public static void main(String[] args) throws IOException {String s = "Apple 苹果15 5G手机";StringReader stringReader = new StringReader(s);IKSegmenter ikSegmenter = new IKSegmenter(stringReader, true);//第二个参数表示是否再对拆分后的单词再进行拆分,true时表示不在继续拆分Lexeme next = ikSegmenter.next();while (next!= null) {System.out.println(next.getLexemeText());next = ikSegmenter.next();}}
}

整体流程

  1. 创建自定义分词工具类IKUtil,IK是一个分词工具依赖
  2. 创建自定义函数类
  3. 注册函数
  4. 消费kafka DWD页面主题数据并设置水位线
  5. 从主流中过滤搜索行为
    • page[‘item’] is not null
    • item_type : “keyword”
    • last_page_id: “search”
  6. 使用分词函数对keyword进行拆分
  7. 对keyword进行分组开窗聚合
  8. 写出到doris
    • 创建doris sink
    • flink需要打开检查点才能将数据写出到doris

在这里插入图片描述

具体实现

import com.atguigu.gmall.realtime.common.base.BaseSQLApp;
import com.atguigu.gmall.realtime.common.constant.Constant;
import com.atguigu.gmall.realtime.common.util.SQLUtil;
import com.atguigu.gmall.realtime.dws.function.KwSplit;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;/*** title:** @Author 浪拍岸* @Create 28/12/2023 上午11:06* @Version 1.0*/
public class DwsTrafficSourceKeywordPageViewWindow extends BaseSQLApp {public static void main(String[] args) {new DwsTrafficSourceKeywordPageViewWindow().start(10021,4,"dws_traffic_source_keyword_page_view_window");}@Overridepublic void handle(StreamExecutionEnvironment env, TableEnvironment tableEnv, String groupId) {//1. 读取主流dwd页面主题数据tableEnv.executeSql("create table page_info(\n" +"    `common` map<string,string>,\n" +"    `page` map<string,string>,\n" +"    `ts` bigint,\n" +"    `row_time` as to_timestamp_ltz(ts,3),\n" +"     WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND\n" +")" + SQLUtil.getKafkaSourceSQL(Constant.TOPIC_DWD_TRAFFIC_PAGE, groupId));//测试是否获取到数据//tableEnv.executeSql("select * from page_info").print();//2. 筛选出关键字keywordsTable keywrodTable = tableEnv.sqlQuery("select\n" +"    page['item'] keywords,\n" +"    `row_time`,\n" +"    ts\n" +" from page_info\n" +" where page['last_page_id'] = 'search'\n" +" and page['item_type'] = 'keyword'\n" +" and page['item'] is not null");tableEnv.createTemporaryView("keywords_table", keywrodTable);// 测试是否获取到数据//tableEnv.executeSql("select * from keywords_table").print();//3. 自定义分词函数并注册tableEnv.createTemporarySystemFunction("kwSplit", KwSplit.class );//4. 调用分词函数对keywords进行拆分Table splitKwTable = tableEnv.sqlQuery("select keywords, keyword, `row_time`" +" from keywords_table" +" left join lateral Table(kwSplit(keywords)) on true");tableEnv.createTemporaryView("split_kw_table", splitKwTable);//tableEnv.executeSql("select * from split_kw_table").print();//5. 对keyword进行分组开窗聚合Table windowAggTable = tableEnv.sqlQuery("select\n" +"    keyword,\n" +"    cast(tumble_start(row_time,interval '10' second ) as string) wStart,\n" +"    cast(tumble_end(row_time,interval '10' second ) as string) wEnd,\n" +"    cast(current_date as string)  cur_date,\n" +"    count(*) keyword_count\n" +"from split_kw_table\n" +"group by tumble(row_time, interval '10' second), keyword");//tableEnv.createTemporaryView("result_table",table);//tableEnv.executeSql("select keyword,keyword_count+1 from result_table").print();//6. 写出到doristableEnv.executeSql("create table doris_sink\n" +"(\n" +"    keyword                STRING,\n" +"    wStart                 STRING,\n" +"    wEnd                   STRING,\n" +"    cur_date               STRING,\n" +"    keyword_count          BIGINT\n" +")" + SQLUtil.getDorisSinkSQL(Constant.DWS_TRAFFIC_SOURCE_KEYWORD_PAGE_VIEW_WINDOW));windowAggTable.insertInto("doris_sink").execute();}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/583362.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

MIT线性代数笔记-第31讲-线性变换及对应矩阵

目录 31.线性变换及对应矩阵打赏 31.线性变换及对应矩阵 线性变换相当于是矩阵的抽象表示&#xff0c;每个线性变换都对应着一个矩阵 例&#xff1a; 考虑一个变换 T T T&#xff0c;使得平面上的一个向量投影为平面上的另一个向量&#xff0c;即 T : R 2 → R 2 T:R^2 \to R…

用ChatGPT挑选钻石!著名珠宝商推出-珠宝GPT

根据Salesforce最新发布的第五版《互联网购物报告》显示&#xff0c;ChatGPT等生成式AI的出现、快速发展&#xff0c;对零售行业和购物者产生了较大影响。可有效简化业务流程实现降本增效&#xff0c;并改善购物体验。 著名珠宝商James Allen为了积极拥抱生成式AI全面提升销售…

软件测试/测试开发丨Python闭包函数和计时器学习笔记

闭包函数 闭包的内部函数中&#xff0c;对外部作用域的变量进行引用闭包无法修改外部函数的局部变量闭包可以保存当前的运行环境 # 普通方法实现 def output_student(name, gender, grade1):print(F"新学期开学啦&#xff0c;学生{name}是{gender}&#xff0c;他是{grad…

AI时代下,如何看待“算法利维坦”?

ChatGPT的浪潮从2022年袭来后&#xff0c;至今热度不减&#xff0c;呈现出蓬勃发展的趋势。AI家居、医疗、教育、金融、公益、农业、艺术......AI真的已经走进了生活的方方面面&#xff0c;我们仿佛已经进入了AI时代&#xff0c;势不可挡。人工智能水平如此之高&#xff0c;不禁…

OpenCV-Python(21):OpenCV中的轮廓性质

3.轮廓的性质 本文我们将主要学习基于轮廓来提取一些经常使用的对象特征。 3.1 长宽比 边界矩形的宽高比&#xff1a; x,y,w,h cv2.boundingRect(cnt) aspect_ratio float(w)/h 3.2 Extent 轮廓面积与边界矩形面积的比。 area cv2.contourArea(cnt) x,y,w,h cv2.bounding…

王道考研计算机网络——应用层

如何为用户提供服务&#xff1f; CS/P2P 提高域名解析的速度&#xff1a;local name server高速缓存&#xff1a;直接地址映射/低级的域名服务器的地址 本机也有告诉缓存&#xff1a;本机开机的时候从本地域名服务器当中下载域名和地址的对应数据库&#xff0c;放到本地的高…

C语言实现RSA算法加解密

使用c语言实现了RSA加解密算法&#xff0c;可以加解密文件和字符串。 rsa算法原理 选择两个大素数p和q&#xff1b;计算n p * q;计算φ(n)(p-1)(q-1)&#xff1b;选择与φ(n)互素的整数d&#xff1b;由de1 mod φ(n)计算得到e&#xff1b;公钥是(e, n), 私钥是(d, n);假设明…

Microsoft .NET Framework 4.5.1 离线安装包

Microsoft .NET Framework 4.5.1 安装包&#xff1a; 一、离线安装包&#xff1a; 百度网盘 链接: https://pan.baidu.com/s/1IGEYT1vyruY6KFu6XEmerA 提取码: m6ix 离线安装包官方地址&#xff1a; https://www.microsoft.com/zh-cn/download/details.aspx?id40779 二、在…

如何在VSCode搭建ESP-IDF开发ESP32

文章目录 概要安装VScode安装ESP-IDF插件使用官方例程小结 概要 ESP-IDF(Espressif IoT Development Framework) 即乐鑫物联网开发框架&#xff0c;它基于 C/C 语言提供了一个自给自足的 SDK&#xff0c;可为在 Windows、Linux 和 macOS 系统平台上开发 ESP32 应用程序提供工具…

跳跃表原理及实现

一、跳表数据结构 跳表是有序表的一种&#xff0c;其底层是通过链表实现的。链表的特点是插入删除效率高&#xff0c;但是查找节点效率很低&#xff0c;最坏的时间复杂度是O(N)&#xff0c;那么跳表就是解决这一痛点而生的。 为了提高查询效率&#xff0c;我们可以给链表加上索…

天翼云云间高速实现租户跨地域内网互通

一、业务需求 用户业务在襄阳、武汉两个云池部署&#xff0c;希望通过云间高速产品将两个资源池云内资源通过云内专网实现内网互通。要求内网双向互通。 二、测试环境配置 云池vpc名称vpc网段子网内网ip/gweip主机名互联网带宽襄阳ceshi192.168.0.0/16192.168.1.0/24192.168.…

unity学习笔记----游戏练习02

一、阳光值的展示和消耗 1.创建一个文本组件用于显示阳光的数值&#xff0c;然后在脚本中得到这个UI。 在SunManger中得到这个组件的引用 public TextMeshProUGUI sunPointText; 写一个用于更新显示的方法 public void UpdataSunPointText() { sunPointText.tex…

FileZilla的使用主动模式与被动模式

&#x1f3ac; 艳艳耶✌️&#xff1a;个人主页 &#x1f525; 个人专栏 &#xff1a;《产品经理如何画泳道图&流程图》 ⛺️ 越努力 &#xff0c;越幸运 目录 一、FileZilla简介 1、FileZilla是什么&#xff1f; 2、FileZilla的应用场景 二、FileZilla的安装 1、下…

JVM篇:JVM内存结构

程序计数器 程序计数器英文名叫&#xff1a;Program Counter Register 作用&#xff1a;用来记录下一条jvm指令的地址行号。 先来查看一段jvm指令&#xff0c;这些指令对应的java代码就是输出1-5 操作系统运行该Java程序时具体流程如下 语言解释&#xff1a;源文件通过编译转…

光伏逆变器MPPT的作用、原理及算法

MPPT是逆变器非常核心的技术&#xff0c;MPPT电压在进行光伏电站设计时一项非常关键的参数。 一、什么是MPPT&#xff1f; &#xff08;单块光伏组件的I-V、P-V曲线&#xff09; 上图中&#xff0c;光伏组件的输出电压和电流遵循I-V曲线(绿色)、P-V曲线(蓝色)&#xff0c;如果…

NVMe over Fabrics:概念、应用和实现

对于大部分人来说&#xff0c;NVMe over Fabrics&#xff08;简称NVMf&#xff09;还是个新东西&#xff0c;因为其第一个正式版本的协议在今年6月份才发布。但是这并不影响人们对NVMf的关注&#xff0c;因为这项依托于NVMe的技术很可能继续改变存储市场格局。 NVMf的贡献在于…

labuladong日常刷题-双指针 | LeetCode 83删除排序链表中的重复元素 5最长回文子串

双指针操作链表与字符串 LeetCode 83 删除排序链表中的重复元素 2023.12.28 题目链接labuladong讲解[链接] ListNode* deleteDuplicates(ListNode* head) {/*暴力求解ListNode* cur new ListNode();ListNode* prenode cur;cur->next head;cur cur->next;while(cu…

web自动化(4)——POM设计重构

1. 什么是POM Page Object Model 是ui自动化测试中常见的封装方式。 原理&#xff1a;将页面封装为PO对象&#xff0c;然后通过面向对象的方式实现UI自动化 2. 封装原则 PO无需包含全部UI元素PO应当验证元素PO不应该包含断言PO不应该暴露元素 3. 怎么进行POM封装 面向对象…

架构设计系列 5:常见架构介绍

前面讲了架构是什么&#xff0c;架构的发展史&#xff0c;架构设计的基础理论&#xff0c;这次针对常见架构设计风格进行介绍和分析。 一、MVC&#xff1a;三层架构经典 经典的 MVC 架构&#xff08;Model-View-Controller&#xff09;架构是软件系统架构设计中的经典&#xf…

数据结构与算法教程,数据结构C语言版教程!(第二部分、线性表详解:数据结构线性表10分钟入门)一

第二部分、线性表详解&#xff1a;数据结构线性表10分钟入门 线性表&#xff0c;数据结构中最简单的一种存储结构&#xff0c;专门用于存储逻辑关系为"一对一"的数据。 线性表&#xff0c;基于数据在实际物理空间中的存储状态&#xff0c;又可细分为顺序表&#xff…