maven导入
<dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit</artifactId><version>2.2.7.RELEASE</version></dependency>
5.2.1 消息的生产者
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.core.MessagePropertiesBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.support.AbstractApplicationContext;import java.nio.charset.StandardCharsets;public class ProducterApplication {public static void main(String[] args) throws Exception {AbstractApplicationContext context = new AnnotationConfigApplicationContext(RabbitConfig.class);RabbitTemplate template = context.getBean(RabbitTemplate.class);//构造消息属性对象MessageProperties msgBuild = MessagePropertiesBuilder.newInstance()//设置消息的类型为文本.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)//消息的编码方式为UTF-8.setContentEncoding(StandardCharsets.UTF_8.name())//自定义消息头信息.setHeader("test.header", "test.value").build();//对象消息进行编码操作Message msg = MessageBuilder.withBody("你好 RabbitMQ!".getBytes(StandardCharsets.UTF_8)).andProperties(msgBuild).build();template.send("ex.anno.fanout", "routing.anno", msg);context.close();}}
RabbitConfig
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Configurable;
import org.springframework.context.annotation.Bean;
import java.net.URI;@Configurable
public class RabbitConfig {/*** 连接工厂** @return*/@Beanpublic ConnectionFactory getConnectionFactory() {URI uri = URI.create("amqp://root:123456@node1:5672/%2f");ConnectionFactory factory = new CachingConnectionFactory(uri);return factory;}/*** RabbitTemplate*/@Bean@Autowiredpublic RabbitTemplate rabbitTemplate(ConnectionFactory factory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(factory);return rabbitTemplate;}/*** RabbitAdmin*/@Bean@Autowiredpublic RabbitAdmin rabbitAdmin(ConnectionFactory factory) {RabbitAdmin admin = new RabbitAdmin(factory);return admin;}/*** Queue*/@Beanpublic Queue queue() {Queue queue = QueueBuilder.nonDurable("queue.anno")//是否排外,即是否只有当前这个连接才能看到。//.exclusive()//是否自动删除//.autoDelete().build();return queue;}/*** Exchange*/@Beanpublic Exchange exchange() {Exchange exchange = new FanoutExchange("ex.anno.fanout", false, false, null);return exchange;}/*** Binding*/@Bean@Autowiredpublic Binding binding(Queue queue, Exchange exchange) {//创建一个不指定参数的绑定Binding binding = BindingBuilder.bind(queue).to(exchange).with("routing.anno").noargs();return binding;}
}
提示:
ConnectionFactory有三个实现
CachingConnectionFactory 基于channel的缓存模式 最常用是这个。
LocalizedQueueConnectionFactory 直接连接某个节点的方式。如果是集群,此种不太适合。
SimpleRoutingConnectionFactory 在当前的连接工厂中按查找的KEY获取连接工厂。
运行消息的生产者,查看消息发送信息
[root@nullnull-os ~]# rabbitmqctl list_exchanges --formatter pretty_table
Listing exchanges for vhost / ...
┌────────────────────┬─────────┐
│ name │ type │
├────────────────────┼─────────┤
│ amq.fanout │ fanout │
├────────────────────┼─────────┤
│ ex.anno.fanout │ fanout │
├────────────────────┼─────────┤
│ ex.busi.topic │ topic │
├────────────────────┼─────────┤
│ amq.rabbitmq.trace │ topic │
├────────────────────┼─────────┤
│ amq.headers │ headers │
├────────────────────┼─────────┤
│ amq.topic │ topic │
├────────────────────┼─────────┤
│ amq.direct │ direct │
├────────────────────┼─────────┤
│ ex.direct │ direct │
├────────────────────┼─────────┤
│ │ direct │
├────────────────────┼─────────┤
│ ex.routing │ direct │
├────────────────────┼─────────┤
│ amq.match │ headers │
└────────────────────┴─────────┘
[root@nullnull-os ~]# rabbitmqctl list_bindings --formatter pretty_table
Listing bindings for vhost /...
┌────────────────┬─────────────┬──────────────────┬──────────────────┬──────────────┬───────────┐
│ source_name │ source_kind │ destination_name │ destination_kind │ routing_key │ arguments │
├────────────────┼─────────────┼──────────────────┼──────────────────┼──────────────┼───────────┤
│ │ exchange │ queue.msg │ queue │ queue.msg │ │
├────────────────┼─────────────┼──────────────────┼──────────────────┼──────────────┼───────────┤
│ │ exchange │ queue.anno │ queue │ queue.anno │ │
├────────────────┼─────────────┼──────────────────┼──────────────────┼──────────────┼───────────┤
│ ex.anno.fanout │ exchange │ queue.anno │ queue │ routing.anno │ │
├────────────────┼─────────────┼──────────────────┼──────────────────┼──────────────┼───────────┤
│ ex.direct │ exchange │ queue.msg │ queue │ routing.q1 │ │
└────────────────┴─────────────┴──────────────────┴──────────────────┴──────────────┴───────────┘
[root@nullnull-os ~]# rabbitmqctl list_queues --formatter pretty_table
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
┌────────────┬──────────┐
│ name │ messages │
├────────────┼──────────┤
│ queue.msg │ 0 │
├────────────┼──────────┤
│ queue.anno │ 1 │
└────────────┴──────────┘
[root@nullnull-os ~]#
通过检查发现,消息已经成功的发送到了队列
5.2.2 使用拉模式获取消息
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.support.AbstractApplicationContext;public class ConsumerGetApplication {public static void main(String[] args) throws Exception {//从指定类加载配制信息AbstractApplicationContext context = new AnnotationConfigApplicationContext(RabbitConfig.class);RabbitTemplate rabbit = context.getBean(RabbitTemplate.class);Message receive = rabbit.receive("queue.anno");String encoding = receive.getMessageProperties().getContentEncoding();System.out.println("消息信息:" + new String(receive.getBody(), encoding));context.close();}}
RabbitConfig的配制
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Configurable;
import org.springframework.context.annotation.Bean;
import java.net.URI;@Configurable
public class RabbitConfig {/*** 连接工厂** @return*/@Beanpublic ConnectionFactory getConnectionFactory() {URI uri = URI.create("amqp://root:123456@node1:5672/%2f");ConnectionFactory factory = new CachingConnectionFactory(uri);return factory;}/*** RabbitTemplate*/@Bean@Autowiredpublic RabbitTemplate rabbitTemplate(ConnectionFactory factory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(factory);return rabbitTemplate;}/*** RabbitAdmin*/@Bean@Autowiredpublic RabbitAdmin rabbitAdmin(ConnectionFactory factory) {RabbitAdmin admin = new RabbitAdmin(factory);return admin;}/*** Queue*/@Beanpublic Queue queue() {Queue queue = QueueBuilder.nonDurable("queue.anno")//是否排外,即是否只有当前这个连接才能看到。//.exclusive()//是否自动删除//.autoDelete().build();return queue;}
}
运行主程序,检查控制台的输出。
消息信息:你好 RabbitMQ!
至此使用拉模式,已经成功的获取队列中的数据。
**5.2.3 使用推模式获取数据 **
消费者处理的代码
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class MessageListener {/*** com.rabbitmq.client.Channel to get access to the Channel channel对象* org.springframework.amqp.core.Message message对象,可以直接操作原生的AMQP消息* org.springframework.messaging.Message to use the messaging abstraction counterpart** @Payload-annotated 注解方法参数,该参数的值就是消息体。 method arguments including the support of validation* @Header-annotated 注解方法参数,访问指定的消息头字段的值。 method arguments to extract a specific header value, including standard AMQP headers defined by AmqpHeaders* @Headers-annotated 该注解的参数获取该消息的消息头的所有字段,参数集合类型对应的MAP argument that must also be assignable to java.util.Map for getting access to all headers.* MessageHeaders 参数类型,访问所有消息头字段 arguments for getting access to all headers.* MessageHeaderAccessor or AmqpMessageHeaderAccessor 访问所有消息头字段。* <p>* 消息监听*/@RabbitListener(queues = "queue.anno")public void whenMessageCome(Message msg) throws Exception {String encoding = msg.getMessageProperties().getContentEncoding();System.out.println("收到的消息:" + new String(msg.getBody(), encoding));}/**// * 使用payload进行消费// *// * 不可同时存在相同的队列被两个监听// *// * @param data// *///@RabbitListener(queues = "queue.anno")//public void whenMessageConsumer(@Payload String data) {// System.out.println("收到的消息:" + data);//}}
此处存在两种方式,一种是接收Message作为参数,还有一种是使用@Payload接收内容作为参数
配制处理
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Configurable;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.amqp.core.Queue;
import java.net.URI;@EnableRabbit
//@ComponentScan("com.nullnull.learn")
@ComponentScan
@Configurable //xml中也可以使用<rabbit:annotation-driven/> 启用@RabbitListener注解
public class RabbitConfig {@Beanpublic ConnectionFactory connectionFactory() {URI uriInfo = URI.create("amqp://root:123456@node1:5672/%2f");return new CachingConnectionFactory(uriInfo);}@Bean@Autowiredpublic RabbitAdmin rabbitAdmin(ConnectionFactory factory) {return new RabbitAdmin(factory);}@Bean@Autowiredpublic RabbitTemplate rabbitTemplate(ConnectionFactory factory) {return new RabbitTemplate(factory);}@Beanpublic Queue queue() {return QueueBuilder.nonDurable("queue.anno").build();}/*** RabbitListener的容器管理对象* <p>* 使用监听器监听推送过来的消息。在一个应用中可能会有多个监听器。这些监听器是需要一个工厂管理起来的。** @return*/@Bean("rabbitListenerContainerFactory")@Autowiredpublic SimpleRabbitListenerContainerFactory containerFactory(ConnectionFactory connectFactory) {SimpleRabbitListenerContainerFactory containerFactory = new SimpleRabbitListenerContainerFactory();//要管理容器就得有连接containerFactory.setConnectionFactory(connectFactory);containerFactory.setAcknowledgeMode(AcknowledgeMode.AUTO);//containerFactory.setAcknowledgeMode(AcknowledgeMode.MANUAL);//containerFactory.setAcknowledgeMode(AcknowledgeMode.NONE);//设置并发的消费者,即可以同时存在10个消费都消费消息。containerFactory.setConcurrentConsumers(10);//设置并发的最大消费者。containerFactory.setMaxConcurrentConsumers(15);//按照批次处理消息消息。containerFactory.setBatchSize(10);return containerFactory;}}
启动类
import org.springframework.context.annotation.AnnotationConfigApplicationContext;public class ConsumerListenerApplication {public static void main(String[] args) {new AnnotationConfigApplicationContext(RabbitConfig.class);}}
再启动生产者
对生产者作一点改造,让其发送多条
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.core.MessagePropertiesBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.support.AbstractApplicationContext;import java.nio.charset.StandardCharsets;public class ProducterApplication {public static void main(String[] args) throws Exception {AbstractApplicationContext context = new AnnotationConfigApplicationContext(RabbitConfig.class);RabbitTemplate template = context.getBean(RabbitTemplate.class);MessageProperties msgBuild = MessagePropertiesBuilder.newInstance().setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN).setContentEncoding(StandardCharsets.UTF_8.name()).setHeader("test.header", "test.value").build();for (int i = 0; i < 20; i++) {Message msg = MessageBuilder.withBody(("你好 RabbitMQ! id :" + i).getBytes(StandardCharsets.UTF_8)).andProperties(msgBuild).build();template.send("ex.anno.fanout", "routing.anno", msg);}context.close();}}
客户端接收,查看控制台
收到的消息:你好 RabbitMQ! id :4
收到的消息:你好 RabbitMQ! id :9
收到的消息:你好 RabbitMQ! id :8
收到的消息:你好 RabbitMQ! id :7
收到的消息:你好 RabbitMQ! id :6
收到的消息:你好 RabbitMQ! id :2
收到的消息:你好 RabbitMQ! id :3
收到的消息:你好 RabbitMQ! id :5
收到的消息:你好 RabbitMQ! id :14
收到的消息:你好 RabbitMQ! id :17
收到的消息:你好 RabbitMQ! id :1
收到的消息:你好 RabbitMQ! id :0
收到的消息:你好 RabbitMQ! id :13
收到的消息:你好 RabbitMQ! id :15
收到的消息:你好 RabbitMQ! id :12
收到的消息:你好 RabbitMQ! id :16
收到的消息:你好 RabbitMQ! id :18
收到的消息:你好 RabbitMQ! id :19
收到的消息:你好 RabbitMQ! id :11
收到的消息:你好 RabbitMQ! id :10
通过观察发现,此处接收的顺序与并非发送的顺序进行的接收,这是因为批量以及并发的控制在这里起的作用,如果要按顺序,去接批量及并发则就是按顺序接收。