主要依赖
<!--spring-boot父工程--><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.2.2.RELEASE</version></parent>
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency><groupId>com.squareup.okhttp3</groupId><artifactId>okhttp</artifactId><version>4.2.0</version>
</dependency>
<dependency><groupId>com.squareup.okhttp3</groupId><artifactId>okhttp-sse</artifactId><version>4.2.0</version>
</dependency>
服务
package cn;import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;import java.io.IOException;
import java.util.Map;@RestController
public class SseController {@PostMapping(value = "/sse", produces = {MediaType.TEXT_EVENT_STREAM_VALUE})public SseEmitter handleSse(@RequestBody Map<String, Object> data) {System.out.println(data);SseEmitter emitter = new SseEmitter();// 模拟一个长时间运行的任务,每秒发送一个事件new Thread(() -> {for (int i = 0; i < 5; i++) {System.out.println("开始响应 " + i);try {emitter.send(System.currentTimeMillis());} catch (IOException e) {// 如果客户端断开连接,我们需要关闭emitteremitter.completeWithError(e);}}emitter.complete();}).start();return emitter;}
}
请求
package cn.demo;import cn.hutool.json.JSONUtil;
import okhttp3.*;
import okhttp3.sse.EventSource;
import okhttp3.sse.EventSourceListener;
import okhttp3.sse.EventSources;import java.util.Arrays;
import java.util.HashMap;public class Test_1 {public static void main(String[] args) throws Exception {OkHttpClient client = new OkHttpClient.Builder().build();EventSource.Factory factory = EventSources.createFactory(client);// 请求体HashMap<String, Object> map = new HashMap<>();map.put("prompt", "哈喽,你好");map.put("history", Arrays.asList());map.put("temperature", 0.9);map.put("top_p", 0.7);map.put("max_new_tokens", 4096);String json = JSONUtil.toJsonStr(map);RequestBody body = RequestBody.create(MediaType.parse("application/json; charset=utf-8"), json);// 请求对象Request request = new Request.Builder().url("http://localhost:8080/sse").post(body).build();// 自定义监听器EventSourceListener eventSourceListener = new EventSourceListener() {@Overridepublic void onOpen(EventSource eventSource, Response response) {System.out.println("开启");super.onOpen(eventSource, response);}@Overridepublic void onEvent(EventSource eventSource, String id, String type, String data) {// 接受消息 dataSystem.out.println("数据data "+data);super.onEvent(eventSource, id, type, data);}@Overridepublic void onClosed(EventSource eventSource) {System.out.println("关闭");//super.onClosed(eventSource);}@Overridepublic void onFailure(EventSource eventSource, Throwable t, Response response) {System.out.println("失败");super.onFailure(eventSource, t, response);}};// 创建事件EventSource eventSource = factory.newEventSource(request, eventSourceListener);//client.dispatcher().executorService().shutdown();}}
说明
请求连接没有立即关闭
当请求完服务后,okhttp本身并不会直接关闭,它有后台挂起的线程。
这里的关闭不是必须的,如果你的应用需要立即关闭连接,释放资源,可以使用最后一行注释的代码。
EventSource eventSource = factory.newEventSource(request, eventSourceListener);
这行代码我在window10上循环了50000次,并没有看到明显的资源占用,似乎真的没有必要关闭。
参考官网 https://square.github.io/okhttp/5.x/okhttp/okhttp3/-ok-http-client/index.html