计算机毕设 flink大数据淘宝用户行为数据实时分析与可视化

文章目录

  • 0 前言
  • 1、环境准备
    • 1.1 flink 下载相关 jar 包
    • 1.2 生成 kafka 数据
    • 1.3 开发前的三个小 tip
  • 2、flink-sql 客户端编写运行 sql
    • 2.1 创建 kafka 数据源表
    • 2.2 指标统计:每小时成交量
      • 2.2.1 创建 es 结果表, 存放每小时的成交量
      • 2.2.2 执行 sql ,统计每小时的成交量
    • 2.3 指标统计:每10分钟累计独立用户数
      • 2.3.1 创建 es 结果表,存放每10分钟累计独立用户数
      • 2.3.2 创建视图
      • 2.3.3 执行 sql ,统计每10分钟的累计独立用户数
    • 2.4 指标统计:商品类目销量排行
      • 2.4.1 创建商品类目维表
      • 2.4.1 创建 es 结果表,存放商品类目排行表
      • 2.4.2 创建视图
      • 2.4.3 执行 sql , 统计商品类目销量排行
  • 3、最终效果与体验心得
    • 3.1 最终效果
    • 3.2 体验心得
      • 3.2.1 执行
      • 3.2.2 存储
  • 4 最后


0 前言

🔥 这两年开始毕业设计和毕业答辩的要求和难度不断提升,传统的毕设题目缺少创新和亮点,往往达不到毕业答辩的要求,这两年不断有学弟学妹告诉学长自己做的项目系统达不到老师的要求。

为了大家能够顺利以及最少的精力通过毕设,学长分享优质毕业设计项目,今天要分享的是

🚩 flink大数据淘宝用户行为数据实时分析与可视化

🥇学长这里给一个题目综合评分(每项满分5分)

  • 难度系数:3分
  • 工作量:3分
  • 创新点:4分

1、环境准备

1.1 flink 下载相关 jar 包

flink-sql 连接外部系统时,需要依赖特定的 jar 包,所以需要事先把这些 jar 包准备好。说明与下载入口

本项目使用到了以下的 jar 包 ,下载后直接放在了 flink/lib 里面。

需要注意的是 flink-sql 执行时,是转化为 flink-job 提交到集群执行的,所以 flink 集群的每一台机器都要添加以下的 jar 包。

外部版本jar
kafka4.1flink-sql-connector-kafka_2.11-1.10.2.jar
flink-json-1.10.2-sql-jar.jar
elasticsearch7.6flink-sql-connector-elasticsearch7_2.11-1.10.2.jar
mysql5.7flink-jdbc_2.11-1.10.2.jar
mysql-connector-java-8.0.11.jar

1.2 生成 kafka 数据

用户行为数据来源: 阿里云天池公开数据集

网盘:https://pan.baidu.com/s/1wDVQpRV7giIlLJJgRZAInQ 提取码:gja5

商品类目纬度数据来源: category.sql

数据生成器:datagen.py

有了数据文件之后,使用 python 读取文件数据,然后并发写入到 kafka。

修改生成器中的 kafka 地址配置,然后运行 以下命令,开始不断往 kafka 写数据

# 5000 并发
nohup python3 datagen.py 5000 &                  

1.3 开发前的三个小 tip

  • 生成器往 kafka 写数据,会自动创建主题,无需事先创建

  • flink 往 elasticsearch 写数据,会自动创建索引,无需事先创建

  • Kibana 使用索引模式从 Elasticsearch 索引中检索数据,以实现诸如可视化等功能。

使用的逻辑为:创建索引模式 》Discover (发现) 查看索引数据 》visualize(可视化)创建可视化图表》dashboards(仪表板)创建大屏,即汇总多个可视化的图表

2、flink-sql 客户端编写运行 sql

# 进入 flink-sql 客户端, 需要指定刚刚下载的 jar 包目录
./bin/sql-client.sh embedded -l lib

2.1 创建 kafka 数据源表

-- 创建 kafka 表, 读取 kafka 数据
CREATE TABLE user_behavior (user_id BIGINT,item_id BIGINT,category_id BIGINT,behavior STRING,ts TIMESTAMP(3),proctime as PROCTIME(),WATERMARK FOR ts as ts - INTERVAL '5' SECOND  
) WITH ('connector.type' = 'kafka', 'connector.version' = 'universal',  'connector.topic' = 'user_behavior',  'connector.startup-mode' = 'earliest-offset', 'connector.properties.zookeeper.connect' = '172.16.122.24:2181', 'connector.properties.bootstrap.servers' = '172.16.122.17:9092', 'format.type' = 'json'  
);
SELECT * FROM user_behavior;

2.2 指标统计:每小时成交量

2.2.1 创建 es 结果表, 存放每小时的成交量

CREATE TABLE buy_cnt_per_hour (hour_of_day BIGINT,buy_cnt BIGINT
) WITH ('connector.type' = 'elasticsearch', 'connector.version' = '7',  'connector.hosts' = 'http://172.16.122.13:9200',  'connector.index' = 'buy_cnt_per_hour','connector.document-type' = 'user_behavior','connector.bulk-flush.max-actions' = '1','update-mode' = 'append','format.type' = 'json'
);

2.2.2 执行 sql ,统计每小时的成交量

INSERT INTO buy_cnt_per_hour
SELECT HOUR(TUMBLE_START(ts, INTERVAL '1' HOUR)), COUNT(*)
FROM user_behavior
WHERE behavior = 'buy'
GROUP BY TUMBLE(ts, INTERVAL '1' HOUR);

2.3 指标统计:每10分钟累计独立用户数

2.3.1 创建 es 结果表,存放每10分钟累计独立用户数

CREATE TABLE cumulative_uv (time_str STRING,uv BIGINT
) WITH ('connector.type' = 'elasticsearch', 'connector.version' = '7',  'connector.hosts' = 'http://172.16.122.13:9200',  'connector.index' = 'cumulative_uv','connector.document-type' = 'user_behavior',    'update-mode' = 'upsert','format.type' = 'json'
);

2.3.2 创建视图

CREATE VIEW uv_per_10min AS
SELECTMAX(SUBSTR(DATE_FORMAT(ts, 'HH:mm'),1,4) || '0') OVER w AS time_str,COUNT(DISTINCT user_id) OVER w AS uv
FROM user_behavior
WINDOW w AS (ORDER BY proctime ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW);

2.3.3 执行 sql ,统计每10分钟的累计独立用户数

INSERT INTO cumulative_uv
SELECT time_str, MAX(uv)
FROM uv_per_10min
GROUP BY time_str;

2.4 指标统计:商品类目销量排行

2.4.1 创建商品类目维表

先在 mysql 创建一张商品类目的维表,然后配置 flink 读取 mysql。

CREATE TABLE category_dim (sub_category_id BIGINT,parent_category_name STRING
) WITH ('connector.type' = 'jdbc','connector.url' = 'jdbc:mysql://172.16.122.25:3306/flink','connector.table' = 'category','connector.driver' = 'com.mysql.jdbc.Driver','connector.username' = 'root','connector.password' = 'root','connector.lookup.cache.max-rows' = '5000','connector.lookup.cache.ttl' = '10min'
);

2.4.1 创建 es 结果表,存放商品类目排行表

CREATE TABLE top_category  (category_name  STRING,buy_cnt  BIGINT
) WITH ('connector.type' = 'elasticsearch', 'connector.version' = '7',  'connector.hosts' = 'http://172.16.122.13:9200',  'connector.index' = 'top_category','connector.document-type' = 'user_behavior','update-mode' = 'upsert','format.type' = 'json'
);

2.4.2 创建视图

CREATE VIEW rich_user_behavior AS
SELECT U.user_id, U.item_id, U.behavior, C.parent_category_name as category_name
FROM user_behavior AS U LEFT JOIN category_dim FOR SYSTEM_TIME AS OF U.proctime AS C
ON U.category_id = C.sub_category_id;

2.4.3 执行 sql , 统计商品类目销量排行

INSERT INTO top_category
SELECT category_name, COUNT(*) buy_cnt
FROM rich_user_behavior
WHERE behavior = 'buy'
GROUP BY category_name;

3、最终效果与体验心得

3.1 最终效果

整个开发过程,只用到了 flink-sql ,无需写 java 或者其它代码,就完成了这样一个实时报表。

image-20201201175438743

3.2 体验心得

3.2.1 执行

  • flink-sql 的 ddl 语句不会触发 flink-job , 同时创建的表、视图仅在会话级别有效。

  • 对于连接表的 insert、select 等操作,则会触发相应的流 job, 并自动提交到 flink 集群,无限地运行下去,直到主动取消或者 job 报错。

  • flink-sql 客户端关闭后,对于已经提交到 flink 集群的 job 不会有任何影响。

本次开发,执行了 3 个 insert , 因此打开 flink 集群面板,可以看到有 3 个无限的流 job 。即使 kafka 数据全部写入完毕,关闭 flink-sql 客户端,这个 3 个 job 都不会停止。
image-20201201175523916

3.2.2 存储

  • flnik 本身不存储业务数据,只作为流批一体的引擎存在,所以主要的用法为读取外部系统的数据,处理后,再写到外部系统。

  • flink 本身的元数据,包括表、函数等,默认情况下只是存放在内存里面,所以仅会话级别有效。但是,似乎可以存储到 Hive Metastore 中,关于这一点就留到以后再实践。

4 最后

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/122147.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Servlet 与Spring对比!

前言: Spring相关的框架知识,算是目前公司在用的前沿知识了,很重要!! 那么以Spring为基础的框架有几个? 以Spring为基础的框架包括若干模块,其中主要的有Spring Framework、Spring Boot、Spring…

十七、模型构建器(ModelBuilder)快速提取城市建成区——批量将夜光数据投影、转为整型(基于参考比较法)

一、前言 本文以参考比较法提取城市建成区为例,详细介绍如何使用模型构建器(ModelBuilder)提高效率,特别说明一下,模型构建器构建的模型是可以保存成为工具,日后需要使用的时候可以直接使用工具不用重复构建,因为模型构建器构建的工作流如果不保存,下次就不能使用,需…

Mybatis基础

文章目录 Mybatis基础XML语言概述使用Mybatis配置Mybatis增删改查复杂查询事务操作动态 SQLifchoose、when、otherwise 缓存机制注解开发 Mybatis基础 虽然我们能够通过JDBC来连接和操作数据库,但是哪怕只是完成一个SQL语句的执行,都需要编写大量的代码…

【Python入门教程】基于OpenCV视频分解成图片+图片组合成视频(视频抽帧组帧)

在人工智能爆火的今天,深度学习被广泛应用于各个领域。深度学习的模型训练离不开大量的样本库。我之前分享过【Python爬虫】批量爬取网页的图片&制作数据集,今天跟大家分享一下如何使用OpenCV库对视频进行抽帧,从而增加样本图片的数量。正…

Operator开发之operator-sdk入门

1 operator-sdk 除了kubebuilder,operator-sdk是另一个常用的用于开发Operator的框架,不过operator-sdk还是基于kubebuilder,因此,通常还是建议使用kubebuilder开发Operator。 2 环境准备 跟kubebuilder类似,需要安…

Tensorflow2 中模型训练标签顺序和预测结果标签顺序不一致问题解决办法

本篇文章将详细介绍Tensorflow2.x中模型训练标签顺序和预测结果标签顺序不一致问题,这个问题如果考虑不周,或者标签顺序没有控制好的情况下会出现预测结果精度极其不准确的情况。 训练数据集的结构:数据集有超过10的类别数,这里包…

【Java 进阶篇】Java HTTP 请求消息详解

HTTP(Hypertext Transfer Protocol)是一种用于传输超文本的应用层协议,广泛用于构建互联网应用。在Java中,我们经常需要发送HTTP请求来与远程服务器进行通信。本文将详细介绍Java中HTTP请求消息的各个部分,包括请求行、…

shouldComponentUpdate 是做什么的?

目录 前言 生命周期函数 shouldComponentUpdate 的写法和用法 代码 事件和API 优缺点 方法 总结 理论 结论 shouldComponentUpdate 是 React 类组件中的一个生命周期方法,用于决定一个组件的 props 或 state 发生变化时是否应该重新渲染。默认情况下&…

什么是离岸金融 (OFFSHORE FINANCE)

什么是离岸金融 离岸金融(Offshore Finance)是指在国外或离岸地区开设金融账户、进行金融交易或管理金融资产的金融实践。这通常涉及将资金、投资、银行账户或金融交易放置在国外的特殊地区或国家,以获得各种金融和税收优惠。离岸金融的目的…

设计模式--7个原则

单一职责原则:一个类负责一项职责。 里氏替换原则:继承与派生的规则。 依赖倒置原则:高层模块不应该依赖基层模块,二者都应该依赖其抽象;抽象不应该依赖细节;细节应该依赖抽象。即针对接口编程&#xff0…

Linux系统下DHCP服务安装部署和使用实例详解(蜜罐)

目录 一、概述 二、具体配置如下: 一、概述 DHCP :动态主机设置协议(英语:Dynamic Host Configuration Protocol,DHCP)是一个局域网的网络协议,使用UDP协议工作,主要有两个用途&…

公司电脑如何限制安装软件

公司电脑如何限制安装软件 安企神终端管理系统下载使用 在企业环境中,电脑已经成为企业中必不可少的办公工具,确保员工的生产力和公司的信息安全是至关重要的。为了实现这一目标,公司可能会限制员工在某些情况下安装软件或者由管理员来为终…

可以实时监控屏幕的电脑监控软件

电脑已经成为了人们工作和生活不可或缺的工具。然而,这也带来了诸多安全问题。一些人可能会利用电脑进行不恰当的操作,如聊天、游戏、观看视频等,甚至会泄露公司的商业机密。 电脑监控软件的定义 电脑监控软件是一种用于监控电脑使用情况的软…

【Docker】Docker的网络

Docker提供了多种内置的网络模式,用于在容器之间建立网络连接。这些网络模式,包括桥接网络、主机网络、无网络模式。我们将主要探讨每种网络模式的优缺点、适用场景。 桥接网络 桥接网络是Docker的默认网络模式。在桥接网络中,Docker会为每…

数字音频工作站软件 Ableton Live 11 mac中文软件特点与功能

Ableton Live 11 mac是一款数字音频工作站软件,用于音乐制作、录音、混音和现场演出。它由Ableton公司开发,是一款极其流行的音乐制作软件之一。 Ableton Live 11 mac软件特点和功能 Comping功能:Live 11增加了Comping功能,允许用…

PHP:json_encode和json_decode用法

json_encode 函数用于将 PHP 数据结构转换为 JSON 字符串。json_decode 函数用于将 JSON 字符串转换为 PHP 数据结构。 // 将 PHP 数据结构转换为 JSON 字符串 $data ["name" > "John","age" > 25,"city" > "New York&…

【Ansible自动化运维工具 1】Ansible常用模块详解(附各模块应用实例和Ansible环境安装部署)

Ansible常用模块 一、Ansible1.1 简介1.2 工作原理1.3 Ansible的特性1.3.1 特性一:Agentless,即无Agent的存在1.3.2 特性二:幂等性 1.4 Ansible的基本组件 二、Ansible环境安装部署2.1 安装ansible2.2 查看基本信息2.3 配置远程主机清单 三、…

elementUI el-collapse 自定义折叠面板icon 和 样式 或文字展开收起

: :v-deep{.el-collapse-item__arrow {width: 40px;}.el-icon-arrow-right:before {content: "展开";font-size: 15px;font-family: heiti;color: #2295ff;font-weight: bold;}.el-collapse-item__arrow.is-active {transform: none;}.el-collapse-item__arrow.is-a…

vue中,js获取svg内容并填充到svg图中

js获取svg内容并填充到svg图中 最近遇到一个需求,要求前端通过接口获取svg中的内容,并且填充到svg图中,接下来我就记录一下整个实现过程。 第一步,找到svg图中每一个需要填充的数据的key值,把所有key值交给后端&…

【linux】SourceForge 开源软件开发平台和仓库

在linux上面安装服务和工具。我们经常会下载安装包。今天推荐一个网站。 SourceForge 开源软件开发平台和仓库 ​ 全球最大开源软件开发平台和仓库 SourceForge.net,又称SF.net,是开源软件开发者进行开发管理的集中式场所。 SourceForge.net由VA Softwa…