先说BUG处理,遇到提示异步问题 Async support must be enabled on a servlet and for all filters involved in async request processing. This is done in Java code using the Servlet API or by adding "<async-supported>true</async-supported>" to servlet and filter declarations in web.xml.
springboot在@WebFilter注解处,加入urlPatterns = { "/*" },asyncSupported = true
springmvc在web.xml处理
<?xml version="1.0" encoding="UTF-8"?>
<web-app xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://java.sun.com/xml/ns/javaee"xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-app_3_0.xsd"version="3.0"><filter-mapping><filter-name>shiroFilter</filter-name><url-pattern>/*</url-pattern><dispatcher>REQUEST</dispatcher><dispatcher>ASYNC</dispatcher>
</filter-mapping>
- demo1,服务器间隔一定时间推送内容
- 接口方法
@GetMapping(path = "/sse/{userId}",produces = MediaType.TEXT_EVENT_STREAM_VALUE )public Flux<ServerSentEvent<String>> sse(@PathVariable String userId) {// 每两秒推送一次return Flux.interval(Duration.ofSeconds(2)).map(seq->Tuples.of(seq, LocalDateTime.now())).log()//序号和时间.map(data-> ServerSentEvent.<String>builder().id(userId).data(data.getT1().toString()).build());//推送内容}
2.前端代码
<!DOCTYPE html>
<html xmlns:th="http://www.thymeleaf.org">
<head><meta charset="UTF-8"/><title>服务器推送事件</title>
</head>
<body>
<div> <div id="data"></div> <div id="result"></div><br/>
</div>
<script th:inline="javascript" >
//服务器推送事件
if (typeof (EventSource) !== "undefined") { var source1 = new EventSource("http://localhost:9000/api/admin/test/sse/1");//当抓取到消息时source1.onmessage = function (evt) {document.getElementById("data").innerHTML = document.getElementById("data").innerHTML+"股票行情:" + evt.data;};
} else {//注意:ie浏览器不支持document.getElementById("result").innerHTML = "抱歉,你的浏览器不支持 server-sent 事件..."; var xhr;var xhr2;if (window.XMLHttpRequest){//IE7+, Firefox, Chrome, Opera, Safari浏览器支持该方法xhr=new XMLHttpRequest();xhr2=new XMLHttpRequest();}else{//IE6, IE5 浏览器不支持,使用ActiveXObject方法代替xhr=new ActiveXObject("Microsoft.XMLHTTP");xhr2=new ActiveXObject("Microsoft.XMLHTTP");}console.log(xhr);console.log(xhr2);xhr.open('GET', '/sse/countDown');xhr.send(null);//发送请求xhr.onreadystatechange = function() {console.log("s响应状态:" + xhr.readyState);//2是空响应,3是响应一部分,4是响应完成if (xhr.readyState > 2) {//这儿可以使用response(对应json)与responseText(对应text)var newData = xhr.response.substr(xhr.seenBytes);newData = newData.replace(/\n/g, "#");newData = newData.substring(0, newData.length - 1);var data = newData.split("#");console.log("获取到的数据:" + data);document.getElementById("result").innerHTML = data;//长度重新赋值,下次截取时需要使用xhr.seenBytes = xhr.response.length;}}xhr2.open('GET', '/sse/retrieve');xhr2.send(null);//发送请求xhr2.onreadystatechange = function() {console.log("s响应状态:" + xhr2.readyState);//0: 请求未初始化,2 请求已接收,3 请求处理中,4 请求已完成,且响应已就绪if (xhr2.readyState > 2) {//这儿可以使用response(对应json)与responseText(对应text)var newData1 = xhr2.response.substr(xhr2.seenBytes);newData1 = newData1.replace(/\n/g, "#");newData1 = newData1.substring(0, newData1.length - 1);var data1 = newData1.split("#");console.log("获取到的数据:" + data1);document.getElementById("data").innerHTML = data1;//长度重新赋值,下次截取时需要使用xhr2.seenBytes = xhr2.response.length;}}
}
</script>
</body>
</html>
- demo2 订阅服务器消息,服务器send推送消息完成后,关闭sse.close
1.接口方法以及工具类
@GetMapping(path = "/sse/sub",produces = MediaType.TEXT_EVENT_STREAM_VALUE )
public SseEmitter subscribe(@RequestParam String questionId,HttpServletResponse response) {// 简单异步发消息 ====//questionId 订阅id,id对应了sse对象new Thread(() -> {try {Thread.sleep(1000);for (int i = 0; i < 10; i++) {Thread.sleep(500);SSEUtils.pubMsg(questionId, questionId + " - kingtao come " + i);}} catch (Exception e) {e.printStackTrace();} finally {// 消息发送完关闭订阅SSEUtils.closeSub(questionId);}}).start();// =================return SSEUtils.addSub(questionId);}
工具类
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;public class SSEUtils {// timeoutprivate static Long DEFAULT_TIME_OUT = 2*60*1000L;// 订阅表private static Map<String, SseEmitter> subscribeMap = new ConcurrentHashMap<>();/** 添加订阅 */public static SseEmitter addSub(String questionId) {if (null == questionId || "".equals(questionId)) {return null;}SseEmitter emitter = subscribeMap.get(questionId);if (null == emitter) {emitter = new SseEmitter(DEFAULT_TIME_OUT);subscribeMap.put(questionId, emitter);}return emitter;}/** 发消息 */public static void pubMsg(String questionId, String msg) {SseEmitter emitter = subscribeMap.get(questionId);if (null != emitter) {try {// 更规范的消息结构看源码emitter.send(SseEmitter.event().data(msg));} catch (Exception e) {// e.printStackTrace();}}}/*** 关闭订阅 * @param questionId*/public static void closeSub(String questionId) {SseEmitter emitter = subscribeMap.get(questionId);if (null != emitter) {try {emitter.complete();subscribeMap.remove(questionId);} catch (Exception e) {e.printStackTrace();}}}
}
2.前端代码
<!DOCTYPE html>
<html lang="en">
<head><meta charset="UTF-8"><title>sse</title>
</head>
<body>
<div><label>问题id</label><input type="text" id="questionId"><button onclick="subscribe()">订阅</button><hr><label>F12-console控制台查看消息</label>
</div><script>function subscribe() {let questionId = document.getElementById('questionId').value;let url = 'http://localhost:9000/api/admin/test/sse/sub?questionId=' + questionId;let eventSource = new EventSource(url);eventSource.onmessage = function (e) {console.log(e.data);};eventSource.onopen = function (e) {console.log(e,1);// todo};eventSource.onerror = function (e) {// todoconsole.log(e,2);eventSource.close()};}
</script>
</body>
</html>