SpringBoot学习小结之RocketMQ

文章目录

  • 前言
  • 一、架构设计
    • 1.1 架构图
    • 1.2 消息
    • 1.3 工作流程
  • 二、部署
    • 2.1 单机
    • 2.2 集群
  • 三、Springboot Producter
    • 3.1 准备
    • 3.2 pom依赖、yml 配置
    • 3.3 普通消息
    • 3.4 顺序、批量、延迟消息
    • 3.5 事务消息
  • 四、Springboot Consumer
    • 4.1 配置
    • 4.2 普通Push消费
    • 4.3 回复
    • 4.4 集群和广播
    • 4.5 并发和顺序
    • 4.6 消息过滤
    • 4.7 重试和死信
    • 4.8 设置消费组负载均衡策略
    • 4.9 设置offset
    • 4.10 Pull 消息
  • 五、总结
    • 5.1 优点
    • 5.2 缺点
  • 参考

前言

在当今互联网时代,随着数据规模和业务复杂度的不断增长,分布式消息中间件作为实现系统解耦、异步通信和削峰填谷的重要工具,扮演着越来越关键的角色。而在众多的消息中间件中,Apache RocketMQ 以其出色的性能、高可用性和可扩展性,成为了许多企业构建分布式系统的首选之一。

RocketMQ 是一种开源分布式消息队列系统, 由阿里巴巴集团开发并在2012年开源,现已成为 Apache 软件基金会的顶级项目之一。它具备高吞吐量、低延迟、高可靠性和强大的水平扩展能力等特性,被广泛应用于互联网、金融、电商、物联网等各个领域。

本文将带您深入了解 RocketMQ,探索其架构设计、部署以及在 Springboot 中使用,帮助您更好地理解和应用这一强大的消息中间件,提升系统的性能和可靠性,实现业务的快速发展和创新。

一、架构设计

1.1 架构图

RocketMQ 的组成部分

组件功能特点
Name Server负责维护集群中所有 Broker 的路由信息和消息队列的状态信息各个 NameServer 相互独立,没有信息转发
Broker存储消息,接收来自生产者的消息并将其提供给消费者也存储消息相关的元数据,包括消费者组、消费进度偏移和主题和队列消息
Producer消息的生产者,负责将消息发送到 Broker通过 MQ 的负载均衡模块选择相应的 Broker 集群队列进行消息投递,投递过程支持快速失败和重试
Consumer消息的消费者,从 Broker 中获取消息进行处理支持推(push)和拉(pull)两种模式对消息进行消费,支持集群方式和广播方式的消费,提供实时消息订阅机制,满足大多数用户的需求

消费模式可以大致分为两种,一种是推Push,一种是拉Pull。

  • Push 是服务端主动推送消息给客户端,优点是及时性较好,但如果客户端没有做好流控,一旦服务端推送大量消息到客户端时,就会导致客户端消息堆积甚至崩溃。
  • Pull 是客户端需要主动到服务端取数据,优点是客户端可以依据自己的消费能力进行消费,但拉取的频率也需要用户自己控制,拉取频繁容易造成服务端和客户端的压力,拉取间隔长又容易造成消费不及时。

1.2 消息

一些和消息相关的核心概念

名词含义
Message消息系统所传输信息的物理载体,生产和消费数据的最小单位。一条消息必须有一个主题(Topic)。消息 body 默认最大 4M。
Topic一类消息的集合,每个主题包含若干条消息,每条消息只能属于一个主题。
Tag为消息设置的标志,用于同一 Topic 下区分不同类型的消息,目前只支持每个消息设置一个, 一般在Topic后加 :Tag名。
ProducerGroup一组 Producer 的集合,这些 Producer 共同实现了某个业务逻辑,通常是发送相同类型或相关类型的消息到同一个 Topic。
ConsumerGroup一组 Consumer 的集合,ConsumerGroup下的消费者主要有两种负载均衡模式,即广播模式和集群模式。在集群模式下,同一个 ConsumerGroup 中的 Consumer 实例是负载均衡消费,在广播模式下,同一个 ConsumerGroup 中每个 Consumer 实例都需要处理全部的消息。
Message Queue一个 Topic 下的物理存储单元,用于存储发送到该 Topic 的消息。每个 Topic 可以有多个消息队列,消息会根据一定的规则被分配到这些消息队列中。
Offset消费者在消息队列中消费消息的位置信息。

消息发送返回的状态

状态含义
SEND_OK消息发送成功,要注意的是消息发送成功也不意味着它是可靠的,要确保不会丢失任何消息,还应启用同步 Master 服务器或同步刷盘,即 SYNC_MASTER 或 SYNC_FLUSH。
FLUSH_DISK_TIMEOUT消息发送成功但是服务器刷盘超时,此时消息已经进入服务器队列(内存),只有服务器宕机,消息才会丢失,消息存储配置参数中可以设置刷盘方式和同步刷盘时间长度,如果 Broker 服务器设置了刷盘方式为同步刷盘,即 FlushDiskType=SYNC_FLUSH(默认为异步刷盘方式),当 Broker 服务器未在同步刷盘时间内(默认为5s)完成刷盘,则将返回该状态——刷盘超时。
FLUSH_SLAVE_TIMEOUT消息发送成功,但是服务器同步到 Slave 时超时,此时消息已经进入服务器队列,只有服务器宕机,消息才会丢失,如果 Broker 服务器的角色是同步 Master,即 SYNC_MASTER(默认是异步 Master 即 ASYNC_MASTER),并且从 Broker 服务器未在同步刷盘时间(默认为5秒)内完成与主服务器的同步,则将返回该状态——数据同步到 Slave 服务器超时。
SLAVE_NOT_AVAILABLE消息发送成功,但是此时 Slave 不可用,如果 Broker 服务器的角色是同步 Master,即 SYNC_MASTER(默认是异步 Master 服务器即 ASYNC_MASTER),但没有配置 slave Broker 服务器,则将返回该状态——无 Slave 服务器可用。

Group 和 Cluster 区别

  • Group 和 Cluster 是两个不同的概念,前者是逻辑上,后者是物理上。
  • 以Producer举例,一个 Producer Cluster 可以包含多个 Producer Group,而一个 Producer Group 只属于一个 Producer Cluster。换句话说,一个生产者集群可以包含多个逻辑上不同的生产者组,每个生产者组都有其特定的 ProducerGroup 标识。

1.3 工作流程

  1. 启动 NameServer

    NameServer 启动后监听端口,等待 Broker、Producer、Consumer 连接,相当于一个路由控制中心。

  2. 启动 Broker

    与所有 NameServer 保持长连接,定时发送心跳包。心跳包中包含当前 Broker 信息以及存储所有 Topic 信息。注册成功后,NameServer 集群中就有 Topic跟 Broker 的映射关系。

  3. 创建 Topic

    创建 Topic 时需要指定该 Topic 要存储在哪些 Broker 上,也可以在发送消息时自动创建 Topic , 但不建议。

  4. 生产者发送消息

    启动时先跟 NameServer 集群中的其中一台建立长连接,并从 NameServer 中获取当前发送的 Topic 存在于哪些 Broker 上,轮询从队列列表中选择一个队列,然后与队列所在的 Broker建立长连接从而向 Broker 发消息。

  5. 消费者接受消息

    跟其中一台 NameServer 建立长连接,获取当前订阅 Topic 存在哪些 Broker上,然后直接跟 Broker 建立连接通道,然后开始消费消息。

二、部署

2.1 单机

mqnamesrv
mqbroker -n localhost:9876

mqnamesrv 默认端口9876,可通过添加配置文件 namesrv.conf 修改

listenPort = 9878
mqnamesrv -c ../conf/namesrv.conf
mqbroker -n localhost:9878

broker 默认端口10911,修改 broker 端口号, 在配置文件 broker.cnf 添加

listenPort = 11087

并在启动时指定配置文件

mqbroker -n localhost:9877 -c ../conf/broker.conf

2.2 集群

6台机器,配置两个 NameServer、4个 Broker(2主2从)

mqnamesrv -n 192.168.1.1:9876
mqnamesrv -n 192.168.1.2:9876mqbroker -n 192.168.1.1:9876;192.161.2:9876 -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-a.properties
mqbroker -n 192.168.1.1:9876;192.161.2:9876 -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-b.properties
mqbroker -n 192.168.1.1:9876;192.161.2:9876 -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-a-s.propertie
mqbroker -n 192.168.1.1:9876;192.161.2:9876 -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-b-s.properties

同步双写配置,每个 Master 配置一个 Slave,有多对 Master-Slave ,HA 采用同步双写方式,即只有主备都写成功,才向应用返回成功,这种模式的优缺点如下:

  • 优点:数据与服务都无单点故障,Master 宕机情况下,消息无延迟,服务可用性与数据可用性都非常高。
  • 缺点:性能比异步复制模式略低(大约低10%左右),发送单个消息的RT会略高,且目前版本在主节点宕机后,备机不能自动切换为主机。

三、Springboot Producter

3.1 准备

mqnamesrv -c ../conf/namesrv.conf
mqbroker -n localhost:9878 -c ../conf/broker.confmqadmin updateTopic -b 127.0.0.1:11087 -t demo-topic

3.2 pom依赖、yml 配置

<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.2</version>
</dependency>
rocketmq:name-server: localhost:9878producer:group: test-producter
demo:rocketmq:topic: demo-topic
spring:application:name: producter

3.3 普通消息

Apache RocketMQ 可用于以三种方式发送消息:同步、异步和单向传输。前两种消息类型是可靠的,因为无论它们是否成功发送都有响应。

使用场景:

  • 同步:一些对消息可靠性要求较高的场景,如订单支付、账单通知等。
  • 异步:一些链路耗时较长,对响应时间较为敏感的业务的场景,如视频上传后通知启动转码服务,转码完成后通知推送转码结果 等。
  • 单向:一些不需要关心消息发送结果,只需简单地发送消息而不关心是否成功的场景,如日志记录等。
@Autowired
private RocketMQTemplate rocketMQTemplate;@Value("${demo.rocketmq.topic}")
private String topic;@Test
void test1_sync() {SendResult sendResult = rocketMQTemplate.syncSend(topic, "同步发送信息");log.info("同步发送结果:{}", sendResult);rocketMQTemplate.convertAndSend(topic, "simple-send-topic simple hello");
}@Test
void test2_async() {rocketMQTemplate.asyncSend(topic, "异步发送信息", new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {log.info("异步发送结果:{}", sendResult);}@Overridepublic void onException(Throwable e) {log.info("异步发送异常: {}", e.toString());}});
}@Test
void test3_oneway() {rocketMQTemplate.sendOneWay(topic, MessageBuilder.withPayload("one way message").build());rocketMQTemplate.convertAndSend(topic, "simple-send-topic simple hello");log.info("发送 oneway message");
}@Test
void test4_tags() {Message message = new Message(topic, "hello tags message".getBytes());message.setTags("tagA");try {rocketMQTemplate.getProducer().send(message);} catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {log.error("syncSend failed. message:{} ", message);throw new RuntimeException(e);}log.info("发送 tag message");
}

3.4 顺序、批量、延迟消息

  • 顺序消息是一种在消息发送和消费过程中要求严格按照特定顺序进行处理的消息。在 RocketMQ 中,顺序消息遵循先进先出(FIFO)的原则,确保按照消息发布的顺序进行消费

    在 RocketMQ 中,支持分区顺序消息,通过对消息进行分区,确保同一个分区键的消息会被分配到同一个队列中,并按照顺序进行消费。

    RocketMQ 的消息顺序性包括生产顺序性和消费顺序性。为了实现消息的顺序性,需要同时满足生产者和消费者两方面的条件:

    • 生产顺序性:确保单个生产者串行地发送消息,并按序存储和持久化。要满足生产顺序性,需要保证消息的发送是单一生产者、串行发送的。
    • 消费顺序性:消费者按照消息的顺序进行处理。RocketMQ 通过设置相同的分区键,将消息发送至同一队列中,从而保证消费顺序性。

    顺序消息适用于需要严格保持事件顺序的场景,如有序事件处理、撮合交易、数据实时增量同步等。例如,在订单处理场景中,需要确保订单生成、付款和发货等操作按照顺序执行,顺序消息能够满足这种需求。

  • 批量消息是在对吞吐率有一定要求的情况下,RocketMQ 可以将一些消息聚成一批以后进行发送,可以增加吞吐率,并减少 API 和网络调用次数。

  • 延迟消息发送是指消息发送到 RocketMQ后,并不期望立马投递这条消息,而是延迟一定时间后才投递到 Consumer 进行消费。

    在分布式定时调度触发、任务超时处理等场景,需要实现精准、可靠的延时事件触发。使用 RocketMQ 的延时消息可以简化定时调度任务的开发逻辑,实现高性能、可扩展、高可靠的定时触发能力。

    投递等级(delay level)延迟时间投递等级(delay level)延迟时间
    11s106min
    25s117min
    310s128min
    430s139min
    51min1410min
    62min1520min
    73min1630min
    84min171h
    95min182h
@Test
void test5_order() {for (int q = 0; q < 4; q++) {// send to 4 queuesList<org.springframework.messaging.Message> msgs = new ArrayList<>();for (int i = 0; i < 10; i++) {int msgIndex = q * 10 + i;String msg = String.format("Hello RocketMQ Batch Msg#%d to queue: %d", msgIndex, q);msgs.add(MessageBuilder.withPayload(msg).setHeader(RocketMQHeaders.KEYS, "KEY_" + msgIndex).build());}SendResult sr = rocketMQTemplate.syncSendOrderly(topic, msgs, q + "", 60000);System.out.println("--- Batch messages orderly to queue :" + sr.getMessageQueue().getQueueId() + " send result :" + sr);}
}@Test
void test6_batch() {List<org.springframework.messaging.Message> msgs = new ArrayList<>();for (int i = 0; i < 10; i++) {msgs.add(MessageBuilder.withPayload("Hello RocketMQ Batch Msg#" + i).setHeader(RocketMQHeaders.KEYS, "KEY_" + i).build());}SendResult sr = rocketMQTemplate.syncSend(topic, msgs, 60000);log.info("--- Batch messages send result : {}",  sr);
}@Test
void test7_lazy() {int totalMessagesToSend = 100;for (int i = 0; i < totalMessagesToSend; i++) {Message message = new Message(topic, ("Hello scheduled message " + i).getBytes());// This message will be delivered to consumer 10 seconds later.message.setDelayTimeLevel(3);try {rocketMQTemplate.getProducer().send(message);} catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {log.error("error {}", e.toString());throw new RuntimeException(e);}}log.info("final send result");
}
}

3.5 事务消息

  1. 生产者将半事务消息发送至 RocketMQ Broker

  2. RocketMQ Broker 将消息持久化成功之后,向生产者返回 Ack 确认消息已经发送成功,此时消息暂不能投递,为半事务消息。

  3. 生产者开始执行本地事务逻辑。

  4. 生产者根据本地事务执行结果向服务端提交二次确认结果(Commit或是Rollback),服务端收到确认结果后处理逻辑如下:

    • 二次确认结果为Commit:服务端将半事务消息标记为可投递,并投递给消费者。
    • 二次确认结果为Rollback:服务端将回滚事务,不会将半事务消息投递给消费者。
  5. 在断网或者是生产者应用重启的特殊情况下,若服务端未收到发送者提交的二次确认结果,或服务端收到的二次确认结果为Unknown未知状态,经过固定时间后,服务端将对消息生产者即生产者集群中任一生产者实例发起消息回查。

  6. 需要注意的是,服务端仅仅会按照参数尝试指定次数,超过次数后事务会强制回滚,因此未决事务的回查时效性非常关键,需要按照业务的实际风险来设置

  7. 生产者收到消息回查后,需要检查对应消息的本地事务执行的最终结果。

  8. 生产者根据检查得到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤4对半事务消息进行处理。

@Test
void test8_transaction() {for (int i = 0; i < 10; i++) {try {org.springframework.messaging.Message msg = MessageBuilder.withPayload("rocketmq transactional message " + i).setHeader(RocketMQHeaders.TRANSACTION_ID, "KEY_" + i).build();SendResult sendResult = rocketMQTemplate.sendMessageInTransaction(null,topic, msg, null);log.info("------RocketMQTemplate send Transactional msg body = {}  , sendResult= {}",msg.getPayload(), sendResult.getSendStatus());Thread.sleep(10);} catch (Exception e) {e.printStackTrace();}}
}@RocketMQTransactionListener
class TransactionListenerImpl implements RocketMQLocalTransactionListener {private AtomicInteger transactionIndex = new AtomicInteger(0);private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();@Overridepublic RocketMQLocalTransactionState executeLocalTransaction(org.springframework.messaging.Message msg, Object arg) {String transId = (String) msg.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);log.info("#### executeLocalTransaction is executed, msgTransactionId={}", transId);int value = transactionIndex.getAndIncrement();int status = value % 3;localTrans.put(transId, status);if (status == 0) {// Return local transaction with success(commit), in this case,// this message will not be checked in checkLocalTransaction()log.info("    # COMMIT # Simulating msg {} related local transaction exec succeeded! ###", msg.getPayload());return RocketMQLocalTransactionState.COMMIT;}if (status == 1) {// Return local transaction with failure(rollback) , in this case,// this message will not be checked in checkLocalTransaction()log.info("    # ROLLBACK # Simulating {} related local transaction exec failed! ", msg.getPayload());return RocketMQLocalTransactionState.ROLLBACK;}log.info("    # UNKNOW # Simulating %s related local transaction exec UNKNOWN!");return RocketMQLocalTransactionState.UNKNOWN;}@Overridepublic RocketMQLocalTransactionState checkLocalTransaction(org.springframework.messaging.Message msg) {String transId = (String) msg.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);RocketMQLocalTransactionState retState = RocketMQLocalTransactionState.COMMIT;Integer status = localTrans.get(transId);if (null != status) {switch (status) {case 0:retState = RocketMQLocalTransactionState.COMMIT;break;case 1:retState = RocketMQLocalTransactionState.ROLLBACK;break;case 2:retState = RocketMQLocalTransactionState.UNKNOWN;break;}}log.info("------ !!! checkLocalTransaction is executed once," +" msgTransactionId={}, TransactionState=%s status={} {}",transId, retState, status);return retState;}}

四、Springboot Consumer

4.1 配置

rocketmq:name-server: localhost:9878consumer:topic: demo-topicgroup: consumer-group demo:rocketmq:group: broadcast-groupconsumer:tag: tagA
spring:application:name: consumer

4.2 普通Push消费

@Slf4j
@Component
@RocketMQMessageListener(topic = "${rocketmq.consumer.topic}",consumerGroup = "${rocketmq.consumer.group}"
)
public class MessageListener implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {log.info("接收消息:{}", message);}
}

4.3 回复

@RocketMQMessageListener(topic = "${rocketmq.consumer.topic}", consumerGroup = "${demo.rocketmq.bytesRequestConsumer}")
public class ConsumerWithReplyBytes implements RocketMQReplyListener<MessageExt, byte[]> {@Overridepublic byte[] onMessage(MessageExt message) {System.out.printf("------- ConsumerWithReplyBytes received: %s \n", message);return "reply message content".getBytes();}
}

4.4 集群和广播

默认集群

@Slf4j
@Component
@RocketMQMessageListener(topic = "${demo.rocketmq.topic}",consumerGroup = "${demo.rocketmq.consumer.group}",messageModel = MessageModel.BROADCASTING
)
public class MessageListener implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {log.info("广播接收消息:{}", message);}
}

4.5 并发和顺序

默认并发

@Slf4j
@Component
@RocketMQMessageListener(topic = "${demo.rocketmq.topic}",consumerGroup = "${demo.rocketmq.consumer.group}",consumeMode = ConsumeMode.ORDERLY
)
public class OrderMessageListener implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {log.info("顺序接收消息:{}", message);}
}

4.6 消息过滤

消息过滤分为 Tag 过滤和 SQL 过滤,默认Tag过滤

在 SQL 语法中,Tag 的属性值为 TAGS,开启属性过滤首先要在 Broker 端设置配置enablePropertyFilter=true

@Slf4j
@Component
@RocketMQMessageListener(topic = "${rocketmq.consumer.topic}",consumerGroup = "${rocketmq.consumer.group}",selectorExpression = "${demo.rocketmq.consumer.tag}"
)
public class FilterMessageListener implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {log.info("过滤Tag接收消息:{}", message);}
}
@Slf4j
@Component
@RocketMQMessageListener(topic = "${rocketmq.consumer.topic}",consumerGroup = "${rocketmq.consumer.group}",selectorExpression = "TAGS is not null and TAGS in ('tagA', 'tagB')",selectorType = SelectorType.SQL92
)
public class SQLFilterMessageListener implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {log.info("SQL过滤Tag接收消息:{}", message);}
}

4.7 重试和死信

默认重试次数为 -1,即 异步为16,顺序为Interger.MAXVALUE

异步重试时间间隔

第几次重试与上次重试的间隔时间第几次重试与上次重试的间隔时间
110s97min
230s108min
31min119min
42min1210min
53min1320min
64min1430min
75min151h
86min162h
@Slf4j
@Component
@RocketMQMessageListener(topic = "${rocketmq.consumer.topic}",consumerGroup = "${rocketmq.consumer.group}",maxReconsumeTimes = 3
)
public class RetryMessageListener implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {log.info("接收消息:{}", message);}
}

当一条消息在初次消费时失败,RocketMQ会自动进行消息重试。若达到最大重试次数后仍然失败,则表明该消息在正常情况下无法被正确消费。

此时,该消息并非立即丢弃,而是会被发送到特殊队列,称为死信队列(Dead-Letter Queue),而这类消息则被称为死信消息(Dead-Letter Message)。

死信队列是死信Topic下唯一的单独队列,而死信Topic的名称通常为%DLQ%ConsumerGroupName,其中 ConsumerGroupName 为对应消费者组的名称。

通过RocketMQ Admin工具或 RocketMQ Dashboard,可以查询到这些死信消息的信息,但它们不会再被消费。

4.8 设置消费组负载均衡策略

例如一个 Topic 有8个队列,一个消费组中有3个消费者,那这三个消费者各自去消费哪些队列。RocketMQ 默认提供了如下负载均衡算法:

  • AllocateMessageQueueAveragely:平均连续分配算法。
  • AllocateMessageQueueAveragelyByCircle:平均轮流分配算法。
  • AllocateMachineRoomNearby:机房内优先就近分配。
  • AllocateMessageQueueByConfig:手动指定,这个通常需要配合配置中心,在消费者启动时,首先先创建 AllocateMessageQueueByConfig 对象,然后根据配置中心的配置,再根据当前的队列信息,进行分配,即该方法不具备队列的自动负载,在 Broker 端进行队列扩容时,无法自动感知,需要手动变更配置。
  • AllocateMessageQueueByMachineRoom:消费指定机房中的队列,该分配算法首先需要调用该策略的 setConsumeridcs(Set<String> consumerIdCs) 方法,用于设置需要消费的机房,将刷选出来的消息按平均连续分配算法进行队列负载。
  • AllocateMessageQueueConsistentHash 一致性 Hash 算法。
@Slf4j
@Component
@RocketMQMessageListener(topic = "${rocketmq.consumer.topic}",consumerGroup = "${demo.rocketmq.rebalancegroup}"
)
public class RebalanceMessageListener implements RocketMQListener<String>, RocketMQPushConsumerLifecycleListener {@Overridepublic void onMessage(String message) {log.info("rebalance: {}", message);}@Overridepublic void prepareStart(DefaultMQPushConsumer consumer) {consumer.setAllocateMessageQueueStrategy(new AllocateMessageQueueAveragelyByCircle());}
}

4.9 设置offset

消费者将从上次消费的位置开始消费消息

@Slf4j
@Component
@RocketMQMessageListener(topic = "${rocketmq.consumer.topic}",consumerGroup = "${rocketmq.consumer.group}")
public class PushMessageListener implements RocketMQListener<String>, RocketMQPushConsumerLifecycleListener {@Overridepublic void onMessage(String message) {log.info("push {}", message);}@Overridepublic void prepareStart(DefaultMQPushConsumer consumer) {consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);}
}

4.10 Pull 消息

// 配置
rocketMQTemplate.getConsumer().subscribe(topic, "*");
rocketMQTemplate.getConsumer().setPullBatchSize(20);
List<MessageExt> messageExts = rocketMQTemplate.getConsumer().poll(1000);
log.info("poll 拉取消息:{}", messageExts);// 类似
List<String> receive = rocketMQTemplate.receive(String.class, 1000);
log.info("poll 拉取消息:{}", receive);

五、总结

5.1 优点

  1. 稳定性高:RocketMQ 在阿里巴巴内部被广泛应用,经过多年的生产环境验证,稳定性高。
  2. 高并发:支持高并发的消息处理,可以满足大量的消息生产和消费需求。
  3. 适应性广:支持多种消息协议,如 JMS、OpenMessaging 等,并且可以很容易地与不同的系统进行集成。
  4. 高可用性:RocketMQ 支持主从和分布式部署,可以保证在任何节点宕机的情况下服务仍然可用。
  5. 高可靠性:提供了三种级别的消息传递保证,并且支持事务消息,可以保证消息的可靠传递。
  6. 支持集群:提供了完善的集群机制,可以实现高可用和负载均衡。

5.2 缺点

  1. 学习曲线较陡峭:RocketMQ 的配置和使用较为复杂,需要一定时间来学习。
  2. 运维要求较高:RocketMQ 的运维工作较为复杂,需要有专业的团队来维护。
  3. 不适合大数据处理: 相对于一些专注于大数据处理的消息中间件,如Kafka,RocketMQ在大数据处理方面的性能可能不如人意。

参考

  1. https://rocketmq.apache.org/zh/docs/4.x/

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/16448.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

在Windows中安装Redis

一、下载Redis github链接&#xff1a;https://github.com/redis-windows/redis-windows/releases 二、安装 解压后点击start.bat文件即可启动服务 新开一个cmd窗口进入安装了Redis的文件夹输入redis-cli.exe -h 127.0.0.1 -p 6379连接Redis&#xff0c;见如下结果便是成功&…

sql-labs靶场环境搭建(手把手保姆级教学)

文章目录 一、sql-labs靶场简介&#xff1a;二、搭建过程1、资源下载2、配置文件&#xff1b;3、访问网站4、创建数据库 三、使用PhpStudy2018原因 一、sql-labs靶场简介&#xff1a; SQL-Labs 是一个实践环境&#xff0c;旨在用于数据库和 SQL&#xff08;结构化查询语言&…

某大型制造集团企业信息化建设总体规划设计方案(67页PPT)

方案介绍&#xff1a; 随着信息技术的飞速发展&#xff0c;企业信息化建设已成为提高管理效率、增强企业竞争力的重要手段。某大型制造集团为应对市场变化、提升管理水平、优化资源配置&#xff0c;决定进行全面深入的信息化建设。本方案旨在构建一个集生产、管理、销售、物流…

【DevOps】Jenkins + Dockerfile自动部署Maven(SpringBoot)项目

环境 docker_host192.168.0.1jenkins_host192.168.0.2 jenkins_host构建完成后把jar发布到docker_host&#xff0c;再通过dockerfile自动构建镜像&#xff0c;运行镜像 1 Jenkins安装 AWS EC2安装Jenkins&#xff1a;AWS EC2 JDK11 Jenkins-CSDN博客 AWS EC2上Docker安装…

T113调试7寸RGB屏

文章目录 软硬件介绍软件板卡屏幕 调试修改内核设备树修改U-Boot设备树 测试添加启动logo其它问题总结 软硬件介绍 软件 基于Tina5.0 SDK。 板卡 韦东山的T113工业板&#xff1a; 屏幕 韦东山的7寸RGB电容触摸屏&#xff1a; 调试 修改内核设备树 打开内核设备树<…

代码随想录算法训练营第四天| 24. 两两交换链表中的节点、19.删除链表的倒数第N个节点 、 面试题 02.07. 链表相交、142.环形链表II

24. 两两交换链表中的节点 题目链接&#xff1a; 24. 两两交换链表中的节点 文档讲解&#xff1a;代码随想录 状态&#xff1a;没做出来&#xff0c;没有正确更新头节点&#xff0c;因为head和cur共享引用&#xff0c;会随着cur的移动&#xff0c;丢失之前存放的节点 错误代码&…

51单片机-实机演示(按键)

书接上回。http://t.csdnimg.cn/4wSSW 目录 一.按下灭&#xff0c;松开亮 二.两个按键控制两个灯 三.点一下灯开&#xff0c;在按一下关 四。优化按键消抖 1.加入bit变量 一.按下灭&#xff0c;松开亮 代码 #include <reg52.h> //此文件中定义了单片机的一些特…

自定义函数python:深入解析与实操

新书上架~&#x1f447;全国包邮奥~ python实用小工具开发教程http://pythontoolsteach.com/3 欢迎关注我&#x1f446;&#xff0c;收藏下次不迷路┗|&#xff40;O′|┛ 嗷~~ 目录 一、引言&#xff1a;函数的命名与规范 二、函数命名&#xff1a;遵循规范&#xff0c;易于…

利用阅读APP3.0目录展示要查看的内容01

喜欢读电子书的小伙伴往往会遇到一个问题&#xff0c;就是想要看书中某些内容&#xff0c;但是不知道具体章节&#xff0c;所以就用查找功能来查&#xff0c;但是呢查找功能查出来展示的结果并不直观。 比如想要阅读:青竹蜂云剑&#xff0c;大衍决&#xff0c;南宫婉&#xff0…

265 基于matlab的粒子群优化分数阶灰色预测模型

基于matlab的粒子群优化分数阶灰色预测模型&#xff0c;以误差结果为目标进行预测&#xff0c;输出多个预测结果。并输出迭代曲线。程序已调通&#xff0c;可直接运行。 265 分数阶灰色预测 粒子群优化算法 - 小红书 (xiaohongshu.com)

二叉树——经典练习题

目录 前言&#xff1a; 一、单值二叉树 题目描述&#xff1a; 思路分析&#xff1a; 代码实现&#xff1a; 二、二叉树最大深度 题目描述&#xff1a; 思路分析&#xff1a; 代码实现&#xff1a; 三、检查两颗树是否相同 题目描述&#xff1a; 思路分析&#xff1a; 代…

mac电脑用n切换node版本

一、安装 node版本管理工具 “n” sudo npm install -g n二、检查安装成功&#xff1a; n --version三、查看依赖包的所有版本号 比如: npm view webpack versions --json npm view 依赖包名 versions --json四、安装你需要的版本的node sudo n <node版本号> // 例如…

【C语言项目实战】使用单链表实现通讯录

&#x1f493; 博客主页&#xff1a;倔强的石头的CSDN主页 &#x1f4dd;Gitee主页&#xff1a;倔强的石头的gitee主页 期待您的关注 ​ 目录 一、引言 二、单链表的基本概念 三、通讯录项目的需求分析 四、通讯录的数据结构 五、通讯录的接口 1.通讯录初始化 / 导入外部…

2010-2024年别克维修手册和电路图线路接线图资料更新

经过整理&#xff0c;2010-2024年别克汽车全系列已经更新至汽修帮手资料库内&#xff0c;覆盖市面上99%车型&#xff0c;包括维修手册、电路图、新车特征、车身钣金维修数据、全车拆装、扭力、发动机大修、发动机正时、保养、电路图、针脚定义、模块传感器、保险丝盒图解对照表…

Qt 界面上控件自适应窗体大小 - 随窗体缩放

Qt 界面上控件自适应窗体大小 - 随窗体缩放 引言一、在Qt Designer上设置二、参数详解三、参考链接 引言 添加布局&#xff0c;设置控件的minimumSize、maximumSize和sizePolicy可以使其跟随窗体进行自适应缩放 - 如上图所示。 一、在Qt Designer上设置 在代码中设置效果一致…

HTML.

HTML:超文本标记语言&#xff08;Hyper Text Markup Language&#xff09; 超文本&#xff1a;不同于普通文本&#xff0c;可以定义图片&#xff0c;音频&#xff0c;视频等内容 标记语言&#xff1a;由标签构成的语言 HTML标签都是预定义好的HTML代码直接在浏览器中运行&#…

JVM之【运行时数据区】

JVM简图 运行时数据区简图 一、程序计数器&#xff08;Program Counter Register&#xff09; 1.程序计数器是什么&#xff1f; 程序计数器是JVM内存模型中的一部分&#xff0c;它可以看作是一个指针&#xff0c;指向当前线程所执行的字节码指令的地址。每个线程在执行过程中…

深度神经网络——什么是生成式人工智能?

1.引言 生成式人工智能最近引起了很大的关注。 该术语用于指依赖无监督或半监督学习算法来创建新的数字图像、视频、音频和文本的任何类型的人工智能系统。 麻省理工学院表示&#xff0c;生成式人工智能是过去十年人工智能领域最有前途的进展之一。 通过生成式人工智能&#…

AI智能体|手把手教你使用扣子Coze图像流的文生图功能

大家好&#xff0c;我是无界生长。 AI智能体&#xff5c;手把手教你使用扣子Coze图像流的文生图功能本文详细介绍了Coze平台的\x26quot;图像流\x26quot;功能中的\x26quot;文生图\x26quot;节点&#xff0c;包括创建图像流、编排文生图节点、节点参数配置&#xff0c;并通过案例…

Layui设置table表格中时间的显示格式

1、问题概述? 【数据库中的时间格式】 【Layui中table表格默认的显示格式】 默认的格式中会显示时间的毫秒单位,但是这个毫秒有时候是不需要的。 总结:这个时候我们就需要定义table表格中的时间显示格式。 2、解决办法? 【解决后时间的显示格式】 【解决办法1:通过字符…