简单记录一下使用netty方式实现SSE的服务端功能
目录
- 简要说明
- 基于Netty
- 功能需求
- 后端代码
- 1. 创建一个SpringBoot 应用
- 2. 创建服务端功能
- 3. 创建前端功能
- 4. 测试SSE
- 封装为组件
简要说明
Server-Sent Events (SSE) 是一种用于在客户端和服务器之间建立单向通信的技术。
它允许服务器主动向客户端推送实时更新,而不需要客户端不断地请求数据。
Server-Sent Events (SSE) 的流行可以追溯到 HTML5 的引入,
最大特点:
- 前端JS原生支持
- 只接受服务端数据,单向通讯
- 原生支持断开重连
他和我们现在经常接触的 websocket,mqtt,类rabbitmq 有说明区别,
同样是客户端服务端的数据访问,同样用于取代客户端轮询访问方式,他们有审美不一样或者说使用场景是什么,下面表格简要说明一下:
技术 | SSE (Server-Sent Events) | WebSocket | MQTT | 类RabbitMQ |
---|---|---|---|---|
类型 | 单向通信 | 双向通信 | 发布/订阅模式 | 消息队列 |
协议 | 基于 HTTP | 独立于 HTTP | 轻量级消息传递协议 | 支持多种协议(如 AMQP) |
使用场景 | 实时更新(如新闻/股票信息推送) | 实时双向通信(如聊天) | 物联网设备通信,例如硬件设备主动上报给服务端信号信息 | 可靠消息传递和任务队列,用于服务端系统之间通讯 |
优点 | - 简单易用 | - 低延迟 | - 轻量级 | - 强大的消息路由功能 |
- 自动重连 | - 支持双向通信 | - 支持 QoS 级别 | - 提供持久化消息存储 | |
- 支持文本数据推送 | - 支持二进制数据传输 | - 发布/订阅解耦 | - 支持多种消息模式 | |
缺点 | - 仅支持单向通信 | - 实现相对复杂 | - 需要 MQTT 代理 | - 设置和管理相对复杂 |
- 不支持二进制数据 | - 需要额外的安全措施 | - 对简单实时应用复杂 | - 可能需要更多资源 |
基于Netty
关于java版本的SSE服务端实现,网上大多举例不正确或者说并没有实现SSE的技术特性 (例如网上举例说 创建一个 servlet,你会发现基本上是http轮询,因为一次service请求后,IO通讯就断开了,前端只会不断重连请求)。
Netty 强大健壮的异步IO通讯框架。
功能需求
- 在SpringBoot项目中创建SSE服务端功能
- 基于Netty框架
- 前端样例可自由断开或连接
- 支持携带Get请求的参数
- 前端样例支持断开重连,连接状态展示
后端代码
1. 创建一个SpringBoot 应用
这里测试的SpringBoot 版本是 2.6.14
使用的JDK版本是 17
2. 创建服务端功能
引入netty Maven依赖
<!-- https://mvnrepository.com/artifact/io.netty/netty-all --><dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.1.113.Final</version></dependency>
创建 SSE 服务端的EventLoopGroup,假设绑定端口 8849
import com.middol.yfagv.model.oms.properties.SseProperties;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.stream.ChunkedWriteHandler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Lazy;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.concurrent.TimeUnit;/*** SSE服务 server sent events** @author admin*/
@Slf4j
@Service
public class SseServer {EventLoopGroup bossGroup = new NioEventLoopGroup();EventLoopGroup workerGroup = new NioEventLoopGroup();private boolean started = false;@PostConstructpublic void init() {log.debug("SSE服务初始化完毕");}public void shutdown() {log.debug("SSE服务 Shutting down server...");// 优雅关闭 workerGroupif (!workerGroup.isShutdown()) {workerGroup.shutdownGracefully(5, 10, TimeUnit.SECONDS);}// 优雅关闭 bossGroupif (!bossGroup.isShutdown()) {bossGroup.shutdownGracefully(5, 10, TimeUnit.SECONDS);}log.debug("SSE服务 Server shut down gracefully.");}@Asyncpublic void start() throws Exception {if (started) {return;}try {ServerBootstrap b = new ServerBootstrap();b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) {ch.pipeline().addLast(new HttpServerCodec());ch.pipeline().addLast(new HttpObjectAggregator( 1024 * 1024));ch.pipeline().addLast(new ChunkedWriteHandler());ch.pipeline().addLast(new SseHandler());}}).option(ChannelOption.SO_BACKLOG, 128).childOption(ChannelOption.SO_KEEPALIVE, true);// 绑定端口并同步ChannelFuture f = b.bind(8849).sync();log.debug("SSE服务启动完成,绑定端口:{}", 8849);started = true;// 添加关闭钩子Runtime.getRuntime().addShutdownHook(new Thread(this::shutdown));// 等待服务器通道关闭f.channel().closeFuture().sync();} finally {shutdown();}}
}
创建处理前端请求的 ChannelHandler,这里我们假设只处理url 是 /events的前端请求。
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.*;
import lombok.extern.slf4j.Slf4j;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;/*** SSE处理器** @author admin*/
@Slf4j
@ChannelHandler.Sharable
public class SseHandler extends SimpleChannelInboundHandler<FullHttpRequest> {private static final String PREFIX = "events";// 定义一个 AttributeKey 用于存储 ScheduledFutureprivate static final AttributeKey<ScheduledFuture<?>> SCHEDULED_FUTURE_KEY = AttributeKey.valueOf("scheduledFuture");@Overridepublic void channelActive(io.netty.channel.ChannelHandlerContext ctx) throws Exception {// 获取远程地址String remoteAddress = ctx.channel().remoteAddress().toString();log.debug(">>>>>>>>>>>>>>>>>>>>>>>>SseHandler: channelActive, remoteAddress={}", remoteAddress);super.channelActive(ctx);}@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {// 获取远程地址String remoteAddress = ctx.channel().remoteAddress().toString();log.debug(">>>>>>>>>>>>>>>>>>>>>>>>SseHandler: channelInactive, remoteAddress={}", remoteAddress);// 从 ChannelHandlerContext 中获取定时任务并取消ScheduledFuture<?> scheduledFuture = ctx.channel().attr(SCHEDULED_FUTURE_KEY).get();if (scheduledFuture != null) {// 显式取消定时任务scheduledFuture.cancel(false);}super.channelInactive(ctx);}@Overrideprotected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) {if (request.method() == HttpMethod.OPTIONS) {// 处理预检请求FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);response.headers().set(HttpHeaderNames.ACCESS_CONTROL_ALLOW_ORIGIN, "*");response.headers().set(HttpHeaderNames.ACCESS_CONTROL_ALLOW_METHODS, "GET, OPTIONS");response.headers().set(HttpHeaderNames.ACCESS_CONTROL_ALLOW_HEADERS, "Content-Type");ctx.writeAndFlush(response);return;}if (HttpUtil.is100ContinueExpected(request)) {send100Continue(ctx);}// 检查请求的 URI 是否以指定的前缀开始String uri = request.uri();// 解析 GET 参数QueryStringDecoder queryStringDecoder = new QueryStringDecoder(uri);Map<String, List<String>> parameters = queryStringDecoder.parameters();if (parameters != null && !parameters.isEmpty()) {log.debug(">>>>>>>>>>>>>>>>>>>>>>>>SseHandler: parameters={}", parameters);}if (!uri.startsWith("/" + PREFIX)) {ctx.writeAndFlush(new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.NOT_FOUND));return;}// 设置 CORS 头HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/event-stream");response.headers().set(HttpHeaderNames.CACHE_CONTROL, "no-cache");response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);// CORS 头response.headers().set(HttpHeaderNames.ACCESS_CONTROL_ALLOW_ORIGIN, "*"); // 允许所有域response.headers().set(HttpHeaderNames.ACCESS_CONTROL_ALLOW_METHODS, "GET, OPTIONS"); // 允许的请求方法response.headers().set(HttpHeaderNames.ACCESS_CONTROL_ALLOW_HEADERS, "Content-Type"); // 允许的请求头ctx.write(response);// 发送初始 SSE 事件sendSseEvent(ctx, "Connected to SSE server");// 定期发送 SSE 事件long initialDelay = 0L;long period = 5L;ctx.executor().scheduleAtFixedRate(() -> sendSseEvent(ctx, "CurrentTimeMillis: " + System.currentTimeMillis()),initialDelay, period, java.util.concurrent.TimeUnit.SECONDS);// 将定时任务的引用存储在 ChannelHandlerContext 的属性中ctx.channel().attr(SCHEDULED_FUTURE_KEY).set(scheduledFuture);}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {// 获取远程地址String remoteAddress = ctx.channel().remoteAddress().toString();log.error(">>>>>>>>>>>>>>>>>>>>>>>>SseHandler: exceptionCaught, remoteAddress={}", remoteAddress, cause);// 关闭连接,自动释放相关资源ctx.close();}protected static void send100Continue(ChannelHandlerContext ctx) {FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.CONTINUE);ctx.write(response);}protected void sendSseEvent(ChannelHandlerContext ctx, String data) {ByteBuf buffer = ctx.alloc().buffer();buffer.writeBytes(("data: " + data + "\n\n").getBytes(StandardCharsets.UTF_8));ctx.writeAndFlush(new DefaultHttpContent(buffer));}}
以上核心业务方法是 channelRead0, 里面设置了可以跨越,
为什么要跨越?原因是netty服务端绑定的端口和本身 SpringBoot的应用端口不是一样,前端页面可能即要请求SpringBoot的业务接口也需要SSE服务接口。
以上是简单模拟向前端页面推送时间戳信息,每隔5秒一次,如果我要推送Bean方法里面的业务数据该如何做?
最简单方法是修改sendSseEvent里面的业务逻辑,使用SpringUtil获得Bean
protected void sendSseEvent(ChannelHandlerContext ctx, String data) {ByteBuf buffer = ctx.alloc().buffer();buffer.writeBytes(("data: " + JSONObject.toJSONString(SpringUtil.getBean(YourService.class).querySome() + "\n\n")).getBytes(StandardCharsets.UTF_8));ctx.writeAndFlush(new DefaultHttpContent(buffer));}
最后写一个手动启动Netty服务的Controller方法,这个主要用于测试,可以设置SpringBoot启动时自动启动Netty服务。
@Lazy@Resourceprivate SseServer sseServer;@ApiOperation(value = "启动SSE服务")@PostMapping("startSseServer")public ResponseVO<String> startSseServer() {try {sseServer.start();} catch (Exception e) {return ResponseVO.fail(e.getMessage(), DateUtil.now());}return ResponseVO.success(ResponseVO.SUCCESS_MSG, DateUtil.now());}
3. 创建前端功能
前端代码,我直接ChatGPT帮我生成一个用于测试SSE功能的页面:
<!DOCTYPE html>
<html lang="en">
<head><meta charset="UTF-8"><meta name="viewport" content="width=device-width, initial-scale=1.0"><title>SSE Example</title><script>let eventSource; // 声明 eventSource 变量let isConnected = false; // 连接状态标志function toggleEventSource() {const button = document.getElementById('toggleButton');const inputUrl = document.getElementById('urlInput').value.trim(); // 获取输入框中的 URLif (!inputUrl) {alert('请输入有效的 URL');return;}if (isConnected) {eventSource.close(); // 关闭连接button.innerText = "开启 EventSource"; // 更新按钮文本button.classList.remove('close'); // 移除关闭状态的样式button.classList.add('open'); // 添加开启状态的样式document.getElementById('status').innerText = "Disconnected from SSE server.";document.getElementById('status').style.color = "red";isConnected = false; // 更新连接状态} else {eventSource = new EventSource(inputUrl); // 使用输入框中的 URL 创建新的 EventSource 实例eventSource.onopen = function() {console.log("Connection to server opened.");const status = document.getElementById('status');status.innerText = "Connected to SSE server.";status.style.color = "green";status.style.fontWeight = "bold";button.innerText = "关闭 EventSource"; // 更新按钮文本button.classList.remove('open'); // 移除开启状态的样式button.classList.add('close'); // 添加关闭状态的样式isConnected = true; // 更新连接状态};eventSource.onmessage = function(event) {console.log("Received message: " + event.data);const messagesDiv = document.getElementById('messages');messagesDiv.innerHTML += `<p>${event.data}</p>`;// 检查行数并在超过100行时清空内容const lines = messagesDiv.getElementsByTagName('p').length;if (lines > 100) {messagesDiv.innerHTML = ''; // 清空内容console.log("Messages cleared after exceeding 100 lines.");}};eventSource.onerror = function() {console.error("EventSource failed.");const status = document.getElementById('status');status.innerText = "Exception: Connection to SSE server lost.";status.style.color = "red";status.style.fontWeight = "bold";button.innerText = "开启 EventSource"; // 更新按钮文本button.classList.remove('close'); // 移除关闭状态的样式button.classList.add('open'); // 添加开启状态的样式isConnected = false; // 更新连接状态};}}</script><style>body {font-family: Arial, sans-serif;margin: 20px;padding: 20px;background-color: #f4f4f4;border-radius: 8px;}#toggleButton {padding: 10px 20px; /* 增加内边距 */font-size: 16px; /* 增大字体 */color: white; /* 字体颜色 */border: none; /* 去掉边框 */border-radius: 5px; /* 圆角 */cursor: pointer; /* 鼠标悬停时显示手型 */transition: background-color 0.3s; /* 背景颜色过渡效果 */}#toggleButton.open {background-color: #007bff; /* 开启状态按钮背景颜色 */}#toggleButton.open:hover {background-color: #0056b3; /* 开启状态悬停时的背景颜色 */}#toggleButton.open:active {background-color: #004080; /* 开启状态点击时的背景颜色 */}#toggleButton.close {background-color: #f17b87; /* 关闭状态按钮背景颜色 */}#toggleButton.close:hover {background-color: #d9534f; /* 关闭状态悬停时的背景颜色 */}#toggleButton.close:active {background-color: #c9302c; /* 关闭状态点击时的背景颜色 */}#urlInput {width: 400px; /* 输入框宽度 */padding: 8px; /* 增加内边距 */font-size: 16px; /* 增大字体 */margin-right: 10px; /* 添加与按钮的间距 */}#messages {margin-top: 20px;padding: 10px;background-color: #fff;border: 1px solid #ccc;border-radius: 4px;max-height: 300px;overflow-y: auto;}p {margin: 5px 0;font-size: 18px; /* 增大内容字体大小 */}#status {font-size: 20px; /* 增大状态字体大小 */}</style>
</head>
<body>
<h1>SSE Example</h1>
<label for="urlInput"></label><input type="text" id="urlInput" placeholder="Enter SSE URL" value="http://localhost:8849/events"/> <!-- URL 输入框 -->
<button id="toggleButton" class="open" onclick="toggleEventSource()">开启 EventSource</button>
<br/><br/><br/>
<div id="status">请点击 [开启 EventSource] 按钮开启 EventSource。</div>
<br/>
<div id="messages"></div>
</body>
</html>
4. 测试SSE
启动SpringBoot服务,启动Netty服务
谷歌浏览器输入 http://localhost:8088/test1.html
这里的8088是SpringBoot应用端口,test1.html 是以上创建的页面,放在SpringBoot 的静态资源文件目录下的页面文件。
点击 【开启 EventSource】
点击 关闭 按钮
再次开启,然后关闭后台服务,然后再次开启后台服务测试 SSE自动重连
封装为组件
依据Netty的高性能实现的SSE服务端功能,基本上实现了SSE的所有技术特点,在理想情况下,一台 普通的 32GB 内存的服务器可以支持数几十万个前端连接,基本满足中小企业业务需求量。
使用SSE其实针对中小企业来说最大的优点其实是部署的便捷性,无需引入其他消息队列中间件等服务。
以上是测试样例,以下是封装成 Spring-boot-starter 组件,代码仓库如下:
https://github.com/dwhgygzt/sse-spring-boot-starter
https://gitee.com/banana6/sse-spring-boot-starter
欢迎下载测试交流!