【电影推荐系统】实时推荐

概览

技术方案:

  • 日志采集服务:通过利用Flume-ng对业务平台中用户对于电影的一次评分行为进行采集,实时发送到Kafka集群。
  • 消息缓冲服务:项目采用Kafka作为流式数据的缓存组件,接受来自Flume的数据采集请求。并将数据推送到项目的实时推荐系统部分。
  • 实时推荐服务:项目采用Spark Streaming作为实时推荐系统,通过接收Kafka中缓存的数据,通过设计的推荐算法实现对实时推荐的数据处理,并将结构合并更新到MongoDB数据库。

1. 实现思路

我们应该如何实现?

  1. 首先应该redis安装,这里存储用户的第K次评分(用户评分存入redis中)
  2. 安装zookeeper,安装kafka,都是standlone模式
  3. 测试Kafka与Spark Streaming 联调。Kafka生产一条数据,Spark Streaming 可以消费成功,并根据redis中的数据和MongoDB数据进行推荐,存入MongoDB中
  4. 在业务系统写埋点信息,测试时写入本地文件,之后再远程测试写入云服务器log文件中
  5. flume配置文件书写,kafka创建两个topic,对整个过程进行测试

2 环境准备

1.1 redis 安装

  • redis安装redis安装
  • 密码:123456
  • 存入redis一些数据 lpush uid:1 mid:score
  • redis 教程:教程

1.2 zookeeper单机版安装

  • zookeeper安装:zookeeper安装
  • 版本:3.7.1
  • 遇到的坑:8080端口连接占用,我们需要在zoo.cpg文件中加上
    admin.serverPort=8001重新启动即可。

1.3 kafka单机安装

  • kafka安装:官网下载地址
  • 安装使用的为:127.0.0.1
  • 启动kafka:kafka教程
bin/kafka-server-start.sh config/server.properties
  • 创建一个topic
bin/kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --replication-factor 1 --partitions 1 --topic recommender
  • 生产一个消息
bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic recommender
  • 消费一个消息
bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic recommender --from-beginning

3 测试kafka与spark streaming联调

  • kafka版本:2.2.0
  • spark版本:2.3.0
  • 因此使用spark-streaming-kafka-0-10

image.png

  1. 启动kafka,生产一条信息
  2. 书写程序
// 定义kafka连接参数val kafkaParam = Map("bootstrap.servers" -> "服务器IP:9092","key.deserializer" -> classOf[StringDeserializer],"value.deserializer" -> classOf[StringDeserializer],"group.id" -> "recommender","auto.offset.reset" -> "latest")// 通过kafka创建一个DStreamval kafkaStream = KafkaUtils.createDirectStream[String, String]( ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String, String]( Array(config("kafka.topic")), kafkaParam ))// 把原始数据UID|MID|SCORE|TIMESTAMP 转换成评分流// 1|31|4.5|val ratingStream = kafkaStream.map{msg =>val attr = msg.value().split("\\|")( attr(0).toInt, attr(1).toInt, attr(2).toDouble, attr(3).toInt )}
  1. 若是kafka报错,如果你同样也是云服务器,请注意kafka的配置信息(很重要!)

(1)解决方法:修改kafka配置文件,设置为设置listeners为内网ip,设置外网ip

  • 解决方案修改内网ip

(2)重新启动,成功

  • 内网外网分流:内网外网分流
  • kafka入门教程:入门教程
  1. redis报错:开启保护模式了,需要修改conf文件

效果

在kafka生产一个数据,可以在MongoDB中得到推荐的电影结果

4 后端埋点

前端进行评分后,触发click事件,后端进行测试埋点,利用log4j写入本地文件中。

4.1 本地测试

  • log4j配置文件
log4j.rootLogger=INFO, file, stdout# write to stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS}  %5p --- [%50t]  %-80c(line:%5L)  :  %m%n# write to file
log4j.appender.file=org.apache.log4j.RollingFileAppender
log4j.appender.FILE.Append=true
log4j.appender.FILE.Threshold=INFO
log4j.appender.file.File=F:/demoparent/business/src/main/log/agent.txt
log4j.appender.file.MaxFileSize=1024KB
log4j.appender.file.MaxBackupIndex=1
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS}  %5p --- [%50t]  %-80c(line:%6L)  :  %m%n
  • 埋点实现
//埋点日志
import org.apache.log4j.Logger;// 关键代码
Logger log = Logger.getLogger(MovieController.class.getName());
log.info(MOVIE_RATING_PREFIX + ":" + uid +"|"+ mid +"|"+ score +"|"+ System.currentTimeMillis()/1000)

4.2 写入远程测试

  1. Linux安装syslog服务,进行测试
  2. 主机log4j配置文件设置服务器ip
  • log4j配置:写入远程服务器
log4j.appender.syslog=org.apache.log4j.net.SyslogAppender
log4j.appender.syslog.SyslogHost= 服务器IP
log4j.appender.syslog.Threshold=INFO
log4j.appender.syslog.layout=org.apache.log4j.PatternLayout
log4j.appender.syslog.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS}  %5p --- [%20t]  %-130c:(line:%4L)  :   %m%n

5 flume配置

  1. flume对接kafka:flume对接文件
  2. flume设置source和sink,source为文件地址,sink为kafka的log
# log-kafka.properties
agent.sources = exectail
agent.channels = memoryChannel 
agent.sinks = kafkasink 
agent.sources.exectail.type = exec 
agent.sources.exectail.command = tail -f /project/logs/agent.log agent.sources.exectail.interceptors=i1 agent.sources.exectail.interceptors.i1.type=regex_filter agent.sources.exectail.interceptors.i1.regex=.+MOVIE_RATING_PREFIX.+ agent.sources.exectail.channels = memoryChannelagent.sinks.kafkasink.type = org.apache.flume.sink.kafka.KafkaSink agent.sinks.kafkasink.kafka.topic = log agent.sinks.kafkasink.kafka.bootstrap.servers = 服务器地址:9092 agent.sinks.kafkasink.kafka.producer.acks = 1 agent.sinks.kafkasink.kafka.flumeBatchSize = 20 agent.sinks.kafkasink.channel = memoryChannel
agent.channels.memoryChannel.type = memory
agent.channels.memoryChannel.capacity = 10000

6 实时推荐

ratingStream.foreachRDD{rdds => rdds.foreach{case (uid, mid, score, timestamp) => {println("rating data coming! >>>>>>>>>>>>>>>>")println(uid+",mid:"+mid)// 1. 从redis里获取当前用户最近的K次评分,保存成Array[(mid, score)]val userRecentlyRatings = getUserRecentlyRating( MAX_USER_RATINGS_NUM, uid, ConnHelper.jedis )println("用户最近的K次评分:"+userRecentlyRatings)// 2. 从相似度矩阵中取出当前电影最相似的N个电影,作为备选列表,Array[mid]val candidateMovies = getTopSimMovies( MAX_SIM_MOVIES_NUM, mid, uid, simMovieMatrixBroadCast.value )println("电影最相似的N个电影:"+candidateMovies)// 3. 对每个备选电影,计算推荐优先级,得到当前用户的实时推荐列表,Array[(mid, score)]val streamRecs = computeMovieScores( candidateMovies, userRecentlyRatings, simMovieMatrixBroadCast.value )println("当前用户的实时推荐列表:"+streamRecs)// 4. 把推荐数据保存到mongodbsaveDataToMongoDB( uid, streamRecs )}}
}
def computeMovieScores(candidateMovies: Array[Int],userRecentlyRatings: Array[(Int, Double)],simMovies: scala.collection.Map[Int, scala.collection.immutable.Map[Int, Double]]): Array[(Int, Double)] ={// 定义一个ArrayBuffer,用于保存每一个备选电影的基础得分val scores = scala.collection.mutable.ArrayBuffer[(Int, Double)]()// 定义一个HashMap,保存每一个备选电影的增强减弱因子val increMap = scala.collection.mutable.HashMap[Int, Int]()val decreMap = scala.collection.mutable.HashMap[Int, Int]()for( candidateMovie <- candidateMovies; userRecentlyRating <- userRecentlyRatings){// 拿到备选电影和最近评分电影的相似度val simScore = getMoviesSimScore( candidateMovie, userRecentlyRating._1, simMovies )if(simScore > 0.7){// 计算备选电影的基础推荐得分scores += ( (candidateMovie, simScore * userRecentlyRating._2) )if( userRecentlyRating._2 > 3 ){increMap(candidateMovie) = increMap.getOrDefault(candidateMovie, 0) + 1} else{decreMap(candidateMovie) = decreMap.getOrDefault(candidateMovie, 0) + 1}}}// 根据备选电影的mid做groupby,根据公式去求最后的推荐评分scores.groupBy(_._1).map{// groupBy之后得到的数据 Map( mid -> ArrayBuffer[(mid, score)] )case (mid, scoreList) =>( mid, scoreList.map(_._2).sum / scoreList.length + log(increMap.getOrDefault(mid, 1)) - log(decreMap.getOrDefault(mid, 1)) )}.toArray.sortWith(_._2>_._2)
}

7 启动顺序

  1. 启动hadoop、spark的容器
  • cd /docker
  • docker-compose up -d
  • docker-compose ps
  1. 启动mongodb和redis服务
  • netstat -lanp | grep "27017"
  • bin/redis-server etc/redis.conf
  1. 启动zookeeper、kafka服务
  • ./zkServer.sh start
  • bin/kafka-server-start.sh config/server.properties
  1. 启动flume服务
  • bin/flume-ng agent -c ./conf/ -f ./conf/log-kafka.properties -n agent

实现效果

前端评分成功后写入日志文件,flume对接log日志文件无问题,kafka对接flume无问题,spark streaming处理收到的一条数据,进行推荐,存入MongoDB中。

image.png

总结

由于时间匆忙,写的有些匆忙,如果有需要前端设计代码和后端的代码可以评论我,我整理整理发到github上。

前端设计部分没有时间去详细做,后续再对前端页面进行美化。本科当时整合了一个管理系统,现在也没有时间做,总之,一周多时间把当时的系统快速复现了下,算是一个复习。

在进行开发时,遇到许多问题,版本问题、服务器内网外网问题、docker容器相关问题、协同过滤算法设计问题,但帮着自己复习了下Vue和SpringBoot。

遇到问题时

  • 遇到问题不应该盲目解决,应该静下心看看报错原因,想想为何报错
  • 版本尤其重要,因此最好在一个project的pom设定版本
  • 使用服务器搭建docker-compose,利用该方法来搭建集群,快速简单,但涉及的端口转发等一些网络知识需要耐下心来看
  • Vue-Cli+Element-ui搭配起来开发简单
  • 写程序时,我们应该提前约定好接口,否则后续会很混乱…

后续

  • 后续将优化下前端页面,设计更多功能
  • 改进推荐算法
  • 增加冷启动方案

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/18425.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

深度学习:使用全连接神经网络FCN实现MNIST手写数字识别

1 引言 本项目构建了一个全连接神经网络(FCN)&#xff0c;实现对MINST数据集手写数字的识别&#xff0c;没有借助任何深度学习算法库&#xff0c;从原理上理解手写数字识别的全过程&#xff0c;包括反向传播&#xff0c;梯度下降等。 2 全连接神经网络介绍 2.1 什么是全连接…

maven引入本地jar包的简单方式【IDEA】【SpringBoot】

前言 想必点进来看这篇文章的各位&#xff0c;都是已经习惯了Maven从中央仓库或者阿里仓库直接拉取jar包进行使用。我也是&#x1f921;&#x1f921;。 前两天遇到一个工作场景&#xff0c;对接三方平台&#xff0c;结果对方就是提供的一个jar包下载链接&#xff0c;可给我整…

SpringBoot使用MyBatis Plus + 自动更新数据表

1、Mybatis Plus介绍 Mybatis&#xff0c;用过的都知道&#xff0c;这里不介绍&#xff0c;mybatis plus只是在mybatis原来的基础上做了些改进&#xff0c;增强了些功能&#xff0c;增强的功能主要为增加更多常用接口方法调用&#xff0c;减少xml内sql语句编写&#xff0c;也可…

python使用selenium 打开谷歌浏览器闪退, 怎么解决

问题描述&#xff1a; 大家早好、午好、晚好吖 ❤ ~欢迎光临本文章 使用 Selenium 操作 Chrome 浏览器&#xff0c; Chrome 浏览器闪退 问题解决&#xff1a; 可能是以下几个方面出现了问题&#xff1a; 1. Chromedriver 版本与 Chrome 浏览器版本不匹配 你需要确保你正在…

安卓:JzvdStd——网络视频播放器

目录 一、JzvdStd介绍 JzvdStd的特点和功能&#xff1a; JzvdStd常用方法&#xff1a; 二、JzvdStd使用 1、补充知识&#xff1a; 例子&#xff1a; MainActivity &#xff1a; VideoPageAdapter &#xff1a; activity_main&#xff1a; video_page&#xff1a; …

第十次CCF计算机软件能力认证

第一题&#xff1a;分蛋糕 小明今天生日&#xff0c;他有 n 块蛋糕要分给朋友们吃&#xff0c;这 n 块蛋糕&#xff08;编号为 1 到 n&#xff09;的重量分别为 a1,a2,…,an。 小明想分给每个朋友至少重量为 k 的蛋糕。 小明的朋友们已经排好队准备领蛋糕&#xff0c;对于每个朋…

Blazor前后端框架Known-V1.2.9

V1.2.9 Known是基于C#和Blazor开发的前后端分离快速开发框架&#xff0c;开箱即用&#xff0c;跨平台&#xff0c;一处代码&#xff0c;多处运行。 Gitee&#xff1a; https://gitee.com/known/KnownGithub&#xff1a;https://github.com/known/Known 概述 基于C#和Blazor…

UE4 unlua学习笔记

将这三个插件放入Plugins内并重新编译 创建一个BlueprintLibrary&#xff0c;声明一个全局函数 在这里声明路径 点击Create Lua Template 在Content的Script即可生成对应的lua文件打开它&#xff01; 显示以上lua代码 打印Hello Unlua 创建该UI&#xff0c;就会在创建UI的Con…

Flutter-基础Widget

Flutter页面-基础Widget 文章目录 Flutter页面-基础WidgetWidgetStateless WidgetStateful WidgetState生命周期 基础widget文本显示TextRichTextDefaultTextStyle 图片显示FlutterLogoIconImageIamge.assetImage.fileImage.networkImage.memory CircleAvatarFadeInImage 按钮R…

火山引擎DataLeap如何解决SLA治理难题(二):申报签署流程与复盘详解

申报签署流程详解 火山引擎DataLeap SLA保障的前提是先达成SLA协议。在SLA保障平台中&#xff0c;以 申报单签署的形式达成SLA协议。平台核心特点是 优化了SLA达成的流程&#xff0c;先通过 “系统卡点计算”减少待签署任务的数量&#xff0c;再通过 “SLA推荐计算”自动签署部…

【Linux】网络基础

&#x1f34e;作者&#xff1a;阿润菜菜 &#x1f4d6;专栏&#xff1a;Linux系统网络编程 文章目录 一、协议初识和网络协议分层&#xff08;TCP/IP四层模型&#xff09;认识协议TCP/IP五层&#xff08;或四层&#xff09;模型 二、认识MAC地址和IP地址认识MAC地址认识IP地址认…

基于Java的闲置物品管理系统(源码+文档+数据库)

很多在校学生经常因为冲动或者因为图一时的新鲜,购买了很多可能只是偶尔用一下的物品&#xff0c;大量物品将会闲置&#xff0c;因此&#xff0c;构建一个资源共享平台&#xff0c;将会极大满足师院学生的需求,可以将其闲置物品挂在资源共享平台上让有需要的学生浏览&#xff0…

Linux【网络基础】数据链路层IP协议技术补充DNSDHCP

文章目录 一、数据链路层&#xff08;1&#xff09;数据链路层与网络层的关联&#xff08;2&#xff09;局域网通信原理&#xff08;3&#xff09;以太网协议&#xff08;4&#xff09;ARP协议 二、NAT协议三、NAPT协议四、ICMP协议五、DNS六、DHCP 一、数据链路层 &#xff0…

二、JVM-深入运行时数据区

深入运行时数据区 计算机体系结构 JVM的设计实际上遵循了遵循冯诺依曼计算机结构 CPU与内存交互图&#xff1a; 硬件一致性协议&#xff1a; MSI、MESI、MOSI、Synapse、Firely、DragonProtocol 摩尔定律 摩尔定律是由英特尔(Intel)创始人之一戈登摩尔(Gordon Moore)提出来…

配置GIt账号、配置公钥

1.设置账号和邮箱 打开终端输入以下命令&#xff1a; git config --global --unset-all user.name git config --global --unset-all user.email然后输入以下命令来设置新的账号和邮箱&#xff1a; git config --global user.name "your_username" git config --glo…

与“云”共舞,联想凌拓的新科技与新突破

伴随着数字经济的高速发展&#xff0c;IT信息技术在数字中国建设中起到的驱动和支撑作用也愈发凸显。特别是2023年人工智能和ChatGPT在全球的持续火爆&#xff0c;更是为整个IT产业注入了澎湃动力。那么面对日新月异的IT信息技术&#xff0c;再结合疫情之后截然不同的经济环境和…

效率提升丨大学必看校园安全实用技巧

在当今社会&#xff0c;教育是培养人才、传承文明的重要场所。然而&#xff0c;教学楼作为学生、教师和员工活动的核心区域&#xff0c;也存在着潜在的安全隐患&#xff0c;其中最为突出的风险之一是火灾。火灾不仅危及生命财产&#xff0c;还可能给整个学校带来不可估量的损失…

vue3中使用原始标签制作一个拖拽和点击上传组件上传成功后展示

在Vue3中&#xff0c;可以使用<input type"file">标签来实现上传文件的功能&#xff0c;同时可以通过<div>标签来实现拖拽上传的功能。 首先&#xff0c;在template中定义一个包含<input>和<div>标签的组件&#xff1a; <template>&…

【C++】模板学习(二)

模板学习 非类型模板参数模板特化函数模板特化类模板特化全特化偏特化 模板分离编译模板总结 非类型模板参数 模板参数除了类型形参&#xff0c;还可以是非类型的形参。 非类型形参要求用一个常量作为类(函数)模板的一个参数。这个参数必须是整形家族的。浮点数&#xff0c;字…