RocketMQ与SpringBoot实际项目中使用

消息生产者

1)添加依赖
<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.0.1.RELEASE</version>
</parent><properties><rocketmq-spring-boot-starter-version>2.0.3</rocketmq-spring-boot-starter-version>
</properties><dependencies><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>${rocketmq-spring-boot-starter-version}</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.6</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency></dependencies>
2)配置文件
# application.properties
rocketmq.name-server=192.168.25.135:9876;192.168.25.138:9876
rocketmq.producer.group=my-group
3)测试类
@RunWith(SpringRunner.class)
@SpringBootTest(classes = {MQSpringBootApplication.class})
public class ProducerTest {@Autowiredprivate RocketMQTemplate rocketMQTemplate;@Testpublic void test1(){rocketMQTemplate.convertAndSend("springboot-mq","hello springboot rocketmq");}
}

消息消费者

1)添加依赖

同消息生产者

2)配置文件

同消息生产者

4)消息监听器
@Slf4j
@Component
@RocketMQMessageListener(topic = "springboot-mq",consumerGroup = "springboot-mq-consumer-1")
public class Consumer implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {log.info("Receive message:"+message);}
}

失败补偿机制

1 消息发送方

消息发送方

  • 配置RocketMQ属性值
rocketmq.name-server=192.168.25.135:9876;192.168.25.138:9876
rocketmq.producer.group=orderProducerGroupmq.order.consumer.group.name=order_orderTopic_cancel_group
mq.order.topic=orderTopic
mq.order.tag.confirm=order_confirm
mq.order.tag.cancel=order_cancel
  • 注入模板类和属性值信息
 @Autowiredprivate RocketMQTemplate rocketMQTemplate;@Value("${mq.order.topic}")private String topic;@Value("${mq.order.tag.cancel}")private String cancelTag;
  • 发送下单失败消息
@Override
public Result confirmOrder(TradeOrder order) {//1.校验订单//2.生成预订try {//3.扣减库存//4.扣减优惠券//5.使用余额//6.确认订单} catch (Exception e) {//确认订单失败,发送消息CancelOrderMQ cancelOrderMQ = new CancelOrderMQ();cancelOrderMQ.setOrderId(order.getOrderId());cancelOrderMQ.setCouponId(order.getCouponId());cancelOrderMQ.setGoodsId(order.getGoodsId());cancelOrderMQ.setGoodsNumber(order.getGoodsNumber());cancelOrderMQ.setUserId(order.getUserId());cancelOrderMQ.setUserMoney(order.getMoneyPaid());try {sendMessage(topic, cancelTag, cancelOrderMQ.getOrderId().toString(), JSON.toJSONString(cancelOrderMQ));} catch (Exception e1) {e1.printStackTrace();CastException.cast(ShopCode.SHOP_MQ_SEND_MESSAGE_FAIL);}return new Result(ShopCode.SHOP_FAIL.getSuccess(), ShopCode.SHOP_FAIL.getMessage());}
}
private void sendMessage(String topic, String tags, String keys, String body) throws Exception {//判断Topic是否为空if (StringUtils.isEmpty(topic)) {CastException.cast(ShopCode.SHOP_MQ_TOPIC_IS_EMPTY);}//判断消息内容是否为空if (StringUtils.isEmpty(body)) {CastException.cast(ShopCode.SHOP_MQ_MESSAGE_BODY_IS_EMPTY);}//消息体Message message = new Message(topic, tags, keys, body.getBytes());//发送消息rocketMQTemplate.getProducer().send(message);
}

2 消费接收方

  • 配置RocketMQ属性值
rocketmq.name-server=192.168.25.135:9876;192.168.25.138:9876
mq.order.consumer.group.name=order_orderTopic_cancel_group
mq.order.topic=orderTopic
  • 创建监听类,消费消息
@Slf4j
@Component
@RocketMQMessageListener(topic = "${mq.order.topic}", consumerGroup = "${mq.order.consumer.group.name}",messageModel = MessageModel.BROADCASTING)
public class CancelOrderConsumer implements RocketMQListener<MessageExt>{@Overridepublic void onMessage(MessageExt messageExt) {...}
}
2.1)回退库存
  • 消息消费者
@Slf4j
@Component
@RocketMQMessageListener(topic = "${mq.order.topic}",consumerGroup = "${mq.order.consumer.group.name}",messageModel = MessageModel.BROADCASTING )
public class CancelMQListener implements RocketMQListener<MessageExt>{@Value("${mq.order.consumer.group.name}")private String groupName;@Autowiredprivate TradeGoodsMapper goodsMapper;@Autowiredprivate TradeMqConsumerLogMapper mqConsumerLogMapper;@Autowiredprivate TradeGoodsNumberLogMapper goodsNumberLogMapper;@Overridepublic void onMessage(MessageExt messageExt) {String msgId=null;String tags=null;String keys=null;String body=null;try {//1. 解析消息内容msgId = messageExt.getMsgId();tags= messageExt.getTags();keys= messageExt.getKeys();body= new String(messageExt.getBody(),"UTF-8");log.info("接受消息成功");//2. 查询消息消费记录TradeMqConsumerLogKey primaryKey = new TradeMqConsumerLogKey();primaryKey.setMsgTag(tags);primaryKey.setMsgKey(keys);primaryKey.setGroupName(groupName);TradeMqConsumerLog mqConsumerLog = mqConsumerLogMapper.selectByPrimaryKey(primaryKey);if(mqConsumerLog!=null){//3. 判断如果消费过...//3.1 获得消息处理状态Integer status = mqConsumerLog.getConsumerStatus();//处理过...返回if(ShopCode.SHOP_MQ_MESSAGE_STATUS_SUCCESS.getCode().intValue()==status.intValue()){log.info("消息:"+msgId+",已经处理过");return;}//正在处理...返回if(ShopCode.SHOP_MQ_MESSAGE_STATUS_PROCESSING.getCode().intValue()==status.intValue()){log.info("消息:"+msgId+",正在处理");return;}//处理失败if(ShopCode.SHOP_MQ_MESSAGE_STATUS_FAIL.getCode().intValue()==status.intValue()){//获得消息处理次数Integer times = mqConsumerLog.getConsumerTimes();if(times>3){log.info("消息:"+msgId+",消息处理超过3次,不能再进行处理了");return;}mqConsumerLog.setConsumerStatus(ShopCode.SHOP_MQ_MESSAGE_STATUS_PROCESSING.getCode());//使用数据库乐观锁更新TradeMqConsumerLogExample example = new TradeMqConsumerLogExample();TradeMqConsumerLogExample.Criteria criteria = example.createCriteria();criteria.andMsgTagEqualTo(mqConsumerLog.getMsgTag());criteria.andMsgKeyEqualTo(mqConsumerLog.getMsgKey());criteria.andGroupNameEqualTo(groupName);criteria.andConsumerTimesEqualTo(mqConsumerLog.getConsumerTimes());int r = mqConsumerLogMapper.updateByExampleSelective(mqConsumerLog, example);if(r<=0){//未修改成功,其他线程并发修改log.info("并发修改,稍后处理");}}}else{//4. 判断如果没有消费过...mqConsumerLog = new TradeMqConsumerLog();mqConsumerLog.setMsgTag(tags);mqConsumerLog.setMsgKey(keys);mqConsumerLog.setConsumerStatus(ShopCode.SHOP_MQ_MESSAGE_STATUS_PROCESSING.getCode());mqConsumerLog.setMsgBody(body);mqConsumerLog.setMsgId(msgId);mqConsumerLog.setConsumerTimes(0);//将消息处理信息添加到数据库mqConsumerLogMapper.insert(mqConsumerLog);}//5. 回退库存MQEntity mqEntity = JSON.parseObject(body, MQEntity.class);Long goodsId = mqEntity.getGoodsId();TradeGoods goods = goodsMapper.selectByPrimaryKey(goodsId);goods.setGoodsNumber(goods.getGoodsNumber()+mqEntity.getGoodsNum());goodsMapper.updateByPrimaryKey(goods);//记录库存操作日志TradeGoodsNumberLog goodsNumberLog = new TradeGoodsNumberLog();goodsNumberLog.setOrderId(mqEntity.getOrderId());goodsNumberLog.setGoodsId(goodsId);goodsNumberLog.setGoodsNumber(mqEntity.getGoodsNum());goodsNumberLog.setLogTime(new Date());goodsNumberLogMapper.insert(goodsNumberLog);//6. 将消息的处理状态改为成功mqConsumerLog.setConsumerStatus(ShopCode.SHOP_MQ_MESSAGE_STATUS_SUCCESS.getCode());mqConsumerLog.setConsumerTimestamp(new Date());mqConsumerLogMapper.updateByPrimaryKey(mqConsumerLog);log.info("回退库存成功");} catch (Exception e) {e.printStackTrace();TradeMqConsumerLogKey primaryKey = new TradeMqConsumerLogKey();primaryKey.setMsgTag(tags);primaryKey.setMsgKey(keys);primaryKey.setGroupName(groupName);TradeMqConsumerLog mqConsumerLog = mqConsumerLogMapper.selectByPrimaryKey(primaryKey);if(mqConsumerLog==null){//数据库未有记录mqConsumerLog = new TradeMqConsumerLog();mqConsumerLog.setMsgTag(tags);mqConsumerLog.setMsgKey(keys);mqConsumerLog.setConsumerStatus(ShopCode.SHOP_MQ_MESSAGE_STATUS_FAIL.getCode());mqConsumerLog.setMsgBody(body);mqConsumerLog.setMsgId(msgId);mqConsumerLog.setConsumerTimes(1);mqConsumerLogMapper.insert(mqConsumerLog);}else{mqConsumerLog.setConsumerTimes(mqConsumerLog.getConsumerTimes()+1);mqConsumerLogMapper.updateByPrimaryKeySelective(mqConsumerLog);}}}
}
2.2)回退优惠券
@Slf4j
@Component
@RocketMQMessageListener(topic = "${mq.order.topic}",consumerGroup = "${mq.order.consumer.group.name}",messageModel = MessageModel.BROADCASTING )
public class CancelMQListener implements RocketMQListener<MessageExt>{@Autowiredprivate TradeCouponMapper couponMapper;@Overridepublic void onMessage(MessageExt message) {try {//1. 解析消息内容String body = new String(message.getBody(), "UTF-8");MQEntity mqEntity = JSON.parseObject(body, MQEntity.class);log.info("接收到消息");//2. 查询优惠券信息TradeCoupon coupon = couponMapper.selectByPrimaryKey(mqEntity.getCouponId());//3.更改优惠券状态coupon.setUsedTime(null);coupon.setIsUsed(ShopCode.SHOP_COUPON_UNUSED.getCode());coupon.setOrderId(null);couponMapper.updateByPrimaryKey(coupon);log.info("回退优惠券成功");} catch (UnsupportedEncodingException e) {e.printStackTrace();log.error("回退优惠券失败");}}
}
2.3)回退余额
@Slf4j
@Component
@RocketMQMessageListener(topic = "${mq.order.topic}",consumerGroup = "${mq.order.consumer.group.name}",messageModel = MessageModel.BROADCASTING )
public class CancelMQListener implements RocketMQListener<MessageExt>{@Autowiredprivate IUserService userService;@Overridepublic void onMessage(MessageExt messageExt) {try {//1.解析消息String body = new String(messageExt.getBody(), "UTF-8");MQEntity mqEntity = JSON.parseObject(body, MQEntity.class);log.info("接收到消息");if(mqEntity.getUserMoney()!=null && mqEntity.getUserMoney().compareTo(BigDecimal.ZERO)>0){//2.调用业务层,进行余额修改TradeUserMoneyLog userMoneyLog = new TradeUserMoneyLog();userMoneyLog.setUseMoney(mqEntity.getUserMoney());userMoneyLog.setMoneyLogType(ShopCode.SHOP_USER_MONEY_REFUND.getCode());userMoneyLog.setUserId(mqEntity.getUserId());userMoneyLog.setOrderId(mqEntity.getOrderId());userService.updateMoneyPaid(userMoneyLog);log.info("余额回退成功");}} catch (UnsupportedEncodingException e) {e.printStackTrace();log.error("余额回退失败");}}
}
2.4)取消订单
@Overridepublic void onMessage(MessageExt messageExt) {String body = new String(messageExt.getBody(), "UTF-8");String msgId = messageExt.getMsgId();String tags = messageExt.getTags();String keys = messageExt.getKeys();log.info("CancelOrderProcessor receive message:"+messageExt);CancelOrderMQ cancelOrderMQ = JSON.parseObject(body, CancelOrderMQ.class);TradeOrder order = orderService.findOne(cancelOrderMQ.getOrderId());order.setOrderStatus(ShopCode.SHOP_ORDER_CANCEL.getCode());orderService.changeOrderStatus(order);log.info("订单:["+order.getOrderId()+"]状态设置为取消");return order;}
线程池优化消息发送逻辑
  • 创建线程池对象
@Bean
public ThreadPoolTaskExecutor getThreadPool() {ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();executor.setCorePoolSize(4);executor.setMaxPoolSize(8);executor.setQueueCapacity(100);executor.setKeepAliveSeconds(60);executor.setThreadNamePrefix("Pool-A");executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());executor.initialize();return executor;}
  • 使用线程池
@Autowired
private ThreadPoolTaskExecutor executorService;public Result callbackPayment(TradePay tradePay) {if (tradePay.getIsPaid().equals(ShopCode.SHOP_ORDER_PAY_STATUS_IS_PAY.getCode())) {tradePay = tradePayMapper.selectByPrimaryKey(tradePay.getPayId());if (tradePay == null) {CastException.cast(ShopCode.SHOP_PAYMENT_NOT_FOUND);}tradePay.setIsPaid(ShopCode.SHOP_ORDER_PAY_STATUS_IS_PAY.getCode());int i = tradePayMapper.updateByPrimaryKeySelective(tradePay);//更新成功代表支付成功if (i == 1) {TradeMqProducerTemp mqProducerTemp = new TradeMqProducerTemp();mqProducerTemp.setId(String.valueOf(idWorker.nextId()));mqProducerTemp.setGroupName("payProducerGroup");mqProducerTemp.setMsgKey(String.valueOf(tradePay.getPayId()));mqProducerTemp.setMsgTag(topic);mqProducerTemp.setMsgBody(JSON.toJSONString(tradePay));mqProducerTemp.setCreateTime(new Date());mqProducerTempMapper.insert(mqProducerTemp);TradePay finalTradePay = tradePay;executorService.submit(new Runnable() {@Overridepublic void run() {try {SendResult sendResult = sendMessage(topic, tag, finalTradePay.getPayId(), JSON.toJSONString(finalTradePay));log.info(JSON.toJSONString(sendResult));if (SendStatus.SEND_OK.equals(sendResult.getSendStatus())) {mqProducerTempMapper.deleteByPrimaryKey(mqProducerTemp.getId());System.out.println("删除消息表成功");}} catch (Exception e) {e.printStackTrace();}}});} else {CastException.cast(ShopCode.SHOP_PAYMENT_IS_PAID);}}return new Result(ShopCode.SHOP_SUCCESS.getSuccess(), ShopCode.SHOP_SUCCESS.getMessage());
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/577224.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

data 为什么是一个函数

在前端开发中&#xff0c;特别是在使用Vue.js框架时&#xff0c;"data" 通常是一个函数&#xff0c;而不是一个变量。这是因为在Vue组件中&#xff0c;"data" 是一个函数&#xff0c;用于返回包含组件状态的对象。这样做有以下几个原因&#xff1a; 避免数…

3.架构设计系列:高并发系统的设计目标

架构设计系列文章 架构设计系列&#xff1a;什么是架构设计架构设计系列&#xff1a;几个常用的架构设计原则 一、如何理解高并发&#xff1f; 高并发&#xff0c;往往意味着大的流量&#xff0c;而大流量必然会对系统带来冲击。面对这种大流量的冲击&#xff0c;有的系统抗住…

udp异步方式接收消息

C#实现 //定义结构体 public struct UdpState { public UdpClient u; public IPEndPoint e; } private UdpClient _client; //_client的初始化请参考其他资料 IPEndPoint remoteEP null; //TODO //public static bool mess…

电脑IP地址密码寻找指南:解密如何快速获取IP地址登录密码?

在使用电脑时&#xff0c;你可能会遇到需要查找IP地址登录密码的情况。而对于大部分普通用户来说&#xff0c;这项任务并不容易。那么&#xff0c;IP地址密码到底在哪里找呢&#xff1f;本文将为你详细解析&#xff0c;提供一些方法和技巧&#xff0c;帮助你快速找到电脑的IP地…

内存之-LeakCanary

关于作者&#xff1a;CSDN内容合伙人、技术专家&#xff0c; 从零开始做日活千万级APP。 专注于分享各领域原创系列文章 &#xff0c;擅长java后端、移动开发、人工智能等&#xff0c;希望大家多多支持。 目录 一、导读二、概览三、使用四、原理分析4.1 自动初始化4.1.1 初始化…

人工智能_机器学习074_SVM支持向量机_软间隔与优化目标函数构建_C参数由来_惩罚误差点的惩罚度---人工智能工作笔记0114

然后我们接着上一节再来看一下这里我们说有个 min_faces_per_person = 0 这个可以看到如果我们写上0,就意味着要加载所有的人脸图片,就会花费的时间久对吧 我们可以试试,这里我们 min_faces_per_person = 0 改成0然后 我们等一会加载完了以后,我们用 display(X.shape,faces.sh…

Jenkins安装与设置(插件安装失败,版本问题解决)

早期的使用docker安装jenkins的方法会出现插件无法安装的问题&#xff0c;是由于docker拉取的jenkins版本太低了 jdk安装 Linux系统安装JDK1.8 详细流程 maven安装&#xff1a; centos7下安装Maven 使用docker进行安装jenkins&#xff1a; 先把镜像和容器卸干净 docker ps -a…

LinuxPTP在汽车时钟同步网络中的实践

这里列举一些在汽车以太网中使用LinuxPTP的注意点。 请使用较新版本的LinuxPTP&#xff0c;可以自己编译部署&#xff01; 在使用中&#xff0c;我遇到了某些指令或者配置文件关键字已经废弃&#xff0c;官方网页中的文档和实际工具对不上的问题。 研究发现是目标板中部署的Li…

vue data变量不能以“_”开头,否则会产生很多怪异问题

1、 比如给子组件赋值&#xff0c;子组件无法得到这个值&#xff08;也不是一直无法得到&#xff0c;设置后this.$forceUpdate() 居然可以得到&#xff09;&#xff0c; 更无法watch到 <zizujian :config"_config1"> </zizujian>this._config1 { ...…

python使用easyocr识别文字,准确率超高!

文章目录 一、文档二、使用1、安装2、使用3、下载模型4、配合pyautogui识别屏幕的文字 参考资料 一、文档 https://www.jaided.ai/easyocr/documentation/ 二、使用 1、安装 pip install easyocr2、使用 import easyocr# 创建EasyOCR Reader reader easyocr.Reader([ch_s…

Go 错误处理

Go 错误处理 Go 语言通过内置的错误接口提供了非常简单的错误处理机制。 error类型是一个接口类型&#xff0c;这是它的定义&#xff1a; type error interface {Error() string }我们可以在编码中通过实现 error 接口类型来生成错误信息。 函数通常在最后的返回值中返回错误…

python中的 multiprocessing.Event是什么

multiprocessing.Event 是 Python 中 multiprocessing 模块提供的一种同步原语&#xff0c;用于在多个进程之间传递信号。Event 本质上是一个用于线程/进程通信的信号标志&#xff0c;可以用于在不同进程之间进行事件的同步。 文章目录 创建 Event 对象在一个进程中设置 Event在…

短视频矩阵系统的崛起和影响

近年来&#xff0c;短视频矩阵系统已经成为了社交媒体中的一股新势力。这个新兴的社交媒体形式以其独特的魅力和吸引力&#xff0c;迅速吸引了大量的用户。这个系统简单来说就是将海量短视频整合在一个平台上&#xff0c;使用户可以方便地观看和分享好玩有趣的短视频。 短视频…

观察者模式学习

观察者模式&#xff08;Observer Design Pattern&#xff09;也被称为发布订阅模式&#xff08;Publish-Subscribe Design Pattern&#xff09;。在 GoF 的《设计模式》一书中&#xff0c;它的定义是这样的&#xff1a; Define a one-to-many dependency between objects so th…

50个免费的 AI 工具,提升工作效率(附网址)

上次我们已经介绍了20个精选的提高工作效率的免费AI工具&#xff0c;但如果你觉得这些AI工具还不过瘾的话&#xff0c;想进一步成为职场中最了解AI的人&#xff0c;本文将汇总介绍免费最新的50个AI工具。 DeepSwap DeepSwap 是一个基于 AI 的工具&#xff0c;适用于想要制作令人…

【内存泄漏】内存泄漏及常见的内存泄漏检测工具介绍

内存泄漏介绍 什么是内存泄漏 内存泄漏是指程序分配了一块内存&#xff08;通常是动态分配的堆内存&#xff09;&#xff0c;但在不再需要这块内存的情况下未将其释放。内存泄漏会导致程序浪费系统内存资源&#xff0c;持续的内存泄漏还导致系统内存的逐渐耗尽&#xff0c;最…

android内存管理机制概览

关于作者&#xff1a;CSDN内容合伙人、技术专家&#xff0c; 从零开始做日活千万级APP。 专注于分享各领域原创系列文章 &#xff0c;擅长java后端、移动开发、人工智能等&#xff0c;希望大家多多支持。 目录 一、导读二、概览三、相关概念3.1 垃圾回收3.2 应用内存的分配与回…

插入排序详解(C语言)

前言 插入排序是一种简单直观的排序算法&#xff0c;在小规模数据排序或部分有序的情况下插入排序的表现十分良好&#xff0c;今天我将带大家学习插入排序的使用。let’s go ! ! ! 插入排序 插入排序的基本思想是将待排序的序列分为已排序和未排序两部分。初始时&#xff0c…

商务大厦安装电气火灾监控系统,从源头监控电气火灾

安科瑞电气股份有限公司 上海嘉定 201801 摘要&#xff1a;介绍分析剩余电流式电气火灾监控系统的特点、组成、设计依据、监控原理和实施方案&#xff0c;并结合上海市某大厦工程设计实例探讨该系统在高层建筑中的设置要求和应用方式&#xff0c;供设计人员参考。 关键词&…

蓝桥杯嵌入式LED

1.LED原理图 2.CubeMX的LED的GPIO 3.创建LED.c和.h文件添加到bsp文件 添加bsp文件路径在c/c中 4.LED相关代码