JavaWeb_LeadNews_Day11-KafkaStream实现实时计算文章分数

JavaWeb_LeadNews_Day11-KafkaStream实现实时计算文章分数

  • KafkaStream
    • 概述
    • 案例-统计单词个数
    • SpringBoot集成
  • 实时计算文章分值
  • 来源
  • Gitee

KafkaStream

概述

  • Kafka Stream: 提供了对存储与Kafka内的数据进行流式处理和分析的功能
  • 特点:
    • Kafka Stream提供了一个非常简单而轻量的Library, 它可以非常方便地嵌入任意Java应用中, 也可以任意方式打包和部署
    • 除了Kafka外, 无任何外部依赖
    • 通过可容错地state, store实现高效地状态操作(如windowed join和aggregation)
    • 支持基于事件时间地窗口操作, 并且可处理晚到的数据(late arrival of records)
  • 关键概念:
    • 源处理器(Source Processor):源处理器是一个没有任何上游处理器的特殊类型的流处理器。它从一个或多个kafka主题生成输入流。通过消费这些主题的消息并将它们转发到下游处理器.
    • Sink处理器: sink处理器是一个没有下游流处理器的特殊类型的流处理器。它接收上游流处理器的消息发送到一个指定的Kafka主题
  • KStream:
    • 数据结构类似于map, key-value键值对.
    • 一段顺序的, 无限长, 不断更新的数据集.

案例-统计单词个数

  • 依赖
    依赖中有排除部分依赖, 还是一整个放上来好了
    <dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- kafkfa --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><exclusions><exclusion><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-streams</artifactId><exclusions><exclusion><artifactId>connect-json</artifactId><groupId>org.apache.kafka</groupId></exclusion><exclusion><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></exclusion></exclusions></dependency>
    </dependencies>
    
  • 流式处理
    public class KafkaStreamQuickStart {public static void main(String[] args) {// Kafka的配置信息Properties prop = new Properties();prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.174.133:9092");prop.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());prop.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());prop.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-quickstart");// Stream 构建器StreamsBuilder streamsBuilder = new StreamsBuilder();// 流式计算streamProcessor(streamsBuilder);// 创建KafkaStream对象KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), prop);// 开启流式计算kafkaStreams.start();}/*** 流式计算* 消息的内容: hello kafka* @param streamsBuilder*/private static void streamProcessor(StreamsBuilder streamsBuilder) {// 创建Kstream对象, 同时指定从哪个topic中接收消息KStream<String, String> stream = streamsBuilder.stream("itcast-topic-input");/*** 处理消息的value*/stream.flatMapValues(new ValueMapper<String, Iterable<String>>() {@Overridepublic Iterable<String> apply(String value) {String[] valAry = value.split(" ");return Arrays.asList(valAry);}})// 按照value聚合处理.groupBy((key, value)->value)// 时间窗口.windowedBy(TimeWindows.of(Duration.ofSeconds(10)))// 统计单词的个数.count()// 转换为KStream.toStream().map((key, value)->{System.out.println("key:"+key+",value:"+value);return new KeyValue<>(key.key().toString(), value.toString());})// 发送消息.to("itcast-topic-out");}
    }
    
  • 发送消息
    for (int i = 0; i < 5; i++) {ProducerRecord<String, String> kvProducerRecord = new ProducerRecord<>("icast-topic-input", "hello kafka");producer.send(kvProducerRecord);
    }
    
  • 接收消息
    // 订阅主题
    consumer.subscribe(Collections.singleton("itcast-topic-out"));
    

SpringBoot集成

  • 配置
    config.java
    /*** 通过重新注册KafkaStreamsConfiguration对象,设置自定配置参数*/
    @Setter
    @Getter
    @Configuration
    @EnableKafkaStreams
    @ConfigurationProperties(prefix="kafka")
    public class KafkaStreamConfig {private static final int MAX_MESSAGE_SIZE = 16* 1024 * 1024;private String hosts;private String group;@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)public KafkaStreamsConfiguration defaultKafkaStreamsConfig() {Map<String, Object> props = new HashMap<>();props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, hosts);props.put(StreamsConfig.APPLICATION_ID_CONFIG, this.getGroup()+"_stream_aid");props.put(StreamsConfig.CLIENT_ID_CONFIG, this.getGroup()+"_stream_cid");props.put(StreamsConfig.RETRIES_CONFIG, 10);props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());return new KafkaStreamsConfiguration(props);}
    }
    
    application.yml
    kafka:hosts: 192.168.174.133:9092group: ${spring.application.name}
    
    BeanConfig.java
    @Slf4j
    @Configuration
    public class KafkaStreamHelloListener {@Beanpublic KStream<String, String> KStream(StreamsBuilder streamsBuilder){// 创建Kstream对象, 同时指定从哪个topic中接收消息KStream<String, String> stream = streamsBuilder.stream("itcast-topic-input");/*** 处理消息的value*/stream.flatMapValues(new ValueMapper<String, Iterable<String>>() {@Overridepublic Iterable<String> apply(String value) {String[] valAry = value.split(" ");return Arrays.asList(valAry);}})// 按照value聚合处理.groupBy((key, value)->value)// 时间窗口.windowedBy(TimeWindows.of(Duration.ofSeconds(10)))// 统计单词的个数.count()// 转换为KStream.toStream().map((key, value)->{System.out.println("key:"+key+",value:"+value);return new KeyValue<>(key.key().toString(), value.toString());})// 发送消息.to("itcast-topic-out");return stream;}
    }
    

实时计算文章分值

  1. nacos: leadnews-behavior配置kafka生产者
    kafka:bootstrap-servers: 192.168.174.133:9092producer:retries: 10key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializer
    
  2. 修改leadnews-behavior
    like
    public ResponseResult likesBehavior(LikesBehaviorDto dto) {...UpdateArticleMess mess = new UpdateArticleMess();mess.setArticleId(dto.getArticleId());mess.setType(UpdateArticleMess.UpdateArticleType.LIKES);if(dto.getOperation() == 0){...mess.setAdd(1);}else{...mess.setAdd(-1);}// kafka: 发送消息, 数据聚合kafkaTemplate.send(HotArticleConstants.HOT_ARTICLE_SCORE_TOPIC, JSON.toJSONString(mess));...
    }
    
    view
    public ResponseResult readBehavior(ReadBehaviorDto dto) {...// kafka: 发送消息, 数据聚合UpdateArticleMess mess = new UpdateArticleMess();mess.setArticleId(dto.getArticleId());mess.setType(UpdateArticleMess.UpdateArticleType.VIEWS);mess.setAdd(1);kafkaTemplate.send(HotArticleConstants.HOT_ARTICLE_SCORE_TOPIC, JSON.toJSONString(mess));...
    }
    
  3. leadnews-article中添加流式聚合处理
    @Slf4j
    @Configuration
    public class HotArticleStreamHandler {@Beanpublic KStream<String, String> KStream(StreamsBuilder streamsBuilder){// 接收消息KStream<String, String> stream = streamsBuilder.stream(HotArticleConstants.HOT_ARTICLE_SCORE_TOPIC);// 聚合流式处理stream.map((key, value)->{UpdateArticleMess mess = JSON.parseObject(value, UpdateArticleMess.class);// 重置消息的key和valuereturn new KeyValue<>(mess.getArticleId().toString(), mess.getType().name()+":"+mess.getAdd());})// 根据文章id进行聚合.groupBy((key, value)->key)// 时间窗口.windowedBy(TimeWindows.of(Duration.ofSeconds(10)))// 自行实现聚合计算.aggregate(// 初始方法, 返回值是消息的valuenew Initializer<String>() {@Overridepublic String apply() {return "COLLECTION:0,COMMENT:0,LIKES:0,VIEWS:0";}},// 真正的聚合操作, 返回值是消息的valuenew Aggregator<String, String, String>() {@Overridepublic String apply(String key, String value, String aggValue) {if(StringUtils.isBlank(value)){return aggValue;}String[] aggAry = aggValue.split(",");int col=0, com=0, lik=0, vie=0;for (String agg : aggAry) {String[] split = agg.split(":");/*** 获得初始值, 也是时间窗口内计算之后的值*/switch (UpdateArticleMess.UpdateArticleType.valueOf(split[0])){case COLLECTION:col = Integer.parseInt(split[1]);break;case COMMENT:com = Integer.parseInt(split[1]);break;case LIKES:lik = Integer.parseInt(split[1]);break;case VIEWS:vie = Integer.parseInt(split[1]);break;}}// 累加操作String[] valAry = value.split(":");switch (UpdateArticleMess.UpdateArticleType.valueOf(valAry[0])){case COLLECTION:col += Integer.parseInt(valAry[1]);break;case COMMENT:com += Integer.parseInt(valAry[1]);break;case LIKES:lik += Integer.parseInt(valAry[1]);break;case VIEWS:vie += Integer.parseInt(valAry[1]);break;}String formatStr = String.format("COLLECTION:%d,COMMENT:%d,LIKES:%d,VIEWS:%d", col, com, lik, vie);System.out.println("文章的id "+key);System.out.println("当前时间窗口内的消息处理结果: "+formatStr);return formatStr;}}, Materialized.as("hot-article-stream-count-001")).toStream().map((key, value)->{return new KeyValue<>(key.key().toString(), formatObj(key.key().toString(), value));})// 发送消息.to(HotArticleConstants.HOT_ARTICLE_INCR_HANDLE_TOPIC);return stream;}/*** 格式化消息的value数据* @param articleId* @param value* @return*/private String formatObj(String articleId, String value) {ArticleVisitStreamMess mess = new ArticleVisitStreamMess();mess.setArticleId(Long.valueOf(articleId));// COLLECTION:0,COMMENT:0,LIKES:0,VIEWS:0String[] valAry = value.split(",");for (String val : valAry) {String[] split = val.split(":");switch (UpdateArticleMess.UpdateArticleType.valueOf(split[0])){case COLLECTION:mess.setCollect(Integer.parseInt(split[1]));break;case COMMENT:mess.setComment(Integer.parseInt(split[1]));break;case LIKES:mess.setLike(Integer.parseInt(split[1]));break;case VIEWS:mess.setView(Integer.parseInt(split[1]));break;}}log.info("聚合消息处理之后的结果为: {}", JSON.toJSONString(mess));return JSON.toJSONString(mess);}}
  4. leadnews-article添加聚合数据监听器
    @Slf4j
    @Component
    public class ArticleIncrHandlerListener {@Autowiredprivate ApArticleService apArticleService;@KafkaListener(topics = HotArticleConstants.HOT_ARTICLE_INCR_HANDLE_TOPIC)public void onMessage(String mess){if(StringUtils.isNotBlank(mess)){ArticleVisitStreamMess articleVisitStreamMess = JSON.parseObject(mess, ArticleVisitStreamMess.class);apArticleService.updateScore(articleVisitStreamMess);System.out.println(mess);}}}
    
  5. 替换redis中的热点数据
    /*** 更新 文章分值, 缓存中的热点文章数据* @param mess*/
    @Override
    public void updateScore(ArticleVisitStreamMess mess) {// 1. 更新文章的阅读, 点赞, 收藏, 评论的数量ApArticle apArticle = updateArticleBehavior(mess);// 2. 计算文章的分值Integer score = hotArticleService.computeArticleScore(apArticle);score *= 3;// 3. 替换当前文章对应频道的热点数据replaceDataToRedis(ArticleConstants.HOT_ARTICLE_FIRST_PAGE + apArticle.getChannelId(), apArticle, score);// 4. 替换推荐对应的热点数据replaceDataToRedis(ArticleConstants.HOT_ARTICLE_FIRST_PAGE + ArticleConstants.DEFAULT_TAG, apArticle, score);
    }/*** 替换数据并且存入到redis中* @param HOT_ARTICLE_FIRST_PAGE* @param apArticle* @param score*/
    private void replaceDataToRedis(String HOT_ARTICLE_FIRST_PAGE, ApArticle apArticle, Integer score) {String articleList = cacheService.get(HOT_ARTICLE_FIRST_PAGE);if(StringUtils.isNotBlank(articleList)){List<HotArticleVo> hotArticleVoList = JSON.parseArray(articleList, HotArticleVo.class);boolean flag = true;// 3.1 文章已是热点文章, 则更新文章分数for (HotArticleVo hotArticleVo : hotArticleVoList) {if(hotArticleVo.getId().equals(apArticle.getId())){hotArticleVo.setScore(score);flag = false;break;}}// 3.2 文章还不是热点文章, 则替换分数最小的文章if(flag){if(hotArticleVoList.size() >= 30){// 热点文章超过30条hotArticleVoList = hotArticleVoList.stream().sorted(Comparator.comparing(HotArticleVo::getScore).reversed()).collect(Collectors.toList());HotArticleVo lastHot = hotArticleVoList.get(hotArticleVoList.size() - 1);if(lastHot.getScore() < score){hotArticleVoList.remove(lastHot);HotArticleVo hotArticleVo = new HotArticleVo();BeanUtils.copyProperties(apArticle, hotArticleVo);hotArticleVo.setScore(score);hotArticleVoList.add(hotArticleVo);}}else{// 热点文章没超过30条HotArticleVo hotArticleVo = new HotArticleVo();BeanUtils.copyProperties(apArticle, hotArticleVo);hotArticleVo.setScore(score);hotArticleVoList.add(hotArticleVo);}}// 3.3 缓存到redishotArticleVoList = hotArticleVoList.stream().sorted(Comparator.comparing(HotArticleVo::getScore).reversed()).collect(Collectors.toList());cacheService.set(HOT_ARTICLE_FIRST_PAGE, JSON.toJSONString(hotArticleVoList));}
    }/*** 更新文章行为数据* @param mess*/
    private ApArticle updateArticleBehavior(ArticleVisitStreamMess mess) {ApArticle apArticle = getById(mess.getArticleId());apArticle.setComment(apArticle.getComment()==null?0:apArticle.getComment()+mess.getComment());apArticle.setCollection(apArticle.getCollection()==null?0:apArticle.getCollection()+mess.getCollect());apArticle.setLikes(apArticle.getLikes()==null?0:apArticle.getLikes()+mess.getLike());apArticle.setViews(apArticle.getViews()==null?0:apArticle.getViews()+mess.getView());updateById(apArticle);return apArticle;
    }
    

来源

黑马程序员. 黑马头条

Gitee

https://gitee.com/yu-ba-ba-ba/leadnews

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/76022.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Pytorch 多卡并行(1)—— 原理简介和 DDP 并行实践

近年来&#xff0c;深度学习模型的规模越来越大&#xff0c;需要处理的数据也越来越多&#xff0c;单卡训练的显存空间和计算效率都越来越难以满足需求。因此&#xff0c;多卡并行训练成为了一个必要的解决方案本文主要介绍使用 Pytorch 的 DistributedDataParallel&#xff08…

合宙Air724UG LuatOS-Air LVGL API控件-表格(Table)

表格&#xff08;Table&#xff09; 示例代码 --创建表格Table1 lvgl.table_create(lvgl.scr_act(),nil)--设置表格为4行5列lvgl.table_set_row_cnt(Table1,4)lvgl.table_set_col_cnt(Table1,5)--给每个单元格赋值lvgl.table_set_cell_value(Table1, 0, 0, "选手")l…

Android之RecyclerView仿ViewPage滑动

文章目录 前言一、效果图二、实现步骤1.xml主布局2.所有用到的drawable资源文件3.xml item布局4.adapter适配器5.javabean实体类6.activity使用 总结 前言 我们都知道ViewPageFragment滑动&#xff0c;但是的需求里面已经有了这玩意&#xff0c;但是在Fragment中还要有类似功能…

基于3D扫描和3D打印的产品逆向工程实战【数字仪表】

逆向工程是一种从物理零件创建数字设计的强大方法&#xff0c;并且可以与 3D 扫描和 3D 打印等技术一起成为原型设计工具包中的宝贵工具。 推荐&#xff1a;用 NSDT编辑器 快速搭建可编程3D场景 3D 扫描仪可以非常快速地测量复杂的物体&#xff0c;并且在涉及现实生活参考时可以…

花生壳内网穿透+Windows系统,如何搭建网站?

1. 准备工作 在百度搜索“Win7下安装ApachePHPMySQL”&#xff0c;根据搜到的教程自行安装WAMP环境。 如果在网页上键入http://127.0.0.1/ 出现以下页面表示您的服务器已经建好&#xff0c;下一步就是关键&#xff0c;如何通过花生壳内网穿透&#xff0c;让外网的用户访问到您…

设计模式 - 责任链

一、前言 ​ 相信大家平时或多或少都间接接触过责任链设计模式&#xff0c;只是可能有些同学自己不知道此处用的是该设计模式&#xff0c;比如说 Java Web 中的 Filter 过滤器&#xff0c;就是非常经典的责任链设计模式的例子。 那么什么是责任链设计模式呢&#xff1f; ​ …

大数据课程L6——网站流量项目的SparkStreaming

文章作者邮箱:yugongshiye@sina.cn 地址:广东惠州 ▲ 本章节目的 ⚪ 了解网站流量项目的SparkStreaming概述; ⚪ 掌握网站流量项目的SparkStreaming实现 Wordcount 底层流程; ⚪ 掌握网站流量项目的SparkStreaming实现历史批次的累积处理; ⚪ 掌握网站流…

快速学会git版本管理——上传gitee仓库

首先在gitee右上角有一个新建仓库 创建之后打开自己想要上传的文件 右键打开 Git Bash Here 接下来会弹出git的窗口 首先先初始化仓库 用git命令 git init 然后用git add . 上传所有文件上传到暂存区(上一篇文章说过add是单个文件&#xff0c;add . 是所有文件) 没有显示错误 …

OpenCV_CUDA_VS编译安装

一、OpenCV 我这里是下载的OpenCV4.5.4&#xff0c;但是不知道到在vs里面build时一直报错&#xff0c;后面换了4.7.0的版本测试&#xff0c;安装成功。 Release OpenCV 4.5.4 opencv/opencv GitHub 这个里面有官方预编译好的OpenCV库&#xff0c;可以直接食用。 扩展包&am…

SQL4 查询结果限制返回行数

描述 题目&#xff1a;现在运营只需要查看前2个用户明细设备ID数据&#xff0c;请你从用户信息表 user_profile 中取出相应结果。 示例&#xff1a; iddevice_idgenderageuniversityprovince12138male21北京大学Beijing23214male复旦大学Shanghai36543female20北京大学Beijin…

设计模式(1) - UML类图

1、前言 从这一节开始&#xff0c;我们将一起学习设计模式。我们的学习目标是什么呢&#xff1f; 了解常用设计模式以及它们的使用场景&#xff1b;分析实际工程中设计模式的使用&#xff0c;揣摩实际意图&#xff0c;了解作者设计思路&#xff1b;尝试运用设计模式迭代、重构…

css transition 指南

css transition 指南 在本文中&#xff0c;我们将深入了解 CSS transition&#xff0c;以及如何使用它们来创建丰富、精美的动画。 基本原理 我们创建动画时通常需要一些动画相关的 CSS。 下面是一个按钮在悬停时移动但没有动画的示例&#xff1a; <button class"…

MySQL下载安装环境变量配置,常用命令

一、下载安装 mysql官网 下载连接 这个是下载图形安装 https://dev.mysql.com/downloads/installer/ 这个是下载免图形安装 https://dev.mysql.com/downloads/mysql/ 担心个别宝宝没有账号&#xff0c;这边也提供一下&#xff0c;方便下载&#xff1a; 账户&#xff1a;1602404…

算法基础-数学知识-容斥原理、博弈论

容斥原理、博弈论 容斥原理890. 能被整除的数&#xff08;二进制状态压缩版本&#xff0c;复杂度多一个Om&#xff09;890. 能被整除的数&#xff08;dfs版本&#xff09; 博弈论无限制nim游戏AcWing 891. Nim游戏AcWing 892. 台阶-Nim游戏&#xff08;待补&#xff09; 集合版…

Linux中防火墙的简单使用方法

目录 前言 ​编辑 一、概念 1、防火墙的分类&#xff1a; 2、防火墙性能 3、硬件防火墙的品牌、软件防火墙的品牌 4、硬件防火墙与软件防火墙比较 二、linux中的防火墙 1、iptables 2.netfilter/iptables功能 3、四表 iptables中表的优先级 4、五链 三、iptables…

数字化转型背景下企业知识管理能力提升路径

近年来&#xff0c;科技不断进步&#xff0c;颠覆性技术&#xff08;例如 5G、云计算、物联网、大数据分析和人工智能等&#xff09;正在重新定义企业如何管理项目和运营效率。知识管理体系亦需要随着科技的进步而改变&#xff0c;以适应新的数字时代环境&#xff0c;并且高效知…

说说MySQL回表查询与覆盖索引

分析&回答 什么是回表查询&#xff1f; 通俗的讲就是&#xff0c;如果索引的列在 select 所需获得的列中&#xff08;因为在 mysql 中索引是根据索引列的值进行排序的&#xff0c;所以索引节点中存在该列中的部分值&#xff09;或者根据一次索引查询就能获得记录就不需要…

从零开始搭建vite4.0-vue3.0项目

目录 前言 项目地址 项目初始化 git初始化 别名配置 解决vscode报错 vue-router安装 pinia安装 环境配置 axios安装 element-plus按需引入 eslint与prettier安装 scss安装 stylelint配置 代码提交规范配置 husky与lint-stage配置 前言 pnpm和npm的命令行完全一…

FastChat

Fast Chat是一个用于训练/部署和评估基于大型语言模型的聊天机器人的开发平台。其核心功能包括&#xff1a; 最先进模型的权重/训练代码和评估代码(例如Vicuna/FastChat-T5)基于分布式多模型的服务系统&#xff0c;具有Web界面和与OpenAI兼容的RESTful API。 安装 pip instal…

在Cisco设备上配置接口速度和双工

默认情况下&#xff0c;思科交换机将自动协商速度和双工设置。将设备&#xff08;交换机、路由器或工作站&#xff09;连接到 Cisco 交换机上的端口时&#xff0c;将发生协商过程&#xff0c;设备将就传输参数达成一致&#xff0c;当今的大多数网络适配器都支持此功能。 在本文…