智能BI(后端)-- 系统异步化

文章目录

  • 系统问题分析
  • 什么是异步化?
  • 业务流程分析
    • 标准异步化的业务流程
    • 系统业务流程
  • 线程池
    • 为什么需要线程池?
    • 线程池两种实现方式
    • 线程池的参数
    • 线程池的开发
  • 项目异步化改造

系统问题分析

问题场景:调用的服务能力有限,或者接口的处理(或返回)时长较长时,就应该考虑异步化了

什么是异步化?

不用等一件事做完,就可以做另外一件事,等第一件事完成时,可以收到一个通知

业务流程分析

标准异步化的业务流程

  1. 当用户要进行耗时很长的操作时,点击提交后,不需要在界面空等,而是应该把这个任务保存到数据库中记录下来
  2. 用户要执行新任务时:
    a. 任务提交成功:
    ⅰ. 若程序存在空闲线程,可以立即执行此任务
    ⅱ. 若所有线程均繁忙,任务将入队列等待处理
    b. 任务提交失败:比如所有线程都在忙碌且任务队列满了
    ⅰ.选择拒绝此任务,不再执行
    ⅱ.通过查阅数据库记录,发现提交失败的任务,并在程序空闲时将这些任务取出执行
  3. 程序(线程)从任务队列中取出任务依次执行,每完成一项任务,就更新任务状态。
  4. 用户可以查询任务的执行状态,或者在任务执行成功或失败时接收通知(例如:发邮件、系统消息提示或短信),从而优化体验
  5. 对于复杂且包含多个环节的任务,在每个小任务完成时,要在程序(数据库中))记录任务的执行状态(进度)。

系统业务流程

  1. 用户点击智能分析页提交按钮时,先把图表立刻保存到数据库中(作为一个任务)
  2. 用户可以在图表管理查看所有图表(已生成的,生成中的,生成失败的)的信息和状态
  3. 用户可以修改生成失败的图表信息,点击重新生成,以尝试再次创建图表
    在这里插入图片描述

问题分析?

  1. 任务队列的最大容量应该设置为多少
  2. 程序怎么从任务队列中取出任务去执行?这个任务队列的流程怎么实现?怎么保证程序最多同时执行多少个任务?

线程池实现

线程池

为什么需要线程池?

  1. 线程的管理比较复杂
  2. 任务存取比较复杂
  3. 线程池可以帮你轻松管理线程,协调任务的执行过程

线程池两种实现方式

  1. Spring中,可以用ThreadPoolTaskExrcutor配合@Async注解来实现(不推荐)
  2. 在Java中,可以使用JUC并发编程包中的ThreadPoolExecutor来实现非常灵活地自定义线程池

线程池的参数

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) {

现状:AI生成能力的并发是只允许4个任务同时去执行,AI能力允许20个任务排队
corePoolSize(核心线程数):正常情况下,我们的系统应该能同时工作的线程数
maximumPoolSize(最大线程数):极限情况下,我们的线程池所拥有的线程
keepAliveTime(空闲线程存活时间):非核心线程在没有任务的情况下,过多久要删除,从而释放无用的线程资源
unit(空闲线程存活时间的单位):分钟,秒
workQueue(工作队列):用于存放给线程执行的任务,存在一个队列的长度(一定要设置)
threadFactory(线程工厂):控制每个线程的生成,线程的属性
RejectedExecutionHandler(拒绝策略):任务队列满的时候,我们采取什么措施

线程池的开发

自定义线程池配置

package com.yupi.springbootinit.config;import org.jetbrains.annotations.NotNull;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;@Configuration
public class ThreadPoolExecutorConfig {@Beanpublic ThreadPoolExecutor threadPoolExecutor(){// 创建一个线程工厂ThreadFactory threadFactory = new ThreadFactory(){// 初始化线程数为 1private int count = 1;// 创建一个新的线程@Override// 每当线程池需要创建新线程时,就会调用newThread方法// @NotNull Runnable r 表示方法参数 r 应该永远不为null,public Thread newThread(@NotNull Runnable r) {Thread thread = new Thread(r);thread.setName("线程" + count ++);return thread;}};// 创建一个新的线程池,线程池核心大小为2,最大线程数为4,// 非核心线程空闲时间为100秒,任务队列为阻塞队列,长度为4,使用自定义的线程工厂创建线ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2,4,100, TimeUnit.SECONDS,new ArrayBlockingQueue<>(4),threadFactory);return threadPoolExecutor;}
}

测试controller层(注意线上环境不要暴露出去)

package com.yupi.springbootinit.controller;import cn.hutool.json.JSONUtil;
import io.netty.handler.codec.serialization.ObjectEncoder;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Profile;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadPoolExecutor;/*** 队列测试controller*/
@RestController
@RequestMapping("/queue")
@Slf4j
@Profile({"dev","local"}) // 只在开发环境和本地环境生效
public class QueueController {@Resourceprivate ThreadPoolExecutor threadPoolExecutor;@GetMapping("/add")// 接收一个参数name,然后将任务添加到线程池中public void add(String name){// 使用CompletableFuture运行一个异步任务CompletableFuture.runAsync(()->{log.info("任务执行中:" + name + "执行人:" + Thread.currentThread().getName());try {// 让线程休眠10分钟,模拟长时间运行的任务Thread.sleep(600000);} catch (InterruptedException e) {throw new RuntimeException(e);} 异步任务在threadPoolExecutor中执行},threadPoolExecutor);}@GetMapping("/get")public String  get(){Map<String, Object> map = new HashMap<>();int size = threadPoolExecutor.getQueue().size();map.put("队列长度",size);long taskCount = threadPoolExecutor.getTaskCount();map.put("任务总数",taskCount);long completedTaskCount = threadPoolExecutor.getCompletedTaskCount();map.put("已完成的任务总数",completedTaskCount);int activeCount = threadPoolExecutor.getActiveCount();map.put("正在工作的线程数",activeCount);return JSONUtil.toJsonStr(map);}
}

项目异步化改造

/*** 智能分析(异步)** @param multipartFile* @param genChartByAiRequest* @param request* @return*/
@PostMapping("/gen/async")
public BaseResponse<BiResponse> genChartByAiAsync(@RequestPart("file") MultipartFile multipartFile,GenChartByAiRequest genChartByAiRequest, HttpServletRequest request) {String name = genChartByAiRequest.getName();String goal = genChartByAiRequest.getGoal();String chartType = genChartByAiRequest.getChartType();// 校验ThrowUtils.throwIf(StringUtils.isBlank(goal), ErrorCode.PARAMS_ERROR, "目标为空");ThrowUtils.throwIf(StringUtils.isNotBlank(name) && name.length() > 100, ErrorCode.PARAMS_ERROR, "名称过长");// 校验文件long size = multipartFile.getSize();String originalFilename = multipartFile.getOriginalFilename();// 校验文件大小final long ONE_MB = 1024 * 1024L;ThrowUtils.throwIf(size > ONE_MB, ErrorCode.PARAMS_ERROR, "文件超过 1M");// 校验文件大小缀 aaa.pngString suffix = FileUtil.getSuffix(originalFilename);final List<String> validFileSuffixList = Arrays.asList("xlsx", "xls");ThrowUtils.throwIf(!validFileSuffixList.contains(suffix), ErrorCode.PARAMS_ERROR, "文件后缀非法");User loginUser = userService.getLoginUser(request);// 限流判断,每个用户一个限流器redisLimiterManager.doRateLimit("genChartByAi_" + loginUser.getId());// 指定一个模型id(把id写死,也可以定义成一个常量)long biModelId = 1659171950288818178L;// 分析需求:// 分析网站用户的增长情况// 原始数据:// 日期,用户数// 1号,10// 2号,20// 3号,30// 构造用户输入StringBuilder userInput = new StringBuilder();userInput.append("分析需求:").append("\n");// 拼接分析目标String userGoal = goal;if (StringUtils.isNotBlank(chartType)) {userGoal += ",请使用" + chartType;}userInput.append(userGoal).append("\n");userInput.append("原始数据:").append("\n");// 压缩后的数据String csvData = ExcelUtils.excelToCsv(multipartFile);userInput.append(csvData).append("\n");// 先把图表保存到数据库中Chart chart = new Chart();chart.setName(name);chart.setGoal(goal);chart.setChartData(csvData);chart.setChartType(chartType);// 插入数据库时,还没生成结束,把生成结果都去掉
//        chart.setGenChart(genChart);
//        chart.setGenResult(genResult);// 设置任务状态为排队中chart.setStatus("wait");chart.setUserId(loginUser.getId());boolean saveResult = chartService.save(chart);ThrowUtils.throwIf(!saveResult, ErrorCode.SYSTEM_ERROR, "图表保存失败");// 在最终的返回结果前提交一个任务// todo 建议处理任务队列满了后,抛异常的情况(因为提交任务报错了,前端会返回异常)CompletableFuture.runAsync(() -> {// 先修改图表任务状态为 “执行中”。等执行成功后,修改为 “已完成”、保存执行结果;执行失败后,状态修改为 “失败”,记录任务失败信息。(为了防止同一个任务被多次执行)Chart updateChart = new Chart();updateChart.setId(chart.getId());// 把任务状态改为执行中updateChart.setStatus("running");boolean b = chartService.updateById(updateChart);// 如果提交失败(一般情况下,更新失败可能意味着你的数据库出问题了)if (!b) {handleChartUpdateError(chart.getId(), "更新图表执行中状态失败");return;}// 调用 AIString result = aiManager.doChat(biModelId, userInput.toString());String[] splits = result.split("【【【【【");if (splits.length < 3) {handleChartUpdateError(chart.getId(), "AI 生成错误");return;}String genChart = splits[1].trim();String genResult = splits[2].trim();// 调用AI得到结果之后,再更新一次Chart updateChartResult = new Chart();updateChartResult.setId(chart.getId());updateChartResult.setGenChart(genChart);updateChartResult.setGenResult(genResult);updateChartResult.setStatus("succeed");boolean updateResult = chartService.updateById(updateChartResult);if (!updateResult) {handleChartUpdateError(chart.getId(), "更新图表成功状态失败");}},threadPoolExecutor);BiResponse biResponse = new BiResponse();
//        biResponse.setGenChart(genChart);
//        biResponse.setGenResult(genResult);biResponse.setChartId(chart.getId());return ResultUtils.success(biResponse);
}
// 上面的接口很多用到异常,直接定义一个工具类
private void handleChartUpdateError(long chartId, String execMessage) {Chart updateChartResult = new Chart();updateChartResult.setId(chartId);updateChartResult.setStatus("failed");updateChartResult.setExecMessage(execMessage);boolean updateResult = chartService.updateById(updateChartResult);if (!updateResult) {log.error("更新图表失败状态失败" + chartId + "," + execMessage);
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/13177.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

离岸公司+外贸

为什么外贸公司老板都喜欢注册离岸公司呢&#xff1f;怎样利用离岸公司做进出口贸易呢&#xff1f; 今天大家花一分钟时间来了解清楚 第一步就是注册一家离岸公司&#xff0c;将这个离岸公司作为国际外贸的中转站&#xff0c;与国外客户签订单&#xff0c;你从国内工厂采购商…

【文档理解】TextMonkey:一种OCR-Free的用于文档理解的多模态大模型

背景 传统的信息提取&#xff0c;通常是从文本中提取信息&#xff0c;相关技术也比较成熟。然而对于复杂领域&#xff0c;例如图片&#xff0c;文档等形式的数据&#xff0c;想要提取出高质量的、可信的数据难度就比较大了&#xff0c;这种任务也常称为&#xff1a;视觉文档理…

CTF网络安全大赛web题目:just_sqli

这道题目是bugku的web题目 题目的 描  述: KosenCTF{} 原文链接&#xff1a; CTF网络安全大赛web题目&#xff1a;just_sqli - 红客网-网络安全与渗透技术 题目Web源代码&#xff1a; <?php$user NULL; $is_admin 0;if (isset($_GET["source"])) {highlig…

齐护K210系列教程(二十七)_语音识别

语音识别 1.烧录固件和模型2.语音识别程序2.1训练并识别2.2使用本地文件语音识别 3.课程资源联系我们 1.烧录固件和模型 注&#xff1a;本应用只适用于有麦克风功能的型号&#xff1a;AIstart_pro、AIstart_掌机、AIstart_Mini, 其它型号不支持&#xff01; 机器码生成以及模…

linux中远程服务器上传输文件的10个sftp命令示例

目录 1. 如何连接到 SFTP 2. 帮助 3.检查当前工作目录 4. 使用 sftp 列出文件 远程 本地 5. 使用 sftp 上传文件 6. 使用 sftp 上传多个文件 7. 使用 sftp 下载文件 8. 在 sftp 中切换目录 远程 本地 9. 使用 sftp 创建目录 10. 使用 sftp 删除目录 11. 退出 sf…

(001)apidoc 的安装

安装 1.确定 node 和 npm 的匹配版本 node -vv10.14.1# 切换node 版本 nvm list nvm use 20.12.22.安装 apidoc。 npm install -g apidoc3.生成文档&#xff1a; apidoc -i ../ -o document/ -f ".java$"-i &#xff1a;指定扫描路径。-o&#xff1a;输出目录。…

golang并发(同步)多任务高性能执行聚合

taskgroup golang并发执行多任务&#xff0c;并聚合多任务结果。 使用文档、 项目github 使用: go get github.com/mlee-msl/taskgroup 功能特点 并发安全的执行多个任务将多个任务的结果进行聚合通过扇出/扇入模式&#xff0c;结合线程安全channel实现高效协程间通信多任务复…

【Linux:环境变量】

环境变量一般是指在操作系统中用来指定操作系统环境的一些参数 常见的环境变量&#xff1a; PATH 指定可执行程序的搜索路径 系统级的文件&#xff1a;/etc/bashrc 用户级文件&#xff1a;~/.bashrc ~/.bash_profile HOME 指定用户的主要工作目录&#xff08;当前用…

kettle从入门到精通 第六十一课 ETL之kettle 任务调度器,轻松使用xxl-job调用kettle中的job和trans

想真正学习或者提升自己的ETL领域知识的朋友欢迎进群&#xff0c;一起学习&#xff0c;共同进步。若二维码失效&#xff0c;公众号后台加我微信入群&#xff0c;备注kettle。 1、大家都知道kettle设计的job流程文件有个缺点&#xff1a;只能设置简单的定时任务&#xff0c;无法…

DPDK:用rte_wmb()来保序,对ARM和IA而言,RTE_WMB()的实现有何不同

rte_wmb()函数在DPDK中用于实现写入屏障&#xff08;Write Memory Barrier&#xff09;&#xff0c;它的作用是确保在CPU执行写操作之前&#xff0c;所有先前的写操作已经被完全刷新到内存中。这个函数在IA和ARM处理器上的实现有一些不同。 对于Intel Architecture (IA)处理器而…

PHP黑魔法之既是0又是1/switch/$a==0可用.绕过(非数字都可绕过)/PHP://伪协议绕过

1、既是0又是1的情况 $a==1 & $test[$a]=t 时 知识点1)php在处理数字时,如果数字的位数超过 16 位是可以弱等于1的,也就是 var_dump( 9999999999999999999 == 1 );//true 因为当数字位数超过 16 位时,是将该数字转换成了数值为 1 的字符串进行处理 知识点2)在科学…

LabVIEW和usrp连接实现ofdm通信系统 如何实现

1. 硬件准备 USRP设备&#xff1a;选择合适的USRP硬件&#xff08;如USRP B210或N210&#xff09;&#xff0c;并确保其与计算机连接&#xff08;通常通过USB或以太网&#xff09;。天线&#xff1a;根据频段需求选择合适的天线。 2. 软件安装 LabVIEW&#xff1a;安装LabVI…

【Golang】 Golang 的 GORM 库中的 Rows 函数

文章目录 前言一、Rows 函数解释二、代码实现三、总结 前言 在使用 Go 语言进行数据库操作时&#xff0c;GORM&#xff08;Go Object-Relational Mapping&#xff09;库是一个常用的工具。它提供了一种简洁和强大的方式来处理数据库操作。本文将介绍 GORM 库中的 Rows 函数&am…

数据库-索引(高级篇)

文章目录 索引概念&#xff1f;索引演示&#xff1f;索引的优劣&#xff1f;为什么使用索引就快&#xff1f;本篇小结 更多相关内容可查看 索引概念&#xff1f; 索引&#xff08;index&#xff09;是帮助MySQL高效获取数据的数据结构(有序)。在数据之外&#xff0c;数据库系统…

生成完美口型同步的 AI 数字人视频

目录 摘要 关键词 1 前言 1.1 研究背景 1.2 研究意义 2 技术框架 2.1 深度学习框架 2.2 语音识别 2.3 面部动作捕捉和口型同步 2.4 综合项目 3 实现过程 3.1 环境搭建 3.2 代码开发 3.3 整合代码 3.4 部署 3.5 更多细节 4 测试过程 4.1 数据准备 4.2 面部检测…

语法分析-文法

如果对于一部文法中&#xff0c;存在至少一个句子有两个或者两个以上的语法树则该文法是二义性的。 我们可以以上面的例子进行解释&#xff0c;对于第棵个语法树&#xff0c;我们可以看到是先进行了加法运算再进行的乘法运算&#xff0c;因为需要先把EE作为整体运算完后再成为E…

上海亚商投顾:沪指低开低走 两市成交额跌破8000亿

上海亚商投顾前言&#xff1a;无惧大盘涨跌&#xff0c;解密龙虎榜资金&#xff0c;跟踪一线游资和机构资金动向&#xff0c;识别短期热点和强势个股。 一.市场情绪 市场全天震荡走低&#xff0c;三大股指尾盘均跌近1%。地产股逆势走强&#xff0c;光大嘉宝、天地源、云南城投…

幻兽帕鲁Palworld服务器手动+docker部署方法+备份迁移

目录 帕鲁部署官方文档帕鲁手动安装法手动安装steamcmd通过steamcmd安装帕鲁后端 docker容器一键部署幻兽帕鲁绿联云NAS机器部署幻兽帕鲁客户端连接附录1&#xff1a;PalServer.sh的启动项附录2&#xff1a;配置文件游戏存档保存和迁移 关于阿里云计算巢 帕鲁部署官方文档 htt…

学习MySQL(五):窗口函数

窗口函数介绍 窗口函数的引入是为了解决想要既显示聚集前的数据&#xff0c;又要显示聚集后的数据&#xff1b;窗口数对一组值进行操作&#xff0c;不需要使用GROUP BY子句对数据进行分组&#xff0c;能够在同一行中同时返回基础行的列和聚合列。 强调&#xff1a;使用MySQL …

​学者观察 | 从区块链应用创新看长安链发展——CCF区块链专委会荣誉主任斯雪明

导语 2024年1月27日&#xff0c;斯雪明教授在长安链发布三周年庆暨生态年会上发表演讲&#xff0c;认为在区块链发展过程中&#xff0c;不仅需要技术创新&#xff0c;同时需要有价值、有特色、有示范意义的应用创新。斯雪明教授介绍了国内区块链技术与应用发展的现状、趋势与挑…