(二十)springboot实战——springboot使用redis的订阅发布机制结合SSE实现站内信的功能

前言

在前面的章节内容中,我们介绍了如何使用springboot项目实现基于redis订阅发布机制实现消息的收发,同时也介绍了基于SSE机制的单通道消息推送案例,本节内容结合redis和sse实现一个常用的实战案例——站内信。实现系统消息的实时推送。

正文

①引入项目的pom依赖,并在application.yml中配置redis连接

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId>
</dependency>

 ②创建一个SSE服务器,用于连接用户和收发消息

package com.yundi.atp.server;import lombok.extern.slf4j.Slf4j;import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;@Slf4j
public class SseServer {/*** 存储用户的连接*/public static Map<String, SseEmitterUTF8> sseMap = new HashMap<>();/*** 建立连接** @param username* @throws IOException*/public static SseEmitterUTF8 connect(String username) throws IOException {if (!sseMap.containsKey(username)) {//设置超时时间(和token有效期一致,超时后不再推送消息),0表示不过期。默认30秒,超过时间未完成会抛出异常:AsyncRequestTimeoutExceptionSseEmitterUTF8 sseEmitter = new SseEmitterUTF8(0L);sseEmitter.send(String.format("%s号用户,连接成功!", username));sseEmitter.onCompletion(() -> sseMap.remove(username));sseEmitter.onTimeout(() -> sseMap.remove(username));sseEmitter.onError(throwable -> sseMap.remove(username));sseMap.put(username, sseEmitter);return sseEmitter;} else {SseEmitterUTF8 sseEmitterUTF8 = sseMap.get(username);sseEmitterUTF8.send(String.format("%s,用户连接成功!", username));return sseEmitterUTF8;}}/*** 发送消息** @param message*/public static synchronized void sendMessage(String message) {List<String> removeList = new ArrayList<>();for (Map.Entry<String, SseEmitterUTF8> entry : sseMap.entrySet()) {String username = entry.getKey();try {SseEmitterUTF8 sseEmitterUTF8 = entry.getValue();sseEmitterUTF8.onCompletion(() -> sseMap.remove(username));sseEmitterUTF8.onTimeout(() -> sseMap.remove(username));sseEmitterUTF8.onError(throwable -> sseMap.remove(username));sseEmitterUTF8.send(message);} catch (IOException e) {//发送不成功,将该用户加入移除列表removeList.add(username);}}//移除连接异常的用户removeList.forEach(item -> sseMap.remove(item));}
}

 ③创建一个redis消息的监听器,将监听到的消息通过sse服务推送给连接的用户

package com.yundi.atp.listen;import com.yundi.atp.constant.ChannelConstant;
import com.yundi.atp.server.SseServer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;
import java.nio.charset.StandardCharsets;@Slf4j
@Component
public class RedisMessageSubscriber implements MessageListener {@Autowiredprivate RedisMessageListenerContainer redisMessageListenerContainer;/*** 订阅消息:将订阅者添加到指定的频道*/@PostConstructpublic void subscribeToChannel() {//广播消息redisMessageListenerContainer.addMessageListener(this, new ChannelTopic(ChannelConstant.CHANNEL_GLOBAL_NAME));}@Overridepublic void onMessage(Message message, byte[] bytes) {String channel = new String(message.getChannel(), StandardCharsets.UTF_8);String messageBody = new String(message.getBody(), StandardCharsets.UTF_8);log.info("Received message: " + messageBody + " from channel: " + channel);SseServer.sendMessage(messageBody);}
}

 ④创建SseEmitterUTF8并继承SseEmitter,重写extendResponse方法,解决中文消息发送乱码问题

package com.yundi.atp.server;import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;import java.nio.charset.StandardCharsets;public class SseEmitterUTF8 extends SseEmitter {public SseEmitterUTF8(Long timeout) {super(timeout);}@Overrideprotected void extendResponse(ServerHttpResponse outputMessage) {super.extendResponse(outputMessage);HttpHeaders headers = outputMessage.getHeaders();headers.setContentType(new MediaType(MediaType.TEXT_EVENT_STREAM, StandardCharsets.UTF_8));}
}

⑤ 创建redis的配置类,用于初始化redis的容器监听器和工具类

package com.yundi.atp.config;import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.serializer.StringRedisSerializer;@Configuration
public class RedisConfig {/*** 初始化一个Redis消息监听容器* @param connectionFactory* @return*/@Beanpublic RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory connectionFactory) {RedisMessageListenerContainer container = new RedisMessageListenerContainer();container.setConnectionFactory(connectionFactory);// 添加其他配置,如线程池大小等return container;}@Beanpublic RedisTemplate<String, String> redisTemplate(RedisConnectionFactory connectionFactory) {RedisTemplate<String, String> redisTemplate = new RedisTemplate<>();redisTemplate.setConnectionFactory(connectionFactory);redisTemplate.setDefaultSerializer(new StringRedisSerializer());return redisTemplate;}
}

⑦ 创建用于站内信发送的频道Channel

package com.yundi.atp.constant;public class ChannelConstant {/*** 广播通道*/public static final String CHANNEL_GLOBAL_NAME = "channel-global";/*** 单播通道*/public static final String CHANNEL_SINGLE_NAME = "channel-single";
}

 ⑧创建一个消息发布接口和一个sse用户消息推送连接接口

package com.yundi.atp.controller;import com.yundi.atp.constant.ChannelConstant;
import com.yundi.atp.server.SseEmitterUTF8;
import com.yundi.atp.server.SseServer;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import javax.annotation.Resource;
import java.io.IOException;@RequestMapping(value = "base")
@RestController
public class BaseController {@Resourceprivate RedisTemplate redisTemplate;/*** 发布广播消息** @param msg*/@GetMapping(value = "/publish/{msg}")public void sendMsg(@PathVariable(value = "msg") String msg) {redisTemplate.convertAndSend(ChannelConstant.CHANNEL_GLOBAL_NAME, msg);}/*** 接收消息** @return* @throws IOException*/@GetMapping(path = "/connect/{username}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)public SseEmitterUTF8 connect(@PathVariable(value = "username") String username) throws IOException {SseEmitterUTF8 connect = SseServer.connect(username);return connect;}
}

 ⑨启动服务,验证站内信功能是否可以正常使用

结语

关于springboot使用redis的订阅发布机制结合SSE实现站内信的功能到这里就结束了,我们下期见。。。。。。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/689294.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

并发编程-基础知识

多线程概念 1 线程 是进程中的一个执行单元&#xff0c;负责当前进程中任务的执行。一个进程在其执行过程中&#xff0c;会产生很多个线程。 2 进程 是指内存中运行的一个应用程序&#xff0c;每个进程都有自己独立的内存空间&#xff1b;进程也是程序的一次执行过程&#xff…

Unity求物体关于平面镜像对称后坐标以及旋转

前言&#xff1a;如题&#xff0c;我在已知一个平面L和物体A&#xff0c;我希望得到镜像后的物体B的位置和旋转。 效果&#xff1a; 推导&#xff1a; 首先我们需要知道物体的对称坐标A&#xff0c;我们现在能已知A坐标以及平面L的法线&#xff0c;如果我们能得到B的坐标&…

Camtasia Studio2024中文汉化版下载安装激活图文教程

Camtasia studio 2024是一款功能强大的屏幕录制和视频编辑软件。它可以帮助用户轻松地记录电脑屏幕上的任何操作&#xff0c;并可以将录制的视频进行编辑和制作成高质量的视频教程、演示文稿、培训课程等。 Camtasia studio 2024具有直观的界面和易于使用的工具&#xff0c;包…

Shiro-04-shiro 详细架构

详细架构 下图显示了Shiro的核心架构概念&#xff0c;并简要概述了每个架构&#xff1a; 下面我们对除了核心组件的部分做一下简单的介绍&#xff1a; Authentication&#xff08;身份验证&#xff09; 身份验证是验证用户身份的过程。 也就是说&#xff0c;当用户通过应用…

Java基于微信小程序的乐室预约小程序,附源码

博主介绍&#xff1a;✌程序员徐师兄、7年大厂程序员经历。全网粉丝12w、csdn博客专家、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和毕业项目实战✌ &#x1f345;文末获取源码联系&#x1f345; &#x1f447;&#x1f3fb; 精彩专栏推荐订阅&#x1f447;…

Apache Apisix网关系统历史漏洞复现分析

文章目录 前言CVE-2020-13945默认api令牌CVE-2021-45232未授权接口2.1 默认账户密码导致RCE2.2 未授权访问api接口RCE CVE-2022-24112 地址限制绕过CVE-2022-29266 JWT令牌伪造4.1 漏洞源码简析与修复4.2 漏洞环境搭建与复现 总结 前言 Apache APISIX 是一个动态、实时、高性能…

沁恒CH32V30X学习笔记02--GPIO的使用教程及2次封装驱动

gpio 概述 刚复位后,GPIO 口运行在初始状态,这时大多数 IO 口都是运行在浮空输入状态 外部中断 所有的 GPIO 口都可以被配置外部中断输入通道,但一个外部中断输入通道最多只能映射到一个 GPIO 引脚上,且外部中断通道的序号必须和 GPIO 端口的位号一致,比如 PA1(或 PB1、…

Android开机不显示bootloader界面

Turn it off in the following way LINUX\android\bootable\bootloader\edk2\QcomModulePkg\Library\BootLib\MenuKeysDetection.c 试了没有生效 --- a/QcomModulePkg/Library/BootLib/MenuKeysDetection.cb/QcomModulePkg/Library/BootLib/MenuKeysDetection.c-364,7 364,8…

[Flask]SSTI1 buuctf

声明&#xff1a;本篇文章csdn要我一天发两篇所以我来水的 跟ssti注入的详细知识我这里写了 https://blog.csdn.net/weixin_74790320/article/details/136154130 上面链接我复现了vulhub的SSTI&#xff0c;其实本质上是一道题 然后我们就用{{.__class__}}看类的类型&#xf…

css2的三大特性

css的三大特性 一.层叠性概念 二.继承性行高的继承 三. 优先级概念a标签默认蓝色继承注意事项 一.层叠性 概念 二.继承性 行高的继承 可用倍数表示三. 优先级 概念 a标签默认蓝色 继承注意事项 例子

单反sd卡照片突然没有了?原因+解决方案

在使用单反相机拍摄时&#xff0c;SD卡作为存储媒介&#xff0c;承担着存储照片的重要角色。然而&#xff0c;有时候我们会遇到SD卡中的照片突然消失的情况&#xff0c;给拍摄工作带来很大的困扰。本文将深入剖析导致这一问题的原因&#xff0c;并提供相应的解决方案&#xff0…

Visual Studio Code安装Oracle SQL Developer插件

Visual Studio Code&#xff0c;简称VS Code&#xff0c;是最流行的IDE之一。SQL Developer作为面向 Oracle 数据库专业人员的查询、开发和管理工具&#xff0c;现已可作为插件&#xff08;Extension&#xff09;在VS Code中安装。无需安装 Java, .NET, 和Oracle Client 。 数…

【代码整理】Pytorch从0实现图像分类pipeline

文章目录 引言1.数据集读取部分dataloader.py1.1.分类数据集的数据组织形式1.2自定义数据增强/数据预处理类1.3.重写torch.utils.data.Dataset数据集读取类1.4.模块测试样例 2.网络部分mynet.py2.1.自定义分类网络torch.nn.Module2.2.模块测试样例 3.训练/验证/测试模块runner.…

嵌入式第十七天!(文件IO)

文件IO&#xff1a; 标准IO和文件IO的区别&#xff1a; 1. 标准IO是库函数&#xff0c;是对系统调用的封装 2. 文件IO是系统调用&#xff0c;是Linux内核中的函数接口 3. 标准IO是有缓存的 4. 文件IO是没有缓存的 1. 操作步骤&#xff1a; 打开 -> 读/写 -> 关闭 2. 打开…

基于Java SSM框架实现精准扶贫管理系统项目【项目源码】

基于java的SSM框架实现精准扶贫管理系统演示 JSP技术介绍 JSP技术本身是一种脚本语言&#xff0c;但它的功能是十分强大的&#xff0c;因为它可以使用所有的JAVA类。当它与JavaBeans 类进行结合时&#xff0c;它可以使显示逻辑和内容分开&#xff0c;这就极大的方便了用户的需…

⭐北邮复试刷题LCR 012. 寻找数组的中心下标__前缀和思想 (力扣119经典题变种挑战)

LCR 012. 寻找数组的中心下标 给你一个整数数组 nums &#xff0c;请计算数组的 中心下标 。 数组 中心下标 是数组的一个下标&#xff0c;其左侧所有元素相加的和等于右侧所有元素相加的和。 如果中心下标位于数组最左端&#xff0c;那么左侧数之和视为 0 &#xff0c;因为…

数据管理关键技术顶层设计

数据管理关键技术顶层设计 明仔 数据思考笔记 2023-12-27 07:36 广东 数据思考笔记 专注于数据架构&#xff0c;数据中台&#xff0c;数据治理的相关分享&#xff0c;寻求数据与业务的结合点。 13篇原创内容 公众号 数据治理65 数据治理 目录 上一篇企业数据资产管理解…

基于Java SSM框架实现生鲜食品o2o商城系统项目【项目源码+论文说明】

基于java的SSM框架实现生鲜食品o2o商城系统演示 摘要 随着社会的发展&#xff0c;社会的各行各业都在利用信息化时代的优势。计算机的优势和普及使得各种信息系统的开发成为必需。 生鲜食品o2o商城系统&#xff0c;主要的模块包括查看管理员&#xff1b;首页、个人中心、用户…

Tomcat版本号泄露

1.问题描述 Tomcat报错页面泄漏Apache Tomcat/7.0.92相关版本号信息&#xff0c;是攻击者攻击的途径之一。因此实际当中建议去掉版本号信息。 2.测试过程 随便访问一个tomcat不存在的界面 http://127.0.0.1:8080/examples/mytest.jsp 3.解决办法 1.进入到tomcat/lib目录下&a…

预检请求:为跨域请求保驾护航(下)

&#x1f90d; 前端开发工程师、技术日更博主、已过CET6 &#x1f368; 阿珊和她的猫_CSDN博客专家、23年度博客之星前端领域TOP1 &#x1f560; 牛客高级专题作者、打造专栏《前端面试必备》 、《2024面试高频手撕题》 &#x1f35a; 蓝桥云课签约作者、上架课程《Vue.js 和 E…