消息中间件的对比
消息中间件 | ActiveMQ | RabbitMQ | RocketMQ | kafka |
开发语言 | java | erlang | java | scala |
单击吞吐量 | 万级 | 万级 | 10万级 | 10万级 |
时效性 | ms | us | ms | ms |
可用性 | 高(主从架构) | 高(主从架构) | 非常高(主从架构) | 非常高(主从架构) |
消息中间件: activeMQ:java(jms协议),性能一般,吞吐量低。rabbitMQ:erlang(amqp协议),性能好,功能丰富,吞吐量一般。rocketMQ:java,性能好,吞吐量丰富,功能丰富。Kafka: scala,吞吐量最大,功能单一,大数据领域
RocketMQ 是阿里开源的分布式消息中间件,跟其它中间件相比,RocketMQ 的特点是纯JAVA实现,是一套提供了消息生产,存储,消费全过程API的软件系统。
RocketMQ的作用:数据收集、限流削峰、异步解耦
数据收集
分布式系统会产生海量级数据流,如:业务日志、监控数据、用户行为等。针对这些数据流进行实时或批量采集汇总,然后对这些数据流进行大数据分析,这是当前互联网平台的必备技术。通过MQ完成此类数据收集是最好的选择。
限流削峰
MQ可以将系统的超量请求暂存其中,以便系统后期可以慢慢进行处理,从而避免了请求的丢失或系统被压垮。
异步解耦
上游系统对下游系统的调用若为同步调用,则会大大降低系统的吞吐量与并发度,且系统耦合度太高、而异步调用则会解决这些问题。所以两层之间若要实现由同步到异步的转化,一般性做法就是,在这两层间添加一个MQ层。
rocketmq.apache.org
Broker:经纪人(经理人)
Topic主题:消息区分,分类,虚拟结构
Queue:消息队列
Apache RocketMQ 是一款低延迟、高并发、高可用、高可靠的分布式消息中间件。消息队列 RocketMQ 可为分布式应用系统提供异步解耦和削峰填谷的能力,同时也具备互联网应用所需的海量消息堆积、高吞吐、可靠重试等特性。
发送消息
发送同步消息
同步消息发送后会用一个返回值,也就是MQ服务器接收到消息返回的一个确认,这种方式非常安全,但是性能就没那么高,而在MQ集群中,也是要等到所有的从机都复制了消息以后才会返回,这种方式适合重要消息的场景
@Test
void rocketmqProducerTest() throws Exception {
//创建生产者
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
//连接namesrv
producer.setNamesrvAddr("192.168.68.133:9876");
//启动
producer.start();
//创建消息
Message message = new Message("Topic","消息".getBytes());
SendResult send = producer.send(message);
System.out.println("发送状态"+send.getSendStatus());
//关闭生产者
producer.shutdown();
}@Test
void rocketmqConsumerTest() throws Exception {
//创建消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerTest");
//连接namesrv
consumer.setNamesrvAddr("192.168.68.133:9876");
//订阅主题 *表示该主题的所有消息
consumer.subscribe("Topic","*");//设置监听器(一直,异步回调方式)
consumer.registerMessageListener(new MessageListenerConcurrently() {
//消费方法
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
//业务处理for (MessageExt messageExt : msgs) {
System.out.println("消费了:"+new String(messageExt.getBody()));
}
System.out.println("消费者上下文"+context);
//CONSUME_SUCCESS成功 RECONSUME_LATER失败
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});//启动
consumer.start();
//挂起当前jvm
System.in.read();
//关闭
//consumer.shutdown();
}
发送异步消息
异步消息常用在对响应时间敏感的业务场景,发送端不能容忍长时间等待Broker的响应。发送完后会有一个异步消息通知
@Test
void aysncProducerTest() throws Exception {
//创建生产者
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
//连接namesrv
producer.setNamesrvAddr("192.168.68.133:9876");
//启动
producer.start();
//创建消息
Message message = new Message("aysncTopic","异步消息".getBytes());
producer.send(message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("发送成功"+sendResult);
}@Override
public void onException(Throwable throwable) {
System.err.println("发送失败"+throwable);
}
});
System.out.println("执行了");
//关闭生产者
//producer.shutdown();
//挂起当前jvm
System.in.read();
}@Test
void aysncConsumerTest() throws Exception {
//创建消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerTest");
//连接namesrv
consumer.setNamesrvAddr("192.168.68.133:9876");
//订阅主题 *表示该主题的所有消息
consumer.subscribe("aysncTopic","*");//设置监听器(一直,异步回调方式)
consumer.registerMessageListener(new MessageListenerConcurrently() {
//消费方法
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
//业务处理for (MessageExt messageExt : msgs) {
System.out.println("消费了:"+new String(messageExt.getBody()));
}
System.out.println("消费者上下文"+context);
//CONSUME_SUCCESS成功 RECONSUME_LATER失败
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});//启动
consumer.start();
//挂起当前jvm
System.in.read();
//关闭
//consumer.shutdown();
}
发送单向消息
单向消息发送这种方式不关心发送结果的场景,这种方式吞吐量大,但存在消息丢失的风险。使用案例:日志信息发送
@Test
void OnewayProducerTest()throws Exception{
//创建生产者
DefaultMQProducer producer = new DefaultMQProducer("Oneway");
//连接namesrv
producer.setNamesrvAddr("192.168.68.133:9876");
//启动
producer.start();
//创建消息
Message message = new Message("Oneway","单向消息".getBytes());
//发送消息
producer.sendOneway(message);
//关闭生产者
producer.shutdown();
}@Test
void OnewayConsumerTest() throws Exception {
//创建消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerTest");
//连接namesrv
consumer.setNamesrvAddr("192.168.68.133:9876");
//订阅主题 *表示该主题的所有消息
consumer.subscribe("Oneway","*");//设置监听器(一直,异步回调方式)
consumer.registerMessageListener(new MessageListenerConcurrently() {
//消费方法
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
//业务处理for (MessageExt messageExt : msgs) {
System.out.println("消费了:"+new String(messageExt.getBody()));
}
System.out.println("消费者上下文"+context);
//CONSUME_SUCCESS成功 RECONSUME_LATER失败
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//启动
consumer.start();
//挂起当前jvm
System.in.read();
//关闭
//consumer.shutdown();
}
发送延时消息
发送延时消息,顾名思义。场景:比如淘宝商城下单后,并未支付,有30分钟未支付订单状态
@Test
void delayProducerTest()throws Exception{
//创建生产者
DefaultMQProducer producer = new DefaultMQProducer("delayGroup");
//连接namesrv
producer.setNamesrvAddr("192.168.68.133:9876");
//启动
producer.start();
//创建消息
Message message = new Message("delay","延迟消息".getBytes());
//设置延时 根据官方延时等级
message.setDelayTimeLevel(2);
//发送消息
producer.sendOneway(message);
//关闭生产者
producer.shutdown();
}@Test
void delayConsumerTest() throws Exception {
//创建消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerTest");
//连接namesrv
consumer.setNamesrvAddr("192.168.68.133:9876");
//订阅主题 *表示该主题的所有消息
consumer.subscribe("delay","*");//设置监听器(一直,异步回调方式)
consumer.registerMessageListener(new MessageListenerConcurrently() {
//消费方法
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
//业务处理for (MessageExt messageExt : msgs) {
System.out.println("消费了:"+new String(messageExt.getBody()));
}
System.out.println("消费者上下文"+context);
//CONSUME_SUCCESS成功 RECONSUME_LATER失败
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//启动
consumer.start();
//挂起当前jvm
System.in.read();
//关闭
//consumer.shutdown();
}
发送批量消息
批量消息:可以一次性发送一组消息
@Test
void delayProducerTest()throws Exception{
//创建生产者
DefaultMQProducer producer = new DefaultMQProducer("delayGroup");
//连接namesrv
producer.setNamesrvAddr("192.168.68.133:9876");
//启动
producer.start();
//创建消息
//批量消息
List<Message> messages = Arrays.asList(
new Message("delay","批量消息1".getBytes()),
new Message("delay","批量消息2".getBytes()),
new Message("delay","批量消息3".getBytes())
);
//发送消息
producer.send(messages);
//关闭生产者
producer.shutdown();
}
发送带标签消息
RocketMQ提供消息过滤功能,可根据业务逻辑区分,带有A标签的被A消费,带有B标签的被B消费
@Test
void TagProducerTest()throws Exception{
//创建生产者
DefaultMQProducer producer = new DefaultMQProducer("TagGroup");
//连接namesrv
producer.setNamesrvAddr("192.168.68.133:9876");
//启动
producer.start();
//创建消息
//批量消息
Message message1 = new Message("tagTopic", "tagA", "tag标签内容A".getBytes());
Message message2 = new Message("tagTopic", "tagB", "tag标签内容B".getBytes());
//发送消息
producer.send(message1);
producer.send(message2);
//关闭生产者
producer.shutdown();
}@Test
void TagAConsumerTest() throws Exception {
//创建消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerTest");
//连接namesrv
consumer.setNamesrvAddr("192.168.68.133:9876");
//订阅主题 *表示该主题的所有消息
consumer.subscribe("tagTopic","tagA || tagB");//设置监听器(一直,异步回调方式)
consumer.registerMessageListener(new MessageListenerConcurrently() {
//消费方法
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
//业务处理for (MessageExt messageExt : msgs) {
System.out.println("消费了:"+new String(messageExt.getBody()));
}
System.out.println("消费者上下文"+context);
//CONSUME_SUCCESS成功 RECONSUME_LATER失败
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//启动
consumer.start();
//挂起当前jvm
System.in.read();
//关闭
//consumer.shutdown();
}@Test
void TagBConsumerTest() throws Exception {
//创建消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerTest");
//连接namesrv
consumer.setNamesrvAddr("192.168.68.133:9876");
//订阅主题 *表示该主题的所有消息
consumer.subscribe("tagTopic","tagB");//设置监听器(一直,异步回调方式)
consumer.registerMessageListener(new MessageListenerConcurrently() {
//消费方法
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
//业务处理for (MessageExt messageExt : msgs) {
System.out.println("消费了:"+new String(messageExt.getBody()));
}
System.out.println("消费者上下文"+context);
//CONSUME_SUCCESS成功 RECONSUME_LATER失败
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//启动
consumer.start();
//挂起当前jvm
System.in.read();
//关闭
//consumer.shutdown();
}
发送顺序消息
发送的消息要保证消息是一定有序的,顺序消息,发送到同一个队列
实体类@Data
@AllArgsConstructor
public class MessageM {
private int userID;
private String desc;
}顺序消息,发送到同一个队列private List<MessageM> messageMs = Arrays.asList(
new MessageM(1,"下单"),
new MessageM(1,"付款"),
new MessageM(1,"配送"),
new MessageM(2,"下单"),
new MessageM(2,"付款"),
new MessageM(2,"配送")
);@Test
void orderProducerTest() throws Exception {
//创建生产者
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
//连接namesrv
producer.setNamesrvAddr("192.168.211.131:9876");
//启动
producer.start();messageMs.forEach(messageM -> {
//创建消息
Message message = new Message("orderMsg",messageM.toString().getBytes());
//发送顺序消息,发送到同一个队列
try {
//相同的userID去相同的队列
producer.send(
message,
new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
//选择队列
int hashCode = o.toString().hashCode();
int i = hashCode % list.size();
return list.get(i);
}
},
messageM.getUserID());
} catch (MQClientException e) {
throw new RuntimeException(e);
} catch (RemotingException e) {
throw new RuntimeException(e);
} catch (MQBrokerException e) {
throw new RuntimeException(e);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
//关闭生产者
producer.shutdown();
}
@Test
void orderConsumerTest() throws Exception {
//创建消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerTest");
//连接namesrv
consumer.setNamesrvAddr("192.168.211.131:9876");
//订阅主题 *表示该主题的所有消息
consumer.subscribe("orderMsg","*");//设置监听器(一直,异步回调方式)
//MessageListenerConcurrently 并发模式,多线程
//MessageListenerOrderly 顺序模式,单线程
consumer.registerMessageListener(new MessageListenerOrderly() {
//消费方法
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
System.out.println("当前线程ID"+Thread.currentThread().getId());
return ConsumeOrderlyStatus.SUCCESS;
}
});
//启动
consumer.start();
//挂起当前jvm
System.in.read();
//关闭
//consumer.shutdown();
}
发送带key消息
Message ID是MQ消息生产的唯一标识
@Testvoid KeyProducerTest()throws Exception{//创建生产者DefaultMQProducer producer = new DefaultMQProducer("KeyGroup");//连接namesrvproducer.setNamesrvAddr("192.168.211.131:9876");//启动producer.start();//自身业务key唯一String Key = UUID.randomUUID().toString();System.out.println("key:"+Key);//创建消息Message message = new Message("KeyTopic", "Key",Key, "带Key".getBytes());//发送消息sproducer.send(message);//关闭生产者producer.shutdown();}@Testvoid KeyAConsumerTest() throws Exception {//创建消费者DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("KeyConsumerTest");//连接namesrvconsumer.setNamesrvAddr("192.168.211.131:9876");//订阅主题 *表示该主题的所有消息consumer.subscribe("KeyTopic","Key");//设置监听器(一直,异步回调方式)consumer.registerMessageListener(new MessageListenerConcurrently() {//消费方法@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {//业务处理for (MessageExt messageExt : msgs) {System.out.println("消费了:"+new String(messageExt.getBody()));System.out.println("消费到了key:"+messageExt.getKeys());}System.out.println("消费者上下文"+context);//CONSUME_SUCCESS成功 RECONSUME_LATER失败return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//启动consumer.start();//挂起当前jvmSystem.in.read();//关闭//consumer.shutdown();}