(二)丶RabbitMQ的六大核心

一丶什么是MQ

        Message Queue(消息队列)简称MQ,是一种应用程序对应用程序的消息通信机制。在MQ中,消息以队列形式存储,以便于异步传输,在MQ中,发布者(生产者)将消息放入队列,而消费者从队列中读取并处理这些消息,这种设计允许生产者和消费者之间解耦,提高系统的响应速度和吞吐量,MQ常用于解耦系统之间的依赖关系,提高系统的稳定性和可扩展性,MQ还支持消峰,即以稳定的系统资源应对突发的流量冲剂,然而使用MQ也可能带来一些挑战,如:系统可用性降低、系统复杂度提高、以及消息一致性问题等;

二丶常见MQ有哪些

        目前业界有很多的MQ产品,例如RabbitMQ、RocketMQ、Kafka、ZeroMQ、MetaMQ等,也有直接使用Redis充当消息队列的案列,而这些消息队列产品,各有侧重,在实际选型时,需要结合自身需求及MQ的产品特征等,综合考虑;

RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是Erlang语言编写,而集群和故障转移是构建在开放电信平台框架上的。所有的主要变成语言均有与代理接口通讯的客户端库。

三丶RabbitMQ六种消息模式

        1.Simple Work Queue(简单工作队列):常见的一对一模式,一条消息由一个消费者进行消费。如果有多个消费者,默认是使用轮询的方式将消息分配消费者

        2.Work Queues(工作队列模式):也叫公屏队列,能者多劳的消息队列模型。队列必须收到来自消费者的手动ACK(消息确认机制)才可以继续往消费者发送消息。

        3.Publish/Subscribe(发布订阅模式):一条消息被多和消费者消费。

        4.Ruoting(路由模式):有选择的接收消息。

        5.Topics(主题模式):通过一定的规则来选择性的接收消息

        6.RPC模式:发布者发布消息,并且通过RPC方式等待结果。

(1)Simple Work Queue(简单工作队列)

        消息生产后将消息放入队列,消息的消费者(consumer)监听消息队列嘛,如果队列中有消息就消费掉,消息被消费后自动从消息队列中删除。(也可能出现异常)

/*** @Description:获取RabbitMQ连接* @Author: xy丶*/
public class RabbitMQConnection {public final static String RABBITMQ_SERVER_HOST = "192.168.0.122";public final static int RABBITMQ_SERVER_PORT = 5672;public final static String VIRTUAL_HOST = "/XY_HOST";public static Connection getConnection() throws IOException, TimeoutException {//1、创建连接ConnectionFactory factory = new ConnectionFactory();//2、设置主机名factory.setHost(RABBITMQ_SERVER_HOST);//3、设置通讯端口,默认是5672,不专门设置也可以factory.setPort(RABBITMQ_SERVER_PORT);//4、设置账号和密码factory.setUsername("admin");factory.setPassword("admin");//4、设置Virtual Hostfactory.setVirtualHost(VIRTUAL_HOST);//5、创建连接return factory.newConnection();}}

       消费者

/*** @Description:简单工作队列模式消费者* @Author: xy丶*/
@Slf4j
public class Consumer {private final static String SIMPLE_WORK_QUEUE= "simple_work_queue";public static void main(String[] args) throws IOException, InterruptedException, TimeoutException {//获取连接Connection connection = RabbitMQConnection.getConnection();//获取通道Channel channel = connection.createChannel();channel.queueDeclare(SIMPLE_WORK_QUEUE, false, false, false, null);QueueingConsumer consumer = new QueueingConsumer(channel);channel.basicConsume(SIMPLE_WORK_QUEUE, true, consumer);QueueingConsumer.Delivery delivery = consumer.nextDelivery();String message = new String(delivery.getBody());log.info("Consumer reception "+message);}
}

生产者

/*** @Description:简单工作队列模式生产者* @Author: xy丶*/
@Slf4j
public class Producer {private final static String SIMPLE_WORK_QUEUE = "simple_work_queue";/*** 简单工作队列模式* @param args* @throws IOException* @throws TimeoutException*/public static void main(String[] args) throws IOException, TimeoutException {//创建连接Connection connection = RabbitMQConnection.getConnection();//创建通道Channel channel = connection.createChannel();// 声明队列 String var1, 是否持久化// boolean var2, 是否排外 只允许该channel访问该队列 为true只能有一个消费者// boolean var3, 是否自动删除// boolean var4, 消费完删除// Map<String, Object> var5 其他属性channel.queueDeclare(SIMPLE_WORK_QUEUE, false, false, false, null);// 消息内容 String var1, 是否持久化// boolean var2, 是否排外 只允许该channel访问该队列 为true只能有一个消费者// boolean var3, 是否自动删除// boolean var4, 消费完删除// Map<String, Object> var5 其他属性String message = "Hello Word!!!";channel.basicPublish("", SIMPLE_WORK_QUEUE,null,message.getBytes());log.info("Producer send "+message);//最后关闭通关和连接channel.close();connection.close();}
       (2).Work Queues(工作队列模式) 创建生产者两个消费者看看效果

        生产者

package com.puwang.MQ.workQueue;import com.puwang.MQ.config.RabbitMQConnection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import lombok.extern.slf4j.Slf4j;import java.io.IOException;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;/*** @Description:工作队列模式生产者* @Author: xy丶*/
@Slf4j
public class Producer {private final static String QUEUE_WORK = "QUEUE_WORK";public static void main(String[] args) throws Exception {Connection connection = RabbitMQConnection.getConnection();Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_WORK, false, false, false, null);for(int i = 0; i < 20; i++){String message = "娃哈哈" + i;channel.basicPublish("", QUEUE_WORK, null, message.getBytes());System.out.println("send=============="+message);Thread.sleep(i*10);}channel.close();connection.close();}
}
@Slf4j
public class WorkQueueConsumer1 {private final static  String QUEUE_WORK = "QUEUE_WORK";/*** 结果:** 1、一条消息只会被一个消费者接收;** 2、rabbit采用轮询的方式将消息是平均发送给消费者的;** 3、消费者在处理完某条消息后,才会收到下一条消息。* @param args* @throws IOException* @throws TimeoutException*/public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {Connection connection = RabbitMQConnection.getConnection();Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_WORK, false,false, false,null);//同一时刻服务器只会发送一条消息给消费者channel.basicQos(1);QueueingConsumer consumer = new QueueingConsumer(channel);//关于手工确认 待之后有时间研究下channel.basicConsume(QUEUE_WORK, false, consumer);while(true){QueueingConsumer.Delivery delivery = consumer.nextDelivery();String message = new String(delivery.getBody());log.info("[消费者1] Received1 '"+message+"'");Thread.sleep(10);//返回确认状态channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}}
}@Slf4j
public class WorkQueueConsumer2 {private final static  String QUEUE_WORK = "QUEUE_WORK";public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {Connection connection = RabbitMQConnection.getConnection();Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_WORK, false,false, false,null);//同一时刻服务器只会发送一条消息给消费者channel.basicQos(1);QueueingConsumer consumer = new QueueingConsumer(channel);//关于手工确认 待之后有时间研究下channel.basicConsume(QUEUE_WORK, false, consumer);while(true){QueueingConsumer.Delivery delivery = consumer.nextDelivery();String message = new String(delivery.getBody());log.info("[消费者2] Received1 '"+message+"'");Thread.sleep(10);//返回确认状态channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}}
}
       (3).Publish/Subscribe(发布订阅模式)

        生产者

/*** 订阅模式 生产者* 订阅模式:一个生产者发送的消息会被多个消费者获取。* 消息产生者将消息放入交换机,交换机发布订阅把消息发送到所有消息队列中,对应消息队列的消费者拿到消息进行消费* 相关场景:邮件群发,群聊天,广播(广告)* @Description:发布订阅模式生产者* @Author: xy丶*/
@Slf4j
public class Producer {private final static String PUBLISH_SUBSCRIBE_EXCHANGE = "Publish_subscribe_exchange";/*** 交换机发布订阅把消息发送到所有消息队列中,对应消息队列的消费者拿到消息进行消费* 相关场景:邮件群发,群聊天,广播(广告)* @param args*/public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {//获取连接Connection connection = RabbitMQConnection.getConnection();//从连接中获取一个通道Channel channel = connection.createChannel();消费者绑定交换机 参数1 队列 参数2 交换机 参数3 routingKeychannel.exchangeDeclare(PUBLISH_SUBSCRIBE_EXCHANGE, "fanout");//发送消息for (int i = 0; i < 10; i++) {String message = "哇哈哈哈!!!"+i;log.info("send message:" + message);//发送消息channel.basicPublish(PUBLISH_SUBSCRIBE_EXCHANGE, "", null, message.getBytes("utf-8"));Thread.sleep(100 * i);}channel.close();connection.close();}
}

消费者

/*** @Description:发布订阅模式消费者* @Author: xy丶*/
@Slf4j
public class PublishSubscribeConsumer1 {//交换机名称private final static String PUBLISH_SUBSCRIBE_EXCHANGE = "Publish_subscribe_exchange";//队列名称private static final String PUBLISH_SUBSCRIBE_QUEUE    = "publish_subscribe_queue";public static void main(String[] args) throws IOException, TimeoutException {//获取连接Connection connection = RabbitMQConnection.getConnection();//从连接中获取一个通道final Channel channel = connection.createChannel();//声明交换机(分发:发布/订阅模式)channel.exchangeDeclare(PUBLISH_SUBSCRIBE_EXCHANGE, "fanout");//声明队列channel.queueDeclare(PUBLISH_SUBSCRIBE_QUEUE, false, false, false, null);//将队列绑定到交换机channel.queueBind(PUBLISH_SUBSCRIBE_QUEUE, PUBLISH_SUBSCRIBE_EXCHANGE, "");//保证一次只分发一个int prefetchCount = 1;channel.basicQos(prefetchCount);//定义消费者DefaultConsumer consumer = new DefaultConsumer(channel) {//当消息到达时执行回调方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body, "utf-8");log.info("[PublishSubscribeConsumer1] Receive message:" + message);try {//消费者休息2s处理业务Thread.sleep(1000);}catch (InterruptedException e) {e.printStackTrace();}finally {System.out.println("[1] done");//手动应答channel.basicAck(envelope.getDeliveryTag(), false);}}};//设置手动应答boolean autoAck = false;//监听队列channel.basicConsume(PUBLISH_SUBSCRIBE_QUEUE, autoAck, consumer);}
}/*** @Description:发布订阅模式消费者* @Author: xy丶*/
@Slf4j
public class PublishSubscribeConsumer1 {//交换机名称private final static String PUBLISH_SUBSCRIBE_EXCHANGE = "Publish_subscribe_exchange";//队列名称private static final String PUBLISH_SUBSCRIBE_QUEUE    = "publish_subscribe_queue";public static void main(String[] args) throws IOException, TimeoutException {//获取连接Connection connection = RabbitMQConnection.getConnection();//从连接中获取一个通道final Channel channel = connection.createChannel();//声明交换机(分发:发布/订阅模式)channel.exchangeDeclare(PUBLISH_SUBSCRIBE_EXCHANGE, "fanout");//声明队列channel.queueDeclare(PUBLISH_SUBSCRIBE_QUEUE, false, false, false, null);//将队列绑定到交换机channel.queueBind(PUBLISH_SUBSCRIBE_QUEUE, PUBLISH_SUBSCRIBE_EXCHANGE, "");//保证一次只分发一个int prefetchCount = 1;channel.basicQos(prefetchCount);//定义消费者DefaultConsumer consumer = new DefaultConsumer(channel) {//当消息到达时执行回调方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body, "utf-8");log.info("[PublishSubscribeConsumer1] Receive message:" + message);try {//消费者休息2s处理业务Thread.sleep(1000);}catch (InterruptedException e) {e.printStackTrace();}finally {System.out.println("[1] done");//手动应答channel.basicAck(envelope.getDeliveryTag(), false);}}};//设置手动应答boolean autoAck = false;//监听队列channel.basicConsume(PUBLISH_SUBSCRIBE_QUEUE, autoAck, consumer);}
}/*** @Description:发布订阅模式消费者* @Author: xy丶*/
@Slf4j
public class PublishSubscribeConsumer2 {//交换机名称private final static String PUBLISH_SUBSCRIBE_EXCHANGE = "Publish_subscribe_exchange";//队列名称private static final String PUBLISH_SUBSCRIBE_QUEUE    = "publish_subscribe_queue";public static void main(String[] args) throws IOException, TimeoutException {//获取连接Connection connection = RabbitMQConnection.getConnection();//从连接中获取一个通道final Channel channel = connection.createChannel();//声明交换机(分发:发布/订阅模式)channel.exchangeDeclare(PUBLISH_SUBSCRIBE_EXCHANGE, "fanout");//声明队列channel.queueDeclare(PUBLISH_SUBSCRIBE_QUEUE, false, false, false, null);//将队列绑定到交换机channel.queueBind(PUBLISH_SUBSCRIBE_QUEUE, PUBLISH_SUBSCRIBE_EXCHANGE, "");//保证一次只分发一个int prefetchCount = 1;channel.basicQos(prefetchCount);//定义消费者DefaultConsumer consumer = new DefaultConsumer(channel) {//当消息到达时执行回调方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body, "utf-8");log.info("[PublishSubscribeConsumer2] Receive message:" + message);try {//消费者休息2s处理业务Thread.sleep(1000);}catch (InterruptedException e) {e.printStackTrace();}finally {System.out.println("[1] done");//手动应答channel.basicAck(envelope.getDeliveryTag(), false);}}};//设置手动应答boolean autoAck = false;//监听队列channel.basicConsume(PUBLISH_SUBSCRIBE_QUEUE, autoAck, consumer);}
}

       (4).Ruoting(路由模式)

消费者

/*** 1)消息生产者将消息发送给交换机按照路由判断,路由是字符串(info) 当前产生的消息携带路由字符(对象的方法),交换机根据路由的key,只能匹配上路由key对应的消息队列,对应的消费者才能消费消息* 2)根据业务功能定义路由字符串* 3)从系统的代码逻辑中获取对应的功能字符串,将消息任务扔到对应的队列中业务场景:error 通知;EXCEPTION;错误通知的功能;传统意义的错误通知;* 客户通知;利用key路由,可以将程序中的错误封装成消息传入到消息队列中,开发者可以自定义消费者,实时接收错误* @Description:发布订阅模式生产者* @Author: xy丶*/
@Slf4j
public class Producer {//路由交换机名称static final String ROUTING_DIRECT_EXCHANGE = "Routing_direct_exchange";//队列名称1 发送static final String QUEUE_SEND = "queue_send";//队列名称2  接收static final String QUEUE_RECEIVE = "queue_receive";public static void main(String[] args) throws Exception {//创建连接Connection connection = RabbitMQConnection.getConnection();//创建频道Channel channel = connection.createChannel();/*** 声明交换机* 参数1:交换机名称* 参数2:交换机类型,fanout,toppic,direct,headers*/channel.exchangeDeclare(ROUTING_DIRECT_EXCHANGE, "direct");/*** 声明(创建)队列* 参数1:队列名称* 参数2:是否定义持久化队列* 参数3:是否独占本次连接* 参数4:是否在不使用的时候自动删除队列* 参数5:队列其它参数*/channel.queueDeclare(QUEUE_SEND,true,false,false,null);channel.queueDeclare(QUEUE_RECEIVE,true,false,false,null);/*** 队列绑定交换机* 参数1:队列名称* 参数2:是否定义持久化队列* 参数3:是否独占本次连接*/channel.queueBind(QUEUE_SEND,ROUTING_DIRECT_EXCHANGE,"send");channel.queueBind(QUEUE_RECEIVE,ROUTING_DIRECT_EXCHANGE,"receive");//发送消息String message = "路由模式:routing key 为 send";/*** 参数1:交换机名称,如果没有指定则使用默认Default Exchage* 参数2:路由key,简单模式可以传递队列名称* 参数3:消息其它属性* 参数4:消息内容*/channel.basicPublish(ROUTING_DIRECT_EXCHANGE,"send",null,message.getBytes());log.info("已发送消息:"+message);//发送消息message = "路由模式:routing key 为 receive";/*** 参数1:交换机名称,如果没有指定则使用默认Default Exchage* 参数2:路由key,简单模式可以传递队列名称* 参数3:消息其它属性* 参数4:消息内容*/channel.basicPublish(ROUTING_DIRECT_EXCHANGE,"receive",null,message.getBytes());log.info("已发送消息:"+message);//关闭资源channel.close();connection.close();}
}

消费者


/***路由消费者*/
@Slf4j
public class RoutingConsumer1 {//路由交换机名称static final String ROUTING_DIRECT_EXCHANGE = "Routing_direct_exchange";//队列名称1 发送static final String QUEUE_SEND = "queue_send";public static void main(String[] args) throws Exception {//创建连接Connection connection = RabbitMQConnection.getConnection();//创建通道(频道)Channel channel = connection.createChannel();//声明交换机channel.exchangeDeclare(ROUTING_DIRECT_EXCHANGE,"direct");/*** 声明(创建)队列* 参数1:队列名称* 参数2:是否定义持久化队列* 参数3:是否独占本次连接* 参数4:是否在不使用的时候自动删除队列* 参数5:队列其它参数*/channel.queueDeclare(QUEUE_SEND,true,false,false,null);//队列绑定交换机channel.queueBind(QUEUE_SEND, ROUTING_DIRECT_EXCHANGE,"send");//创建消费这,并设置消息处理DefaultConsumer consumer = new DefaultConsumer(channel) {/*** consumerTag 消息者标签,在channel.basicConsume时候可以指定* envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,*          消息和重传标志(收到消息失败后是否需要重新发送)* properties 属性信息* body 消息*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//路由keylog.info("路由key为:" + envelope.getRoutingKey());//交换机log.info("交换机为:" + envelope.getExchange());//消息idlog.info("消息id为:" + envelope.getDeliveryTag());//收到的消息log.info("消费者1-接收到的消息为:" + new String(body, "utf8"));}};/*** 监听消息* 参数1:队列名称* 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动确认* 参数3:消息接收到后回调*/channel.basicConsume(QUEUE_SEND, true, consumer);}/***路由消费者*/
@Slf4j
public class RoutingConsumer2 {//路由交换机名称static final String ROUTING_DIRECT_EXCHANGE = "Routing_direct_exchange";//队列名称1 发送static final String QUEUE_RECEIVE = "queue_receive";public static void main(String[] args) throws Exception {//创建连接Connection connection = RabbitMQConnection.getConnection();//创建通道(频道)Channel channel = connection.createChannel();//声明交换机channel.exchangeDeclare(ROUTING_DIRECT_EXCHANGE,"direct");/*** 声明(创建)队列* 参数1:队列名称* 参数2:是否定义持久化队列* 参数3:是否独占本次连接* 参数4:是否在不使用的时候自动删除队列* 参数5:队列其它参数*/channel.queueDeclare(QUEUE_RECEIVE,true,false,false,null);//队列绑定交换机channel.queueBind(QUEUE_RECEIVE, ROUTING_DIRECT_EXCHANGE,"receive");//创建消费这,并设置消息处理DefaultConsumer consumer = new DefaultConsumer(channel) {/*** consumerTag 消息者标签,在channel.basicConsume时候可以指定* envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,*          消息和重传标志(收到消息失败后是否需要重新发送)* properties 属性信息* body 消息*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//路由keylog.info("路由key为:" + envelope.getRoutingKey());//交换机log.info("交换机为:" + envelope.getExchange());//消息idlog.info("消息id为:" + envelope.getDeliveryTag());//收到的消息log.info("消费者2-接收到的消息为:" + new String(body, "utf8"));}};/*** 监听消息* 参数1:队列名称* 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动确认* 参数3:消息接收到后回调*/channel.basicConsume(QUEUE_RECEIVE, true, consumer);}
}
       (5).Topics(主题模式/路由模式的一种)

生产者

/*** 跟 routing 路由模式类似,只不过路由模式是指定固定的路由键 routingKey,而主题模式是可以模糊匹配路由routingKey,类似于SQL中 = 和 like 的关系* 消息可能匹配多个消费者,但是同一个队列的中的消息不会被重复消费;**要求* Topic 模式消息的 routing_key 不能随意写,必须满足一定的要求,它必须是一个单词列表,以 “.” 或者 “#” 分隔开。这些单词可以是任意单词,这个单词列表最多不能超过 255 个字节。* 分隔符* “*(星号)”:可以代替一个单词* “#(井号)”:可以替代零个或多个单词* @Description:主题模式* @Author: xy丶*/
@Slf4j
public class TopicProducer {public static final String TOPIC_EXCHANGE = "topic_exchange";public static final String TOPIC_QUEUE_ONE = "topic_queue_one";public static final String TOPIC_QUEUE_TWO = "topic_queue_two";public static void main(String[] args) throws Exception {//声明用作全局变量的队列变量和交换价变量//创建连接Connection connection = RabbitMQConnection.getConnection();//创建信道Channel channel = connection.createChannel();//声明队列channel.queueDeclare(TOPIC_QUEUE_ONE,true,false,false,null);channel.queueDeclare(TOPIC_QUEUE_TWO,true,false,false,null);//声明交换机channel.exchangeDeclare(TOPIC_EXCHANGE, "topic",true);//绑定队列channel.queueBind(TOPIC_QUEUE_ONE,TOPIC_EXCHANGE,"*.orange.*");channel.queueBind(TOPIC_QUEUE_TWO,TOPIC_EXCHANGE,"*.*.rabbit");channel.queueBind(TOPIC_QUEUE_TWO,TOPIC_EXCHANGE,"lazy.#");//发生消息for (int i = 0; i <10 ; i++) {String msg="goodnight!My love world===>"+i;channel.basicPublish(TOPIC_EXCHANGE,"ag.we.rabbit",null,msg.getBytes());}}
}

消费者

@Slf4j
public class TopicCustomer1 {public static final String TOPIC_QUEUE_ONE="topic_queue_one";public static void main(String[] args) throws Exception{//创建连接Connection connection = RabbitMQConnection.getConnection();//创建信道Channel channel = connection.createChannel();DefaultConsumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {log.info("TopicCustomer1=====>:"+new String(body));}};channel.basicConsume(TOPIC_QUEUE_ONE,true,consumer);}
}@Slf4j
public class TopicCustomer2 {public static final String TOPIC_QUEUE_TWO="topic_queue_two";public static void main(String[] args) throws Exception{//创建连接Connection connection = RabbitMQConnection.getConnection();//创建信道Channel channel = connection.createChannel();DefaultConsumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {log.info("TopicCustomer22=====>:"+new String(body));}};channel.basicConsume(TOPIC_QUEUE_TWO,true,consumer);}
}@Slf4j
public class TopicCustomer3 {public static final String TOPIC_QUEUE_TWO = "topic_queue_two";public static void main(String[] args) throws Exception{//创建连接Connection connection = RabbitMQConnection.getConnection();//创建信道Channel channel = connection.createChannel();DefaultConsumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {log.info("TopicCustomer2=====>:"+new String(body));}};channel.basicConsume(TOPIC_QUEUE_TWO,true,consumer);}
}

       (6).RPC模式

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/747652.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

LeetCode 面试经典150题 80.删除有序数组中的重复项II

题目&#xff1a; 给你一个有序数组 nums &#xff0c;请你 原地 删除重复出现的元素&#xff0c;使得出现次数超过两次的元素只出现两次 &#xff0c;返回删除后数组的新长度。 不要使用额外的数组空间&#xff0c;你必须在 原地 修改输入数组 并在使用 O(1) 额外空间的条件…

百度paddleocr GPU版部署

显卡&#xff1a;NVIDIA GeForce RTX 4070&#xff0c;Nvidia驱动程序版本&#xff1a;537.13 Nvidia驱动程序能支持的最高cuda版本&#xff1a;12.2.138 Python&#xff1a;python3.10.11。试过python3.12&#xff0c;安装paddleocr失败&#xff0c;找不到相关模块。 飞桨版本…

Linux从0到1——Linux第一个小程序:进度条

Linux从0到1——Linux第一个小程序&#xff1a;进度条 1. 输出缓冲区2. 回车和换行的本质3. 实现进度条3.1 简单原理版本3.2 实际工程版本 1. 输出缓冲区 1. 小实验&#xff1a; 编写一个test.c文件&#xff0c;&#xff1a; #include <stdio.h> #include <unistd.h…

老电脑装什么系统流畅

对于一些老旧电脑来说&#xff0c;重装系统是提升电脑性能的最佳选择。那么&#xff0c;老电脑装什么系统流畅呢&#xff1f;推荐Windows 7系统&#xff0c;它对硬件的需求相对较低。配置较低的电脑运行Windows 7可以更好地利用系统资源&#xff0c;提高电脑的运行速度和响应能…

c语言实现https客户端 源码+详细注释(OpenSSL下载,visual studio编译器环境配置)

OpenSSL的下载和环境配置 请参考&#xff1a;openssl下载安装教程 步骤&#xff1a;官网下载->安装到选定目录->配置环境变量->打开命令窗口检查是否安装成功 注意&#xff1a; 打开命令窗口&#xff08;快捷键winr,在弹出窗口内输入cmd按回车&#xff09;&#xff0…

ChatGPT解决hmm...something seems to have gone wrong.

ChatGPT解决hmm…something seems to have gone wrong. 这里是官方社区的一种workaround办法。仅仅只是mark一下。 我这边遇到的现象是&#xff0c;ChatGPT 3.5是正常的&#xff0c;但是使用ChatGPT 4就会频繁的出现这样的输出。而且恶心的是&#xff0c;即使是这种输出&…

(三)丶RabbitMQ的四种类型交换机

前言&#xff1a;四大交换机工作原理及实战应用 1.交换机的概念 交换机可以理解成具有路由表的路由程序&#xff0c;仅此而已。每个消息都有一个称为路由键&#xff08;routing key&#xff09;的属性&#xff0c;就是一个简单的字符串。最新版本的RabbitMQ有四种交换机类型&a…

云计算2主从数据库

设置主从数据库的目的是将数据库1和数据库2分别建在两个虚拟机上&#xff0c;并实现数据互通访问 首先准备两个虚拟机&#xff0c;这里示例ip分别为&#xff1a; 192.168.200.10&#xff1b;192.168.200.20 修改主机名&#xff0c;一个是mysql1&#xff0c;一个是mysql2&#x…

vscode 运行 java 项目之解决“Build failed, do you want to continue”的问题

Visual Studio Code运行 java 起来似乎比 IDEA 更轻量、比 eclipse 更友好&#xff0c;是不可多得的现代编译法宝。 安装好官方推荐的 java 扩展包后&#xff0c;就可以运行 java 代码了。功能 比 code runner 强&#xff0c;支持 gradle、maven、普通java项目&#xff0c;运行…

[云原生] Prometheus之部署 Alertmanager 发送告警

一、Alertmanager 发送告警的介绍 Prometheus 对指标的收集、存储与告警能力分属于 Prometheus Server 和 AlertManager 两个独立的组件&#xff0c;前者仅负责定义告警规则生成告警通知&#xff0c; 具体的告警操作则由后者完成。 Alertmanager 负责处理由 Prometheus Serve…

力扣L12--- 125验证回文串(java版)-2024年3月15日

1.题目 2.知识点 注1&#xff1a;在 Java 中&#xff0c;toString() 方法用于将对象转换为字符串表示形式。对于数组对象&#xff0c;toString() 方法将返回数组的字符串表示形式&#xff0c;其中包含数组中每个元素的字符串表示形式&#xff0c;以逗号分隔&#xff0c;并且包…

Python基础入门 --- 1-2.字面量

文章目录 Python基础入门第一章&#xff1a;1.1 第一个python程序 第二章 &#xff1a;2.1 字面量2.2 常用的值类型2.3 字符串2.3.1 三种定义方式2.3.2 引号嵌套2.3.3 字符串拼接2.3.4 字符串格式化2.3.5 格式化的精度控制数字精度控制&#xff1a; 2.3.6 字符串格式化方式22.3…

CentOS7 部署 k8s

准备两台虚拟机192.168.152.129192.168.152.130更改主机名192.168.152.129&#xff1a;hostnamectl set-hostname k8s-masterhostnamectl192.168.152.130&#xff1a;hostnamectl set-hostname k8s-node1hostnamectl master节点配置 1.配置hosts 在两台节点上执行vim /etc/h…

金蝶云星空,怎么做BI数据可视化分析?

金蝶云星空是一个流程管理方面的软件&#xff0c;如果想要做BI数据可视化分析&#xff0c;还就需要一套BI方案&#xff0c;即一套奥威BI软件金蝶云星空BI方案。 奥威BI软件&#xff0c;负责提供平台和技术&#xff1b;金蝶云星空BI方案&#xff0c;则提供标准化的数据分析模型…

「HarmonyOS」下拉刷新组件使用详情

前言&#xff1a;在客户端开发过程中&#xff0c;经常会出现下拉刷新的功能&#xff0c;用于重新加载数据和加载更多数据&#xff0c;通过查找相关资料&#xff0c;查找到一个refresh下拉刷新的第三方库&#xff0c;今天主要介绍一下其中比较常用的RefreshLayout形式下拉刷新使…

K8S日志收集方案-EFK部署

EFK架构工作流程 部署说明 ECK (Elastic Cloud on Kubernetes)&#xff1a;2.7 Kubernetes&#xff1a;1.23.0 文件准备 crds.yaml 下载地址&#xff1a;https://download.elastic.co/downloads/eck/2.7.0/crds.yaml operator.yaml 下载地址&#xff1a;https://download.e…

性能出众的一区新算法|星鸦优化算法NOA原理及代码实现(Matlab)

文章来源于我的个人公众号&#xff1a;KAU的云实验台&#xff0c;主要更新智能优化算法的原理、应用、改进 CEC2005中的测试 本文KAU将介绍一个2023年发表在1区期刊KBS上的优化算法——星鸦优化算法(Nutcracker Optimization Algorithm&#xff0c;NOA)[1] 该算法由Mohamed …

一直被模仿,从未被超越

德国威步以及卓越的创新能力&#xff0c;成为业内不断被模仿的对象。德国威步自1989年创立35年以来&#xff0c;一直坚定地在软件保护及授权管理领域努力耕耘并不断创新和改进&#xff0c;拥抱互联网及软件开发技术的革新&#xff0c;完美融入并发展&#xff0c;虽然被其他竞争…

Github 2024-03-15 开源项目日报 Top10

根据Github Trendings的统计,今日(2024-03-15统计)共有10个项目上榜。根据开发语言中项目的数量,汇总情况如下: 开发语言项目数量TypeScript项目3Python项目3非开发语言项目2PHP项目1C#项目1Rust项目1《Hello 算法》:动画图解、一键运行的数据结构与算法教程 创建周期:476…

通过Rothko罗斯科绘画学习CSS盒子

本文章属于学习笔记&#xff0c;在https://www.freecodecamp.org/chinese/learn/2022/responsive-web-design/中练习 1、使用 padding 简写属性来增加两个元素之间的空间到。 .canvas {} .frame { padding:50px; }2、overflow 设置为 hidden - 将画布更改回其原始尺寸。overfl…