python毕设选题 - flink大数据淘宝用户行为数据实时分析与可视化

文章目录

  • 0 前言
  • 1、环境准备
    • 1.1 flink 下载相关 jar 包
    • 1.2 生成 kafka 数据
    • 1.3 开发前的三个小 tip
  • 2、flink-sql 客户端编写运行 sql
    • 2.1 创建 kafka 数据源表
    • 2.2 指标统计:每小时成交量
      • 2.2.1 创建 es 结果表, 存放每小时的成交量
      • 2.2.2 执行 sql ,统计每小时的成交量
    • 2.3 指标统计:每10分钟累计独立用户数
      • 2.3.1 创建 es 结果表,存放每10分钟累计独立用户数
      • 2.3.2 创建视图
      • 2.3.3 执行 sql ,统计每10分钟的累计独立用户数
    • 2.4 指标统计:商品类目销量排行
      • 2.4.1 创建商品类目维表
      • 2.4.1 创建 es 结果表,存放商品类目排行表
      • 2.4.2 创建视图
      • 2.4.3 执行 sql , 统计商品类目销量排行
  • 3、最终效果与体验心得
    • 3.1 最终效果
    • 3.2 体验心得
      • 3.2.1 执行
      • 3.2.2 存储
  • 4 最后


0 前言

🔥 这两年开始毕业设计和毕业答辩的要求和难度不断提升,传统的毕设题目缺少创新和亮点,往往达不到毕业答辩的要求,这两年不断有学弟学妹告诉学长自己做的项目系统达不到老师的要求。

为了大家能够顺利以及最少的精力通过毕设,学长分享优质毕业设计项目,今天要分享的是

🚩 flink大数据淘宝用户行为数据实时分析与可视化

🥇学长这里给一个题目综合评分(每项满分5分)

  • 难度系数:3分
  • 工作量:3分
  • 创新点:4分

1、环境准备

1.1 flink 下载相关 jar 包

flink-sql 连接外部系统时,需要依赖特定的 jar 包,所以需要事先把这些 jar 包准备好。说明与下载入口

本项目使用到了以下的 jar 包 ,下载后直接放在了 flink/lib 里面。

需要注意的是 flink-sql 执行时,是转化为 flink-job 提交到集群执行的,所以 flink 集群的每一台机器都要添加以下的 jar 包。

外部版本jar
kafka4.1flink-sql-connector-kafka_2.11-1.10.2.jar
flink-json-1.10.2-sql-jar.jar
elasticsearch7.6flink-sql-connector-elasticsearch7_2.11-1.10.2.jar
mysql5.7flink-jdbc_2.11-1.10.2.jar
mysql-connector-java-8.0.11.jar

1.2 生成 kafka 数据

用户行为数据来源: 阿里云天池公开数据集

网盘:https://pan.baidu.com/s/1wDVQpRV7giIlLJJgRZAInQ 提取码:gja5

商品类目纬度数据来源: category.sql

数据生成器:datagen.py

有了数据文件之后,使用 python 读取文件数据,然后并发写入到 kafka。

修改生成器中的 kafka 地址配置,然后运行 以下命令,开始不断往 kafka 写数据

# 5000 并发
nohup python3 datagen.py 5000 &                  

1.3 开发前的三个小 tip

  • 生成器往 kafka 写数据,会自动创建主题,无需事先创建

  • flink 往 elasticsearch 写数据,会自动创建索引,无需事先创建

  • Kibana 使用索引模式从 Elasticsearch 索引中检索数据,以实现诸如可视化等功能。

使用的逻辑为:创建索引模式 》Discover (发现) 查看索引数据 》visualize(可视化)创建可视化图表》dashboards(仪表板)创建大屏,即汇总多个可视化的图表

2、flink-sql 客户端编写运行 sql

# 进入 flink-sql 客户端, 需要指定刚刚下载的 jar 包目录
./bin/sql-client.sh embedded -l lib

2.1 创建 kafka 数据源表

-- 创建 kafka 表, 读取 kafka 数据
CREATE TABLE user_behavior (user_id BIGINT,item_id BIGINT,category_id BIGINT,behavior STRING,ts TIMESTAMP(3),proctime as PROCTIME(),WATERMARK FOR ts as ts - INTERVAL '5' SECOND  
) WITH ('connector.type' = 'kafka', 'connector.version' = 'universal',  'connector.topic' = 'user_behavior',  'connector.startup-mode' = 'earliest-offset', 'connector.properties.zookeeper.connect' = '172.16.122.24:2181', 'connector.properties.bootstrap.servers' = '172.16.122.17:9092', 'format.type' = 'json'  
);
SELECT * FROM user_behavior;

2.2 指标统计:每小时成交量

2.2.1 创建 es 结果表, 存放每小时的成交量

CREATE TABLE buy_cnt_per_hour (hour_of_day BIGINT,buy_cnt BIGINT
) WITH ('connector.type' = 'elasticsearch', 'connector.version' = '7',  'connector.hosts' = 'http://172.16.122.13:9200',  'connector.index' = 'buy_cnt_per_hour','connector.document-type' = 'user_behavior','connector.bulk-flush.max-actions' = '1','update-mode' = 'append','format.type' = 'json'
);

2.2.2 执行 sql ,统计每小时的成交量

INSERT INTO buy_cnt_per_hour
SELECT HOUR(TUMBLE_START(ts, INTERVAL '1' HOUR)), COUNT(*)
FROM user_behavior
WHERE behavior = 'buy'
GROUP BY TUMBLE(ts, INTERVAL '1' HOUR);

2.3 指标统计:每10分钟累计独立用户数

2.3.1 创建 es 结果表,存放每10分钟累计独立用户数

CREATE TABLE cumulative_uv (time_str STRING,uv BIGINT
) WITH ('connector.type' = 'elasticsearch', 'connector.version' = '7',  'connector.hosts' = 'http://172.16.122.13:9200',  'connector.index' = 'cumulative_uv','connector.document-type' = 'user_behavior',    'update-mode' = 'upsert','format.type' = 'json'
);

2.3.2 创建视图

CREATE VIEW uv_per_10min AS
SELECTMAX(SUBSTR(DATE_FORMAT(ts, 'HH:mm'),1,4) || '0') OVER w AS time_str,COUNT(DISTINCT user_id) OVER w AS uv
FROM user_behavior
WINDOW w AS (ORDER BY proctime ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW);

2.3.3 执行 sql ,统计每10分钟的累计独立用户数

INSERT INTO cumulative_uv
SELECT time_str, MAX(uv)
FROM uv_per_10min
GROUP BY time_str;

2.4 指标统计:商品类目销量排行

2.4.1 创建商品类目维表

先在 mysql 创建一张商品类目的维表,然后配置 flink 读取 mysql。

CREATE TABLE category_dim (sub_category_id BIGINT,parent_category_name STRING
) WITH ('connector.type' = 'jdbc','connector.url' = 'jdbc:mysql://172.16.122.25:3306/flink','connector.table' = 'category','connector.driver' = 'com.mysql.jdbc.Driver','connector.username' = 'root','connector.password' = 'root','connector.lookup.cache.max-rows' = '5000','connector.lookup.cache.ttl' = '10min'
);

2.4.1 创建 es 结果表,存放商品类目排行表

CREATE TABLE top_category  (category_name  STRING,buy_cnt  BIGINT
) WITH ('connector.type' = 'elasticsearch', 'connector.version' = '7',  'connector.hosts' = 'http://172.16.122.13:9200',  'connector.index' = 'top_category','connector.document-type' = 'user_behavior','update-mode' = 'upsert','format.type' = 'json'
);

2.4.2 创建视图

CREATE VIEW rich_user_behavior AS
SELECT U.user_id, U.item_id, U.behavior, C.parent_category_name as category_name
FROM user_behavior AS U LEFT JOIN category_dim FOR SYSTEM_TIME AS OF U.proctime AS C
ON U.category_id = C.sub_category_id;

2.4.3 执行 sql , 统计商品类目销量排行

INSERT INTO top_category
SELECT category_name, COUNT(*) buy_cnt
FROM rich_user_behavior
WHERE behavior = 'buy'
GROUP BY category_name;

3、最终效果与体验心得

3.1 最终效果

整个开发过程,只用到了 flink-sql ,无需写 java 或者其它代码,就完成了这样一个实时报表。

image-20201201175438743

3.2 体验心得

3.2.1 执行

  • flink-sql 的 ddl 语句不会触发 flink-job , 同时创建的表、视图仅在会话级别有效。

  • 对于连接表的 insert、select 等操作,则会触发相应的流 job, 并自动提交到 flink 集群,无限地运行下去,直到主动取消或者 job 报错。

  • flink-sql 客户端关闭后,对于已经提交到 flink 集群的 job 不会有任何影响。

本次开发,执行了 3 个 insert , 因此打开 flink 集群面板,可以看到有 3 个无限的流 job 。即使 kafka 数据全部写入完毕,关闭 flink-sql 客户端,这个 3 个 job 都不会停止。
image-20201201175523916

3.2.2 存储

  • flnik 本身不存储业务数据,只作为流批一体的引擎存在,所以主要的用法为读取外部系统的数据,处理后,再写到外部系统。

  • flink 本身的元数据,包括表、函数等,默认情况下只是存放在内存里面,所以仅会话级别有效。但是,似乎可以存储到 Hive Metastore 中,关于这一点就留到以后再实践。

4 最后

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/598317.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

463岛屿周长

题目 给定一个 row x col 的二维网格地图 grid ,其中:grid[i][j] 1 表示陆地, grid[i][j] 0 表示水域。 网格中的格子 水平和垂直 方向相连(对角线方向不相连)。整个网格被水完全包围,但其中恰好有一个…

自动驾驶HWP的功能定义

一、功能定义 高速路自动驾驶功能HWP是指在一般畅通高速公路或城市快速路上驾驶员可以放开双手双脚,同时注意力可在较长时间内从驾驶环境中转移,做一些诸如看手机、接电话、看风景等活动,该系统最低工作速度为60kph。 如上两种不同环境和速度…

力扣题:字符串变换-1.5

力扣题-1.5 [力扣刷题攻略] Re:从零开始的力扣刷题生活 力扣题1:482. 密钥格式化 解题思想:首先先将破折号去除,并将所有字母转换为大写,然后计算第一组的长度,进行结果字符串的拼接,如果第一组的长度为0,则需要删除开头的’-符号 class S…

互联网加竞赛 基于CNN实现谣言检测 - python 深度学习 机器学习

文章目录 1 前言1.1 背景 2 数据集3 实现过程4 CNN网络实现5 模型训练部分6 模型评估7 预测结果8 最后 1 前言 🔥 优质竞赛项目系列,今天要分享的是 基于CNN实现谣言检测 该项目较为新颖,适合作为竞赛课题方向,学长非常推荐&am…

BUUCTF crypto做题记录(5)新手向

一、传统知识古典密码 加上一个数,就有点移位加密的味道,很有可能就是凯撒加密 辛卯28,癸巳30,丙戌23,辛未8,庚辰17,癸酉10,己卯16,癸巳30 加1之后,28变29&a…

详解编码与调制

编码与调制是现代通信领域的重要概念。在信息传输过程中,编码和调制起着至关重要的作用,它们帮助将数字信号转化为模拟信号,从而实现高效、可靠的数据传输。本文将从编码和调制的基本概念、常见的编码和调制技术以及其在通信领域的应用等方面…

微商城怎么弄才能开通呢?

​微商城的开通,对于许多商家来说,是进入移动电商领域的重要一步。它不仅能帮助你扩大销售渠道,还能让你更好地管理和服务你的客户。那么,微商城怎么弄才能开通呢? 1、注册微信公众号:首先,你需…

麒麟KYLINOS操作系统上扩容系统盘

原文链接:麒麟KYLINOS操作系统上扩容系统盘 hello,大家好啊!继之前我们讨论了如何在统信UOS上扩容数据盘之后,今天我要给大家带来的是在麒麟KYLINOS操作系统上扩容系统盘与数据盘的方法。随着数据的不断增长,系统盘或数…

微软 Power Platform 使用Power Automate发送邮件以Dataverse作为数据源的附件File Column

微软Power Platform使用Power Automate发送邮件添加Power Apps以Dataverse作为数据源的附件File Column方式 目录 微软Power Platform使用Power Automate发送邮件添加Power Apps以Dataverse作为数据源的附件File Column方式1、需求背景介绍2、附件列File Column介绍3、如何在Po…

分布式之任务调度Elastic-Job学习二

4 Spring 集成与分片详解 ejob-springboot 工程 4.1 pom 依赖 <properties><elastic-job.version>2.1.5</elastic-job.version> </properties> <dependency><groupId>com.dangdang</groupId><artifactId>elastic-job-lite-…

安全cdn有哪些优势

1. 免备案&#xff1a;在中国大陆地区&#xff0c;进行网站建设需要先进行备案手续&#xff0c;而安全cdn可以避免这一繁琐的步骤&#xff0c;节省时间和精力。 2. 精品线路&#xff1a;安全cdn使用的是覆盖范围更广、速度更快的香港CN2 GIA优化线路。 3. 高速稳定&#xff1a…

在vue3中使用Cesium保姆篇

1.首先新建一个vue项目 Vue.js - 渐进式 JavaScript 框架 | Vue.js 可以直接到管网中查看命令通过npm来创建一个vue3的项目 然后通过命令下载1.99的版本的cesium和plugin npm i cesium1.99 vite-plugin-cesium 下载完了以后 2.引入cesium 首先找到vue的vite.config.js …

[LitCTF 2023]这是什么?SQL !注一下 !

[LitCTF 2023]这是什么&#xff1f;SQL &#xff01;注一下 &#xff01; wp 题目描述&#xff1a;为了安全起见多带了几个套罢了o(▽)q 页面内容&#xff08;往下滑&#xff09;&#xff1a; SQL 语句已给出&#xff0c;无非是更换了闭合方式。 先输个 1 试试&#xff1a; …

【unity】基于Obi的绳/杆蓝图、绳杆区别及其创建方法

绳索 是通过使用距离和弯曲约束将粒子连接起来而形成的。由于规则粒子没有方向(只有位置)&#xff0c;因此无法模拟扭转效应(维基百科)&#xff0c;绳子也无法保持其静止形状。然而&#xff0c;与杆不同的是&#xff0c;绳索可以被撕裂/劈开&#xff0c;并且可以在运行时改变其…

轻量对象存储 LighthouseCOS 用户实践征文

产品使用攻略、上云技术实践&#xff0c;有奖征集&#xff0c;多重好礼等您带回家&#xff5e; 存储桶一键挂载轻量应用服务器&#xff0c;简单易用&#xff0c;腾讯云轻量对象存储用户实践征文活动特惠&#xff1a;腾讯云轻量云专场特惠活动。 投稿说明 1. 注册/登录腾讯云账…

【EI会议征稿通知】第三届智能电网与绿色能源国际学术会议(ICSGGE 2024)

第三届智能电网与绿色能源国际学术会议&#xff08;ICSGGE 2024&#xff09; 2024 3rd International Conference on Smart Grid and Green Energy 2024年第三届智能电网与绿色能源国际学术会议&#xff08;ICSGGE 2024&#xff09;将于2024年4月19-21日在中国成都举行。会议…

探索架构之美 | 小米分享架构师的方法论

大家好&#xff0c;我是小米&#xff01;今天我们要聊的话题可是相当精彩——“架构师的方法论”&#xff01;作为一名热爱技术的小伙伴&#xff0c;我深知在软件开发领域&#xff0c;拥有一套科学的方法论是多么的重要。所以&#xff0c;不废话&#xff0c;让我们一起踏上探索…

SpringBoot集成 Websocket 实现服务与客户端进行消息发送和接收

介绍 WebSocket是一种在单个TCP连接上进行全双工通信的协议。WebSocket使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据。 效果 客户端效果 服务端日志 pom依赖 <!-- websocket --> <dependency><groupId>org.springfram…

vue3+Ts+Hook的方式实现商城核心功能sku选择器

前言 Hooks是React等函数式编程框架中非常受欢迎的工具&#xff0c;随着VUE3 Composition API 函数式编程风格的推出&#xff0c;现在也受到越来越多VUE3开发者的青睐&#xff0c;它让开发者的代码具有更高的复用度且更加清晰、易于维护。 本文将通过CRMEB商城商品详情sku选择…

iOS苹果和Android安卓测试APP应用程序的差异

Hello大家好呀&#xff0c;我是咕噜铁蛋&#xff01;我们经常需要关注移动应用程序的测试和优化&#xff0c;以提供更好的用户体验。在移动应用开发领域&#xff0c;iOS和Android是两个主要的操作系统平台。本文铁蛋讲给各位小伙伴们详细介绍在App测试中iOS和Android的差异&…