微服务day06

MQ入门

同步处理业务:

异步处理:

将任务处理后交给MQ来进行分发处理。



MQ的相关知识

同步调用

同步调用的小结

异步调用

MQ技术选型

RabbitMQ

安装部署

其中包含几个概念:

  • publisher:生产者,也就是发送消息的一方

  • consumer:消费者,也就是消费消息的一方

  • queue:队列,存储消息。生产者投递的消息会暂存在消息队列中,等待消费者处理

  • exchange:交换机,负责消息路由。生产者发送的消息由交换机决定投递到哪个队列。

  • virtual host:虚拟主机,起到数据隔离的作用。每个虚拟主机相互独立,有各自的exchange、queue

上述这些东西都可以在RabbitMQ的管理控制台来管理,下一节我们就一起来学习控制台的使用。

查看详情:点这里icon-default.png?t=O83Ahttps://b11et3un53m.feishu.cn/wiki/OQH4weMbcimUSLkIzD6cCpN0nvc

数据隔离

创建新用户:

创建新的host:

Java客户端

快速入门

在控制台创建消息队列:

导入实例项目后,在发送者和接受者的pom文件中引入依赖:

 <!--AMQP依赖,包含RabbitMQ--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>

修改两个模块的配置文件:

spring:rabbitmq:host: 192.168.21.101 # 你的虚拟机IPport: 5672 # 端口virtual-host: /hmall # 虚拟主机username: hmall # 用户名password: 123 # 密码

在发送者的启动类创建一个测试类:

package com.itheima.publisher;import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;import static org.junit.jupiter.api.Assertions.*;@SpringBootTest
class mqTest {//引入Rabbit提供的操作类@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void test(){//设置要发送的字符串String massage = "hello rabbitmq";//设置要发个那个消息队列String name = "simple.queue";//调用工具类进行发送rabbitTemplate.convertAndSend(name,massage);}}

在接收者创建一个接受的类:

package com.itheima.consumer.mq;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;//将类交给bean容器来进行管理,进行监听
@Component
@Slf4j
public class leatinMq {// 利用RabbitListener来声明要监听的队列信息// 将来一旦监听的队列中有了消息,就会推送给当前服务,调用当前方法,处理消息。// 可以看到方法体中接收的就是消息体的内容@RabbitListener(queues = "simple.queue")public void Leasion(String msg){log.info("接收到消息:{}",msg);}
}

输出结果:

11-10 20:42:27:552  INFO 22620 --- [ntContainer#0-1] com.itheima.consumer.mq.leatinMq         : 接收到消息:hello rabbitmq
11-10 20:42:42:000  INFO 22620 --- [ntContainer#0-1] com.itheima.consumer.mq.leatinMq         : 接收到消息:hello rabbitmq
Work Queues

创建队列:

修改发送方的测试函数,发送50条数据:

    //修改为连续发送50条数据到队列中@Testpublic void test2(){for (int i = 1; i <= 50; i++) {String massage = "hello rabbitmq_"+i;String name = "work.queue";rabbitTemplate.convertAndSend(name,massage);}}

建立两个监听来进行读取:

    @RabbitListener(queues = "work.queue")public void Leasion1(String msg){System.out.println("队列1接收到消息:"+msg+"_"+ LocalTime.now());}@RabbitListener(queues = "work.queue")public void Leasion2(String msg){System.err.println("队列2接收到消息:"+msg+"_"+ LocalTime.now());}

代码运行结果:

队列1接收到消息:hello rabbitmq_1_21:25:12.780
队列1接收到消息:hello rabbitmq_3_21:25:12.780
队列1接收到消息:hello rabbitmq_5_21:25:12.781
队列1接收到消息:hello rabbitmq_7_21:25:12.781
队列1接收到消息:hello rabbitmq_9_21:25:12.781
队列1接收到消息:hello rabbitmq_11_21:25:12.781
队列1接收到消息:hello rabbitmq_13_21:25:12.782
队列1接收到消息:hello rabbitmq_15_21:25:12.782
队列1接收到消息:hello rabbitmq_17_21:25:12.783
队列1接收到消息:hello rabbitmq_19_21:25:12.783
队列1接收到消息:hello rabbitmq_21_21:25:12.783
队列1接收到消息:hello rabbitmq_23_21:25:12.783
队列1接收到消息:hello rabbitmq_25_21:25:12.783
队列1接收到消息:hello rabbitmq_27_21:25:12.783
队列1接收到消息:hello rabbitmq_29_21:25:12.784
队列1接收到消息:hello rabbitmq_31_21:25:12.784
队列1接收到消息:hello rabbitmq_33_21:25:12.785
队列1接收到消息:hello rabbitmq_35_21:25:12.785
队列1接收到消息:hello rabbitmq_37_21:25:12.787
队列1接收到消息:hello rabbitmq_39_21:25:12.787
队列1接收到消息:hello rabbitmq_41_21:25:12.788
队列1接收到消息:hello rabbitmq_43_21:25:12.788
队列1接收到消息:hello rabbitmq_45_21:25:12.789
队列1接收到消息:hello rabbitmq_47_21:25:12.789
队列1接收到消息:hello rabbitmq_49_21:25:12.789
队列2接收到消息:hello rabbitmq_2_21:25:12.780
队列2接收到消息:hello rabbitmq_4_21:25:12.780
队列2接收到消息:hello rabbitmq_6_21:25:12.780
队列2接收到消息:hello rabbitmq_8_21:25:12.781
队列2接收到消息:hello rabbitmq_10_21:25:12.781
队列2接收到消息:hello rabbitmq_12_21:25:12.781
队列2接收到消息:hello rabbitmq_14_21:25:12.781
队列2接收到消息:hello rabbitmq_16_21:25:12.781
队列2接收到消息:hello rabbitmq_18_21:25:12.782
队列2接收到消息:hello rabbitmq_20_21:25:12.783
队列2接收到消息:hello rabbitmq_22_21:25:12.783
队列2接收到消息:hello rabbitmq_24_21:25:12.783
队列2接收到消息:hello rabbitmq_26_21:25:12.783
队列2接收到消息:hello rabbitmq_28_21:25:12.783
队列2接收到消息:hello rabbitmq_30_21:25:12.784
队列2接收到消息:hello rabbitmq_32_21:25:12.784
队列2接收到消息:hello rabbitmq_34_21:25:12.785
队列2接收到消息:hello rabbitmq_36_21:25:12.785
队列2接收到消息:hello rabbitmq_38_21:25:12.785
队列2接收到消息:hello rabbitmq_40_21:25:12.785
队列2接收到消息:hello rabbitmq_42_21:25:12.785
队列2接收到消息:hello rabbitmq_44_21:25:12.788
队列2接收到消息:hello rabbitmq_46_21:25:12.790
队列2接收到消息:hello rabbitmq_48_21:25:12.790
队列2接收到消息:hello rabbitmq_50_21:25:12.790

可以看出这两个监听者是轮流进行监听的。并且不考虑是否有运行速度的区别。

这个是将1监听设置线程休眠25毫秒即每秒中可处理40个,

这个是将1监听设置线程休眠200毫秒即每秒中可处理5个,的运行情况

队列1接收到消息:hello rabbitmq_1_21:31:42.712
队列1接收到消息:hello rabbitmq_3_21:31:42.737
队列1接收到消息:hello rabbitmq_5_21:31:42.762
队列1接收到消息:hello rabbitmq_7_21:31:42.787
队列1接收到消息:hello rabbitmq_9_21:31:42.813
队列1接收到消息:hello rabbitmq_11_21:31:42.838
队列1接收到消息:hello rabbitmq_13_21:31:42.864
队列2接收到消息:hello rabbitmq_2_21:31:42.885
队列1接收到消息:hello rabbitmq_15_21:31:42.890
队列1接收到消息:hello rabbitmq_17_21:31:42.915
队列1接收到消息:hello rabbitmq_19_21:31:42.941
队列1接收到消息:hello rabbitmq_21_21:31:42.967
队列1接收到消息:hello rabbitmq_23_21:31:42.993
队列1接收到消息:hello rabbitmq_25_21:31:43.019
队列1接收到消息:hello rabbitmq_27_21:31:43.045
队列1接收到消息:hello rabbitmq_29_21:31:43.070
队列2接收到消息:hello rabbitmq_4_21:31:43.086
队列1接收到消息:hello rabbitmq_31_21:31:43.097
队列1接收到消息:hello rabbitmq_33_21:31:43.122
队列1接收到消息:hello rabbitmq_35_21:31:43.148
队列1接收到消息:hello rabbitmq_37_21:31:43.173
队列1接收到消息:hello rabbitmq_39_21:31:43.198
队列1接收到消息:hello rabbitmq_41_21:31:43.223
队列1接收到消息:hello rabbitmq_43_21:31:43.249
队列1接收到消息:hello rabbitmq_45_21:31:43.274
队列2接收到消息:hello rabbitmq_6_21:31:43.286
队列1接收到消息:hello rabbitmq_47_21:31:43.300
队列1接收到消息:hello rabbitmq_49_21:31:43.326
队列2接收到消息:hello rabbitmq_8_21:31:43.487
队列2接收到消息:hello rabbitmq_10_21:31:43.687
队列2接收到消息:hello rabbitmq_12_21:31:43.887
队列2接收到消息:hello rabbitmq_14_21:31:44.089
队列2接收到消息:hello rabbitmq_16_21:31:44.289
队列2接收到消息:hello rabbitmq_18_21:31:44.490
队列2接收到消息:hello rabbitmq_20_21:31:44.691
队列2接收到消息:hello rabbitmq_22_21:31:44.891
队列2接收到消息:hello rabbitmq_24_21:31:45.092
队列2接收到消息:hello rabbitmq_26_21:31:45.293
队列2接收到消息:hello rabbitmq_28_21:31:45.495
队列2接收到消息:hello rabbitmq_30_21:31:45.695
队列2接收到消息:hello rabbitmq_32_21:31:45.896
队列2接收到消息:hello rabbitmq_34_21:31:46.098
队列2接收到消息:hello rabbitmq_36_21:31:46.299
队列2接收到消息:hello rabbitmq_38_21:31:46.499
队列2接收到消息:hello rabbitmq_40_21:31:46.699
队列2接收到消息:hello rabbitmq_42_21:31:46.900
队列2接收到消息:hello rabbitmq_44_21:31:47.101
队列2接收到消息:hello rabbitmq_46_21:31:47.303
队列2接收到消息:hello rabbitmq_48_21:31:47.504
队列2接收到消息:hello rabbitmq_50_21:31:47.704

下面将设置条件,能者多劳。

修改后的情况:

队列1接收到消息:hello rabbitmq_1_21:34:50.426
队列1接收到消息:hello rabbitmq_3_21:34:50.454
队列1接收到消息:hello rabbitmq_4_21:34:50.482
队列1接收到消息:hello rabbitmq_5_21:34:50.508
队列1接收到消息:hello rabbitmq_6_21:34:50.534
队列1接收到消息:hello rabbitmq_7_21:34:50.565
队列1接收到消息:hello rabbitmq_8_21:34:50.592
队列2接收到消息:hello rabbitmq_2_21:34:50.599
队列1接收到消息:hello rabbitmq_9_21:34:50.618
队列1接收到消息:hello rabbitmq_11_21:34:50.645
队列1接收到消息:hello rabbitmq_12_21:34:50.672
队列1接收到消息:hello rabbitmq_13_21:34:50.698
队列1接收到消息:hello rabbitmq_14_21:34:50.726
队列1接收到消息:hello rabbitmq_15_21:34:50.752
队列1接收到消息:hello rabbitmq_16_21:34:50.780
队列2接收到消息:hello rabbitmq_10_21:34:50.800
队列1接收到消息:hello rabbitmq_17_21:34:50.807
队列1接收到消息:hello rabbitmq_19_21:34:50.835
队列1接收到消息:hello rabbitmq_20_21:34:50.863
队列1接收到消息:hello rabbitmq_21_21:34:50.890
队列1接收到消息:hello rabbitmq_22_21:34:50.918
队列1接收到消息:hello rabbitmq_23_21:34:50.944
队列1接收到消息:hello rabbitmq_24_21:34:50.972
队列1接收到消息:hello rabbitmq_25_21:34:50.999
队列2接收到消息:hello rabbitmq_18_21:34:51.003
队列1接收到消息:hello rabbitmq_26_21:34:51.028
队列1接收到消息:hello rabbitmq_28_21:34:51.055
队列1接收到消息:hello rabbitmq_29_21:34:51.081
队列1接收到消息:hello rabbitmq_30_21:34:51.108
队列1接收到消息:hello rabbitmq_31_21:34:51.135
队列1接收到消息:hello rabbitmq_32_21:34:51.162
队列1接收到消息:hello rabbitmq_33_21:34:51.188
队列2接收到消息:hello rabbitmq_27_21:34:51.205
队列1接收到消息:hello rabbitmq_34_21:34:51.215
队列1接收到消息:hello rabbitmq_36_21:34:51.242
队列1接收到消息:hello rabbitmq_37_21:34:51.269
队列1接收到消息:hello rabbitmq_38_21:34:51.295
队列1接收到消息:hello rabbitmq_39_21:34:51.322
队列1接收到消息:hello rabbitmq_40_21:34:51.349
队列1接收到消息:hello rabbitmq_41_21:34:51.376
队列1接收到消息:hello rabbitmq_42_21:34:51.403
队列2接收到消息:hello rabbitmq_35_21:34:51.406
队列1接收到消息:hello rabbitmq_43_21:34:51.430
队列1接收到消息:hello rabbitmq_45_21:34:51.456
队列1接收到消息:hello rabbitmq_46_21:34:51.483
队列1接收到消息:hello rabbitmq_47_21:34:51.509
队列1接收到消息:hello rabbitmq_48_21:34:51.536
队列1接收到消息:hello rabbitmq_49_21:34:51.562
队列1接收到消息:hello rabbitmq_50_21:34:51.589
队列2接收到消息:hello rabbitmq_44_21:34:51.608

Fanout交换机

案例:

声明两个消息队列:

创建一个fanout模式的交换机:

将交换机和消息队列关联:

修改消费者的方法:

    @RabbitListener(queues = "fanout.queue1")public void Fanoutlisten1(String msg) throws InterruptedException {System.err.println("消费者1接收到队列fanout.queue1的消息:"+msg+"_"+ LocalTime.now());}@RabbitListener(queues = "fanout.queue2")public void Fanoutlisten2(String msg) throws InterruptedException {System.err.println("消费者2接收到队列fanout.queue2的消息:"+msg+"_"+ LocalTime.now());}

修改发送者的代码,使其发送到 hm.fanout 交换机:

    @Testpublic void testFanout(){String massage = "hello rabbitmq";//修改交换机的名字为hm.fanoutString name = "hm.fanout";//由于是广播,所以发送到交换机,不需要指定路由键,将消息队列名称设置为nullrabbitTemplate.convertAndSend(name,null,massage);}

结果:

消费者2接收到队列fanout.queue2的消息:hello rabbitmq_22:15:58.655
消费者1接收到队列fanout.queue1的消息:hello rabbitmq_22:15:58.655
交换机小结:

Direct队列
案例

创建队列:

创建交换机:

将交换机和队列联系起来:

修改接收者(消费者):

    @RabbitListener(queues = "direct.queue1")public void Directlisten1(String msg) throws InterruptedException {System.err.println("消费者1接收到队列direct.queue1的消息:"+msg+"_"+ LocalTime.now());}@RabbitListener(queues = "direct.queue2")public void Directlisten2(String msg) throws InterruptedException {System.err.println("消费者2接收到队列direct.queue2的消息:"+msg+"_"+ LocalTime.now());}

发送者:

    @Testpublic void testDirect1(){String massage = "红色:震惊男生宿舍后面发现女尸";//修改交换机的名字为hm.fanoutString name = "hm.direct";//修改路由键为redrabbitTemplate.convertAndSend(name,"red",massage);}@Testpublic void testDirect2(){String massage = "蓝色:该女尸竟是硅胶制品";//修改交换机的名字为hm.fanoutString name = "hm.direct";//修改路由键为bluerabbitTemplate.convertAndSend(name,"blue",massage);}

结果:

消费者1接收到队列direct.queue1的消息:红色:震惊男生宿舍后面发现女尸_22:40:27.927
消费者2接收到队列direct.queue2的消息:红色:震惊男生宿舍后面发现女尸_22:40:27.927
消费者1接收到队列direct.queue1的消息:蓝色:该女尸竟是硅胶制品_22:40:37.891
Topic交换机

案例

创建两个消息队列:

创建topic交换机:

关联交换机和消息队列:

修改发送者:

    @Testpublic void testTopic1(){String massage = "今天天气不错啊";//修改交换机的名字为hm.topicString name = "hm.topic";//修改路由键为redrabbitTemplate.convertAndSend(name,"china.news",massage);}@Testpublic void testTopic2(){String massage = "这是一个大新闻啊";//修改交换机的名字为hm.topicString name = "hm.topic";//修改路由键为bluerabbitTemplate.convertAndSend(name,"china.goods",massage);}

修改接收值:

    @RabbitListener(queues = "topic.queue1")public void Topiclisten1(String msg) throws InterruptedException {System.err.println("消费者1接收到队列topic.queue1的消息:"+msg+"_"+ LocalTime.now());}@RabbitListener(queues = "topic.queue2")public void Topiclisten2(String msg) throws InterruptedException {System.err.println("消费者2接收到队列topic.queue2的消息:"+msg+"_"+ LocalTime.now());}

运行结果:

消费者2接收到队列topic.queue2的消息:今天天气不错啊_09:08:04.351
消费者1接收到队列topic.queue1的消息:今天天气不错啊_09:08:04.351
消费者1接收到队列topic.queue1的消息:这是一个大新闻啊_09:08:12.309
Topic小结

声明队列交换机

 注意:       由于消息发送端通常只负责消息的发送,所以在通常情况下都是将消息队列和交换机的创建放在消息的接受端。

在接受端创建fanout交换机和队列:

1、删除已有的fanout交换机和队列。

2、编写代码:

        在消息接受者编写代码,创建一个配置类:

package com.itheima.consumer.config;import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class FanoutConfigrasion {//交给Bean注解来进行处理//创建交换机@Beanpublic FanoutExchange fanoutExchange(){//参数:交换机名称,是否持久化,是否自动删除,持久化默认为开启(持久化就是是否保存到磁盘)
//        return new FanoutExchange("hm.fanout");//使用build来创建交换机,durable(true)即是否持久化return ExchangeBuilder.fanoutExchange("hm.fanout").durable(true).build();}//创建消息队列@Beanpublic Queue fanoutQueue1(){
//        return new Queue("fanout.queue1");//使用build来创建消息队列return QueueBuilder.durable("fanout.queue1").build();}@Beanpublic Queue fanoutQueue2(){
//        return new Queue("fanout.queue1");//使用build来创建消息队列return QueueBuilder.durable("fanout.queue2").build();}// 绑定队列和交换机@Beanpublic Binding bindingfanoutQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);}@Beanpublic Binding bindingfanoutQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);}}

运行该模块就可以创建交换机和消息队列:

由于基于Bean注解的方式,需要每个key都要写一遍比较麻烦。

还提供基于@RabbitListener的声明方式。

使用配置类注解的方式:

package com.itheima.consumer.config;import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class dircetConfigrasion {//交给Bean注解来进行处理//创建交换机@Beanpublic DirectExchange directExchange(){//参数:交换机名称,是否持久化,是否自动删除,持久化默认为开启(持久化就是是否保存到磁盘)
//        return new FanoutExchange("hm.fanout");//使用build来创建交换机,durable(true)即是否持久化return ExchangeBuilder.directExchange("hm.direct").durable(true).build();}//创建消息队列@Beanpublic Queue DirectQueue1(){
//        return new Queue("fanout.queue1");//使用build来创建消息队列return QueueBuilder.durable("direct.queue1").build();}@Beanpublic Queue DirectQueue2(){
//        return new Queue("fanout.queue1");//使用build来创建消息队列return QueueBuilder.durable("direct.queue2").build();}// 绑定队列和交换机@Beanpublic Binding bindingfanoutQueue1red(Queue directQueue1, DirectExchange directExchange){return BindingBuilder.bind(directQueue1).to(directExchange).with("red");}@Beanpublic Binding bindingfanoutQueue1blue(Queue directQueue1, DirectExchange directExchange){return BindingBuilder.bind(directQueue1).to(directExchange).with("blue");}@Beanpublic Binding bindingfanoutQueue2red(Queue directQueue2, DirectExchange directExchange){return BindingBuilder.bind(directQueue2).to(directExchange).with("red");}@Beanpublic Binding bindingfanoutQueue2yellow(Queue directQueue2, DirectExchange directExchange){return BindingBuilder.bind(directQueue2).to(directExchange).with("yellow");}}

使用注解来创建:

1、注释掉Config注解使上面的配置类失效

2、代码:

    @RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue1"),exchange = @Exchange(name = "hm.direct",type = ExchangeTypes.DIRECT),key = {"red","blue"}))public void Directlisten1redblue(String msg) throws InterruptedException {System.err.println("消费者1接收到队列direct.queue1的消息:"+msg+"_"+ LocalTime.now());}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue2"),exchange = @Exchange(name = "hm.direct",type = ExchangeTypes.DIRECT),key = {"red","yellow"}))public void Directlisten2redyellow(String msg) throws InterruptedException {System.err.println("消费者2接收到队列direct.queue2的消息:"+msg+"_"+ LocalTime.now());}

3、结果,运行项目创建成功。

消息转换器
案例:

    @Testpublic void testObgect(){//准备Map数据Map map = new HashMap();map.put("name","jack");map.put("age",21);rabbitTemplate.convertAndSend("obgect.queue",map);}

使用JSON序列化器:

引入依赖:

<dependency><groupId>com.fasterxml.jackson.dataformat</groupId><artifactId>jackson-dataformat-xml</artifactId><version>2.9.10</version>
</dependency>

在两个模块都添加配置项:

@Bean
public MessageConverter messageConverter(){// 1.定义消息转换器Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();// 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息jackson2JsonMessageConverter.setCreateMessageIds(true);return jackson2JsonMessageConverter;
}

发送者的配置项添加到启动类中:

package com.itheima.publisher;import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;@SpringBootApplication
public class PublisherApplication {public static void main(String[] args) {SpringApplication.run(PublisherApplication.class);}@Beanpublic MessageConverter messageConverter(){// 1.定义消息转换器Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();// 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息jackson2JsonMessageConverter.setCreateMessageIds(true);return jackson2JsonMessageConverter;}
}

接受者代码:

配置类:

package com.itheima.consumer.config;import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class JackionConfig {@Beanpublic MessageConverter messageConverter(){// 1.定义消息转换器Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();// 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息jackson2JsonMessageConverter.setCreateMessageIds(true);return jackson2JsonMessageConverter;}
}

接收监听:

    @RabbitListener(queues = "obgect.queue")public void Obgectlisten(Map msg) throws InterruptedException {System.err.println("消费者1接收到队列fanout.queue1的消息:"+msg);}
结果:
消费者1接收到队列fanout.queue1的消息:{name=jack, age=21}

业务改造:

给两个模块都引入依赖引入依赖:

        <!--消息发送--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
<!--        序列化器--><dependency><groupId>com.fasterxml.jackson.dataformat</groupId><artifactId>jackson-dataformat-xml</artifactId><version>2.9.10</version></dependency>

两个模块都设置配置文件

spring:rabbitmq:host: 192.168.21.101 # 你的虚拟机IPport: 5672 # 端口virtual-host: /hmall # 虚拟主机username: hmall # 用户名password: 123 # 密码

为两个模块设置序列化器:

package com.hmall.trade.config;import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class JackionConfigration {@Beanpublic MessageConverter messageConverter(){// 1.定义消息转换器Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();// 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息jackson2JsonMessageConverter.setCreateMessageIds(true);return jackson2JsonMessageConverter;}
}

在接受端设置接受代码:

package com.hmall.trade.listener;import com.hmall.trade.service.IOrderService;
import lombok.RequiredArgsConstructor;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;@Component
@RequiredArgsConstructor
public class Orderlisten {private final IOrderService orderService;@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "trade.pay.success.queue", durable = "true"),exchange = @Exchange(name = "pay.topic"),key = "pay.success"))public void listenPaySuccess(Long orderId){orderService.markOrderPaySuccess(orderId);}
}

改造发送端的代码:

        //TODO 5.修改订单状态try {rabbitTemplate.convertAndSend("pay.direct", "pay.success", po.getBizOrderNo());} catch (Exception e) {log.error("支付成功的消息发送失败,支付单id:{}, 交易单id:{}", po.getId(), po.getBizOrderNo(), e);}
package com.hmall.pay.service.impl;import com.baomidou.mybatisplus.core.toolkit.IdWorker;
import com.baomidou.mybatisplus.core.toolkit.StringUtils;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;import com.hmall.api.client.UserClient;
import com.hmall.api.client.tradeClient;
import com.hmall.common.exception.BizIllegalException;
import com.hmall.common.utils.BeanUtils;
import com.hmall.common.utils.UserContext;
import com.hmall.pay.domain.dto.PayApplyDTO;
import com.hmall.pay.domain.dto.PayOrderFormDTO;
import com.hmall.pay.domain.po.PayOrder;
import com.hmall.pay.enums.PayStatus;
import com.hmall.pay.mapper.PayOrderMapper;
import com.hmall.pay.service.IPayOrderService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;import java.time.LocalDateTime;/*** <p>* 支付订单 服务实现类* </p>**/
@Service
@Slf4j
@RequiredArgsConstructor
public class PayOrderServiceImpl extends ServiceImpl<PayOrderMapper, PayOrder> implements IPayOrderService {private final UserClient userClient;private final RabbitTemplate rabbitTemplate;//    private final tradeClient tradeClient;@Overridepublic String applyPayOrder(PayApplyDTO applyDTO) {// 1.幂等性校验PayOrder payOrder = checkIdempotent(applyDTO);// 2.返回结果return payOrder.getId().toString();}@Override@Transactionalpublic void tryPayOrderByBalance(PayOrderFormDTO payOrderDTO) {// 1.查询支付单PayOrder po = getById(payOrderDTO.getId());// 2.判断状态if(!PayStatus.WAIT_BUYER_PAY.equalsValue(po.getStatus())){// 订单不是未支付,状态异常throw new BizIllegalException("交易已支付或关闭!");}// 3.尝试扣减余额userClient.deductMoney(payOrderDTO.getPw(), po.getAmount());// 4.修改支付单状态boolean success = markPayOrderSuccess(payOrderDTO.getId(), LocalDateTime.now());if (!success) {throw new BizIllegalException("交易已支付或关闭!");}//TODO 5.修改订单状态try {rabbitTemplate.convertAndSend("pay.direct", "pay.success", po.getBizOrderNo());} catch (Exception e) {log.error("支付成功的消息发送失败,支付单id:{}, 交易单id:{}", po.getId(), po.getBizOrderNo(), e);}
//        tradeClient.markOrderPaySuccess(po.getBizOrderNo());}public boolean markPayOrderSuccess(Long id, LocalDateTime successTime) {return lambdaUpdate().set(PayOrder::getStatus, PayStatus.TRADE_SUCCESS.getValue()).set(PayOrder::getPaySuccessTime, successTime).eq(PayOrder::getId, id)// 支付状态的乐观锁判断.in(PayOrder::getStatus, PayStatus.NOT_COMMIT.getValue(), PayStatus.WAIT_BUYER_PAY.getValue()).update();}private PayOrder checkIdempotent(PayApplyDTO applyDTO) {// 1.首先查询支付单PayOrder oldOrder = queryByBizOrderNo(applyDTO.getBizOrderNo());// 2.判断是否存在if (oldOrder == null) {// 不存在支付单,说明是第一次,写入新的支付单并返回PayOrder payOrder = buildPayOrder(applyDTO);payOrder.setPayOrderNo(IdWorker.getId());save(payOrder);return payOrder;}// 3.旧单已经存在,判断是否支付成功if (PayStatus.TRADE_SUCCESS.equalsValue(oldOrder.getStatus())) {// 已经支付成功,抛出异常throw new BizIllegalException("订单已经支付!");}// 4.旧单已经存在,判断是否已经关闭if (PayStatus.TRADE_CLOSED.equalsValue(oldOrder.getStatus())) {// 已经关闭,抛出异常throw new BizIllegalException("订单已关闭");}// 5.旧单已经存在,判断支付渠道是否一致if (!StringUtils.equals(oldOrder.getPayChannelCode(), applyDTO.getPayChannelCode())) {// 支付渠道不一致,需要重置数据,然后重新申请支付单PayOrder payOrder = buildPayOrder(applyDTO);payOrder.setId(oldOrder.getId());payOrder.setQrCodeUrl("");updateById(payOrder);payOrder.setPayOrderNo(oldOrder.getPayOrderNo());return payOrder;}// 6.旧单已经存在,且可能是未支付或未提交,且支付渠道一致,直接返回旧数据return oldOrder;}private PayOrder buildPayOrder(PayApplyDTO payApplyDTO) {// 1.数据转换PayOrder payOrder = BeanUtils.toBean(payApplyDTO, PayOrder.class);// 2.初始化数据payOrder.setPayOverTime(LocalDateTime.now().plusMinutes(120L));payOrder.setStatus(PayStatus.WAIT_BUYER_PAY.getValue());payOrder.setBizUserId(UserContext.getUser());return payOrder;}public PayOrder queryByBizOrderNo(Long bizOrderNo) {return lambdaQuery().eq(PayOrder::getBizOrderNo, bizOrderNo).one();}
}

业务改造完毕。

作业:

作业1

将MQ配置抽取到Nacos中管理,微服务中直接使用共享配置。

1、为pay-service模块引入依赖,统一配置管理和读取配置文件的依赖

        <!--统一配置管理--><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId></dependency><!--读取bootstrap文件--><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-bootstrap</artifactId></dependency>

2、在nacos中创建一个共享配置文件

spring:rabbitmq:host: ${hm.mq.host:192.168.21.101}port: ${hm.mq.port:5672} # 端口virtual-host: ${hm.mq.virtual-host:/hmall} # 虚拟主机username: ${hm.mq.username:hmall} # 用户名password: ${hm.mq.password:123} # 密码

3、修改模块中的配置文件

server:port: 8086
feign:okhttp:enabled: true # 开启OKHttp连接池支持sentinel:enabled: true # 开启feign对sentinel的支持
hm:swagger:title: 支付服务接口文档package: com.hmall.pay.controllerdb:database: hm-pay
spring:cloud:sentinel:transport:dashboard: localhost:8090 #访问路径http-method-specify: true # 开启请求方式前缀nacos:server-addr: 192.168.21.101application:name: pay-service

4、添加引导配置文件

bootstrap.yml

spring:application:name: pay-service # 服务名称profiles:active: devcloud:nacos:server-addr: 192.168.21.101 # nacos地址config:file-extension: yaml # 文件后缀名shared-configs: # 共享配置- dataId: shared-jdbc.yaml # 共享mybatis配置- dataId: shared-log.yaml # 共享日志配置- dataId: shared-swagger.yaml # 共享日志配置- dataId: shared-seata.yaml- dataId: shared-mq.yaml

作业二:改造下单功能

改造下单功能,将基于OpenFeign的清理购物车同步调用,改为基于RabbitMQ的异步通知:

  • 定义topic类型交换机,命名为trade.topic

  • 定义消息队列,命名为cart.clear.queue

  • cart.clear.queuetrade.topic绑定,BindingKeyorder.create

  • 下单成功时不再调用清理购物车接口,而是发送一条消息到trade.topic,发送消息的RoutingKeyorder.create,消息内容是下单的具体商品、当前登录用户信息

  • 购物车服务监听cart.clear.queue队列,接收到消息后清理指定用户的购物车中的指定商品

1、在car-service模块添加依赖:

        <!--消息发送--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>

2、修改配置文件:

spring:application:name: cart-service # 服务名称profiles:active: devcloud:nacos:server-addr: 192.168.21.101 # nacos地址config:file-extension: yaml # 文件后缀名shared-configs: # 共享配置- dataId: shared-jdbc.yaml # 共享mybatis配置- dataId: shared-log.yaml # 共享日志配置- dataId: shared-swagger.yaml # 共享日志配置- dataId: shared-seata.yaml # 共享日志配置- dataId: shared-mq.yaml # 共享日志配置

3、添加配置类配置序列化器

@Configuration
public class JackionConfigration {@Beanpublic MessageConverter messageConverter(){// 1.定义消息转换器Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();// 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息jackson2JsonMessageConverter.setCreateMessageIds(true);return jackson2JsonMessageConverter;}
}

4、创建监听:

package com.hmall.cart.listener;import com.hmall.cart.service.impl.CartServiceImpl;
import lombok.RequiredArgsConstructor;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.util.Set;@Component
@RequiredArgsConstructor
public class Catlisten {private final CartServiceImpl cartService;@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "cart.clear.queue", durable = "true"),exchange = @Exchange(name = "trade.topic"),key = "order.create"))public void listenPaySuccess(Set<Long> orderIds){cartService.removeByItemIds(orderIds);}
}

修改发送端:trade-service

        //TODO 3.清理购物车商品
//        cartService.removeByItemIds(itemIds);
//        cartClient.deleteCartItemByIds(itemIds);rabbitTemplate.convertAndSend("trade.topic","order.create",itemIds);
    @Transactional@GlobalTransactionalpublic Long createOrder(OrderFormDTO orderFormDTO) {// 1.订单数据Order order = new Order();// 1.1.查询商品List<OrderDetailDTO> detailDTOS = orderFormDTO.getDetails();// 1.2.获取商品id和数量的MapMap<Long, Integer> itemNumMap = detailDTOS.stream().collect(Collectors.toMap(OrderDetailDTO::getItemId, OrderDetailDTO::getNum));Set<Long> itemIds = itemNumMap.keySet();// 1.3.查询商品
//        List<ItemDTO> items = itemService.queryItemByIds(itemIds);List<ItemDTO> items = itemClient.queryItemByIds(itemIds);if (items == null || items.size() < itemIds.size()) {throw new BadRequestException("商品不存在");}// 1.4.基于商品价格、购买数量计算商品总价:totalFeeint total = 0;for (ItemDTO item : items) {total += item.getPrice() * itemNumMap.get(item.getId());}order.setTotalFee(total);// 1.5.其它属性order.setPaymentType(orderFormDTO.getPaymentType());order.setUserId(UserContext.getUser());order.setStatus(1);// 1.6.将Order写入数据库order表中save(order);// 2.保存订单详情List<OrderDetail> details = buildDetails(order.getId(), items, itemNumMap);detailService.saveBatch(details);//TODO 3.清理购物车商品
//        cartService.removeByItemIds(itemIds);
//        cartClient.deleteCartItemByIds(itemIds);rabbitTemplate.convertAndSend("trade.topic","order.create",itemIds);// 4.扣减库存try {itemClient.deductStock(detailDTOS);
//            itemService.deductStock(detailDTOS);} catch (Exception e) {throw new RuntimeException("库存不足!");}return order.getId();}

修改完毕。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/59190.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

[ Linux 命令基础 2 ] Linux 命令详解-系统管理命令

&#x1f36c; 博主介绍 &#x1f468;‍&#x1f393; 博主介绍&#xff1a;大家好&#xff0c;我是 _PowerShell &#xff0c;很高兴认识大家~ ✨主攻领域&#xff1a;【渗透领域】【数据通信】 【通讯安全】 【web安全】【面试分析】 &#x1f389;点赞➕评论➕收藏 养成习…

Linux:vim命令总结及环境配置

文章目录 前言一、vim的基本概念二、vim模式命令解析1. 命令模式1&#xff09;命令模式到其他模式的转换&#xff1a;2&#xff09;光标定位&#xff1a;3&#xff09;其他命令&#xff1a; 2. 插入模式3. 底行模式4. 替换模式5. 视图模式6. 外部命令 三、vim环境的配置1. 环境…

【在Linux世界中追寻伟大的One Piece】多路转接epoll

目录 1 -> I/O多路转接之poll 1.1 -> poll函数接口 1.2 -> poll的优点 1.3 -> poll的缺点 1.4 -> poll示例 1.4.1 -> 使用poll监控标准输入 2 -> I/O多路转接之epoll 2.1 -> 初识epoll 2.2 -> epoll的相关系统调用 2.2.1 -> epoll_cre…

JS爬虫实战之TikTok_Shop验证码

TikTok_Shop验证码逆向 逆向前准备思路1- 确认接口2- 参数确认3- 获取轨迹参数4- 构建请求5- 结果展示 结语 逆向前准备 首先我们得有TK Shop账号&#xff0c;否则是无法抓取到数据的。拥有账号后&#xff0c;我们直接进入登录。 TikTok Shop 登录页面 思路 逆向步骤一般分为…

自由学习记录(20)

PureMVC 把 LoginView 视图组件赋给 viewComponent&#xff0c;然后用它来监听用户事件&#xff0c;更新显示状态。 command 将请求&#xff08;例如方法调用&#xff09;封装成一个对象&#xff0c;从而使得用户可以通过该对象来调用相应的操作。 Command&#xff08;命令…

数据结构-并查集专题(2)

一、前言 接&#xff08;1&#xff09;完成剩余题目和了解并查集运用求解最小生成树的Kruskal算法 二、专题训练 2.1 题目总览 前四题见&#xff08;1&#xff09; 2.2 1568: 并查集-家谱 思路 首先这个题目的描述就有问题&#xff0c;它说每一组的父子关系由两行组成&…

吾店云介绍 – 中国人的WordPress独立站和商城系统平台

经过多年在WordPress建站领域的摸索和探索&#xff0c;能轻松创建和管理各种类型网站的平台 – 吾店云建站平台诞生了。 应该说这是一个艰苦卓绝的过程&#xff0c;在中国创建一个能轻松创建和使用WordPress网站的平台并不容易&#xff0c;最主要是网络环境和托管软件的限制。…

测试实项中的偶必现难测bug--<pre>标签问题

问题描述: 用户从网上copy的简介信息可能带有<pre>标签,导致安卓上的内容只能一行滑动展示,但是ios有对这个标签做特殊处理: 分析: <pre> 标签是 HTML 中用于表示预格式化文本的标签,它的作用是保留文本中的空格、换行和缩进。它的全称是 preformatted text…

管理 Elasticsearch 变得更容易了,非常容易!

作者&#xff1a;来自 Elastic Ken Exner Elasticsearch 用户&#xff0c;我们听到了你的心声。管理 Elasticsearch 有时会变得很复杂&#xff0c;面临的挑战包括性能调整、问题检测和资源优化。我们一直致力于简化你的体验。今天&#xff0c;我们宣布了自收购 Opster 以来的一…

微波无源器件 OMT1 一种用于倍频程接收机前端的十字转门四脊正交模耦合器(24-51GHz)

摘要&#xff1a; 我们报道了一种用于天文学射电望远镜的毫米波波长接收机的一种十字转门四脊OMT的设计&#xff0c;制造和实测结果。此四脊OMT被直接兼容到一个四脊馈电喇叭来实现可以拓展矩形波导单模带宽的双极化低噪声接收机。使用了24-51GHz的带宽&#xff0c;OMT证实了0.…

如何使用IDEA创建Maven/SSM工程?

鉴于很多学校还在教授SSMJSP&#xff0c;很多同学不会使用IDEA创建Maven工程&#xff0c;这里进行说明 windows下安装jdk并配置环境 添加链接描述Windows下安装Maven并配置环境 首先你要本地安装jdk&#xff0c;Maven并配置基础环境变量&#xff0c;然后对IDEA进行jdk、Mave…

网络安全常见面试题--含答案

本文面试题汇总&#xff1a; 防范常见的 Web 攻击 重要协议分布层 arp协议的工作原理rip协议是什么&#xff1f;rip的工作原理 什么是RARP&#xff1f;工作原理OSPF协议&#xff1f;OSPF的工作原理 TCP与UDP区别总结 什么是三次握手四次挥手&#xff1f; tcp为什么要三次握手&…

C++内存泄漏检查工具——Valgrind(--tool = memcheck)

在写c程序中通常遇到程序崩溃&#xff0c;我们首先想到的是内存问题 如果代码量少看几遍就能看得出来&#xff0c;如果代码量多起来我们就得借助一些工具了比如gdb调试和valgrind中得memcheck来解决内存问题 我用的ubuntu&#xff0c;先安装valgrind sudo apt update sudo a…

库打包工具 rollup

库打包工具 rollup 摘要 **概念&#xff1a;**rollup是一个模块化的打包工具 注&#xff1a;实际应用中&#xff0c;rollup更多是一个库打包工具 与Webpack的区别&#xff1a; 文件处理&#xff1a; rollup 更多专注于 JS 代码&#xff0c;并针对 ES Module 进行打包webpa…

微服务容器化部署实践(FontConfiguration.getVersion)

文章目录 前言一、整体步骤简介二、开始实战1.准备好微服务2.将各个微服务打包为镜像第一种第二种3. 将各个打包好的镜像,通过docker-compose容器编排,运行即可总结前言 docker容器化部署微服务: 将微服务容器化部署到 Docker 容器中是一个常见的做法,可以提高应用的可移…

人工智能(AI)和机器学习(ML)技术学习流程

目录 人工智能(AI)和机器学习(ML)技术 自然语言处理(NLP): Word2Vec: Seq2Seq(Sequence-to-Sequence): Transformer: 范式、架构和自注意力: 多头注意力: 预训练、微调、提示工程和模型压缩: 上下文学习、思维链、全量微调、量化、剪枝: 思维树、思维…

带你读懂什么是AI Agent智能体

一、智能体的定义与特性 定义&#xff1a;智能体是一个使用大语言模型&#xff08;LLM&#xff09;来决定应用程序控制流的系统。然而&#xff0c;智能体的定义并不唯一&#xff0c;不同人有不同的看法。Langchain的创始人Harrison Chase从技术角度给出了定义&#xff0c;但更…

Qt_day3_信号槽

目录 信号槽 1. 概念 2. 函数原型 3. 连接方式 3.1 自带信号 → 自带槽 3.2 自带信号 → 自定义槽 3.3 自定义信号 4. 信号槽传参 5. 对应关系 5.1 一对多 5.2 多对一 信号槽 1. 概念 之前的程序界面只能看&#xff0c;不能交互&#xff0c;信号槽可以让界面进行人机…

《ElementPlus 与 ElementUI 差异集合》Icon 图标 More 差异说明

参考 《element plus 使用 icon 图标(两种方式)》使用 icon 升级 Vue2 升级 Vue3 项目时&#xff0c;遇到命名时的实心与空心点差异&#xff01; ElementUI&#xff1a; 实心是 el-icon-more空心是 el-icon-more-outline ElementPlus&#xff1a; 实心是 el-icon-more-fill…

RWKV 通过几万 token 的 CoT 解决几乎 100% 的数独问题(采用 29M 参数的小模型)

RWKV 做 CoT 特别有优势&#xff0c;因为推理速度和显存占用与上下文无关。即使做几百万 token 的 CoT 也不会变慢或额外消耗显存。 RWKV 社区开发者 Jellyfish042 训练了一个用于解决数独谜题的专用 RWKV 模型 Sudoku-RWKV &#xff0c;项目的训练代码和数据制作脚本均已开源…