- SSE 是一种单向通信,只允许服务器向客户端发送数据。客户端无法向服务器发送数据。
- SSE 建立在 HTTP 协议之上,使用标准 HTTP 请求和响应。
- SSE 不需要额外的库或协议处理,客户端可以使用浏览器的原生 EventSource API 来接收数据。
- SSE 支持跨域通信,可以通过 CORS(跨域资源共享)机制进行配置。
- SSE 在现代浏览器中也得到广泛支持,但与 WebSocket 相比,它的历史要长一些。
import org.springframework.http.MediaType;
import org.springframework.stereotype.Component;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;/*** 数据管理器用于管理Server-Sent Events (SSE) 的订阅和数据推送。* @author xiaobo*/
@Component
public class DataManager {private final Map<String, List<SseEmitter>> dataEmitters = new HashMap<>();/*** 订阅特定数据类型的SSE连接。** @param dataType 要订阅的数据类型* @param emitter SSE连接*/public void subscribe(String dataType, SseEmitter emitter) {dataEmitters.computeIfAbsent(dataType, k -> new ArrayList<>()).add(emitter);emitter.onCompletion(() -> removeEmitter(dataType, emitter));emitter.onTimeout(() -> removeEmitter(dataType, emitter));}/*** 推送特定数据类型的数据给所有已订阅的连接。** @param dataType 要推送的数据类型* @param data 要推送的数据*/public void pushData(String dataType, String data) {List<SseEmitter> emitters = dataEmitters.getOrDefault(dataType, new ArrayList<>());emitters.forEach(emitter -> {try {emitter.send(SseEmitter.event().data(data, MediaType.TEXT_PLAIN));} catch (IOException e) {removeEmitter(dataType, emitter);}});}private void removeEmitter(String dataType, SseEmitter emitter) {List<SseEmitter> emitters = dataEmitters.get(dataType);if (emitters != null) {emitters.remove(emitter);}}
}
import com.todoitbo.baseSpringbootDasmart.sse.DataManager;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; import javax.annotation.Resource; /** * @author xiaobo */
@RestController
@RequestMapping("/environment")
public class EnvironmentController { @Resource private DataManager dataManager; @GetMapping(value = "/subscribe", produces = MediaType.TEXT_EVENT_STREAM_VALUE) public SseEmitter subscribe() { SseEmitter emitter = new SseEmitter(); dataManager.subscribe("environment", emitter); return emitter; } // 示例:推送环境监测数据给前端 @GetMapping("/push/{testText}") public ResponseEntity<String> pushEnvironmentData(@PathVariable String testText) { dataManager.pushData("environment", testText); return ResponseEntity.ok("Data pushed successfully."); }
}
如果没有数据产生会出现连接超时问题。
默认情况下,EventSource对象会自动重连,以保持连接的持久性。
第一次订阅SSE连接后,即使后端没有数据产生,之后也能接收到数据。这可以通过定期发送心跳数据
@Scheduled(fixedRate = 30000) // 每30秒发送一次心跳数据
public void sendHeartbeat() {dataManager.pushData("heartbeat", "Heartbeat data");
}
<!DOCTYPE html>
<html>
<head><title>SSE Data Receiver</title>
</head>
<body><h1>Real-time Data Display</h1><div id="data-container"></div><script>const dataContainer = document.getElementById('data-container');// 创建一个 EventSource 对象,指定 SSE 服务器端点的 URLconst eventSource = new EventSource('http://127.0.0.1:13024/environment/subscribe'); // 根据你的控制器端点来设置URLeventSource.onopen = function(event) {};// 添加事件处理程序,监听服务器端发送的事件eventSource.onmessage = (event) => {const data = event.data;// 在这里处理从服务器接收到的数据// 可以将数据显示在页面上或进行其他操作const newDataElement = document.createElement('p');newDataElement.textContent = data;dataContainer.appendChild(newDataElement);};eventSource.onerror = (error) => {// 处理连接错误console.error('Error occurred:', error);// 重新建立连接eventSource.close();setTimeout(() => {// 重新建立连接eventSource = new EventSource('/environment/subscribe');}, 1000); // 1秒后重试};</script>
</body>
</html>
精简版后端
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;import java.io.IOException;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicLong;@RestController
@SpringBootApplication
public class SseApplication {private final CopyOnWriteArrayList<SseEmitter> emitters = new CopyOnWriteArrayList<>();private final AtomicLong counter = new AtomicLong();public static void main(String[] args) {SpringApplication.run(SseApplication.class, args);}@GetMapping("/sse")public SseEmitter handleSse() {SseEmitter emitter = new SseEmitter();emitters.add(emitter);emitter.onCompletion(() -> emitters.remove(emitter));new Thread(() -> {try {for (int i = 0; i < 10; i++) {emitter.send(SseEmitter.event().id(String.valueOf(counter.incrementAndGet())).name("message").data("This is message " + i));Thread.sleep(1000);}emitter.complete();} catch (IOException | InterruptedException e) {emitter.completeWithError(e);}}).start();return emitter;}
}