作用
1. 限流削峰
2. 异步解耦
组成
Producer:消息的发送者,生产者;举例:发件人
Consumer:消息接收者,消费者;举例:收件人
Broker:暂存和传输消息的通道;举例:快递
NameServer:管理Broker;举例:各个快递公司的管理机构 相当于broker的注册中心,保留了broker的信息
Queue:队列,消息存放的位置,一个Broker中可以有多个队列
Topic:主题,消息的分类
快速入门
生产者
1.创建消息生产者producer,并制定生产者组名
2.指定Nameserver地址
3.启动producer
4.创建消息对象,指定主题Topic、Tag和消息体等
5.发送消息
6.关闭生产者producer
消费者
1.创建消费者consumer,制定消费者组名
2.指定Nameserver地址
3.创建监听订阅主题Topic和Tag等
4.处理消息
5.启动消费者consumer
搭建pom依赖
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.9.2</version>
</dependency>
编写生产者
@Testvoid simpleProducer() throws MQClientException, MQBrokerException, RemotingException, InterruptedException {//创建一个生产者,指定组名DefaultMQProducer producer = new DefaultMQProducer("test-producer-group");//连接namesrvproducer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);//启动producer.start();//创建一个消息Message message = new Message("testTopic", "hello world".getBytes());//发送消息SendResult send = producer.send(message);System.out.println(send.getSendStatus());//关闭生产者producer.shutdown();}
编写消费者
@Testvoid simpleConsumer() throws Exception{//创建一个消费者DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test-consumer-group");//连接consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);//订阅一个主题 *表示所有的消息,后期会有消息过滤consumer.subscribe("testTopic", "*");consumer.registerMessageListener((MessageListenerConcurrently) (list, consumeConcurrentlyContext) -> {//业务处理System.out.println("我是消费者");System.out.println("消费:"+new String(list.get(0).getBody()));System.out.println("消费上下文"+consumeConcurrentlyContext);//返回值return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});//启动consumer.start();System.in.read();}
消费模式
Push是服务端【MQ】主动推送消息给客户端,优点是及时性较好,但如果客户端没有做好流控,一旦服务端推送大量消息到客户端时,就会导致客户端消息堆积甚至崩溃。
Pull是客户端需要主动到服务端取数据,优点是客户端可以依据自己的消费能力进行消费,但拉取的频率也需要用户自己控制,拉取频繁容易造成服务端和客户端的压力,拉取间隔长又容易造成消费不及时。
Push模式也是基于pull模式的,只能客户端内部封装了api,一般场景下,上游消息生产量小或者均速的时候,选择push模式。在特殊场景下,例如电商大促,抢优惠券等场景可以选择pull模式