记一次项目所学(中间件等)-动态提醒功能(RocketMQ)

记一次项目所学(中间件等)–动态提醒功能(RocketMQ)

订阅发布模式与观察者模式

在这里插入图片描述

在这里插入图片描述

RocketMQ:纯java编写的开源消息中间件 高性能低延迟分布式事务

Redis : 高性能缓存工具,数据存储在内存中,读写速度非常快

RocketMQ相关工具类及配置实现

配置类

 
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.9.1</version></dependency>//redis<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId><version>2.2.2.RELEASE</version></dependency>

生产者发送消息工具类

public class RocketMQUtil {//同步发送消息public static void syncSendMsg(DefaultMQProducer producer, Message msg) throws Exception{SendResult result = producer.send(msg);System.out.println(result);}//异步发送消息public static void asyncSendMsg(DefaultMQProducer producer, Message msg) throws Exception{producer.send(msg, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {Logger logger = LoggerFactory.getLogger(RocketMQUtil.class);logger.info("异步发送消息成功,消息id:" + sendResult.getMsgId());}@Overridepublic void onException(Throwable e) {e.printStackTrace();}});}
}

RocketMQ配置类

@Configuration
public class RocketMQConfig {//  rocketMQ名称服务器的地址@Value("${rocketmq.name.server.address}")private String nameServerAddr;@Autowiredprivate RedisTemplate<String, String> redisTemplate;@Autowiredprivate UserFollowingService userFollowingService;//生产者@Bean("momentsProducer")public DefaultMQProducer momentsProducer() throws Exception{DefaultMQProducer producer = new DefaultMQProducer(UserMomentsConstant.GROUP_MOMENTS);producer.setNamesrvAddr(nameServerAddr);producer.start();return producer;}@Bean("momentsConsumer")//push 为推送,还有拉取等consumerpublic DefaultMQPushConsumer momentsConsumer() throws Exception{DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(UserMomentsConstant.GROUP_MOMENTS);consumer.setNamesrvAddr(nameServerAddr);//订阅    *表示所有内容consumer.subscribe(UserMomentsConstant.TOPIC_MOMENTS, "*");//消费者监听器,监听到后下一步操作//registerMessageListener注册消息监听consumer.registerMessageListener(new MessageListenerConcurrently() {@Override//ConsumeConcurrentlyStatus并发处理//MessageExt消息的扩充,ConsumeConcurrentlyContext为处理的上下文public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context){MessageExt msg = msgs.get(0);if(msg == null){return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}//取出的是byte数组类型String bodyStr = new String(msg.getBody());UserMoment userMoment = JSONObject.toJavaObject(JSONObject.parseObject(bodyStr), UserMoment.class);Long userId = userMoment.getUserId();//定位粉丝idList<UserFollowing>fanList = userFollowingService.getUserFans(userId);for(UserFollowing fan : fanList){//发到redis用户到redis拿String key = "subscribed-" + fan.getUserId();//把动态列表拿出来String subscribedListStr = redisTemplate.opsForValue().get(key);List<UserMoment> subscribedList;if(StringUtil.isNullOrEmpty(subscribedListStr)){subscribedList = new ArrayList<>();}else{//转换列表的类subscribedList = JSONArray.parseArray(subscribedListStr, UserMoment.class);}subscribedList.add(userMoment);//把列表再转成字符串放进去redisTemplate.opsForValue().set(key, JSONObject.toJSONString(subscribedList));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();return consumer;}

具体业务逻辑:

@Service
public class UserMomentsService {@Autowiredprivate UserMomentsDao userMomentsDao;@Autowiredprivate ApplicationContext applicationContext;@Autowiredprivate RedisTemplate<String, String> redisTemplate;public void addUserMoments(UserMoment userMoment) throws Exception {userMoment.setCreateTime(new Date());//cruduserMomentsDao.addUserMoments(userMoment);DefaultMQProducer producer = (DefaultMQProducer)applicationContext.getBean("momentsProducer");//主题 以及json的数组消息Message msg = new Message(UserMomentsConstant.TOPIC_MOMENTS, JSONObject.toJSONString(userMoment).getBytes(StandardCharsets.UTF_8));RocketMQUtil.syncSendMsg(producer, msg);}// 查询订阅动态public List<UserMoment> getUserSubscribedMoments(Long userId) {String key = "subscribed-" + userId;//查出来的是String描述的json类型String listStr = redisTemplate.opsForValue().get(key);//返回的是List类型,要把查出来的String封装成一个一个的UserMoment再进List中return JSONArray.parseArray(listStr, UserMoment.class);}
}

PS:消费信息逻辑在配置类的Consumer中已经写好了

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/733576.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Meta正打造一个巨型AI模型,旨在为其“整个视频生态系统”提供动力,一位高管透露

每周跟踪AI热点新闻动向和震撼发展 想要探索生成式人工智能的前沿进展吗&#xff1f;订阅我们的简报&#xff0c;深入解析最新的技术突破、实际应用案例和未来的趋势。与全球数同行一同&#xff0c;从行业内部的深度分析和实用指南中受益。不要错过这个机会&#xff0c;成为AI领…

Swift 入门学习:集合(Collection)类型趣谈-上

概览 集合的概念在任何编程语言中都占有重要的位置&#xff0c;正所谓&#xff1a;“古来聚散地&#xff0c;宿昔长荆棘&#xff1b;游人聚散中&#xff0c;一片湖光里”。把那一片片、一瓣瓣、一粒粒“可耐”的小精灵全部收拢、吸纳的井然有序、条条有理&#xff0c;怎能不让…

tcp流式服务和粘包问题

目录 1.概念 2.流式服务 3.粘包问题 1.概念 套接字是一个全双工的 使用TCP协议通信的双方必须先建立连接,然后才能开始数据的读写,双方都必须为该连接分配必要的内核资源,以管理连接的状态和连接上数据的传输. TCP连接是全双工的,即双方的数据读写可以通过一个连接进行,完成…

【C语言】linux内核ip_local_out函数

一、讲解 这个函数 __ip_local_out 是 Linux 内核网络子系统中的函数&#xff0c;部分与本地出口的 IPv4 数据包发送相关。下面讲解这段代码的每一部分&#xff1a; 1. 函数声明 int __ip_local_out(struct net *net, struct sock *sk, struct sk_buff *skb)&#xff1a; -…

react实战——react旅游网

慕课网react实战 搭建项目问题1.按照官网在index.tsx中引入antd出错&#xff1f;2.typescript中如何使用react-router3.react-router3.1 V63.2 V53.3V6实现私有路由 4.函数式组件接收props参数时定义数据接口&#xff1f;5.使用TypeScript开发react项目&#xff1a;6.要使一个组…

SQLite3中的callback回调函数注意的细节

调用 sqlite3_exec(sqlite3*, const char *sql, sqlite_callback, void *data, char **errmsg)该例程提供了一个执行 SQL 命令的快捷方式&#xff0c; SQL 命令由 sql 参数提供&#xff0c;可以由多个 SQL 命令组成。 在这里&#xff0c; 第一个参数 sqlite3 是打开的数据库对…

代码随想录算法训练营第day41|背包理论基础、416. 分割等和子集

目录 a.背包理论基础——01背包 1.二维数组的01背包表示 2.一维滚动数组表示 b. 416. 分割等和子集 - 力扣&#xff08;LeetCode&#xff09; a.背包理论基础——01背包 背包问题分类&#xff1a; 对于面试的话&#xff0c;其实掌握01背包&#xff0c;和完全背包&#xff…

Excel F4键的作用

目录 一. 单元格相对/绝对引用转换二. 重复上一步操作 一. 单元格相对/绝对引用转换 ⏹ 使用F4键 如下图所示&#xff0c;B1单元格引用了A1单元格的内容。此时是使用相对引用&#xff0c;可以按下键盘上的F4键进行相对引用和绝对引用的转换。 二. 重复上一步操作 ⏹添加或删除…

SSM框架,MyBatis-Plus的学习(下)

条件构造器 使用MyBatis-Plus的条件构造器&#xff0c;可以构建灵活高效的查询条件&#xff0c;可以通过链式调用来组合多个条件。 条件构造器的继承结构 Wrapper &#xff1a; 条件构造抽象类&#xff0c;最顶端父类 AbstractWrapper &#xff1a; 用于查询条件封装&#xf…

首屏性能优化:提升用户体验的秘籍

&#x1f90d; 前端开发工程师、技术日更博主、已过CET6 &#x1f368; 阿珊和她的猫_CSDN博客专家、23年度博客之星前端领域TOP1 &#x1f560; 牛客高级专题作者、打造专栏《前端面试必备》 、《2024面试高频手撕题》 &#x1f35a; 蓝桥云课签约作者、上架课程《Vue.js 和 E…

复盘-excel

excel-选列没有用&#xff0c;选小标题才可以 将簇状柱形图放置在一个新表上##### excel: 添加数据模型时&#xff0c;要通过套用表格格式与外部断开连接 透视分析2010年人数未解决(第四套&#xff09; 通过日期显示星期几 判断星期几 因为前面已经通过星期六&#xff0c…

03_Tomcat

文章目录 Tomcat概念自制简易的服务器JavaEE规范Tomcat安装Tomcat启动Tomcat的资源部署直接部署虚拟映射 Tomcat的设置 Tomcat 概念 服务器&#xff1a;两层含义。 软件层面&#xff1a;软件&#xff0c;可以将本地的资源发布到网络中&#xff0c;供网络上面的其他用户来访问…

grafana table合并查询

注&#xff1a;本文基于Grafana v9.2.8编写 1 问题 默认情况下table展示的是一个查询返回的多个field&#xff0c;但是我想要的数据在不同的metric上&#xff0c;比如我需要显示某个pod的读写IO&#xff0c;但是读和写这两个指标存在于两个不同的metirc&#xff0c;需要分别查…

多种方法求解数组排序

&#x1d649;&#x1d65e;&#x1d658;&#x1d65a;!!&#x1f44f;&#x1f3fb;‧✧̣̥̇‧✦&#x1f44f;&#x1f3fb;‧✧̣̥̇‧✦ &#x1f44f;&#x1f3fb;‧✧̣̥̇:Solitary_walk ⸝⋆ ━━━┓ - 个性标签 - &#xff1a;来于“云”的“羽球人”。…

C++——string模拟实现

前言&#xff1a;上篇文章我们对string类及其常用的接口方法的使用进行了分享&#xff0c;这篇文章将着重进行对这些常用的接口方法的内部细节进行分享和模拟实现。 目录 一.基础框架 二.遍历字符串 1.[]运算符重载 2.迭代器 3.范围for 三.常用方法 1.增加 2.删除 3.调…

数据库-第十一章 并发控制【期末复习|考研复习】

前言 总结整理不易&#xff0c;希望大家点赞收藏。 给大家整理了一下数据库系统概论中的重点概念&#xff0c;以供大家期末复习和考研复习的时候使用。 参考资料是王珊老师和萨师煊老师的数据库系统概论(第五版)。 数据库系统概论系列文章传送门&#xff1a; 第一章 绪论 第二/…

数据分析-Pandas数据画箱线图

数据分析-Pandas数据画箱线图 数据分析和处理中&#xff0c;难免会遇到各种数据&#xff0c;那么数据呈现怎样的规律呢&#xff1f;不管金融数据&#xff0c;风控数据&#xff0c;营销数据等等&#xff0c;莫不如此。如何通过图示展示数据的规律&#xff1f; 数据表&#xff…

【重新定义matlab强大系列十七】Matlab深入浅出长短期记忆神经网络LSTM

&#x1f517; 运行环境&#xff1a;Matlab &#x1f6a9; 撰写作者&#xff1a;左手の明天 &#x1f947; 精选专栏&#xff1a;《python》 &#x1f525; 推荐专栏&#xff1a;《算法研究》 #### 防伪水印——左手の明天 #### &#x1f497; 大家好&#x1f917;&#x1f91…

音视频学习笔记——c++多线程(二)

✊✊✊&#x1f308;大家好&#xff01;本篇文章是多线程系列第二篇文章&#x1f607;。首先讲解了利用mutex解决多线程数据共享问题&#xff0c;举例更好理解lock和unlock的使用方法&#xff0c;以及错误操作造成的死锁问题&#xff0c;最后讲解了lock_guard与unique_lock使用…

Django模型层(附带test环境)

Django模型层(附带test环境) 目录 Django模型层(附带test环境)连接数据库Django ORM在models.py中建表允许为空指定默认值数据库迁移命令 开启测试环境建表语句补充(更改默认表名)数据的增加时间数据的时区 多表数据的增加一对多多对多 数据的删除修改数据查询数据查询所有数据…