mall整合RabbitMQ实现延迟消息

摘要

本文主要讲解mall整合RabbitMQ实现延迟消息的过程,以发送延迟消息取消超时订单为例。RabbitMQ是一个被广泛使用的开源消息队列。它是轻量级且易于部署的,它能支持多种消息协议。RabbitMQ可以部署在分布式和联合配置中,以满足高规模、高可用性的需求。

项目使用框架介绍

RabbitMQ

RabbitMQ是一个被广泛使用的开源消息队列。它是轻量级且易于部署的,它能支持多种消息协议。RabbitMQ可以部署在分布式和联合配置中,以满足高规模、高可用性的需求。

RabbitMQ的安装和使用

  1. 安装Erlang,下载地址:erlang.org/download/ot…

  1. 安装RabbitMQ,下载地址:dl.bintray.com/rabbitmq/al…

  1. 安装完成后,进入RabbitMQ安装目录下的sbin目录

  1. 在地址栏输入cmd并回车启动命令行,然后输入以下命令启动管理功能:
rabbitmq-plugins enable rabbitmq_management
复制代码

  1. 访问地址查看是否安装成功:http://localhost:15672/

  1. 输入账号密码并登录:guest guest

  2. 创建帐号并设置其角色为管理员:mall mall

  1. 创建一个新的虚拟host为:/mall

  1. 点击mall用户进入用户配置页面

  1. 给mall用户配置该虚拟host的权限

  1. 至此,RabbitMQ的安装和配置完成。

RabbitMQ的消息模型

标志中文名英文名描述
P生产者Producer消息的发送者,可以将消息发送到交换机
C消费者Consumer消息的接收者,从队列中获取消息进行消费
X交换机Exchange接收生产者发送的消息,并根据路由键发送给指定队列
Q队列Queue存储从交换机发来的消息
type交换机类型typedirect表示直接根据路由键(orange/black)发送消息

Lombok

Lombok为Java语言添加了非常有趣的附加功能,你可以不用再为实体类手写getter,setter等方法,通过一个注解即可拥有。

注意:需要安装idea的Lombok插件,并在项目中的pom文件中添加依赖。

业务场景说明

用于解决用户下单以后,订单超时如何取消订单的问题。

  • 用户进行下单操作(会有锁定商品库存、使用优惠券、积分一系列的操作);
  • 生成订单,获取订单的id;
  • 获取到设置的订单超时时间(假设设置的为60分钟不支付取消订单);
  • 按订单超时时间发送一个延迟消息给RabbitMQ,让它在订单超时后触发取消订单的操作;
  • 如果用户没有支付,进行取消订单操作(释放锁定商品库存、返还优惠券、返回积分一系列操作)。

整合RabbitMQ实现延迟消息

在pom.xml中添加相关依赖

<!--消息队列相关依赖-->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!--lombok依赖-->
<dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional>
</dependency>
复制代码

修改SpringBoot配置文件

修改application.yml文件,在spring节点下添加Mongodb相关配置。

  rabbitmq:
    host: localhost # rabbitmq的连接地址
    port: 5672 # rabbitmq的连接端口号
    virtual-host: /mall # rabbitmq的虚拟host
    username: mall # rabbitmq的用户名
    password: mall # rabbitmq的密码
    publisher-confirms: true #如果对异步消息需要回调必须设置为true
复制代码

添加消息队列的枚举配置类QueueEnum

用于延迟消息队列及处理取消订单消息队列的常量定义,包括交换机名称、队列名称、路由键名称。

package com.macro.mall.tiny.dto;import lombok.Getter;/*** 消息队列枚举配置* Created by macro on 2018/9/14.*/
@Getter
public enum QueueEnum {/*** 消息通知队列*/QUEUE_ORDER_CANCEL("mall.order.direct", "mall.order.cancel", "mall.order.cancel"),/*** 消息通知ttl队列*/QUEUE_TTL_ORDER_CANCEL("mall.order.direct.ttl", "mall.order.cancel.ttl", "mall.order.cancel.ttl");/*** 交换名称*/private String exchange;/*** 队列名称*/private String name;/*** 路由键*/private String routeKey;QueueEnum(String exchange, String name, String routeKey) {this.exchange = exchange;this.name = name;this.routeKey = routeKey;}
}复制代码

添加RabbitMQ的配置

用于配置交换机、队列及队列与交换机的绑定关系。

package com.macro.mall.tiny.config;import com.macro.mall.tiny.dto.QueueEnum;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** 消息队列配置* Created by macro on 2018/9/14.*/
@Configuration
public class RabbitMqConfig {/*** 订单消息实际消费队列所绑定的交换机*/@BeanDirectExchange orderDirect() {return (DirectExchange) ExchangeBuilder.directExchange(QueueEnum.QUEUE_ORDER_CANCEL.getExchange()).durable(true).build();}/*** 订单延迟队列队列所绑定的交换机*/@BeanDirectExchange orderTtlDirect() {return (DirectExchange) ExchangeBuilder.directExchange(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getExchange()).durable(true).build();}/*** 订单实际消费队列*/@Beanpublic Queue orderQueue() {return new Queue(QueueEnum.QUEUE_ORDER_CANCEL.getName());}/*** 订单延迟队列(死信队列)*/@Beanpublic Queue orderTtlQueue() {return QueueBuilder.durable(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getName()).withArgument("x-dead-letter-exchange", QueueEnum.QUEUE_ORDER_CANCEL.getExchange())//到期后转发的交换机.withArgument("x-dead-letter-routing-key", QueueEnum.QUEUE_ORDER_CANCEL.getRouteKey())//到期后转发的路由键.build();}/*** 将订单队列绑定到交换机*/@BeanBinding orderBinding(DirectExchange orderDirect,Queue orderQueue){return BindingBuilder.bind(orderQueue).to(orderDirect).with(QueueEnum.QUEUE_ORDER_CANCEL.getRouteKey());}/*** 将订单延迟队列绑定到交换机*/@BeanBinding orderTtlBinding(DirectExchange orderTtlDirect,Queue orderTtlQueue){return BindingBuilder.bind(orderTtlQueue).to(orderTtlDirect).with(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getRouteKey());}}
复制代码

在RabbitMQ管理页面可以看到以下交换机和队列

交换机及队列说明

  • mall.order.direct(取消订单消息队列所绑定的交换机):绑定的队列为mall.order.cancel,一旦有消息以mall.order.cancel为路由键发过来,会发送到此队列。
  • mall.order.direct.ttl(订单延迟消息队列所绑定的交换机):绑定的队列为mall.order.cancel.ttl,一旦有消息以mall.order.cancel.ttl为路由键发送过来,会转发到此队列,并在此队列保存一定时间,等到超时后会自动将消息发送到mall.order.cancel(取消订单消息消费队列)。

添加延迟消息的发送者CancelOrderSender

用于向订单延迟消息队列(mall.order.cancel.ttl)里发送消息。

package com.macro.mall.tiny.component;import com.macro.mall.tiny.dto.QueueEnum;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;/*** 取消订单消息的发出者* Created by macro on 2018/9/14.*/
@Component
public class CancelOrderSender {private static Logger LOGGER =LoggerFactory.getLogger(CancelOrderSender.class);@Autowiredprivate AmqpTemplate amqpTemplate;public void sendMessage(Long orderId,final long delayTimes){//给延迟队列发送消息amqpTemplate.convertAndSend(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getExchange(), QueueEnum.QUEUE_TTL_ORDER_CANCEL.getRouteKey(), orderId, new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {//给消息设置延迟毫秒值message.getMessageProperties().setExpiration(String.valueOf(delayTimes));return message;}});LOGGER.info("send delay message orderId:{}",orderId);}
}复制代码

添加取消订单消息的接收者CancelOrderReceiver

用于从取消订单的消息队列(mall.order.cancel)里接收消息。

package com.macro.mall.tiny.component;import com.macro.mall.tiny.service.OmsPortalOrderService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;/*** 取消订单消息的处理者* Created by macro on 2018/9/14.*/
@Component
@RabbitListener(queues = "mall.order.cancel")
public class CancelOrderReceiver {private static Logger LOGGER =LoggerFactory.getLogger(CancelOrderReceiver.class);@Autowiredprivate OmsPortalOrderService portalOrderService;@RabbitHandlerpublic void handle(Long orderId){LOGGER.info("receive delay message orderId:{}",orderId);portalOrderService.cancelOrder(orderId);}
}复制代码

添加OmsPortalOrderService接口

package com.macro.mall.tiny.service;import com.macro.mall.tiny.common.api.CommonResult;
import com.macro.mall.tiny.dto.OrderParam;
import org.springframework.transaction.annotation.Transactional;/*** 前台订单管理Service* Created by macro on 2018/8/30.*/
public interface OmsPortalOrderService {/*** 根据提交信息生成订单*/@TransactionalCommonResult generateOrder(OrderParam orderParam);/*** 取消单个超时订单*/@Transactionalvoid cancelOrder(Long orderId);
}复制代码

添加OmsPortalOrderService的实现类OmsPortalOrderServiceImpl

package com.macro.mall.tiny.service.impl;import com.macro.mall.tiny.common.api.CommonResult;
import com.macro.mall.tiny.component.CancelOrderSender;
import com.macro.mall.tiny.dto.OrderParam;
import com.macro.mall.tiny.service.OmsPortalOrderService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;/*** 前台订单管理Service* Created by macro on 2018/8/30.*/
@Service
public class OmsPortalOrderServiceImpl implements OmsPortalOrderService {private static Logger LOGGER = LoggerFactory.getLogger(OmsPortalOrderServiceImpl.class);@Autowiredprivate CancelOrderSender cancelOrderSender;@Overridepublic CommonResult generateOrder(OrderParam orderParam) {//todo 执行一系类下单操作,具体参考mall项目LOGGER.info("process generateOrder");//下单完成后开启一个延迟消息,用于当用户没有付款时取消订单(orderId应该在下单后生成)sendDelayMessageCancelOrder(11L);return CommonResult.success(null, "下单成功");}@Overridepublic void cancelOrder(Long orderId) {//todo 执行一系类取消订单操作,具体参考mall项目LOGGER.info("process cancelOrder orderId:{}",orderId);}private void sendDelayMessageCancelOrder(Long orderId) {//获取订单超时时间,假设为60分钟long delayTimes = 30 * 1000;//发送延迟消息cancelOrderSender.sendMessage(orderId, delayTimes);}}复制代码

添加OmsPortalOrderController定义接口

package com.macro.mall.tiny.controller;import com.macro.mall.tiny.dto.OrderParam;
import com.macro.mall.tiny.service.OmsPortalOrderService;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.ResponseBody;/*** 订单管理Controller* Created by macro on 2018/8/30.*/
@Controller
@Api(tags = "OmsPortalOrderController", description = "订单管理")
@RequestMapping("/order")
public class OmsPortalOrderController {@Autowiredprivate OmsPortalOrderService portalOrderService;@ApiOperation("根据购物车信息生成订单")@RequestMapping(value = "/generateOrder", method = RequestMethod.POST)@ResponseBodypublic Object generateOrder(@RequestBody OrderParam orderParam) {return portalOrderService.generateOrder(orderParam);}
}复制代码

进行接口测试

调用下单接口

注意:已经将延迟消息时间设置为30秒

项目源码地址

github.com/macrozheng/…

公众号

mall项目全套学习教程连载中,关注公众号第一时间获取。

转载于:https://juejin.im/post/5cff98986fb9a07ed36ea139

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/387614.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

竞价打板的关键点

竞价打板&#xff0c;主要是速度&#xff0c;其他不重要的&#xff0c;如果为了当天盈利大&#xff0c;失去竞价打板的本质含义&#xff0c;因为竞价可以买到&#xff0c;盘中买不到&#xff0c;才是竞价打板的目的&#xff0c;也就是从竞价打板的角度看&#xff0c;主要是看习…

Java常见的几种内存溢出及解决方法

Java常见的几种内存溢出及解决方法【情况一】&#xff1a;java.lang.OutOfMemoryError:Javaheapspace&#xff1a;这种是java堆内存不够&#xff0c;一个原因是真不够&#xff08;如递归的层数太多等&#xff09;&#xff0c;另一个原因是程序中有死循环&#xff1b;如果是java…

docker操作之mysql容器

1、创建宿主机器的挂载目录 /opt/docker/mysql/conf /opt/docker/mysql/data /opt/docker/mysql/logs 2、创建【xxx.cnf】配置文件&#xff0c;内容如下所示&#xff1a; [mysqld]#服务唯一Idserver-id 1port 3306log-error /var/log/mysql/error.log #只能用IP地址skip_nam…

Windows10系统下wsappx占用CPU资源过高?wsappx是什么?如何关闭wsappx进程?

在Windows10系统开机的时候&#xff0c;wsappx进程占用的CPU资源非常高&#xff0c;导致电脑运行速度缓慢&#xff0c;那么我们如何关闭wsappx进程&#xff0c;让电脑加快运行速度呢&#xff1f;下面就一起来看一下操作的方法吧。 【现象】 1、先来看一下电脑刚开机的时候&…

如何通过Windows Server 2008 R2建立NFS存储

如何通过Windows Server 2008 R2建立NFS存储在我们日常工作的某些实验中&#xff0c;会需要使用存储服务器。而硬件存储成本高&#xff0c;如StarWind之类的iSCSI软存储解决方案需要单独下载服务器端程序&#xff0c;且配置比较繁琐&#xff0c;令很多新手们很是头疼。事实上&a…

python-windows安装相关问题

1.python的环境配置&#xff0c;有些时候是没有配置的&#xff0c;需要在【系统环境】-【path】里添加。 2.安装pip&#xff1a;从官网下载pip包&#xff0c;然后到包目录》python setup.py install 安装 3.安装scrapyd&#xff1a;正常使用pip3 install scrapyd安装不起&…

hdu 1542/1255 Atlantis/覆盖的面积

1542 1255 两道扫描线线段树的入门题。 基本没有什么区别&#xff0c;前者是模板&#xff0c;后者因为是求覆盖次数至少在两次以上的&#xff0c;这个同样是具有并集性质的&#xff0c;所以把cover的判断条件更改一下就可以了qwq。 hdu1542 代码如下&#xff1a; #include<i…

使用了JDK自带的jconsole查看Tomcat运行情况

最近对公司的项目进行JVM调优&#xff0c;使用了JDK自带的jconsole查看Tomcat运行情况&#xff0c;记录下配置以便以后参考&#xff1a;首先&#xff0c;修改Tomcat的bin目录下的catalina.bat文件&#xff0c;在JAVA_OPTS变量中添加下面四行&#xff0c;即可set JAVA_OPTS %JAV…

jvm02

java虚拟机内存管理 每个线程就是一个顺序的执行单元&#xff0c;线程共享区即多个线程共享同一块区域&#xff0c;线程独占区即每个线程都有自己的虚拟机栈&#xff0c;本地方法栈&#xff0c;程序计数器。 程序计数器是一个比较小的内存空间&#xff0c;可以看作是当前线程所…

搭建svn管理平台

安装svn服务器&#xff1a;yum -y install subversion创建svn的目录&#xff1a;mkdir -p /data/svn初始化svn目录&#xff1a;svnadmin create /data/svnconf下的三个目录介绍&#xff1a;authz&#xff1a;控制权限,创建用户。密码在passwd创建 passwd&#xff1a;密码文件&…

Oracle dataguard 正常切换和应急切换

Oracle dataguard 正常切换和应急切换oracle dataguard提供异地容灾方案,能有效的防止单点故障和提供高可用技术,这里介绍dataguard正常主备切换和应急切换&#xff08;应急切换模拟主库出现问题无法还原,备库脱离dataguard接管主库对外提供服务&#xff09;1&#xff09;Oracl…

好程序员web前端分享JS引擎的执行机制

好程序员web前端分享JS引擎的执行机制&#xff0c;请先着重牢记两点&#xff01;JS是单线程语言。JS的EventLoop是JS的执行机制。深入了解JS的执行&#xff0c;就等于深入了解JS里的eventloop。1、灵魂三问&#xff1a;JS为什么是单线程的?为什么需要异步?单线程又是如何实现…

shutil模块、json和pickle模块

shutil模块&#xff1a; 高级的文件、文件夹、压缩包处理模块 json和pickle模块 之前学过eval内置方法可以将一个字符串转化成Python对象&#xff0c;但eval方法是有局限性的&#xff0c;对于普通的数据类型&#xff0c;json.loads、eval都可以使用&#xff0c;但遇到特殊类型的…

每日一问:LayoutParams 你知道多少?

前面的文章中着重讲解了 View 的测量流程。其中我提到了一句非常重要的话&#xff1a;**View 的测量匡高是由父控件的 MeasureSpec 和 View 自身的 LayoutParams 共同决定的。**我们在前面的 每日一问&#xff1a;谈谈对 MeasureSpec 的理解 把 MeasureSpec 的重点进行了讲解&a…

kuangbin专题十六 KMP扩展KMP HDU2594 Simpsons’ Hidden Talents

Homer: Marge, I just figured out a way to discover some of the talents we weren’t aware we had. Marge: Yeah, what is it? Homer: Take me for example. I want to find out if I have a talent in politics, OK? Marge: OK. Homer: So I take some politician’s na…

SNI: 实现多域名虚拟主机的SSL/TLS认证

为什么80%的码农都做不了架构师&#xff1f;>>> 一. 介绍 早期的SSLv2根据经典的公钥基础设施PKI(Public Key Infrastructure)设计&#xff0c;它默认认为&#xff1a;一台服务器&#xff08;或者说一个IP&#xff09;只会提供一个服务&#xff0c;所以在SSL握手时…

echo(),print(),print_r(),var_dump()的区别

echo可以一次输出多个值&#xff0c;多个值之间用逗号分隔。echo是语言结构(language construct)&#xff0c;而并不是真正的函数&#xff0c;因此不能作为表达式的一部分使用。echo是php的内部指令&#xff0c;不是函数&#xff0c;无返回值。 print()&#xff1a;函数print()…

我心目中的牛程序员、我们可以对比看看(人家还是看多年朋友面子上才肯帮忙1周,至少需支付1万元辛苦费)...

为什么80%的码农都做不了架构师&#xff1f;>>> 最近碰到客户整个网站改版的需要&#xff0c;非常短的时间里只有1周时间里&#xff0c;需要把整个B2C网站彻底的进行版面&#xff0c;我自己估算了一下&#xff0c;就是往死里干一天工作48个小时&#xff0c;1周也干…

c#做端口转发程序支持正向连接和反向链接

3389的时候 例子1&#xff1a;连接a机器的3389端口连不上&#xff0c;因为对方防火墙或者网关做了限制&#xff0c;只能访问a机器的个别端口比如80。 例子2&#xff1a;连接a机器的几乎所有端口都连不上&#xff08;对方乃内网或者防火墙网关做了限制&#xff09;&#xff0c…

Spring Boot(十四):spring boot整合shiro-登录认证和权限管理

Spring Boot(十四)&#xff1a;spring boot整合shiro-登录认证和权限管理 使用Spring Boot集成Apache Shiro。安全应该是互联网公司的一道生命线&#xff0c;几乎任何的公司都会涉及到这方面的需求。在Java领域一般有Spring Security、Apache Shiro等安全框架&#xff0c;但是由…