用spring-webmvc包实现AI(Deepseek)事件流(SSE)推送

 前后端:  Spring Boot + Angular

spring-webmvc-5.2.2包

代码片段如下:

控制层:

@GetMapping(value = "/realtime/page/ai/sse", produces = MediaType.TEXT_EVENT_STREAM_VALUE)@ApiOperation(value = "获取告警记录进行AI分析")public SseEmitter getRealTimeAlarmAi(AlarmRecordQueryParam queryParam) {final DateTimeFormatter timeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm");IPage<AlarmRecord> page = alarmRecordService.findRealTimeByParam(queryParam);StringBuilder alarmInfo = new StringBuilder();try {// 根据状态设置前缀String prefix = queryParam.getStatus() == 1 ?"最近十条历史告警记录:" : "当前十条实时告警信息:";String emptyMessage = queryParam.getStatus() == 1 ?"暂无历史告警" : "当前无实时告警";if (page.getRecords() != null && !page.getRecords().isEmpty()) {alarmInfo.append(prefix);sseService.buildAlarmContent(page, alarmInfo, timeFormatter);} else {alarmInfo.append(emptyMessage);}sseService.validatePromptLength(alarmInfo, maxPromptLength);} catch (Exception e) {log.error("告警信息处理异常", e);}return sseService.createStreamConnection(alarmInfo.toString(), "告警");}
    @ApiOperation("查询图表数据用AI分析数据详情")@GetMapping("/chart/ai/sse")@OpLog(inputExpression = "开始时间:{#queryParam.startTime},结束时间:{#queryParam.endTime},图表ID:{#queryParam.chartId}",outputExpression = "{#code}")public SseEmitter chartAiSSEData(@Validated ChartDataQueryParam queryParam) throws Exception {String ChartAi = "报表";ChartInstance chart = Optional.ofNullable(chartService.getById(queryParam.getChartId())).orElseThrow(() -> new Exception("找不到:" + queryParam.getChartId() + "的图表定义"));List<ChartDeviceSensor> deviceSensors = ChartInstance.toChartDeviceSensorList(chart);String endTime = DataQueryParam.endTime(queryParam.getEndTime());DataQueryParam dataQueryParam = new DataQueryParam(queryParam.getStartTime(), endTime, deviceSensors);IChartDataService chartDataService = chartDataServiceManager.getInstance(chart.getChartTypeId());List dataList = chartDataService.getChartData(dataQueryParam);List<String> times = dataQueryParam.getDateType().getTimes(dataQueryParam.getStartTime(), dataQueryParam.getEndTime());ChartData chartData = new ChartData<>(chart.getId(), chart.getName(), chart.getChartFormat(), chart.getChartTypeId(), chart.getShowType(), chart.getCategoryId(), times, dataList);// 将 ChartData 转换为压缩字符串String csvData = ChartDataFormatter.formatChartData(chartData);log.info("当前请求字符长度:" + csvData.length());try {if (csvData.length() > maxPromptLength) {OpLogAspect.setCode(400); // 设置错误码throw new IllegalArgumentException("数据长度超过限制,最大允许长度:" + maxPromptLength);}OpLogAspect.setCode(200);return sseService.createStreamConnection(csvData,ChartAi);} catch (IllegalArgumentException e) {OpLogAspect.setCode(400); // 参数错误throw e;} catch (Exception e) {OpLogAspect.setCode(500); // 系统错误throw new RuntimeException("处理请求失败", e);}}

业务层代码:

package com.keydak.project.core.chart.ai.service.impl;import cn.hutool.core.lang.UUID;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.keydak.project.core.alarm.data.vo.AlarmRecord;
import com.keydak.project.core.chart.ai.dto.KeydakAiConfigDTO;
import com.keydak.project.core.chart.ai.exception.BalanceException;
import com.keydak.project.core.chart.ai.service.SSEService;
import com.keydak.repository.core.enums.SystemGlobalConfigEnum;
import com.keydak.repository.core.service.ISystemGlobalConfigService;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import com.fasterxml.jackson.core.type.TypeReference;import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.io.*;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.time.format.DateTimeFormatter;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;/*** AI服务实现** @author xyt*/
@Service
@Slf4j
public class SSEServiceImpl implements SSEService {@Autowiredprivate ISystemGlobalConfigService systemGlobalConfigService;private final ObjectMapper objectMapper = new ObjectMapper();private RateLimiter rateLimiter;@PostConstructpublic void init() {try {// 初始化限流器时动态获取配置KeydakAiConfigDTO initialConfig = getConfig();rateLimiter = new RateLimiter(initialConfig.getRateLimit());} catch (Exception e) {throw new RuntimeException("初始化失败,无法获取Keydak AI配置", e);}}// 线程池配置private static final int CORE_POOL_SIZE = 5; // 核心线程数private static final int MAX_POOL_SIZE = 8; // 最大线程数private static final long KEEP_ALIVE_TIME = 30;  // 线程空闲时间private static final int QUEUE_CAPACITY = 30; //队列private final ExecutorService executor = new ThreadPoolExecutor(CORE_POOL_SIZE,MAX_POOL_SIZE,KEEP_ALIVE_TIME,TimeUnit.SECONDS,new LinkedBlockingQueue<>(QUEUE_CAPACITY),new ThreadPoolExecutor.AbortPolicy() // 使用 AbortPolicy 直接拒绝任务而不执行);/*** 刷新限流器配置*/@Overridepublic synchronized void refreshRateLimiter(Integer rate) {try {if (rateLimiter == null) {rateLimiter = new RateLimiter(rate);} else {rateLimiter.updateRate(rate);}log.info("限流器已更新,新速率限制: {}", rate);} catch (Exception e) {log.error("刷新限流器配置失败", e);}}@PreDestroypublic void destroy() {executor.shutdown();try {if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {executor.shutdownNow();}} catch (InterruptedException e) {executor.shutdownNow();Thread.currentThread().interrupt();}}/*** 获取Keydak AI配置信息。** @return 返回Keydak AI配置信息的DTO对象*/private KeydakAiConfigDTO getConfig() throws Exception {KeydakAiConfigDTO config = systemGlobalConfigService.getTag(SystemGlobalConfigEnum.KEYDAK_AI_CONFIG,KeydakAiConfigDTO.class);if (config == null) {throw new Exception("Keydak AI配置不存在");}return config;}@Overridepublic SseEmitter createStreamConnection(String message, String aiType) {SseEmitter emitter = new SseEmitter(120_000L); // 2分钟超时try {KeydakAiConfigDTO config = getConfig();double balance = getBalance();log.info("当前余额: {} 元", balance);log.warn("当前可用令牌数: {}", rateLimiter.tokens.get());if (!rateLimiter.tryAcquire()) {log.warn("请求被限流 | 当前允许的QPS:{}", config.getRateLimit());handleRateLimitError(emitter);return emitter;}} catch (BalanceException e) {handleBalanceError(emitter, e.getMessage());return emitter;} catch (Exception e) {handleBalanceError(emitter, "系统错误: " + e.getMessage());return emitter;}// 保持原有事件监听emitter.onCompletion(() -> log.info("SSE连接完成"));emitter.onTimeout(() -> {log.warn("SSE连接超时");rateLimiter.refill(); // 超时请求返还令牌});emitter.onError(e -> log.error("SSE连接错误", e));// 保持原有线程池处理executor.execute(() -> {try {processSSEStream(message, aiType, emitter);} catch (Exception e) {emitter.completeWithError(e);}});return emitter;}/*** 新增限流错误处理方法** @param emitter 事件发射器* @throws IOException 如果发送失败*/private void handleRateLimitError(SseEmitter emitter) {try {Map<String, Object> error = new LinkedHashMap<>();error.put("error", "rate_limit_exceeded");error.put("message", "请求过于频繁,请稍后再试");error.put("timestamp", System.currentTimeMillis());emitter.send(SseEmitter.event().data(objectMapper.writeValueAsString(error)).name("rate-limit-error").reconnectTime(5000L));emitter.complete();} catch (IOException e) {log.error("发送限流错误失败", e);}}private void handleBalanceError(SseEmitter emitter, String errorMsg) {try {JSONObject error = new JSONObject();error.put("error", "balance_insufficient");error.put("message", errorMsg);emitter.send(SseEmitter.event().data(error.toJSONString()).name("balance-error"));emitter.complete();} catch (Exception e) {log.error("发送余额错误信息失败", e);}}private void processSSEStream(String message, String aiType, SseEmitter emitter) throws Exception {HttpURLConnection connection = null;try {connection = createConnection();String jsonBody = buildRequestBody(message, aiType);log.info("发送AI请求数据: {}", jsonBody); // 记录请求体sendRequestData(connection, jsonBody);validateResponse(connection);try (InputStream inputStream = connection.getInputStream();BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream))) {String line;while ((line = reader.readLine()) != null) {if (Thread.currentThread().isInterrupted()) {throw new InterruptedException("处理被中断");}if (line.startsWith("data: ")) {String jsonData = line.substring(6).trim();log.debug("AI响应数据: {}", jsonData);if ("[DONE]".equals(jsonData)) {log.info("收到流结束标记");sendCompletionEvent(emitter);  // 发送完成事件break;  // 结束循环}try {processStreamData(emitter, jsonData);} catch (Exception e) {log.error("数据处理失败,终止连接", e);emitter.completeWithError(e);break;}}}}} catch (Exception e) {log.error("SSE处理发生异常", e);throw e;} finally {if (connection != null) connection.disconnect();}}private void processStreamData(SseEmitter emitter, String jsonData) throws Exception {try {Map<String, Object> apiResponse = objectMapper.readValue(jsonData,new TypeReference<Map<String, Object>>() {});List<Map<String, Object>> choices = (List<Map<String, Object>>) apiResponse.get("choices");if (choices == null || choices.isEmpty()) return;Map<String, Object> choice = choices.get(0);Map<String, Object> delta = (Map<String, Object>) choice.get("delta");Map<String, Object> chunk = new LinkedHashMap<>();chunk.put("timestamp", System.currentTimeMillis());chunk.put("messageId", UUID.randomUUID().toString());// 处理思考过程if (delta.containsKey("reasoning_content")) {String reasoning = (String) delta.get("reasoning_content");if (reasoning != null && !reasoning.trim().isEmpty()) {chunk.put("type", "reasoning");chunk.put("content", reasoning);sendChunk(emitter, chunk);}}// 处理正式回答if (delta.containsKey("content")) {String content = (String) delta.get("content");if (content != null) {chunk.put("type", "answer");chunk.put("content", content);sendChunk(emitter, chunk);}}} catch (JsonProcessingException e) {log.error("JSON解析失败 | 原始数据: {} | 错误: {}", jsonData, e.getMessage());throw new IOException("Failed to process stream data", e);} catch (ClassCastException e) {log.error("数据结构类型错误 | 原始数据: {} | 错误: {}", jsonData, e.getMessage());throw new IllegalStateException("Invalid data structure", e);} catch (Exception e) {log.error("处理数据块时发生未知错误 | 原始数据: {}", jsonData, e);throw e;}}private void sendChunk(SseEmitter emitter, Map<String, Object> chunk) throws IOException {String chunkJson = objectMapper.writeValueAsString(chunk);log.debug("发送数据块: {}", chunkJson);SseEmitter.SseEventBuilder event = SseEmitter.event().data(chunkJson).id(UUID.randomUUID().toString()).name("ai-message").reconnectTime(5000L);emitter.send(event);}private void sendCompletionEvent(SseEmitter emitter) {try {Map<String, Object> completionEvent = new LinkedHashMap<>();completionEvent.put("event", "done");completionEvent.put("timestamp", System.currentTimeMillis());completionEvent.put("messageId", UUID.randomUUID().toString());String eventJson = objectMapper.writeValueAsString(completionEvent);emitter.send(SseEmitter.event().data(eventJson).id("COMPLETION_EVENT").name("stream-end").reconnectTime(0L));  // 停止重连log.info("已发送流结束事件");} catch (IOException e) {log.error("发送完成事件失败", e);} finally {emitter.complete();log.info("SSE连接已关闭");}}private HttpURLConnection createConnection() throws Exception {KeydakAiConfigDTO config = getConfig();HttpURLConnection connection = (HttpURLConnection) new URL(config.getUrl()).openConnection();connection.setRequestMethod("POST");connection.setDoOutput(true);connection.setRequestProperty("Content-Type", "application/json");connection.setRequestProperty("Authorization", "Bearer " + config.getKey());connection.setRequestProperty("Accept", "text/event-stream");connection.setConnectTimeout(30_000);connection.setReadTimeout(120_000);return connection;}private void sendRequestData(HttpURLConnection connection, String jsonBody) throws Exception {try (OutputStream os = connection.getOutputStream()) {os.write(jsonBody.getBytes(StandardCharsets.UTF_8));os.flush();}}private void validateResponse(HttpURLConnection connection) throws Exception {if (connection.getResponseCode() != 200) {String errorMsg = readErrorStream(connection);throw new RuntimeException("API请求失败: " + connection.getResponseCode() + " - " + errorMsg);}}private String readErrorStream(HttpURLConnection connection) throws IOException {try (BufferedReader reader = new BufferedReader(new InputStreamReader(connection.getErrorStream(), StandardCharsets.UTF_8))) {StringBuilder response = new StringBuilder();String line;while ((line = reader.readLine()) != null) {response.append(line);}return response.toString();}}private String buildRequestBody(String userMessage, String aiType) throws IOException {KeydakAiConfigDTO config = null;try {config = getConfig();} catch (Exception e) {e.printStackTrace();}Map<String, Object> request = new HashMap<>();request.put("model", config.getModelType());request.put("stream", true);List<Map<String, String>> messages = new ArrayList<>();Map<String, String> message = new HashMap<>();message.put("role", "user");if ("报表".equals(aiType)) {//报表提问词message.put("content", buildPrompt(config.getPrompt(), userMessage));} else {//告警提问词message.put("content", buildPrompt(config.getPromptAlarm(), userMessage));}messages.add(message);request.put("messages", messages);return objectMapper.writeValueAsString(request);}private String buildPrompt(String basePrompt, String userMessage) {return String.format("%s\n%s\n", basePrompt, userMessage);}/*** 查询当前余额** @return 当前余额* @throws IOException 如果请求失败*/@Override@SneakyThrowspublic double getBalance() {HttpURLConnection connection = null;try {KeydakAiConfigDTO config = getConfig();URL url = new URL(config.getBalanceUrl());connection = (HttpURLConnection) url.openConnection();connection.setRequestMethod("GET");connection.setRequestProperty("Authorization", "Bearer " + config.getKey());connection.setConnectTimeout(5000);connection.setReadTimeout(5000);int responseCode = connection.getResponseCode();if (responseCode != 200) {String errorBody = readErrorStream(connection); // 复用已有的错误流读取方法throw new IOException("HTTP Error: " + responseCode + " - " + errorBody);}try (BufferedReader reader = new BufferedReader(new InputStreamReader(connection.getInputStream(), StandardCharsets.UTF_8))) {StringBuilder response = new StringBuilder();String line;while ((line = reader.readLine()) != null) {response.append(line);}JSONObject jsonObject = JSON.parseObject(response.toString());// 以下解析逻辑保持原样if (!jsonObject.containsKey("is_available") || !jsonObject.containsKey("balance_infos")) {throw new IOException("Invalid balance response format");}JSONArray balanceInfos = jsonObject.getJSONArray("balance_infos");if (jsonObject.getBoolean("is_available") && balanceInfos != null && !balanceInfos.isEmpty()) {JSONObject balanceInfo = balanceInfos.getJSONObject(0);if (!balanceInfo.containsKey("total_balance")) {throw new IOException("Missing total_balance field");}return balanceInfo.getDouble("total_balance");} else {throw new IOException("Balance information is not available");}}} finally {if (connection != null) {connection.disconnect();}}}/*** 限流器实现**/private static class RateLimiter {private volatile int capacity;private final AtomicInteger tokens;private volatile long lastRefillTime;private final Object lock = new Object();RateLimiter(int rate) {this.capacity = rate;this.tokens = new AtomicInteger(rate);this.lastRefillTime = System.currentTimeMillis();}public void refill() {synchronized (lock) {long now = System.currentTimeMillis();long elapsed = now - lastRefillTime;if (elapsed >= 1000) {tokens.set(capacity); // 直接重置为最大容量lastRefillTime = now;}}}public boolean tryAcquire() {synchronized (lock) {refill();if (tokens.get() > 0) {tokens.decrementAndGet();return true;}return false;}}public void updateRate(int newRate) {synchronized (lock) {this.capacity = newRate;tokens.set(Math.min(tokens.get(), newRate));lastRefillTime = System.currentTimeMillis();}}}/*** 告警内容构建方法**/@Overridepublic void buildAlarmContent(IPage<AlarmRecord> page,StringBuilder alarmInfo,DateTimeFormatter formatter) {page.getRecords().forEach(record -> {// 时间格式化(使用首次告警时间)String time = Optional.ofNullable(record.getFirstTime()).map(t -> t.format(formatter)).orElse("时间未知");// 设备名称空值处理String device = StringUtils.defaultString(record.getDeviceName(), "未知设备");// 状态/数值处理逻辑String state = resolveStateValue(record);// 告警描述处理String desc = StringUtils.defaultString(record.getContent(), "未知告警类型");// 按规范格式拼接alarmInfo.append(String.format("%s %s %s %s;", time, device, state, desc));});}/*** 状态值解析方法*/private String resolveStateValue(AlarmRecord record) {if (record.getValue() != null) {return record.getValue().stripTrailingZeros().toPlainString();}return record.getStatus() != null ?(record.getStatus() ? "1" : "0") : "状态未知";}/*** 长度校验方法**/@Overridepublic void validatePromptLength(StringBuilder content, int maxLength) {if (content.length() > maxLength) {throw new IllegalArgumentException("告警数据过长,请缩小查询范围");}}}

前端代码:

<div class="modal-area"><form name="formNg" novalidate><div class="modal-header"><h3 class="modal-title" style="color: #FFFFFF">AI分析</h3></div><div class="modal-body"><div class="form"><!-- 加载状态 - 修改为动态效果 --><div ng-if="connectionStatus === 'connecting'" class="loading"><div class="ai-thinking-container"><span>AI思考中</span><div class="ai-typing-indicator"><div class="typing-dot"></div><div class="typing-dot"></div><div class="typing-dot"></div></div></div></div><!-- 思考过程 --><div class="thinking-panel" ng-if="thinkingContent"><div class="thinking-header"><i class="fa fa-brain"></i> 思考过程<!-- 总用时显示(完成后保留) --><span ng-if="thinkingTime">({{thinkingTime}}秒)</span></div><div class="thinking-content"ng-bind-html="thinkingContent"scroll-to-bottom="thinkingContent"></div></div><!-- 正式回答 --><div class="answer-panel" ng-if="answerContent"><div class="answer-header"><i class="fa fa-comment"></i> 以下是AI的分析</div><div class="answer-content"ng-bind-html="answerContent"scroll-to-bottom="answerContent"></div></div><!-- 错误提示 --><div ng-if="connectionStatus === 'error'" class="alert alert-danger"><i class="fa fa-exclamation-triangle"></i> 连接异常,请尝试重新分析</div></div></div><div class="modal-footer"><button ng-click="retry()"class="btn btn-warning"ng-disabled="connectionStatus === 'connecting'"><i class="fa fa-redo"></i> 重新分析</button><button ng-click="cancel()" class="btn btn-danger"><i class="fa fa-times"></i> 关闭</button></div></form>
</div><style>.thinking-header span {margin-left: 5px;font-size: 0.9em;opacity: 0.8;color: #a0c4ff;}.modal-body {height: 6rem; /* 设置固定高度 */overflow-y: auto; /* 内容超出时显示滚动条 */padding: 15px;background: #1B448A; /* 背景改为蓝色 */border-radius: 4px;font-family: 'Consolas', monospace;color: #FFFFFF;}/* 思考过程样式 */.thinking-panel {margin-bottom: 20px;border-left: 3px solid #4a90e2;padding-left: 15px;}.thinking-header {color: #4a90e2;font-size: 16px;margin-bottom: 10px;}.thinking-content {background: rgba(255, 255, 255, 0.05);padding: 12px;border-radius: 4px;color: #e0e0e0;line-height: 1.6;}/* 正式回答样式 */.answer-panel {margin-top: 25px;border-top: 1px solid #00c85333;padding-top: 15px;}.answer-header {color: #00c853;font-size: 16px;margin-bottom: 10px;}.answer-content {background: rgba(255, 255, 255, 0.05);padding: 12px;border-radius: 4px;color: #ffffff;line-height: 1.6;}/* 图标样式 */.fa-brain {color: #4a90e2;margin-right: 8px;}.fa-comment {color: #00c853;margin-right: 8px;}/* 新的加载动画样式 */.loading {color: #FFF;text-align: left;padding: 15px;font-size: 16px;}.ai-thinking-container {display: flex;align-items: center;gap: 8px;}.ai-typing-indicator {display: flex;align-items: center;gap: 4px;height: 20px;}.typing-dot {width: 8px;height: 8px;background-color: #FFFFFF;border-radius: 50%;opacity: 0.4;animation: typing-animation 1.4s infinite ease-in-out;}.typing-dot:nth-child(1) {animation-delay: 0s;}.typing-dot:nth-child(2) {animation-delay: 0.2s;}.typing-dot:nth-child(3) {animation-delay: 0.4s;}@keyframes typing-animation {0%, 60%, 100% {transform: translateY(0);opacity: 0.4;}30% {transform: translateY(-5px);opacity: 1;}}
</style>
// 报表分析
UI.Controllers.controller("AiTipsCtrl", ["$scope", "$sce", "$uibModalInstance", "parent", "SSEService", "$timeout",function($scope, $sce, $uibModalInstance, parent, SSEService, $timeout) {// 状态管理$scope.connectionStatus = 'connecting'; // connecting | connected | error | completed$scope.thinkingContent = null;$scope.answerContent = null;$scope.thinkingTime = null; // 新增:思考时间变量$scope.startTime = null; // 新增:开始时间戳let eventSource = null;let thinkingBuffer = "";let answerBuffer = "";// 自动滚动指令$scope.scrollToBottom = function() {$timeout(() => {const container = document.querySelector('.modal-body');if (container) {container.scrollTop = container.scrollHeight + 120;}}, 50);};// 内容更新方法function processChunkData(data) {if (data.type === 'reasoning') {// 如果是第一条思考内容,记录开始时间if (!thinkingBuffer && !$scope.startTime) {$scope.startTime = new Date().getTime();}thinkingBuffer += data.content;$scope.thinkingContent = $sce.trustAsHtml(thinkingBuffer.replace(/\n/g, '<br/>').replace(/ {2}/g, ' &nbsp;'));// 更新思考时间updateThinkingTime();}else if (data.type === 'answer') {answerBuffer += data.content;$scope.answerContent = $sce.trustAsHtml(answerBuffer.replace(/\n/g, '<br/>').replace(/ {2}/g, ' &nbsp;'));}$scope.scrollToBottom();}function updateThinkingTime() {if ($scope.startTime) {const currentTime = new Date().getTime();$scope.thinkingTime = ((currentTime - $scope.startTime) / 1000).toFixed(1);}}// 初始化SSE连接function initSSE() {const url = '/data/chart/ai/sse?' + $.param(parent.queryParam);eventSource = new EventSource(url);eventSource.onopen = () => {$scope.$apply(() => {$scope.connectionStatus = 'connected';});};// 处理消息事件eventSource.addEventListener('ai-message', e => {$scope.$apply(() => {try {const data = JSON.parse(e.data);processChunkData(data);} catch (err) {console.error('消息解析错误:', err);$scope.answerContent = $sce.trustAsHtml('<div class="text-danger">数据格式错误</div>');}});});// 处理结束事件eventSource.addEventListener('stream-end', () => {$scope.$apply(() => {$scope.connectionStatus = 'completed';//最终更新一次思考时间updateThinkingTime();safeClose();});});// 错误处理eventSource.onerror = (err) => {$scope.$apply(() => {console.error('SSE连接错误:', err);$scope.connectionStatus = 'error';safeClose();});};}// 安全关闭连接function safeClose() {if (eventSource) {eventSource.close();eventSource = null;}}// 重新尝试$scope.retry = () => {safeClose();thinkingBuffer = "";answerBuffer = "";$scope.thinkingContent = null;$scope.answerContent = null;$scope.thinkingTime = null; //重置思考时间$scope.startTime = null; //重置开始时间$scope.connectionStatus = 'connecting';initSSE();};// 关闭模态框$scope.cancel = () => {safeClose();$uibModalInstance.dismiss();};// 初始化initSSE();// 清理$scope.$on('$destroy', () => {safeClose();});}
]);
// 告警分析
UI.Controllers.controller("AiAlarmTipsCtrl", ["$scope", "$sce", "$uibModalInstance", "parent", "SSEService", "$timeout",function($scope, $sce, $uibModalInstance, parent, SSEService, $timeout) {// 状态管理$scope.connectionStatus = 'connecting'; // connecting | connected | error | completed$scope.thinkingContent = null;$scope.answerContent = null;$scope.thinkingTime = null; // 新增:思考时间变量$scope.startTime = null; // 新增:开始时间戳let eventSource = null;let thinkingBuffer = "";let answerBuffer = "";// 自动滚动指令$scope.scrollToBottom = function() {$timeout(() => {const container = document.querySelector('.modal-body');if (container) {container.scrollTop = container.scrollHeight + 120;}}, 50);};// 内容更新方法function processChunkData(data) {if (data.type === 'reasoning') {// 如果是第一条思考内容,记录开始时间if (!thinkingBuffer && !$scope.startTime) {$scope.startTime = new Date().getTime();}thinkingBuffer += data.content;$scope.thinkingContent = $sce.trustAsHtml(thinkingBuffer.replace(/\n/g, '<br/>').replace(/ {2}/g, ' &nbsp;'));// 更新思考时间updateThinkingTime();}else if (data.type === 'answer') {answerBuffer += data.content;$scope.answerContent = $sce.trustAsHtml(answerBuffer.replace(/\n/g, '<br/>').replace(/ {2}/g, ' &nbsp;'));}$scope.scrollToBottom();}// 更新思考时间function updateThinkingTime() {if ($scope.startTime) {const currentTime = new Date().getTime();$scope.thinkingTime = ((currentTime - $scope.startTime) / 1000).toFixed(1);}}// 初始化SSE连接function initSSE() {const url = '/alarm/record/realtime/page/ai/sse?' + $.param(parent.queryParam);eventSource = new EventSource(url);eventSource.onopen = () => {$scope.$apply(() => {$scope.connectionStatus = 'connected';});};// 处理消息事件eventSource.addEventListener('ai-message', e => {$scope.$apply(() => {try {const data = JSON.parse(e.data);processChunkData(data);} catch (err) {console.error('消息解析错误:', err);$scope.answerContent = $sce.trustAsHtml('<div class="text-danger">数据格式错误</div>');}});});// 处理结束事件eventSource.addEventListener('stream-end', () => {$scope.$apply(() => {$scope.connectionStatus = 'completed';// 最终更新一次思考时间updateThinkingTime();safeClose();});});// 错误处理eventSource.onerror = (err) => {$scope.$apply(() => {console.error('SSE连接错误:', err);$scope.connectionStatus = 'error';safeClose();});};}// 安全关闭连接function safeClose() {if (eventSource) {eventSource.close();eventSource = null;}}// 重新尝试$scope.retry = () => {safeClose();thinkingBuffer = "";answerBuffer = "";$scope.thinkingContent = null;$scope.answerContent = null;$scope.thinkingTime = null; // 重置思考时间$scope.startTime = null; // 重置开始时间$scope.connectionStatus = 'connecting';initSSE();};// 关闭模态框$scope.cancel = () => {safeClose();$uibModalInstance.dismiss();};// 初始化initSSE();// 清理$scope.$on('$destroy', () => {safeClose();});}
]);showAiTips: function (resolve) {this.showDialog("Template/AiTips.html", "AiTipsCtrl", resolve, 600);},showAiAlarmTips: function (resolve) {this.showDialog("Template/AiAlarmTips.html", "AiAlarmTipsCtrl", resolve, 600);}

数据库结构:

INSERT INTO `system_global_config`(`tag`, `key`, `value`, `description`) VALUES ('keydak_ai_config', 'balanceUrl', 'https://api.deepseek.com/user/balance', '余额查询');
INSERT INTO `system_global_config`(`tag`, `key`, `value`, `description`) VALUES ('keydak_ai_config', 'enable', 'true', '启用AI报表助手');
INSERT INTO `system_global_config`(`tag`, `key`, `value`, `description`) VALUES ('keydak_ai_config', 'key', '', 'API密钥');
INSERT INTO `system_global_config`(`tag`, `key`, `value`, `description`) VALUES ('keydak_ai_config', 'modelType', 'deepseek-reasoner', 'deepseek模型类型');
INSERT INTO `system_global_config`(`tag`, `key`, `value`, `description`) VALUES ('keydak_ai_config', 'prompt', '', 'AI提问词');
INSERT INTO `system_global_config`(`tag`, `key`, `value`, `description`) VALUES ('keydak_ai_config', 'promptAlarm', '', 'AI提问词');
INSERT INTO `system_global_config`(`tag`, `key`, `value`, `description`) VALUES ('keydak_ai_config', 'rateLimit', '3', '限制每秒多少次请求');
INSERT INTO `system_global_config`(`tag`, `key`, `value`, `description`) VALUES ('keydak_ai_config', 'url', 'https://api.deepseek.com/v1/chat/completions', 'API接口');

实体类(使用AES加密 密钥):

package com.keydak.project.core.chart.ai.dto;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;import javax.crypto.Cipher;
import javax.crypto.spec.SecretKeySpec;
import java.nio.charset.StandardCharsets;
import java.util.*;/*** AI配置信息** @author xyt*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class KeydakAiConfigDTO {private Boolean enable;/*** API_URL地址**/private String url;/*** 查询余额地址**/private String balanceUrl;/*** API密钥**/private String key;/*** 限流次数**/private Integer rateLimit;/*** AI提问词(报表)**/private String prompt;/*** AI提问词(告警)**/private String promptAlarm;/*** 模型类型**/private String modelType;private static final String SALT = ""; // 16 bytes for AES-128private static final String ALGORITHM = "AES/ECB/PKCS5Padding";public void validate() {List<String> missingFields = new ArrayList<>();if (url == null) missingFields.add("API_URL地址");if (balanceUrl == null) missingFields.add("查询余额地址");if (key == null) missingFields.add("API密钥");if (rateLimit == null) missingFields.add("限流次数");if (prompt == null) missingFields.add("AI提问词");if (!missingFields.isEmpty()) {throw new IllegalStateException("参数不能为空: " + String.join(", ", missingFields));}}/*** 判断密钥是否已经加密*/public boolean isEncryptedKey(String key) {try {// 尝试解密,如果能成功解密则认为已经是加密过的decryptKey(key);return true;} catch (Exception e) {return false;}}/*** 加密密钥**/public String encryptKey(String key) throws Exception {SecretKeySpec secretKey = new SecretKeySpec(SALT.getBytes(StandardCharsets.UTF_8), "AES");Cipher cipher = Cipher.getInstance(ALGORITHM);cipher.init(Cipher.ENCRYPT_MODE, secretKey);byte[] encryptedKey = cipher.doFinal(key.getBytes(StandardCharsets.UTF_8));return Base64.getEncoder().encodeToString(encryptedKey);}/*** 解密密钥*/public String decryptKey(String encryptedKey) throws Exception {SecretKeySpec secretKey = new SecretKeySpec(SALT.getBytes(StandardCharsets.UTF_8), "AES");Cipher cipher = Cipher.getInstance(ALGORITHM);cipher.init(Cipher.DECRYPT_MODE, secretKey);byte[] decryptedKey = cipher.doFinal(Base64.getDecoder().decode(encryptedKey));return new String(decryptedKey, StandardCharsets.UTF_8);}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/75805.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

基于Python的招聘推荐数据可视化分析系统

【Python】基于Python的招聘推荐数据可视化分析系统&#xff08;完整系统源码开发笔记详细部署教程&#xff09;✅ 目录 一、项目简介二、项目界面展示三、项目视频展示 一、项目简介 &#x1f680;&#x1f31f; 基于Python的招聘推荐数据可视化分析系统&#xff01;&#x1…

使用注解开发springMVC

引言 在学习过第一个springMVC项目建造过后&#xff0c;让我们直接进入真实开发中所必需的注解开发&#xff0c; 是何等的简洁高效&#xff01;&#xff01; 注&#xff1a;由于Maven可能存在资源过滤的问题&#xff0c;在maven依赖中加入 <build><resources>&l…

linux专题3-----禁止SSH的密码登录

要在linux系统中禁止密码登录&#xff0c;您可以通过修改 SSH 配置来实现。请按照以下步骤操作(此处以 Ubuntu为例)&#xff1a; 1、SSH 登录到您的服务器&#xff08;或直接在命令行模式下&#xff09;。 2、备份 SSH 配置文件&#xff1a; 在终端中运行以下命令以备份现有的…

基于LangChain和通义(Tongyi)实现NL2SQL的智能检索(无需训练)

在数据驱动的时代,如何高效地从数据库中获取信息成为了一个重要的挑战。自然语言到SQL(NL2SQL)技术提供了一种便捷的解决方案,使用户能够用自然语言查询数据库,而无需深入了解SQL语法。本文将探讨如何利用LangChain和通义(Tongyi)实现NL2SQL的智能检索,具体步骤如下: …

深度学习处理文本(10)

保存自定义层 在编写自定义层时&#xff0c;一定要实现get_config()方法&#xff1a;这样我们可以利用config字典将该层重新实例化&#xff0c;这对保存和加载模型很有用。该方法返回一个Python字典&#xff0c;其中包含用于创建该层的构造函数的参数值。所有Keras层都可以被序…

机器视觉3D中激光偏镜的优点

机器视觉的3D应用中,激光偏镜(如偏振片、波片、偏振分束器等)通过其独特的偏振控制能力,显著提升了系统的测量精度、抗干扰能力和适应性。以下是其核心优点: 1. 提升3D成像精度 抑制环境光干扰:偏振片可滤除非偏振的环境杂光(如日光、室内照明),仅保留激光偏振信号,大…

线程同步的学习与应用

1.多线程并发 1).多线程并发引例 #include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <assert.h> #include <pthread.h>int wg0; void *fun(void *arg) {for(int i0;i<1000;i){wg;printf("wg%d\n",wg);} } i…

写.NET可以指定运行SUB MAIN吗?调用任意一个里面的类时,如何先执行某段初始化代码?

VB.NET 写.NET可以指定运行SUB MAIN吗?调用任意一个里面的类时,如何先执行某段初始化代码? 分享 1. 在 VB.NET 中指定运行 Sub Main 在 VB.NET 里&#xff0c;你能够指定 Sub Main 作为程序的入口点。下面为你介绍两种实现方式&#xff1a; 方式一&#xff1a;在项目属性…

【AI插件开发】Notepad++ AI插件开发实践(代码篇):从Dock窗口集成到功能菜单实现

一、引言 上篇文章已经在Notepad的插件开发中集成了选中即问AI的功能&#xff0c;这一篇文章将在此基础上进一步集成&#xff0c;支持AI对话窗口以及常见的代码功能菜单&#xff1a; 显示AI的Dock窗口&#xff0c;可以用自然语言向 AI 提问或要求执行任务选中代码后使用&…

关联容器-模板类pair数对

关联容器 关联容器和顺序容器有着根本的不同:关联容器中的元素是按关键字来保存和访问的,而顺序容器中的元素是按它们在容器中的位置来顺序保存和访问的。 关联容器支持高效的关键字查找和访问。 两个主要的关联容器(associative-container),set和map。 set 中每个元素只包…

京东运维面试题及参考答案

目录 OSPF 实现原理是什么? 请描述 TCP 三次握手的过程。 LVS 的原理是什么? 阐述 Nginx 七层负载均衡的原理。 Nginx 与 Apache 有什么区别? 如何查看监听在 8080 端口的是哪个进程(可举例:netstat -tnlp | grep 8080)? OSI 七层模型是什么,请写出各层的协议。 …

输入框输入数字且保持精度

在项目中如果涉及到金额等需要数字输入且保持精度的情况下&#xff0c;由于输入框是可以随意输入文本的&#xff0c;所以一般情况下可能需要监听输入框的change事件&#xff0c;然后通过正则表达式去替换掉不匹配的文本部分。 由于每次文本改变都会被监听&#xff0c;包括替换…

使用 requests 和 BeautifulSoup 解析淘宝商品

以下将详细解释如何通过这两个库来实现按关键字搜索并解析淘宝商品信息。 一、准备工作 1. 安装必要的库 在开始之前&#xff0c;确保已经安装了 requests 和 BeautifulSoup 库。如果尚未安装&#xff0c;可以通过以下命令进行安装&#xff1a; bash pip install requests…

C#调用ACCESS数据库,解决“Microsoft.ACE.OLEDB.12.0”未注册问题

C#调用ACCESS数据库&#xff0c;解决“Microsoft.ACE.OLEDB.12.0”未注册问题 解决方法&#xff1a; 1.将C#采用的平台从AnyCpu改成X64 2.将官网下载的“Microsoft Access 2010 数据库引擎可再发行程序包AccessDatabaseEngine_X64”文件解压 3.安装解压后的文件 点击下载安…

【文献阅读】Vision-Language Models for Vision Tasks: A Survey

发表于2024年2月 TPAMI 摘要 大多数视觉识别研究在深度神经网络&#xff08;DNN&#xff09;训练中严重依赖标注数据&#xff0c;并且通常为每个单一视觉识别任务训练一个DNN&#xff0c;这导致了一种费力且耗时的视觉识别范式。为应对这两个挑战&#xff0c;视觉语言模型&am…

【Kubernetes】StorageClass 的作用是什么?如何实现动态存储供应?

StorageClass 使得用户能够根据不同的存储需求动态地申请和管理存储资源。 StorageClass 定义了如何创建存储资源&#xff0c;并指定了存储供应的配置&#xff0c;例如存储类型、质量、访问模式等。为动态存储供应提供了基础&#xff0c;使得 Kubernetes 可以在用户创建 PVC 时…

Muduo网络库介绍

1.Reactor介绍 1.回调函数 **回调&#xff08;Callback&#xff09;**是一种编程技术&#xff0c;允许将一个函数作为参数传递给另一个函数&#xff0c;并在适当的时候调用该函数 1.工作原理 定义回调函数 注册回调函数 触发回调 2.优点 异步编程 回调函数允许在事件发生时…

Debian编译安装mysql8.0.41源码包 笔记250401

Debian编译安装mysql8.0.41源码包 以下是在Debian系统上通过编译源码安装MySQL 8.0.41的完整步骤&#xff0c;包含依赖管理、编译参数优化和常见问题处理&#xff1a; 准备工作 1. 安装编译依赖 sudo apt update sudo apt install -y \cmake gcc g make libssl-dev …

Git常用问题收集

gitignore 忽略文件夹 不生效 有时候我们接手别人的项目时&#xff0c;发现有的忽略不对想要修改&#xff0c;但发现修改忽略.gitignore后无效。原因是如果某些文件已经被纳入版本管理在.gitignore中忽略路径是不起作用的&#xff0c;这时候需要先清除本地缓存&#xff0c;然后…

编程哲学——TCP可靠传输

TCP TCP可靠传输 TCP的可靠传输表现在 &#xff08;1&#xff09;建立连接时三次握手&#xff0c;四次挥手 有点像是这样对话&#xff1a; ”我们开始对话吧“ ”收到“ ”好的&#xff0c;我收到你收到了“ &#xff08;2&#xff09;数据传输时ACK应答和超时重传 ”我们去吃…