RabbitMQ工作模式
1.路由模式
创建交换机 , 连接队列 (生产者)
public class MyTestExDirect {@Testpublic void bbb() throws IOException, TimeoutException {ConnectionFactory connectionFactory = new ConnectionFactory();//连接mqconnectionFactory.setUsername("账号");connectionFactory.setPassword("密码");connectionFactory.setHost("ip地址");connectionFactory.setPort(端口号);connectionFactory.setVirtualHost("/aaa");//建立连接Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();//创建交换机channel.exchangeDeclare("ex_direct", BuiltinExchangeType.DIRECT,false);//创建队列/*** String queue, 队列的名称* boolean durable, 持久化* boolean exclusive, 是否独占* boolean autoDelete, 受否自动删除* Map<String, Object> arguments 参数*/channel.queueDeclare("mydirect1",false,false,false,null);channel.queueDeclare("mydirect2",false,false,false,null);//绑定交换机和队列 设置routingkeychannel.queueBind("mydirect1","ex_direct","error");channel.queueBind("mydirect2","ex_direct","test");channel.queueBind("mydirect2","ex_direct","test2");//交换机 routingkey 根据routingkey在队列上发布消息channel.basicPublish("ex_direct","error",null,"路由模式测试".getBytes());}
}

启动测试
交换机创建成功

队列创建成功 , 与交换机连接成功

通过routingkey "error" 将消息发送到 mydirect1

创建消费者
public class ConsumerAppDirect
{public static void main( String[] args ) throws IOException, TimeoutException {ConnectionFactory connectionFactory = new ConnectionFactory();//连接mqconnectionFactory.setUsername("账号"); connectionFactory.setPassword("密码"); connectionFactory.setHost("ip地址");connectionFactory.setPort(端口号); connectionFactory.setVirtualHost("/aaa");//建立连接Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();Consumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String s = new String(body);System.out.println("mq:-----aaa"+s);}};channel.basicConsume("mydirect1",true,consumer);}
}

开启监控

2.Topics 主题模式
Topic类型与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key 的时候使用通配符!
Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert
通配符规则:
#:匹配一个或多个词
*:匹配不多不少恰好1个词 test.* test.insert
举例:
item.#:能够匹配item.insert.abc 或者 item.insert
item.*:只能匹配item.insert

创建交换机和生产者
public class MyTestExTopics {@Testpublic void ccc() throws IOException, TimeoutException {ConnectionFactory connectionFactory = new ConnectionFactory();//连接mqconnectionFactory.setUsername("账号");connectionFactory.setPassword("密码"); connectionFactory.setHost("ip地址");connectionFactory.setPort(端口号);connectionFactory.setVirtualHost("/aaa");//建立连接Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();//创建交换机channel.exchangeDeclare("ex_topics", BuiltinExchangeType.TOPIC,false);//创建队列/*** String queue, 队列的名称* boolean durable, 持久化* boolean exclusive, 是否独占* boolean autoDelete, 受否自动删除* Map<String, Object> arguments 参数*/channel.queueDeclare("mytopics1",false,false,false,null);channel.queueDeclare("mytopics2",false,false,false,null);//绑定交换机和队列 设置routingkeychannel.queueBind("mytopics1","ex_topics","test.#");channel.queueBind("mytopics2","ex_topics","*.aaa");channel.queueBind("mytopics2","ex_topics","test.*");//交换机 此处的routingkey应该是具体的值 根据routingkey在队列上发布消息channel.basicPublish("ex_topics","test.aaa",null,"TOPIC模式测试".getBytes());}
}

测试



发布消息成功
消费者监听参考路由模式 , 只需要修改队列就行
SpringBoot整合RabbitMQ
1.搭建项目
添加依赖
<!--2. rabbitmq--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency>

添加配置文件

2.创建工作模式(主题模式)
1)创建交换机和队列
package com.example.config;import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class TopicMqConfig {@Value("${mq.exchange.name}")private String EXCHANGENAME;@Value("${mq.queue.name}")private String QUEUENAME1;@Value("${mq.queue.name}")private String QUEUENAME2;//创建交换机@Bean("ex1")public Exchange getExchange(){Exchange exchange = ExchangeBuilder.topicExchange(EXCHANGENAME).durable(false).build();return exchange;}//创建队列@Bean("queue1")public Queue getQueue1(){Queue queue1 = QueueBuilder.nonDurable(QUEUENAME1).build();return queue1;}@Bean("queue2")public Queue getQueue2(){Queue queue2 = QueueBuilder.nonDurable(QUEUENAME1).build();return queue2;}//绑定交换机和队列@Bean("binding1")public Binding bindingQueueToExchange1(@Qualifier("ex1") Exchange exchange,@Qualifier("queue1") Queue queue){Binding binding1 = BindingBuilder.bind(queue).to(exchange).with("*.*").noargs();return binding1;}@Bean("binding2")public Binding bindingQueueToExchange2(@Qualifier("ex1") Exchange exchange,@Qualifier("queue2") Queue queue){Binding binding2 = BindingBuilder.bind(queue).to(exchange).with("test.*").noargs();return binding2;}
}

2)创建生产者

测试



3)创建消费者

创建配置文件

创建测试类 监听队列
package com.example.message;import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class ConsumerMessage {@RabbitListener(queues = "test_queue2")public void xxx(Message message){byte[] body = message.getBody();String s = new String(body);System.out.println(s);}
}

测试


MQ高级特性,消息的可靠性传递
1.确认模式
开启确认模式 修改配置

创建测试类
@SpringBootTest
public class MqTtst {@Value("${mq.exchange.name}")private String EXCHANGENAME;@Resourceprivate RabbitTemplate rabbitTemplate;@Testvoid sendMsg(){rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {@Overridepublic void confirm(CorrelationData correlationData, boolean b, String s) {if (b){System.out.println("发送消息成功");}else {System.out.println("发送消息失败,原因:"+s);}}});rabbitTemplate.convertAndSend(EXCHANGENAME,"test.topic","测试springBoot");}
}

启动测试

2.消息回退
当交换机接收到消息 , 但队列收不到消息时 , 使用回退
修改配置

测试
@Test
void sendMsgReturn(){// 消息回退rabbitTemplate.setMandatory(true);//rabbitTemplate.setReturnsCallback(returnedMessage -> System.out.println("消息回退,回退的消息是:"+new String(returnedMessage.getMessage().getBody())));rabbitTemplate.convertAndSend(EXCHANGENAME,"test.topic","测试springBoot");
}

3.Consumer Ack
三种确认方式
自动确认:acknowledge="none" 。不管处理成功与否,业务处理异常也不管
(当消费者意担接收到消息之后,消费者就会给broker一个回执,证明已经接收到消息 了,不管消息到底是否成功)
手动确认:acknowledge="manual" 。可以解决业务异常的情况
(收到消息之后不会立马确认收到消息,当业务处理没有问题的时候手动的调用代码的方 式来进行处理,如果业务失败了,就可以进行额外的操作)
根据异常情况确认:acknowledge="auto",(这种方式使用麻烦,不作讲解)
其中自动确认是指,当消息一旦被Consumer接收到,则自动确认收到,并将相应 message 从 RabbitMQ 的消息缓存中移除。但是在实际业务处理中,很可能消息接收到,业务处理出现异常,那么该消息就会丢失。如果设置了手动确认方式,则需要在业务处理成功后,调用channel.basicAck(),手动签收,如果出现异常,则调用channel.basicNack()方法,让其自动重新发送消息。
1)自动确认

2)手动确认
修改配置 开启手动签收
3)创建测试
@Component
public class ShouDingQianShouMeaasge implements ChannelAwareMessageListener {@Override@RabbitListener(queues = "test_queue2")public void onMessage(Message message, Channel channel) throws Exception {Thread.sleep(2000);byte[] body = message.getBody();String s = new String(body);System.out.println(s);long deliveryTag = message.getMessageProperties().getDeliveryTag();try {System.out.println(1/0);channel.basicAck(deliveryTag,true);}catch (Exception e){System.out.println("拒绝签收");channel.basicNack(deliveryTag,true,true);}}
}

启动测试
有异常拒绝签收


无异常签收成功

