RabbitMQ项目实战(二)

文章目录

  • 项目改造
    • 实现步骤

项目改造

以前把任务提交到线程池,然后在线程池提交中编写处理程序的代码,线程池内排队。
如果程序中断了,任务就没了,就丢了。
改造后的流程:

  1. 把任务提交改为向队列发送消息
  2. 写一个专门接收消息的程序,处理任务
  3. 如果程序中断了,消息未被确认,还会重发
  4. 现在,消息全部集中发送到消息队列,你可以部署多个后端,都从同一个地方取任务,从而实现了分布式负载均衡

实现步骤

1)创建交换机和队列
2)将线程池中的执行代码移到消费者类中
3)根据消费者的需求来确认消息的格式(chartId)
4)将提交线程池改造为发送消息到队列
注意:如果程序中断了,没有ack,也没有nack(服务中断,没有任何响应),那么这条消息会被重新放到消息队列中,从而实现了每个任务都会执行
记得在项目启动前创建队列和交换机

package com.yupi.springbootinit.bimq;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;/*** 用于创建测试程序用到的交换机和队列(只用在程序启动前执行一次)*/
public class BiMqInitMain {public static void main(String[] args) {try{ConnectionFactory factory  = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.exchangeDeclare(BiMqConstant.BI_EXCHANGE_NAME,"direct");String queueName = BiMqConstant.BI_QUEUE_NAME;channel.queueDeclare(queueName,true,false,false,null);channel.queueBind(queueName,BiMqConstant.BI_EXCHANGE_NAME,BiMqConstant.BI_ROUTING_KEY);}catch (Exception e){}}
}

生产者:

package com.yupi.springbootinit.bimq;import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;import javax.annotation.Resource;@Component
public class BiMessageProducer {@Resourceprivate RabbitTemplate rabbitTemplate;public void sendMessage(String message){// 使用rabbitTemplate的convertAndSend方法将消息发送到指定的交换机和路由键rabbitTemplate.convertAndSend(BiMqConstant.BI_EXCHANGE_NAME,BiMqConstant.BI_ROUTING_KEY,message);}
}

消费者:

package com.yupi.springbootinit.bimq;import cn.hutool.core.text.StrBuilder;
import com.rabbitmq.client.Channel;
import com.yupi.springbootinit.common.ErrorCode;
import com.yupi.springbootinit.constant.CommonConstant;
import com.yupi.springbootinit.exception.BusinessException;
import com.yupi.springbootinit.manager.AiManager;
import com.yupi.springbootinit.model.entity.Chart;
import com.yupi.springbootinit.service.ChartService;
import com.yupi.springbootinit.utils.ExcelUtils;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;import javax.annotation.RegEx;
import javax.annotation.Resource;@Component
@Slf4j
public class BiMessageConsumer {@Resourceprivate ChartService chartService;@Resourceprivate AiManager aiManager;//使用@SneakyThrows注解简化异常处理//使得你可以在不声明抛出异常的方法中抛出受检异常,而无需捕获它们。这在一些特定情况下可能会很有用,但通常不建议频繁使用,因为它可能会破坏代码的可读性和健壮性。@SneakyThrows//使用@RabbitListener注解指定要监听的队列名称为"code_queue",并设置消息的确认机制为手动确认@RabbitListener(queues = {BiMqConstant.BI_QUEUE_NAME},ackMode = "MANUAL")// // 在RabbitMQ中,每条消息都会被分配一个唯一的投递标签,用于标识该消息在通道中的投递状态和顺序。通过使用@Header(AmqpHeaders.DELIVERY_TAG)注解,可以从消息头中提取出该投递标签,并将其赋值给long deliveryTag参数。public void reciveMessage(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliverttag){log.info("receive message = {}" + message);if (StringUtils.isBlank(message)) {//拒绝消息channel.basicNack(deliverttag,false,false);throw new BusinessException(ErrorCode.SYSTEM_ERROR,"消息为空");}long chartId = Long.parseLong(message);Chart chart = chartService.getById(chartId);if(chart == null){channel.basicNack(deliverttag,false,false);throw new BusinessException(ErrorCode.NOT_FOUND_ERROR,"图表为空");}Chart updateChart = new Chart();updateChart.setId(chart.getId());updateChart.setStatus("running");boolean b = chartService.updateById(updateChart);if(!b){handleChartUpdateError(chart.getId(),"更新图表执行中状态失败");return;}String result = aiManager.doChat(CommonConstant.BI_MODEL_ID, getUserInput(chart));String[] splits = result.split("【【【【【");if(splits.length < 3){handleChartUpdateError(chart.getId(),"AI生成错误");return;}String genChart = splits[1];String genResult = splits[2];Chart updateChart2 = new Chart();updateChart2.setId(chart.getId());updateChart2.setStatus("succeed");updateChart2.setGenChart(genChart);updateChart2.setGenResult(genResult);boolean b1 = chartService.updateById(updateChart2);if(!b1){handleChartUpdateError(chart.getId(),"更新图表成功状态失败");return;}//确认消息channel.basicAck(deliverttag,false);}/*** 根据chart获取用户的输入* @param chart* @param*/public String getUserInput(Chart chart){String goal = chart.getGoal();String chartType = chart.getChartType();String CSVData = chart.getChartData();StrBuilder userInput = new StrBuilder();userInput.append("分析需求:").append("\n");String userGoal = goal;if(StringUtils.isNotBlank(chartType)){//指定了图表类型,就在目标上拼接请使用,图表类型userGoal += "请使用,"  + chartType;}userInput.append(CSVData).append("\n");return userInput.toString();}public void handleChartUpdateError(long chartId,String execMessage){Chart chart = new Chart();chart.setId(chartId);chart.setStatus("failed");chart.setExecMessage(execMessage);boolean b = chartService.updateById(chart);if(!b){log.error("更新图表失败状态失败" + chartId + "," + execMessage);}}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/825852.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Android开发——ListView

activity_main.xml <?xml version"1.0" encoding"utf-8"?> <LinearLayout xmlns:android"http://schemas.android.com/apk/res/android"android:layout_height"match_parent"android:layout_width"match_parent"…

SpringBoot使用maven指定依赖包的版本

目录 1. 解决示例2. 完整pom示例 前言&#xff1a;我们在使用A依赖的时候&#xff0c;这个依赖有引入了第三方B依赖&#xff0c;这时候我想指定B依赖的版本号 1. 解决示例 我想将 mysql、logback、tomcat 的版本升级到指定版本 只需在 pom.xml 文件的 properties 里面添加如下…

3D模型格式转换工具HOOPS Exchange:3D CAD数据的快速导入与导出

在当今的工程设计领域中&#xff0c;快速且可靠地处理3D CAD数据是至关重要的。HOOPS Exchange SDK通过提供一组C软件库&#xff0c;为开发团队提供了实现这一目标的有效工具。 什么是HOOPS Exchange&#xff1f; HOOPS Exchange是一组C软件库&#xff0c;旨在为开发团队提供…

已解决 数据库异常 SQLSyntaxErrorException:FUNCTION count does not exist.

今天排查问题看到数据库报错 Caused by: java.sql.SQLSyntaxErrorException: FUNCTION your_table_name(你的数据库名字).count does not exist. Check the Function Name Parsing and Resolution section in the Reference Manual然后查了很久才找到问题 在COUNT和括号之间有…

YOLOv1精读笔记

YOLO系列 摘要1. 将目标检测视为一个回归问题2. 定位准确率不如 SOTA&#xff0c;但背景错误率更低3. 泛化能力强 1.引言1.1 YOLO 速度很快1.2 全局推理 2. Unified Detection2.1 网络设计2.2 训练YOLOv1模型损失函数的选择和其潜在的问题YOLOv1模型如何改进其损失函数来更好地…

反射应用简单案例

day40 反射应用 案例 1.万能数组扩容 设置泛型的copyof仅支持引用数据类型&#xff0c;即任意类型&#xff0c;直接new数组不行&#xff0c;利用反射实现扩容&#xff1b; 打印调用toString也进行编写&#xff0c;利用StringBuffer或者StringBiulder进行字符串拼接 public c…

华为OD-C卷-伐木工[200分]Python3-100%

题目描述 一根X米长的树木,伐木工切割成不同长度的木材后进行交易,交易价格为每根木头长度的乘积。 规定切割后的每根木头长度都为正整数;也可以不切割,直接拿整根树木进行交易。 请问伐木工如何尽量少的切割,才能使收益最大化? 输入描述 木材的长度(X ≤ 50) 输出描…

tomcat部署两个相同端口前端,一个后端(Windows)

tomcat/webapps下 互联网端的包为ROOT和XXX&#xff08;XXX为项目名&#xff09; 一体机端只有一个前端包ABC 把ROOT下的所有文件剪切到XXX下的WEB-INF下classes下创建一个文件夹名叫static 把新的一体机的前端ABC的文件放到ROOT下即可 注意&#xff1a;ABC和static下的ind…

【史上最全】带你全方位了解containerd 的几种插件扩展模式

除了 snapshotter&#xff0c;containerd 的扩展机制你还了解哪些&#xff1f; 本文内容节选自 《containerd 原理剖析与实战》&#xff0c;本书正参加限时优惠内购&#xff0c;限时 69.9 元购买。 进入正题之前先看一下 containerd 的整体架构 1. containerd 架构 图 contain…

List<int[]>[] g = new ArrayList[n];

在Java中&#xff0c;List<int[]>[] g new ArrayList[n]; 这行代码定义了一个数组 g&#xff0c;该数组的每个元素都是一个 ArrayList<int[]> 类型的对象。这里&#xff0c;n 是预期图中顶点的数量&#xff0c;因此 g 数组的长度是 n。 List<int[]>&#x…

UE4_常见动画节点学习_Two Bone IK双骨骼IK

学习资料&#xff0c;仅供参考&#xff01; Two Bone IK 控制器将逆运动&#xff08;IK&#xff09;解算器应用于到如角色四肢等3关节链。 变量&#xff08; HandIKWeight &#xff09;被用于在角色的 hand_l 和 hand_r 控制器上驱动 关节目标位置&#xff08;Joint Target Lo…

Elasticsearch克隆索引

我所使用的Elasticsearch的版本是基于7.17.7。 需求是将某个ES的索引进行克隆。例如我要将索引test_0419_1克隆一份新的索引test_0419_2。步骤如下&#xff1a; 首先将源索引进行修改PUT /test_0419_1/_block/write&#xff0c;即禁止对这个索引进行写数据操作。然后执行克隆…

Navicat连接postgresql时出现‘datlastsysoid does not exist‘报错的问题

连接报错 解决方案 解决方法1&#xff1a;升级navicat 解决方法2&#xff1a;降级pgsql 解决方法3&#xff1a;修改dll 使用3解决 实操演示 1、 打开 Navicat 安装目录&#xff0c;找到libcc.dll文件 2、备份libcc.dll文件&#xff0c;将其复制并粘贴或者修改副本为任何其他名…

java知识点 --- 类

在 Java 中&#xff0c;类&#xff08;Class&#xff09;是一个核心的面向对象编程概念&#xff0c;它扮演着构建软件组件的基本单元的角色。类是定义对象的蓝图或模板&#xff0c;它包含了创建特定类型对象所需的数据&#xff08;称为成员变量或属性&#xff09;和行为&#x…

前摇和后摇

在许多动作游戏和竞技游戏中&#xff0c;特别是那些包含战斗机制的游戏&#xff0c;“前摇”&#xff08;Pre-cast delay&#xff09;和"后摇"&#xff08;Post-cast delay&#xff09;是描述特定动作或技能释放过程中的两个关键阶段。这些概念主要用于增加游戏的战术…

【C++杂项】cin的详细用法

cin详细用法 1. cin简介2. cin的常用读取方法2.1 cin>>的用法2.2 cin.get的用法2.3 cin.getline的用法 3. cin清空输入缓冲区4. 其它方法4.1 getline()读取一行 1. cin简介 cin是C中的标准输入流对象&#xff0c;即istream类的对象。cin主要用于从标准输入读取数据&…

ORACLE中impdp导入多个文件的方法

1、多个文件导入dumpfileexpdp_full_qytcdb_20240412_%U.dmp&#xff0c;%U代表01、02、03这些 impdp sys/123456192.168.1.200:1521/Oracle directoryjy dumpfileexpdp_full_20250412_%U.dmp logfileimpdp_$(date %Y%m%d).log table_exists_actionskip fully parallel8; 2、…

DNS服务器的管理与配置

目录 一、相关知识 域名空间 DNS服务器分类 域名解析过程 资源记录 二、安装DNS服务 安装bind软件包 DNS服务的启动与停止 配置主要名称服务器 主配置文件 从例子学起&#xff1a; &#xff08;1&#xff09;建立主配置文件named.conf &#xff08;2&#xff09;…

Windows10安装Docker Desktop(大妈看了都会)

目录 Windows10安装Docker Desktop&#xff08;大妈看了都会&#xff09; 1.前言 1.1 为什么要在Windows10上安装Docker 1.2 Docker Desktop介绍 2.下载Docker Desktop 3.启用Hyper-V以在 Windows 10上创建虚拟机 4.安装Docker Desktop 5.运行Docker Desktop 6.Docker…

阿里云图片处理之 缩放

文档 : https://help.aliyun.com/zh/oss/user-guide/resize-images-4?spma2c4g.11186623.0.0.61cd2759v4jkhX 需求 : 图片过大, 导致加载过慢, 需对图片进行压缩 <image :src"imgUrl ?x-oss-processimage/resize,h_700,m_lfit"></image>Ps : 题外话…