python毕设选题 - flink大数据淘宝用户行为数据实时分析与可视化

文章目录

  • 0 前言
  • 1、环境准备
    • 1.1 flink 下载相关 jar 包
    • 1.2 生成 kafka 数据
    • 1.3 开发前的三个小 tip
  • 2、flink-sql 客户端编写运行 sql
    • 2.1 创建 kafka 数据源表
    • 2.2 指标统计:每小时成交量
      • 2.2.1 创建 es 结果表, 存放每小时的成交量
      • 2.2.2 执行 sql ,统计每小时的成交量
    • 2.3 指标统计:每10分钟累计独立用户数
      • 2.3.1 创建 es 结果表,存放每10分钟累计独立用户数
      • 2.3.2 创建视图
      • 2.3.3 执行 sql ,统计每10分钟的累计独立用户数
    • 2.4 指标统计:商品类目销量排行
      • 2.4.1 创建商品类目维表
      • 2.4.1 创建 es 结果表,存放商品类目排行表
      • 2.4.2 创建视图
      • 2.4.3 执行 sql , 统计商品类目销量排行
  • 3、最终效果与体验心得
    • 3.1 最终效果
    • 3.2 体验心得
      • 3.2.1 执行
      • 3.2.2 存储
  • 4 最后


0 前言

🔥 这两年开始毕业设计和毕业答辩的要求和难度不断提升,传统的毕设题目缺少创新和亮点,往往达不到毕业答辩的要求,这两年不断有学弟学妹告诉学长自己做的项目系统达不到老师的要求。

为了大家能够顺利以及最少的精力通过毕设,学长分享优质毕业设计项目,今天要分享的是

🚩 flink大数据淘宝用户行为数据实时分析与可视化

🥇学长这里给一个题目综合评分(每项满分5分)

  • 难度系数:3分
  • 工作量:3分
  • 创新点:4分

1、环境准备

1.1 flink 下载相关 jar 包

flink-sql 连接外部系统时,需要依赖特定的 jar 包,所以需要事先把这些 jar 包准备好。说明与下载入口

本项目使用到了以下的 jar 包 ,下载后直接放在了 flink/lib 里面。

需要注意的是 flink-sql 执行时,是转化为 flink-job 提交到集群执行的,所以 flink 集群的每一台机器都要添加以下的 jar 包。

外部版本jar
kafka4.1flink-sql-connector-kafka_2.11-1.10.2.jar
flink-json-1.10.2-sql-jar.jar
elasticsearch7.6flink-sql-connector-elasticsearch7_2.11-1.10.2.jar
mysql5.7flink-jdbc_2.11-1.10.2.jar
mysql-connector-java-8.0.11.jar

1.2 生成 kafka 数据

用户行为数据来源: 阿里云天池公开数据集

网盘:https://pan.baidu.com/s/1wDVQpRV7giIlLJJgRZAInQ 提取码:gja5

商品类目纬度数据来源: category.sql

数据生成器:datagen.py

有了数据文件之后,使用 python 读取文件数据,然后并发写入到 kafka。

修改生成器中的 kafka 地址配置,然后运行 以下命令,开始不断往 kafka 写数据

# 5000 并发
nohup python3 datagen.py 5000 &                  

1.3 开发前的三个小 tip

  • 生成器往 kafka 写数据,会自动创建主题,无需事先创建

  • flink 往 elasticsearch 写数据,会自动创建索引,无需事先创建

  • Kibana 使用索引模式从 Elasticsearch 索引中检索数据,以实现诸如可视化等功能。

使用的逻辑为:创建索引模式 》Discover (发现) 查看索引数据 》visualize(可视化)创建可视化图表》dashboards(仪表板)创建大屏,即汇总多个可视化的图表

2、flink-sql 客户端编写运行 sql

# 进入 flink-sql 客户端, 需要指定刚刚下载的 jar 包目录
./bin/sql-client.sh embedded -l lib

2.1 创建 kafka 数据源表

-- 创建 kafka 表, 读取 kafka 数据
CREATE TABLE user_behavior (user_id BIGINT,item_id BIGINT,category_id BIGINT,behavior STRING,ts TIMESTAMP(3),proctime as PROCTIME(),WATERMARK FOR ts as ts - INTERVAL '5' SECOND  
) WITH ('connector.type' = 'kafka', 'connector.version' = 'universal',  'connector.topic' = 'user_behavior',  'connector.startup-mode' = 'earliest-offset', 'connector.properties.zookeeper.connect' = '172.16.122.24:2181', 'connector.properties.bootstrap.servers' = '172.16.122.17:9092', 'format.type' = 'json'  
);
SELECT * FROM user_behavior;

2.2 指标统计:每小时成交量

2.2.1 创建 es 结果表, 存放每小时的成交量

CREATE TABLE buy_cnt_per_hour (hour_of_day BIGINT,buy_cnt BIGINT
) WITH ('connector.type' = 'elasticsearch', 'connector.version' = '7',  'connector.hosts' = 'http://172.16.122.13:9200',  'connector.index' = 'buy_cnt_per_hour','connector.document-type' = 'user_behavior','connector.bulk-flush.max-actions' = '1','update-mode' = 'append','format.type' = 'json'
);

2.2.2 执行 sql ,统计每小时的成交量

INSERT INTO buy_cnt_per_hour
SELECT HOUR(TUMBLE_START(ts, INTERVAL '1' HOUR)), COUNT(*)
FROM user_behavior
WHERE behavior = 'buy'
GROUP BY TUMBLE(ts, INTERVAL '1' HOUR);

2.3 指标统计:每10分钟累计独立用户数

2.3.1 创建 es 结果表,存放每10分钟累计独立用户数

CREATE TABLE cumulative_uv (time_str STRING,uv BIGINT
) WITH ('connector.type' = 'elasticsearch', 'connector.version' = '7',  'connector.hosts' = 'http://172.16.122.13:9200',  'connector.index' = 'cumulative_uv','connector.document-type' = 'user_behavior',    'update-mode' = 'upsert','format.type' = 'json'
);

2.3.2 创建视图

CREATE VIEW uv_per_10min AS
SELECTMAX(SUBSTR(DATE_FORMAT(ts, 'HH:mm'),1,4) || '0') OVER w AS time_str,COUNT(DISTINCT user_id) OVER w AS uv
FROM user_behavior
WINDOW w AS (ORDER BY proctime ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW);

2.3.3 执行 sql ,统计每10分钟的累计独立用户数

INSERT INTO cumulative_uv
SELECT time_str, MAX(uv)
FROM uv_per_10min
GROUP BY time_str;

2.4 指标统计:商品类目销量排行

2.4.1 创建商品类目维表

先在 mysql 创建一张商品类目的维表,然后配置 flink 读取 mysql。

CREATE TABLE category_dim (sub_category_id BIGINT,parent_category_name STRING
) WITH ('connector.type' = 'jdbc','connector.url' = 'jdbc:mysql://172.16.122.25:3306/flink','connector.table' = 'category','connector.driver' = 'com.mysql.jdbc.Driver','connector.username' = 'root','connector.password' = 'root','connector.lookup.cache.max-rows' = '5000','connector.lookup.cache.ttl' = '10min'
);

2.4.1 创建 es 结果表,存放商品类目排行表

CREATE TABLE top_category  (category_name  STRING,buy_cnt  BIGINT
) WITH ('connector.type' = 'elasticsearch', 'connector.version' = '7',  'connector.hosts' = 'http://172.16.122.13:9200',  'connector.index' = 'top_category','connector.document-type' = 'user_behavior','update-mode' = 'upsert','format.type' = 'json'
);

2.4.2 创建视图

CREATE VIEW rich_user_behavior AS
SELECT U.user_id, U.item_id, U.behavior, C.parent_category_name as category_name
FROM user_behavior AS U LEFT JOIN category_dim FOR SYSTEM_TIME AS OF U.proctime AS C
ON U.category_id = C.sub_category_id;

2.4.3 执行 sql , 统计商品类目销量排行

INSERT INTO top_category
SELECT category_name, COUNT(*) buy_cnt
FROM rich_user_behavior
WHERE behavior = 'buy'
GROUP BY category_name;

3、最终效果与体验心得

3.1 最终效果

整个开发过程,只用到了 flink-sql ,无需写 java 或者其它代码,就完成了这样一个实时报表。

image-20201201175438743

3.2 体验心得

3.2.1 执行

  • flink-sql 的 ddl 语句不会触发 flink-job , 同时创建的表、视图仅在会话级别有效。

  • 对于连接表的 insert、select 等操作,则会触发相应的流 job, 并自动提交到 flink 集群,无限地运行下去,直到主动取消或者 job 报错。

  • flink-sql 客户端关闭后,对于已经提交到 flink 集群的 job 不会有任何影响。

本次开发,执行了 3 个 insert , 因此打开 flink 集群面板,可以看到有 3 个无限的流 job 。即使 kafka 数据全部写入完毕,关闭 flink-sql 客户端,这个 3 个 job 都不会停止。
image-20201201175523916

3.2.2 存储

  • flnik 本身不存储业务数据,只作为流批一体的引擎存在,所以主要的用法为读取外部系统的数据,处理后,再写到外部系统。

  • flink 本身的元数据,包括表、函数等,默认情况下只是存放在内存里面,所以仅会话级别有效。但是,似乎可以存储到 Hive Metastore 中,关于这一点就留到以后再实践。

4 最后

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/598317.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

463岛屿周长

题目 给定一个 row x col 的二维网格地图 grid ,其中:grid[i][j] 1 表示陆地, grid[i][j] 0 表示水域。 网格中的格子 水平和垂直 方向相连(对角线方向不相连)。整个网格被水完全包围,但其中恰好有一个…

GreatSQL社区2023全年技术文章总结

GreatSQL社区自成立以来一直致力于为广大的数据库爱好者提供一个交流与学习的平台。在2023年,我们见证了社区的蓬勃发展,见证了众多技术文章的诞生与分享。 此篇总结呈现GreatSQL社区2023年社区技术文章在CSDN发布的全部。这些文章涵盖了GreatSQL、MGR、…

自动驾驶HWP的功能定义

一、功能定义 高速路自动驾驶功能HWP是指在一般畅通高速公路或城市快速路上驾驶员可以放开双手双脚,同时注意力可在较长时间内从驾驶环境中转移,做一些诸如看手机、接电话、看风景等活动,该系统最低工作速度为60kph。 如上两种不同环境和速度…

知识笔记(七十)———tp5中的增删改查(详细)

增 添加多条数据 添加多条数据直接向 Db 类的 insertAll 方法传入需要添加的数据即可 $data [[foo > bar, bar > foo],[foo > bar1, bar > foo1],[foo > bar2, bar > foo2] ]; Db::name(user)->insertAll($data); 助手函数写法 // 添加单条数据 db(…

力扣labuladong一刷day53天LFU 算法

力扣labuladong一刷day53天LFU 算法 一、460. LFU 缓存 题目链接&#xff1a;https://leetcode.cn/problems/lfu-cache/description/ class LFUCache {HashMap<Integer, Integer> ktv;HashMap<Integer, Integer> ktf;HashMap<Integer, LinkedHashSet<Inte…

力扣题:字符串变换-1.5

力扣题-1.5 [力扣刷题攻略] Re&#xff1a;从零开始的力扣刷题生活 力扣题1&#xff1a;482. 密钥格式化 解题思想&#xff1a;首先先将破折号去除,并将所有字母转换为大写,然后计算第一组的长度,进行结果字符串的拼接,如果第一组的长度为0,则需要删除开头的’-符号 class S…

互联网加竞赛 基于CNN实现谣言检测 - python 深度学习 机器学习

文章目录 1 前言1.1 背景 2 数据集3 实现过程4 CNN网络实现5 模型训练部分6 模型评估7 预测结果8 最后 1 前言 &#x1f525; 优质竞赛项目系列&#xff0c;今天要分享的是 基于CNN实现谣言检测 该项目较为新颖&#xff0c;适合作为竞赛课题方向&#xff0c;学长非常推荐&am…

力扣-135.分发糖果

135.分发糖果 n 个孩子站成一排。给你一个整数数组 ratings 表示每个孩子的评分。 你需要按照以下要求&#xff0c;给这些孩子分发糖果&#xff1a; 每个孩子至少分配到 1 个糖果。 相邻两个孩子评分更高的孩子会获得更多的糖果。 请你给每个孩子分发糖果&#xff0c;计算并返…

BUUCTF crypto做题记录(5)新手向

一、传统知识古典密码 加上一个数&#xff0c;就有点移位加密的味道&#xff0c;很有可能就是凯撒加密 辛卯28&#xff0c;癸巳30&#xff0c;丙戌23&#xff0c;辛未8&#xff0c;庚辰17&#xff0c;癸酉10&#xff0c;己卯16&#xff0c;癸巳30 加1之后&#xff0c;28变29&a…

详解编码与调制

编码与调制是现代通信领域的重要概念。在信息传输过程中&#xff0c;编码和调制起着至关重要的作用&#xff0c;它们帮助将数字信号转化为模拟信号&#xff0c;从而实现高效、可靠的数据传输。本文将从编码和调制的基本概念、常见的编码和调制技术以及其在通信领域的应用等方面…

微商城怎么弄才能开通呢?

​微商城的开通&#xff0c;对于许多商家来说&#xff0c;是进入移动电商领域的重要一步。它不仅能帮助你扩大销售渠道&#xff0c;还能让你更好地管理和服务你的客户。那么&#xff0c;微商城怎么弄才能开通呢&#xff1f; 1、注册微信公众号&#xff1a;首先&#xff0c;你需…

麒麟KYLINOS操作系统上扩容系统盘

原文链接&#xff1a;麒麟KYLINOS操作系统上扩容系统盘 hello&#xff0c;大家好啊&#xff01;继之前我们讨论了如何在统信UOS上扩容数据盘之后&#xff0c;今天我要给大家带来的是在麒麟KYLINOS操作系统上扩容系统盘与数据盘的方法。随着数据的不断增长&#xff0c;系统盘或数…

【开题报告】基于SSM的动物保护知识科普平台的设计与实现

1.研究背景 动物保护是一个全球性的议题&#xff0c;涉及到生态环境、物种多样性和可持续发展等重要领域。随着人们环保意识的提高和社会对动物福利的关注度的增加&#xff0c;动物保护知识科普的重要性日益凸显。 在过去的几十年里&#xff0c;动物保护组织和机构开展了大量…

微软 Power Platform 使用Power Automate发送邮件以Dataverse作为数据源的附件File Column

微软Power Platform使用Power Automate发送邮件添加Power Apps以Dataverse作为数据源的附件File Column方式 目录 微软Power Platform使用Power Automate发送邮件添加Power Apps以Dataverse作为数据源的附件File Column方式1、需求背景介绍2、附件列File Column介绍3、如何在Po…

亚马逊多店铺运营:如何有效降低账号关联风险?

在亚马逊平台进行多店铺运营时&#xff0c;账号关联风险是卖家需要特别关注的问题。本文将介绍一些有效的方法&#xff0c;以帮助卖家降低账号关联风险&#xff0c;保护多店铺运营的稳定和持续性。 一、避免使用相同的IP地址和设备 亚马逊通过IP地址和设备指纹等信息来判断账…

分布式之任务调度Elastic-Job学习二

4 Spring 集成与分片详解 ejob-springboot 工程 4.1 pom 依赖 <properties><elastic-job.version>2.1.5</elastic-job.version> </properties> <dependency><groupId>com.dangdang</groupId><artifactId>elastic-job-lite-…

安全cdn有哪些优势

1. 免备案&#xff1a;在中国大陆地区&#xff0c;进行网站建设需要先进行备案手续&#xff0c;而安全cdn可以避免这一繁琐的步骤&#xff0c;节省时间和精力。 2. 精品线路&#xff1a;安全cdn使用的是覆盖范围更广、速度更快的香港CN2 GIA优化线路。 3. 高速稳定&#xff1a…

邦芒攻略:让HR看得上的简历五大要素

简历是求职的敲门砖。只有通过简历筛选&#xff0c;HR给你面试通知&#xff0c;你才有机会面试&#xff0c;距离你的心仪的公司和职位又近了一步。 1、简历上的求职目标一定要简洁明确。简历上的内容一定要能与所应聘的岗位所呼应&#xff0c;无 关紧要的信息最好不要出现&…

Xpath的问题:为什么在DOM中确定存在(可见)的元素,用//表达式匹配不到(附解决办法)

今天遇到一个很有意思的问题&#xff0c;我的爬取的目标页面上有时会出现一个弹窗&#xff0c;它挡住我点击其它按钮了&#xff0c;我想找到它的关闭按钮&#xff0c;自动点击一下关闭掉&#xff0c;本来是很简单的事情&#xff0c;但偏偏出问题了&#xff0c;DOM中看到的html是…

在vue3中使用Cesium保姆篇

1.首先新建一个vue项目 Vue.js - 渐进式 JavaScript 框架 | Vue.js 可以直接到管网中查看命令通过npm来创建一个vue3的项目 然后通过命令下载1.99的版本的cesium和plugin npm i cesium1.99 vite-plugin-cesium 下载完了以后 2.引入cesium 首先找到vue的vite.config.js …