一、引入依赖
< dependency> < groupId> com. squareup. okhttp3< / groupId> < artifactId> okhttp< / artifactId> < version> 4.10 .0 < / version>
< / dependency>
< dependency> < groupId> com. squareup. okhttp3< / groupId> < artifactId> okhttp- sse< / artifactId> < version> 4.10 .0 < / version>
< / dependency>
二、创建 SSE 客户端服务类
import okhttp3. * ;
import okhttp3. sse. EventSource ;
import okhttp3. sse. EventSourceListener ;
import okhttp3. sse. EventSources ;
import org. springframework. stereotype. Service ; import java. util. concurrent. TimeUnit ; @Service
public class SseClientService { private final OkHttpClient okHttpClient; private EventSource eventSource; public SseClientService ( ) { this . okHttpClient = new OkHttpClient. Builder ( ) . connectTimeout ( 10 , TimeUnit . SECONDS) . readTimeout ( 0 , TimeUnit . SECONDS) . writeTimeout ( 10 , TimeUnit . SECONDS) . build ( ) ; } public void connectToSseServer ( String url) { Request request = new Request. Builder ( ) . url ( url) . build ( ) ; EventSource. Factory factory = EventSources . createFactory ( okHttpClient) ; this . eventSource = factory. newEventSource ( request, new EventSourceListener ( ) { @Override public void onOpen ( EventSource eventSource, Response response) { System . out. println ( "SSE连接已建立" ) ; } @Override public void onEvent ( EventSource eventSource, String id, String type, String data) { System . out. printf ( "收到事件: id=%s, type=%s, data=%s%n" , id, type, data) ; } @Override public void onClosed ( EventSource eventSource) { System . out. println ( "SSE连接已关闭" ) ; } @Override public void onFailure ( EventSource eventSource, Throwable t, Response response) { System . err. println ( "SSE连接失败: " + t. getMessage ( ) ) ; reconnect ( url) ; } } ) ; } private void reconnect ( String url) { try { Thread . sleep ( 5000 ) ; connectToSseServer ( url) ; } catch ( InterruptedException e) { Thread . currentThread ( ) . interrupt ( ) ; } } public void closeConnection ( ) { if ( eventSource != null ) { eventSource. cancel ( ) ; } okHttpClient. dispatcher ( ) . executorService ( ) . shutdown ( ) ; }
}
三、 创建控制器测试
import org. springframework. web. bind. annotation. GetMapping ;
import org. springframework. web. bind. annotation. RequestMapping ;
import org. springframework. web. bind. annotation. RestController ; @RestController
@RequestMapping ( "/sse-client" )
public class SseClientController { private final SseClientService sseClientService; public SseClientController ( SseClientService sseClientService) { this . sseClientService = sseClientService; } @GetMapping ( "/connect" ) public String connect ( ) { sseClientService. connectToSseServer ( "http://localhost:8080/sse-server/subscribe" ) ; return "SSE客户端已启动" ; } @GetMapping ( "/disconnect" ) public String disconnect ( ) { sseClientService. closeConnection ( ) ; return "SSE客户端已关闭" ; }
}
四、 高级功能实现
1. 自定义事件处理
@Override
public void onEvent ( EventSource eventSource, String id, String type, String data) { switch ( type) { case "message" : handleMessageEvent ( data) ; break ; case "system-alert" : handleSystemAlert ( data) ; break ; default : handleDefaultEvent ( data) ; }
}
2. 添加认证头
public void connectToSseServerWithAuth ( String url, String token) { Request request = new Request. Builder ( ) . url ( url) . header ( "Authorization" , "Bearer " + token) . build ( ) ;
}
3. 心跳检测
@Override
public void onEvent ( EventSource eventSource, String id, String type, String data) { if ( "heartbeat" . equals ( type) ) { System . out. println ( "收到心跳: " + data) ; return ; }
}
五、OkHttpConfig单独配置
import okhttp3. OkHttpClient ;
import org. springframework. context. annotation. Bean ;
import org. springframework. context. annotation. Configuration ; import java. util. concurrent. TimeUnit ; @Configuration
public class OkHttpConfig { @Bean public OkHttpClient okHttpClient ( ) { return new OkHttpClient. Builder ( ) . connectTimeout ( 15 , TimeUnit . SECONDS) . readTimeout ( 0 , TimeUnit . SECONDS) . writeTimeout ( 15 , TimeUnit . SECONDS) . retryOnConnectionFailure ( true ) . build ( ) ; }
}