入门 SpringCloudStream 之 RocketMq 实践全集

本文概览:

  • 组件介绍
  • 工作流程
  • 文本消息+自定义信道
  • 多主题+文本消息+自定义信道
  • 标签过滤+获取头信息
  • 定向的异常处理与全局异常处理
  • 顺序消息
    • 全局顺序消息
    • 局部顺序消息
  • 事务消息

当在选取队列组件的时候,通常要结合实际情况,大数据场景Kafka可能是理想的选择,事务或延迟队列场景可能RocketMQ是较成熟的选择,其他常规业务高性能场景可能RabbitMQ是不错的选择。今天这里为了了解和使用事务和延迟队列的特性,选择研究RocketMQ。

本文实践版本:

  • Spring-Cloud-Stream: 2.2.10-C1
  • Spring-Boot: 2.3.12.RELEASE

1、组件介绍

Producer:生产者,支持分布式集群部署,支持快速产生消息并投递

Consumer:消费者,支持分布式集群部署,支持Push和Pull的模式消费数据,支持集群和广播方式消费数据

NameServer:topic注册中心,支持Broker的动态注册与发现

Broker: 负责消息的存储、投递、查询与服务高可用

其他名词:

  • Topic: 主题,对消息分类
  • Message: 消息体
  • MessageID: 全局唯一标志,系统自动生产
  • Tag: 二级消息类型,区分某个Topic下的消息分类
  • Producer实例: 生产者的一个对象实例
  • Consumer实例: 消费者的一个对象实例
  • Group: 一类producer和consumer
  • Group ID: group标识
  • 队列: 每个Topic会对应一个或者多个队列来存储信息
  • Exactly-Once 语义:一条消息之后能被consumer消费一次,即使重试也不会多次消费。消息队列 RocketMQ 的 Exactly-Once 投递语义适用于“接收消息 -> 处理消息 -> 结果持久化到数据库”的流程,能够保证您的每一条消息消费的最终处理结果写入到您的数据库一次且仅一次,保证消息消费的幂等。
  • 集群消费:同一个groupId下的consumer平均消费,一个消息只被投递到某一个consumer中
  • 广播消费:同一个groupId下的consumer各自消费,一个消息被投递到等多个consumer中
  • 定时消息:指定时间将消息投递给consumer进行消费
  • 延时消息:延后一段时间投递给consumer进行消费
  • 事务消息:分布式事务最终一致性
  • 顺序消息:按照顺序进行发布和消费
  • 全局顺序消息:一种特殊的分区顺序消息,严格遵守先进先出进行发布会和消费
  • 分区顺序消息:一个topic多个分区,通过shardingKey区分分区,同一个分区内遵守先进先出,多分区能够增加并发度提升性能
  • 消息堆积:消费者未能在短时间内消费所有数据
  • 消息过滤:消费者可以根据TAG过滤消息
  • 消息轨迹:从生产者产出,到消费者消费的过程中,各个香干节点的时间、地点等数据汇聚而成的完整链路
  • 重置消费位点:在消息持久化存储的时间范围内,重新设置消费进度,成功设置时间点后,由生产者发送到服务端的消息
  • 死信队列:处理无法正常消费的消息,消息被初次消费失败后,会进行自动重试,重试达到上限依旧失败后,消息会被放入死信队列-Dead Letter Queue,存储死信消息的特殊队列
  • 消息路由:不同地域之间的消息同步

2、工作流程

  • 启动 NameServer
  • 启动 Broker
  • Broker/Producer/Consumer 注册至 NameServer,并彼此获取Topic等信息
  • 发送消息前,创建Topic
  • Producer启动,与NameServer建立长链接。从NameServere获取Broker信息,与Broker建立长链接,向Broker发送消息
  • Consumer启动,与NameServer建立长链接。从NameServere获取Broker信息,与Broker建立长链接,从Broker获取消息

3、文本消息+自定义信道

server:port: 8090spring:cloud:stream:rocketmq:binder:name-server: 127.0.0.1:9876group: rmq-gropbindings:textoutput:destination: text-topiccontentType: text/plaingroup: text-grouptextinput:destination: text-topiccontentType: text/plaingroup: text-group
# 日志级别
logging:level:com.alibaba.cloud.stream.binder.rocketmq: info
public interface SelfSink {public static String TEXT_INPUT="textinput";@Input(TEXT_INPUT)SubscribableChannel textInput();
}@Configuration
@EnableBinding(SelfSink.class)
public class RmqConsumerConfig {}@Service
public class ReceiveService {@StreamListener(value = SelfSink.TEXT_INPUT)public void textInput(String message) {System.out.println("receive content:" + message);}
}@Configuration
@EnableBinding(SelfSource.class)
public class RmqProducerConfig {}public interface SelfSource {public static String TEXT_OUTPUT="textoutput";@Output(TEXT_OUTPUT)MessageChannel textOutput();
}@Service
public class SendService {@Autowiredprivate SelfSource source;public void sendText(String msg){Message<String> message = (Message<String>) MessageBuilder.withPayload(msg).build();source.textOutput().send(message);}
}

4、多主题+文本消息+自定义信道

server:port: 8090spring:cloud:stream:rocketmq:binder:name-server: 127.0.0.1:9876group: rmq-gropbindings:textoutput:destination: text-topiccontentType: text/plaingroup: text-grouptextoutput2:destination: text-topic2contentType: text/plaingroup: text-group2textinput:destination: text-topiccontentType: text/plaingroup: text-grouptextinput2:destination: text-topic2contentType: text/plaingroup: text-group2
# 日志级别
logging:level:com.alibaba.cloud.stream.binder.rocketmq: info
public interface SelfSource {public static String TEXT_OUTPUT="textoutput";public static String TEXT_OUTPUT2="textoutput2";@Output(TEXT_OUTPUT)MessageChannel textOutput();@Output(TEXT_OUTPUT2)MessageChannel textOutput2();
}@Service
public class SendService {@Autowiredprivate SelfSource source;public void sendText(String msg){Message<String> message = (Message<String>) MessageBuilder.withPayload(msg).build();source.textOutput().send(message);}public void sendText2(String msg){Message<String> message = (Message<String>) MessageBuilder.withPayload(msg).build();source.textOutput2().send(message);}
}public interface SelfSink {public static String TEXT_INPUT="textinput";public static String TEXT_INPUT2="textinput2";@Input(TEXT_INPUT)SubscribableChannel textInput();@Input(TEXT_INPUT2)SubscribableChannel textInput2();
}@Service
public class ReceiveService {@StreamListener(value = SelfSink.TEXT_INPUT)public void textInput(String message) {System.out.println("receive group1 content:" + message);}@StreamListener(value = SelfSink.TEXT_INPUT2)public void textInput2(String message) {System.out.println("receive group2 content:" + message);}
}

5、标签过滤+获取头信息

server:port: 8090spring:cloud:stream:rocketmq:binder:name-server: 127.0.0.1:9876group: rmq-groupbindings:textinput3:consumer:subscription: tagfilterbindings:textoutput2:destination: text-topic2contentType: text/plaingroup: text-group2textinput3:destination: text-topic2contentType: text/plaingroup: text-group2
# 日志级别
logging:level:com.alibaba.cloud.stream.binder.rocketmq: info
public interface SelfSource {public static String TEXT_OUTPUT2="textoutput2";@Output(TEXT_OUTPUT2)MessageChannel textOutput2();
}@Service
public class SendService {@Autowiredprivate SelfSource source;public void sendText(String msg) {Message<String> message = (Message<String>) MessageBuilder.withPayload(msg).build();source.textOutput().send(message);}public void sendText2(String msg) {Message<String> message = (Message<String>) MessageBuilder.withPayload(msg).build();source.textOutput2().send(message);}public void sendWithTag(String msg, String tag) {Message<String> message = MessageBuilder.withPayload(msg).setHeader(MessageConst.PROPERTY_TAGS, tag).setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON).build();source.textOutput2().send(message);}
}public interface SelfSink {public static String TEXT_INPUT="textinput";public static String TEXT_INPUT2="textinput2";public static String TEXT_INPUT3="textinput3";@Input(TEXT_INPUT3)SubscribableChannel textInput3();
}@Service
public class ReceiveService {@StreamListener(value = SelfSink.TEXT_INPUT3,condition = "headers['ROCKET_TAGS'] == 'tagfilter'")public void textInput3WithTag(String message, @Headers Map headers, @Header(name = "ROCKET_TAGS")String name) {System.out.println("receive group2 tagfilter content:" + message +", headers="+headers +", name="+name);}
}
send tagfilter group2 index:0
send tagfilter group2 index:1
send tagfilter group2 index:2
send tagfilter group2 index:3
receive group2 tagfilter content:group2-index:0, headers={ROCKET_TAGS=tagfilter, ROCKET_MQ_BORN_TIMESTAMP=1698235938769, ROCKET_MQ_FLAG=0, ROCKET_MQ_MESSAGE_ID=C6120001C9F42C13DA157FEE87CD0000, ROCKET_MQ_TOPIC=text-topic2, ROCKET_MQ_BORN_HOST=172.17.0.1, id=c13ac4ee-f498-a558-c6b7-30ed7563220f, ROCKET_MQ_SYS_FLAG=0, contentType=application/json, ROCKET_MQ_QUEUE_ID=3, timestamp=1698235938796}, name=tagfilter
receive group2 tagfilter content:group2-index:1, headers={ROCKET_TAGS=tagfilter, ROCKET_MQ_BORN_TIMESTAMP=1698235938793, ROCKET_MQ_FLAG=0, ROCKET_MQ_MESSAGE_ID=C6120001C9F42C13DA157FEE87E90002, ROCKET_MQ_TOPIC=text-topic2, ROCKET_MQ_BORN_HOST=172.17.0.1, id=df97a997-8437-c3de-7802-80313be691ed, ROCKET_MQ_SYS_FLAG=0, contentType=application/json, ROCKET_MQ_QUEUE_ID=0, timestamp=1698235938818}, name=tagfilter
send tagfilter group2 index:4
receive group2 tagfilter content:group2-index:4, headers={ROCKET_TAGS=tagfilter, ROCKET_MQ_BORN_TIMESTAMP=1698235938816, ROCKET_MQ_FLAG=0, ROCKET_MQ_MESSAGE_ID=C6120001C9F42C13DA157FEE88000009, ROCKET_MQ_TOPIC=text-topic2, ROCKET_MQ_BORN_HOST=172.17.0.1, id=020522c8-6c29-b4db-c3ca-9987bd108625, ROCKET_MQ_SYS_FLAG=0, contentType=application/json, ROCKET_MQ_QUEUE_ID=3, timestamp=1698235938825}, name=tagfilter
receive group2 tagfilter content:group2-index:3, headers={ROCKET_TAGS=tagfilter, ROCKET_MQ_BORN_TIMESTAMP=1698235938808, ROCKET_MQ_FLAG=0, ROCKET_MQ_MESSAGE_ID=C6120001C9F42C13DA157FEE87F80007, ROCKET_MQ_TOPIC=text-topic2, ROCKET_MQ_BORN_HOST=172.17.0.1, id=aa7d5d49-4178-f621-38ed-70b77bbfffc3, ROCKET_MQ_SYS_FLAG=0, contentType=application/json, ROCKET_MQ_QUEUE_ID=2, timestamp=1698235939676}, name=tagfilter
receive group2 tagfilter content:group2-index:2, headers={ROCKET_TAGS=tagfilter, ROCKET_MQ_BORN_TIMESTAMP=1698235938801, ROCKET_MQ_FLAG=0, ROCKET_MQ_MESSAGE_ID=C6120001C9F42C13DA157FEE87F10005, ROCKET_MQ_TOPIC=text-topic2, ROCKET_MQ_BORN_HOST=172.17.0.1, id=76d60399-c127-adab-e262-59d01f50baab, ROCKET_MQ_SYS_FLAG=0, contentType=application/json, ROCKET_MQ_QUEUE_ID=1, timestamp=1698235939677}, name=tagfilter

6、定向的异常处理与全局异常处理

  @StreamListener(value = SelfSink.TEXT_INPUT)public void textInput(String message) {System.out.println("receive group1 content:" + message);throw new IllegalStateException("异常信息1");}@StreamListener(value = SelfSink.TEXT_INPUT3,condition = "headers['ROCKET_TAGS'] == 'tagfilter'")public void textInput3WithTag(String message, @Headers Map headers, @Header(name = "ROCKET_TAGS")String name) {System.out.println("receive group2 tagfilter content:" + message +", headers="+headers +", name="+name);throw new IllegalArgumentException("异常信息3");}@ServiceActivator(inputChannel = "text-topic.text-group.errors")public void handleError(ErrorMessage errorMessage){Throwable throwable = errorMessage.getPayload();System.out.println("定向异常:"+throwable);Message<?> originalMessage = errorMessage.getOriginalMessage();System.out.println(Objects.nonNull(originalMessage)?originalMessage:"空");}@StreamListener("errorChannel")public void error(Message<?> message){ErrorMessage errorMessage = (ErrorMessage)message;System.out.println("其他异常:"+errorMessage);}
send group1 index:0
receive group1 content:group1-index:0
send tagfilter group2 index:0
receive group2 tagfilter content:group2-index:0, headers={ROCKET_TAGS=tagfilter, ROCKET_MQ_BORN_TIMESTAMP=1698236994357, ROCKET_MQ_FLAG=0, ROCKET_MQ_MESSAGE_ID=C6120001D1122C13DA157FFEA3350003, ROCKET_MQ_TOPIC=text-topic2, ROCKET_MQ_BORN_HOST=172.17.0.1, id=ff93d975-c880-fbf0-0168-6b6e0fab9653, ROCKET_MQ_SYS_FLAG=0, contentType=application/json, ROCKET_MQ_QUEUE_ID=0, timestamp=1698236995233}, name=tagfilter
receive group1 content:group1-index:0
receive group2 tagfilter content:group2-index:0, headers={ROCKET_TAGS=tagfilter, ROCKET_MQ_BORN_TIMESTAMP=1698236994357, ROCKET_MQ_FLAG=0, ROCKET_MQ_MESSAGE_ID=C6120001D1122C13DA157FFEA3350003, ROCKET_MQ_TOPIC=text-topic2, ROCKET_MQ_BORN_HOST=172.17.0.1, id=ff93d975-c880-fbf0-0168-6b6e0fab9653, ROCKET_MQ_SYS_FLAG=0, contentType=application/json, ROCKET_MQ_QUEUE_ID=0, timestamp=1698236995233}, name=tagfilter
receive group1 content:group1-index:0
INFO 53522 --- [ad_text-group_1] o.s.i.h.s.MessagingMethodInvokerHelper   : Overriding default instance of MessageHandlerMethodFactory with provided one.
定向异常:org.springframework.messaging.MessagingException: Exception thrown while invoking ReceiveService#textInput[1 args]; nested exception is java.lang.IllegalStateException: 异常信息1, failedMessage=GenericMessage [payload=byte[14], headers={ROCKET_MQ_BORN_TIMESTAMP=1698236994326, ROCKET_MQ_FLAG=0, ROCKET_MQ_MESSAGE_ID=C6120001D1122C13DA157FFEA3130000, ROCKET_MQ_TOPIC=text-topic, ROCKET_MQ_BORN_HOST=172.17.0.1, id=5672d0c2-04ae-6012-ae06-5cd84c2781fb, ROCKET_MQ_SYS_FLAG=0, contentType=text/plain, ROCKET_MQ_QUEUE_ID=3, timestamp=1698236994353}]
空
receive group2 tagfilter content:group2-index:0, headers={ROCKET_TAGS=tagfilter, ROCKET_MQ_BORN_TIMESTAMP=1698236994357, ROCKET_MQ_FLAG=0, ROCKET_MQ_MESSAGE_ID=C6120001D1122C13DA157FFEA3350003, ROCKET_MQ_TOPIC=text-topic2, ROCKET_MQ_BORN_HOST=172.17.0.1, id=ff93d975-c880-fbf0-0168-6b6e0fab9653, ROCKET_MQ_SYS_FLAG=0, contentType=application/json, ROCKET_MQ_QUEUE_ID=0, timestamp=1698236995233}, name=tagfilter
其他异常:ErrorMessage [payload=org.springframework.messaging.MessagingException: Exception thrown while invoking ReceiveService#textInput3WithTag[3 args]; nested exception is java.lang.IllegalArgumentException: 异常信息3, failedMessage=GenericMessage [payload=byte[14], headers={ROCKET_TAGS=tagfilter, ROCKET_MQ_BORN_TIMESTAMP=1698236994357, ROCKET_MQ_FLAG=0, ROCKET_MQ_MESSAGE_ID=C6120001D1122C13DA157FFEA3350003, ROCKET_MQ_TOPIC=text-topic2, ROCKET_MQ_BORN_HOST=172.17.0.1, id=ff93d975-c880-fbf0-0168-6b6e0fab9653, ROCKET_MQ_SYS_FLAG=0, contentType=application/json, ROCKET_MQ_QUEUE_ID=0, timestamp=1698236995233}], headers={id=266dcc54-fb75-0a90-e456-917b8743ff5b, timestamp=1698236998247}]
ERROR 53522 --- [d_text-group2_1] o.s.integration.handler.LoggingHandler   : org.springframework.messaging.MessagingException: Exception thrown while invoking ReceiveService#textInput3WithTag[3 args]; nested exception is java.lang.IllegalArgumentException: 异常信息3, failedMessage=GenericMessage [payload=byte[14], headers={ROCKET_TAGS=tagfilter, ROCKET_MQ_BORN_TIMESTAMP=1698236994357, ROCKET_MQ_FLAG=0, ROCKET_MQ_MESSAGE_ID=C6120001D1122C13DA157FFEA3350003, ROCKET_MQ_TOPIC=text-topic2, ROCKET_MQ_BORN_HOST=172.17.0.1, id=ff93d975-c880-fbf0-0168-6b6e0fab9653, ROCKET_MQ_SYS_FLAG=0, contentType=application/json, ROCKET_MQ_QUEUE_ID=0, timestamp=1698236995233}]at org.springframework.cloud.stream.binding.StreamListenerMessageHandler.handleRequestMessage(StreamListenerMessageHandler.java:64)at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:134)at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:69)at org.springframework.cloud.stream.binding.DispatchingStreamListenerMessageHandler.handleRequestMessage(DispatchingStreamListenerMessageHandler.java:96)

获取异常消息体内容:尝试关闭springcloudstream自带的重试机制,能够实现。
上面步骤没有特殊配置,默认遇到异常会进行重新投递,导致System.out.println(Objects.nonNull(originalMessage)?originalMessage:"空");始终为空,无法获取原始信息。
调整如下配置后:

server:port: 8090spring:cloud:stream:rocketmq:binder:name-server: 127.0.0.1:9876group: rmq-groupbindings:textinput3:consumer:subscription: tagfilterbindings:textoutput:destination: text-topiccontentType: text/plaingroup: text-grouptextoutput2:destination: text-topic2contentType: text/plaingroup: text-group2textinput:destination: text-topiccontentType: text/plaingroup: text-groupconsumer:maxAttempts: 1 #--> 默认是3,1表示不重试textinput2:destination: text-topic2contentType: text/plaingroup: text-group2textinput3:destination: text-topic2contentType: text/plaingroup: text-group2
# 日志级别
logging:level:com.alibaba.cloud.stream.binder.rocketmq: info

可以获取原始异常

 @ServiceActivator(inputChannel = "text-topic.text-group.errors")public void handleError(ErrorMessage errorMessage){Throwable throwable = errorMessage.getPayload();System.out.println("定向异常:"+throwable);Message<?> originalMessage = errorMessage.getOriginalMessage();
//        System.out.println(Objects.nonNull(originalMessage)?"处理定向异常原始信息: "+ originalMessage :"无");assert originalMessage != null;System.out.println("处理定向异常原始信息:"+new String((byte[])originalMessage.getPayload()));}
定向异常:org.springframework.messaging.MessagingException: Exception thrown while invoking ReceiveService#textInput[1 args]; nested exception is java.lang.IllegalStateException: 异常信息1, failedMessage=GenericMessage [payload=byte[14], headers={ROCKET_MQ_BORN_TIMESTAMP=1698237547339, ROCKET_MQ_FLAG=0, ROCKET_MQ_MESSAGE_ID=C6120001D46F2C13DA15800713490000, ROCKET_MQ_TOPIC=text-topic, ROCKET_MQ_BORN_HOST=172.17.0.1, id=1d05982a-38f7-5208-076d-7b84a2555bf1, ROCKET_MQ_SYS_FLAG=0, contentType=text/plain, ROCKET_MQ_QUEUE_ID=0, timestamp=1698237547362}]
处理定向异常原始信息:group1-index:0

7、顺序消息

全局顺序消息

server:port: 8090spring:cloud:stream:rocketmq:binder:name-server: 127.0.0.1:9876group: rmq-groupbindings:pojooutput:producer:sendType: Syncgroup: pojo-grouppojoinput:consumer:orderly: true #这个参数的写法和你当前使用的版本息息相关,具体见官方文档bindings:pojooutput:destination: pojo-topiccontentType: application/jsongroup: pojo-grouppojoinput:destination: pojo-topiccontentType: application/jsongroup: pojo-group
# 日志级别
logging:level:com.alibaba.cloud.stream.binder.rocketmq: info
public class MessageDto {private static final long serialVersionUID =1L;private String index;private String title;private String content;public String getTitle() {return title;}public void setTitle(String title) {this.title = title;}public String getContent() {return content;}public void setContent(String content) {this.content = content;}public String getIndex() {return index;}public void setIndex(String index) {this.index = index;}@Overridepublic String toString() {return "MessageDto{" +"index='" + index + '\'' +", title='" + title + '\'' +", content='" + content + '\'' +'}';}
}@Component
public class ProducerRunner implements CommandLineRunner {@Autowiredprivate SendService sendService;@Overridepublic void run(String... args) throws Exception {for (int i = 0; i < 5; i++) {MessageDto messageDto = new MessageDto();messageDto.setIndex("num:" + i);messageDto.setTitle("title");messageDto.setContent("content");sendService.sendPojoOrderly(messageDto);}}
}public interface SelfSource {public static String POJO_OUTPUT="pojooutput";@Output(POJO_OUTPUT)MessageChannel pojoOutput();
}@Service
public class SendService {@Autowiredprivate SelfSource source;public void sendPojoOrderly(MessageDto messageDto){Message<MessageDto> message = MessageBuilder.withPayload(messageDto).setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON).build();System.out.println(message);source.pojoOutput().send(message);}
}public interface SelfSink {public static String POJO_INPUT="pojoinput";@Input(POJO_INPUT)SubscribableChannel pojoInput();
}@Service
public class ReceiveService {@StreamListener(value = SelfSink.POJO_INPUT)public void pojoInout(@Payload MessageDto messageDto){System.out.println("[onMessage][线程编号:{} 消息内容:{}]" + Thread.currentThread().getId()+", "+messageDto.toString());} 
}    

测试效果:正常发送,并且按照顺序消费(不加orderly就是乱序)

GenericMessage [payload=MessageDto{index='num:0', title='title', content='content'}, headers={contentType=application/json, id=00641aae-71d2-3a01-35f4-b3ec42886e9c, timestamp=1698745880467}]
GenericMessage [payload=MessageDto{index='num:1', title='title', content='content'}, headers={contentType=application/json, id=4d4a8cc2-d861-cf76-f0f5-33fdc06b2942, timestamp=1698745880521}]
GenericMessage [payload=MessageDto{index='num:2', title='title', content='content'}, headers={contentType=application/json, id=009ebc60-de06-adda-c44e-82faaae155c7, timestamp=1698745880525}]
GenericMessage [payload=MessageDto{index='num:3', title='title', content='content'}, headers={contentType=application/json, id=cad3af6c-df76-bc54-49c5-61cd27ef5b08, timestamp=1698745880528}]
GenericMessage [payload=MessageDto{index='num:4', title='title', content='content'}, headers={contentType=application/json, id=9beea414-7a78-a2e6-58f8-27729a2c7300, timestamp=1698745880531}]
[onMessage][线程编号:{} 消息内容:{}]119, MessageDto{index='num:2', title='title', content='content'}
[onMessage][线程编号:{} 消息内容:{}]119, MessageDto{index='num:0', title='title', content='content'}
[onMessage][线程编号:{} 消息内容:{}]119, MessageDto{index='num:4', title='title', content='content'}
[onMessage][线程编号:{} 消息内容:{}]119, MessageDto{index='num:1', title='title', content='content'}
[onMessage][线程编号:{} 消息内容:{}]119, MessageDto{index='num:3', title='title', content='content'}

配置顺序消费后,rocketmq内部同一个topic有多个queue,默认4个,每个queue中的数据是有序的,从日志打印的表面上看还是乱的实则已经队列内有序
但是如果想看到依据某一个标准,看到绝对的有序,可以通过分区有序来观测

局部顺序消息

计划分配两个分区,依据index【0,1】字段来分流到不同的队列,从而看出队列内的有序情况

server:port: 8090spring:cloud:stream:bindings:pojooutput: destination: pojo-topic content-type: application/json group: pojo-group producer:partitionCount: 2 #消息生产需要广播的消费者数量。即消息分区的数量partitionKeyExpression: payload.index #分区 key 表达式。该表达式基于 Spring EL,从消息中获得分区 keypojoinput:destination: pojo-topiccontent-type: application/json group: pojo-group consumer:partitioned: true rocketmq:binder:name-server: 127.0.0.1:9876 group: rmq-groupbindings:pojooutput:producer:sendMsgTimeout: 3000 sendType: Sync pojoinput:consumer:enabled: true subscription: myTag||look messageModel: CLUSTERING push:orderly: trueinstance-count: 2 instance-index: 0 
# 日志级别
logging:level:com.alibaba.cloud.stream.binder.rocketmq: info

最终能看到数据被index分区,每一个区里面消费是顺序的

GenericMessage [payload=MessageDto{index='0', title='title0', content='content'}, headers={id=e06aef14-e094-ab5e-32a6-c8ba574eb7cf, contentType=application/json, TAGS=myTag, timestamp=1698810537731}]
GenericMessage [payload=MessageDto{index='0', title='title1', content='content'}, headers={id=bdb2d72e-4502-380a-6324-e49af58e6082, contentType=application/json, TAGS=myTag, timestamp=1698810537790}]
GenericMessage [payload=MessageDto{index='0', title='title2', content='content'}, headers={id=5b2ea555-a3f5-648f-99e5-ac4183fd939c, contentType=application/json, TAGS=myTag, timestamp=1698810537794}]
GenericMessage [payload=MessageDto{index='0', title='title3', content='content'}, headers={id=ba9ff31d-68ef-bcdb-3e86-f92cb88804c2, contentType=application/json, TAGS=myTag, timestamp=1698810537796}]
GenericMessage [payload=MessageDto{index='1', title='title4', content='content'}, headers={id=1c4b47c3-3f65-cf65-fa8e-143d873602f3, contentType=application/json, TAGS=myTag, timestamp=1698810537800}]
GenericMessage [payload=MessageDto{index='1', title='title5', content='content'}, headers={id=030732b8-1792-5899-b704-90462774ee76, contentType=application/json, TAGS=myTag, timestamp=1698810537803}]
GenericMessage [payload=MessageDto{index='1', title='title6', content='content'}, headers={id=2e4cdcc9-1b46-fddc-9df8-1c000d66648c, contentType=application/json, TAGS=myTag, timestamp=1698810537808}]
GenericMessage [payload=MessageDto{index='1', title='title7', content='content'}, headers={id=9f45f277-bab6-9abe-1618-6755ec3715b4, contentType=application/json, TAGS=myTag, timestamp=1698810537811}]
[onMessage][线程编号:{} 消息内容:{}]120, MessageDto{index='1', title='title4', content='content'}
[onMessage][线程编号:{} 消息内容:{}]120, MessageDto{index='1', title='title5', content='content'}
[onMessage][线程编号:{} 消息内容:{}]120, MessageDto{index='1', title='title6', content='content'}
[onMessage][线程编号:{} 消息内容:{}]120, MessageDto{index='1', title='title7', content='content'}
[onMessage][线程编号:{} 消息内容:{}]120, MessageDto{index='0', title='title0', content='content'}
[onMessage][线程编号:{} 消息内容:{}]120, MessageDto{index='0', title='title1', content='content'}
[onMessage][线程编号:{} 消息内容:{}]120, MessageDto{index='0', title='title2', content='content'}
[onMessage][线程编号:{} 消息内容:{}]120, MessageDto{index='0', title='title3', content='content'}

8、事务消息

自定义listener,来处理事务的执行、事务的提交或回滚

server:port: 8090spring:cloud:stream:bindings:transoutput:destination: trans-topiccontent-type: application/jsongroup: trans-grouptransinput:destination: trans-topiccontent-type: application/jsongroup: trans-grouprocketmq:binder:name-server: 127.0.0.1:9876 group: rmq-group bindings:transoutput:producer:producerType: Trans transactionListener: rocketMQTransactionListener group: trans-grouptransinput:consumer:group: trans-group
# 日志级别
logging:level:com.alibaba.cloud.stream.binder.rocketmq: info
public class TransData {private String param1;private String param2;public String getParam1() {return param1;}public void setParam1(String param1) {this.param1 = param1;}public String getParam2() {return param2;}public void setParam2(String param2) {this.param2 = param2;}@Overridepublic String toString() {return "TransData{" +"param1='" + param1 + '\'' +", param2='" + param2 + '\'' +'}';}
}public interface SelfSource {public static String TRANS_OUTPUT="transoutput";@Output(TRANS_OUTPUT)MessageChannel transOutput();
}@Service
public class SendService {@Autowiredprivate SelfSource source;public void sendTrans(String transMessage) {TransData data = new TransData();data.setParam1("data1");data.setParam2("data2");Message<String> message = MessageBuilder.withPayload(transMessage).setHeader("args", JSON.toJSONString(data)).build();source.transOutput().send(message);}
}public interface SelfSink {public static String TRANS_INPUT="transinput";@Input(TRANS_INPUT)SubscribableChannel transInput();
}@Component
public class RocketMQTransactionListener implements TransactionListener {@Overridepublic LocalTransactionState executeLocalTransaction(Message msg, Object arg) {System.out.println("=========本地事务开始执行=============");String message = new String(msg.getBody());System.out.println("原始消息体:{}"+message);String tid = msg.getTransactionId();System.out.println("事务消息id:{}"+ tid);//模拟执行本地事务begin=======System.out.println("本地事务执行参数,start......----------------------");TransData args = JSON.parseObject(msg.getProperty("args"), TransData.class);//rollback, commit or unknownSystem.out.println("[executeLocalTransaction][执行本地事务,消息:{} args:{}]"+ msg+"--"+ args.toString());System.out.println("本地事务执行参数,end......------------------------");//模拟执行本地事务end========//TODO 根据本地事务执行结果返回//LocalTransactionState.COMMIT_MESSAGE 二次确认消息,然后消费者可以消费//LocalTransactionState.ROLLBACK_MESSAGE 回滚消息,Broker端会删除半消息//LocalTransactionState.UNKNOW Broker端会进行回查消息return LocalTransactionState.UNKNOW;}@Overridepublic LocalTransactionState checkLocalTransaction(MessageExt msg) {System.out.println("==========回查接口=========");//rollback, commit or unknownSystem.out.println("[checkLocalTransaction][回查消息:{}]"+ msg);String tid = msg.getTransactionId();System.out.println("[checkLocalTransaction][事务消息id:{}]"+ tid);return LocalTransactionState.COMMIT_MESSAGE;}
}public interface SelfSink {public static String TRANS_INPUT="transinput";@Input(TRANS_INPUT)SubscribableChannel transInput();
}@Service
public class ReceiveService {@StreamListener(value = SelfSink.TRANS_INPUT)public void transInput(String message,@Header(name = "args")String args){System.out.println("receive trans content:" + message +", args="+args);}
}

测试情况:

=========本地事务开始执行=============
原始消息体:{}transdata
事务消息id:{}C6120001B9C9077556FD02F051300000
本地事务执行参数,start......----------------------
[executeLocalTransaction][执行本地事务,消息:{} args:{}]Message{topic='trans-topic', flag=0, properties={args={"param1":"data1","param2":"data2"}, TRAN_MSG=true, id=5283b92f-8e67-c398-a63b-fd90ee7b577e, UNIQ_KEY=C6120001B9C9077556FD02F051300000, WAIT=true, contentType=application/json, PGROUP=trans-group, timestamp=1698817303846}, body=[116, 114, 97, 110, 115, 100, 97, 116, 97], transactionId='C6120001B9C9077556FD02F051300000'}--TransData{param1='data1', param2='data2'}
本地事务执行参数,end......------------------------
=========本地事务开始执行=============
原始消息体:{}transdata
事务消息id:{}C6120001B9C9077556FD02F051440002
本地事务执行参数,start......----------------------
[executeLocalTransaction][执行本地事务,消息:{} args:{}]Message{topic='trans-topic', flag=0, properties={args={"param1":"data1","param2":"data2"}, TRAN_MSG=true, id=0cebbc2e-f6af-b6ab-a013-bc19d3d1b670, UNIQ_KEY=C6120001B9C9077556FD02F051440002, WAIT=true, contentType=application/json, PGROUP=trans-group, timestamp=1698817303875}, body=[116, 114, 97, 110, 115, 100, 97, 116, 97], transactionId='C6120001B9C9077556FD02F051440002'}--TransData{param1='data1', param2='data2'}
本地事务执行参数,end......------------------------
=========本地事务开始执行=============
原始消息体:{}transdata
事务消息id:{}C6120001B9C9077556FD02F051470004
本地事务执行参数,start......----------------------
[executeLocalTransaction][执行本地事务,消息:{} args:{}]Message{topic='trans-topic', flag=0, properties={args={"param1":"data1","param2":"data2"}, TRAN_MSG=true, id=1ba5b5ae-c041-558f-b97e-5eff2093f477, UNIQ_KEY=C6120001B9C9077556FD02F051470004, WAIT=true, contentType=application/json, PGROUP=trans-group, timestamp=1698817303879}, body=[116, 114, 97, 110, 115, 100, 97, 116, 97], transactionId='C6120001B9C9077556FD02F051470004'}--TransData{param1='data1', param2='data2'}
本地事务执行参数,end......------------------------
=========本地事务开始执行=============
原始消息体:{}transdata
事务消息id:{}C6120001B9C9077556FD02F0514D0006
本地事务执行参数,start......----------------------
[executeLocalTransaction][执行本地事务,消息:{} args:{}]Message{topic='trans-topic', flag=0, properties={args={"param1":"data1","param2":"data2"}, TRAN_MSG=true, id=f03f772b-0a49-d85c-4a53-9a2dbaf91c17, UNIQ_KEY=C6120001B9C9077556FD02F0514D0006, WAIT=true, contentType=application/json, PGROUP=trans-group, timestamp=1698817303885}, body=[116, 114, 97, 110, 115, 100, 97, 116, 97], transactionId='C6120001B9C9077556FD02F0514D0006'}--TransData{param1='data1', param2='data2'}
本地事务执行参数,end......------------------------
=========本地事务开始执行=============
原始消息体:{}transdata
事务消息id:{}C6120001B9C9077556FD02F051520008
本地事务执行参数,start......----------------------
[executeLocalTransaction][执行本地事务,消息:{} args:{}]Message{topic='trans-topic', flag=0, properties={args={"param1":"data1","param2":"data2"}, TRAN_MSG=true, id=3bc809a8-2f33-40cb-edaf-9b69c68282c8, UNIQ_KEY=C6120001B9C9077556FD02F051520008, WAIT=true, contentType=application/json, PGROUP=trans-group, timestamp=1698817303890}, body=[116, 114, 97, 110, 115, 100, 97, 116, 97], transactionId='C6120001B9C9077556FD02F051520008'}--TransData{param1='data1', param2='data2'}
本地事务执行参数,end......------------------------
==========回查接口=========
[checkLocalTransaction][回查消息:{}]MessageExt [brokerName=null, queueId=1, storeSize=416, queueOffset=18, sysFlag=0, bornTimestamp=1698817303885, bornHost=/172.17.0.1:60840, storeTimestamp=1698817303907, storeHost=/127.0.0.1:10911, msgId=7F00000100002A9F000000000012DDF4, commitLogOffset=1236468, bodyCRC=147427201, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='trans-topic', flag=0, properties={args={"param1":"data1","param2":"data2"}, REAL_TOPIC=trans-topic, TRANSACTION_CHECK_TIMES=1, TRAN_MSG=true, id=f03f772b-0a49-d85c-4a53-9a2dbaf91c17, UNIQ_KEY=C6120001B9C9077556FD02F0514D0006, CLUSTER=DefaultRmqCluster, contentType=application/json, PGROUP=trans-group, WAIT=false, timestamp=1698817303885, REAL_QID=1}, body=[116, 114, 97, 110, 115, 100, 97, 116, 97], transactionId='C6120001B9C9077556FD02F0514D0006'}]
[checkLocalTransaction][事务消息id:{}]C6120001B9C9077556FD02F0514D0006
==========回查接口=========
[checkLocalTransaction][回查消息:{}]MessageExt [brokerName=null, queueId=3, storeSize=416, queueOffset=16, sysFlag=0, bornTimestamp=1698817303876, bornHost=/172.17.0.1:60840, storeTimestamp=1698817303897, storeHost=/127.0.0.1:10911, msgId=7F00000100002A9F000000000012DA9C, commitLogOffset=1235612, bodyCRC=147427201, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='trans-topic', flag=0, properties={args={"param1":"data1","param2":"data2"}, REAL_TOPIC=trans-topic, TRANSACTION_CHECK_TIMES=1, TRAN_MSG=true, id=0cebbc2e-f6af-b6ab-a013-bc19d3d1b670, UNIQ_KEY=C6120001B9C9077556FD02F051440002, CLUSTER=DefaultRmqCluster, contentType=application/json, PGROUP=trans-group, WAIT=false, timestamp=1698817303875, REAL_QID=3}, body=[116, 114, 97, 110, 115, 100, 97, 116, 97], transactionId='C6120001B9C9077556FD02F051440002'}]
[checkLocalTransaction][事务消息id:{}]C6120001B9C9077556FD02F051440002
==========回查接口=========
[checkLocalTransaction][回查消息:{}]MessageExt [brokerName=null, queueId=0, storeSize=416, queueOffset=17, sysFlag=0, bornTimestamp=1698817303879, bornHost=/172.17.0.1:60840, storeTimestamp=1698817303902, storeHost=/127.0.0.1:10911, msgId=7F00000100002A9F000000000012DC48, commitLogOffset=1236040, bodyCRC=147427201, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='trans-topic', flag=0, properties={args={"param1":"data1","param2":"data2"}, REAL_TOPIC=trans-topic, TRANSACTION_CHECK_TIMES=1, TRAN_MSG=true, id=1ba5b5ae-c041-558f-b97e-5eff2093f477, UNIQ_KEY=C6120001B9C9077556FD02F051470004, CLUSTER=DefaultRmqCluster, contentType=application/json, PGROUP=trans-group, WAIT=false, timestamp=1698817303879, REAL_QID=0}, body=[116, 114, 97, 110, 115, 100, 97, 116, 97], transactionId='C6120001B9C9077556FD02F051470004'}]
[checkLocalTransaction][事务消息id:{}]C6120001B9C9077556FD02F051470004
==========回查接口=========
[checkLocalTransaction][回查消息:{}]MessageExt [brokerName=null, queueId=2, storeSize=416, queueOffset=15, sysFlag=0, bornTimestamp=1698817303860, bornHost=/172.17.0.1:60840, storeTimestamp=1698817303887, storeHost=/127.0.0.1:10911, msgId=7F00000100002A9F000000000012D8F0, commitLogOffset=1235184, bodyCRC=147427201, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='trans-topic', flag=0, properties={args={"param1":"data1","param2":"data2"}, REAL_TOPIC=trans-topic, TRANSACTION_CHECK_TIMES=1, TRAN_MSG=true, id=5283b92f-8e67-c398-a63b-fd90ee7b577e, UNIQ_KEY=C6120001B9C9077556FD02F051300000, CLUSTER=DefaultRmqCluster, contentType=application/json, PGROUP=trans-group, WAIT=false, timestamp=1698817303846, REAL_QID=2}, body=[116, 114, 97, 110, 115, 100, 97, 116, 97], transactionId='C6120001B9C9077556FD02F051300000'}]
[checkLocalTransaction][事务消息id:{}]C6120001B9C9077556FD02F051300000
==========回查接口=========
[checkLocalTransaction][回查消息:{}]MessageExt [brokerName=null, queueId=2, storeSize=416, queueOffset=19, sysFlag=0, bornTimestamp=1698817303890, bornHost=/172.17.0.1:60840, storeTimestamp=1698817303912, storeHost=/127.0.0.1:10911, msgId=7F00000100002A9F000000000012DFA0, commitLogOffset=1236896, bodyCRC=147427201, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='trans-topic', flag=0, properties={args={"param1":"data1","param2":"data2"}, REAL_TOPIC=trans-topic, TRANSACTION_CHECK_TIMES=1, TRAN_MSG=true, id=3bc809a8-2f33-40cb-edaf-9b69c68282c8, UNIQ_KEY=C6120001B9C9077556FD02F051520008, CLUSTER=DefaultRmqCluster, contentType=application/json, PGROUP=trans-group, WAIT=false, timestamp=1698817303890, REAL_QID=2}, body=[116, 114, 97, 110, 115, 100, 97, 116, 97], transactionId='C6120001B9C9077556FD02F051520008'}]
[checkLocalTransaction][事务消息id:{}]C6120001B9C9077556FD02F051520008
receive trans content:transdata, args={"param1":"data1","param2":"data2"}
receive trans content:transdata, args={"param1":"data1","param2":"data2"}
receive trans content:transdata, args={"param1":"data1","param2":"data2"}
receive trans content:transdata, args={"param1":"data1","param2":"data2"}
receive trans content:transdata, args={"param1":"data1","param2":"data2"}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/130123.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Xilinx DDR3 MIG系列——项目开发内容介绍及目录

本节目录 一、Xilinx DDR3 MIG项目背景 二、Xilinx DDR3 MIG项目学习路线 三、Xilinx DDR3 MIG项目目录订阅本专栏内容的者,可获取任一一份工程源码。具体获取方式,系列文章更新完成后,关注微信公众号“小灰灰的FPGA”,将自己的订阅信息以及所需要的项目,截图并私信给作者…

计算机组成与结构-计算机体系结构

计算机体系结构 指令系统 Flynn分类法 SISD&#xff08;单指令流单数据流&#xff09; 结构 控制部分&#xff1a;一个处理器&#xff1a;一个主存模块&#xff1a;一个 代表 单处理器系统 SIMD&#xff08;单指令流多数据流&#xff09; 结构 控制部分&#xff1a;一个处理…

PyTorch入门学习(十四):优化器

目录 一、优化器的重要性 二、PyTorch 中的深度学习 三、优化器的选择 一、优化器的重要性 深度学习模型通常包含大量的参数&#xff0c;因此训练过程涉及到优化这些参数以减小损失函数的值。这个过程类似于找到函数的最小值&#xff0c;但由于模型通常非常复杂&#xff0c…

JSONP 跨域访问(1), 简介, 原理, 实验, 缺点

JSONP 跨域访问(1), 简介, 原理, 实验, 缺点 一, JSONP 简介 JSONP&#xff08;JSON with Padding&#xff09;是一种非官方跨域数据交互协议。它允许web页面从不同的域名下加载数据。 由于同源策略&#xff0c;web页面通过XMLHttpRequest调用通常只允许访问与其自身相同域名…

C++ 算法:区间和的个数

涉及知识点 归并排序 题目 给你一个整数数组 nums 以及两个整数 lower 和 upper 。求数组中&#xff0c;值位于范围 [lower, upper] &#xff08;包含 lower 和 upper&#xff09;之内的 区间和的个数 。 区间和 S(i, j) 表示在 nums 中&#xff0c;位置从 i 到 j 的元素之和…

基于单片机设计的电子柜锁

一、前言 随着现代社会的不断发展&#xff0c;电子柜锁的应用越来越广泛。传统的机械柜锁存在一些不便之处&#xff0c;例如钥匙容易丢失、密码容易泄露等问题。设计一款基于单片机的电子柜锁系统成为了一个有趣而有意义的项目。 该电子柜锁系统通过电磁锁作为柜锁的开关&…

【JMeter参数化】上一个接口返回作为下一个接口入参

前言: 实际工作场景当中,比如获取到商品列表,并查看商品详情。如果将商品id写死,就很笨拙。所以我们可以进行参数化动态去更新商品id 目录 【同一个线程组内的】 场景1:接口A仅取一个值,作为接口B的入参 场景:接口A是获取教师列表中某个教室的id,接口B是查看该教师的详…

debounce and throtlle

debounce // 核心&#xff1a;单位时间内触发>1 则只执行最后一次。//excutioner 可以认为是执行器。执行器存在则清空&#xff0c;再赋值新的执行器。function debounce(fn, delay 500) {let excutioner null;return function () {let context this;let args arguments…

建筑能源管理(8)——合同能源管理

1、简介 合同能源管理是20世纪70年代中期在发达国家逐步发展起来的一种节能服务机制在国外简称EPC(Energy Performance Contracting)&#xff0c;在国内广泛地被称为EMC (Energy Management Contracting)&#xff0c;它由专门的节能服务公司(Energy Service Company,ESCO)在为…

STM32F4X SDIO(六) 例程讲解-SD_PowerON

STM32F4X SDIO&#xff08;六&#xff09; 例程讲解-SD_PowerON 例程讲解-SD_PowerONSDIO引脚初始化和时钟初始化SDIO初始化(单线模式)CMD0:GO_IDLE_STATE命令发送程序命令响应程序 CMD8:SEND_IF_CONDCMD8参数命令发送程序命令响应程序 CMD55:APP_CMDCMD55命令参数命令发送命令…

【Acwing170】加成序列(dfs+迭代加深+剪枝)题解和一点感想

本思路来自acwing算法提高课 题目描述 看本文需要准备的知识 1.dfs算法基本思想 2.对剪枝这个词有个简单的认识 迭代加深思想和此题分析 首先&#xff0c;什么是迭代加深呢&#xff1f;当一个问题的解有很大概率出现在递归树很浅的层&#xff0c;但是这个问题的解本身存在…

音视频开发:音频编码原理+采集+编码实战

原理&#xff1a; 消除冗余信息&#xff0c;压缩量最大&#xff0c;也叫有损压缩 剔除人耳听觉范围外的音频信号20Hz以下和20000Hz以上&#xff1b;去除被掩蔽的音频信号&#xff0c;信号的遮蔽可以分为频域遮蔽和时域遮蔽&#xff1b;频域遮蔽效应 屏蔽70分贝以下&#xff0…

汽车标定技术(一):XCP概述

目录 1.汽车标定概述 2.XCP协议由来及版本介绍 3.XCP技术通览 3.1 XCP上下机通信模型 3.2 XCP指令集 3.2.1 XCP帧结构定义 3.2.2 标准指令集 3.2.3 标定指令集 3.2.4 页切换指令集 3.2.5 数据采集指令集 3.2.6 刷写指令集 3.3 ECU描述文件(A2L)概述 3.3.1 标定上位…

C++中何时及如何使用析构函数

C中何时及如何使用析构函数 析构函数不返回任何值&#xff0c;没有返回类型&#xff0c;也没有函数参数。由于没 有函数参数&#xff0c;因此它不能被重载。换言之&#xff0c;一个类可以有多个构造函数&#xff0c;但是只能有一个析构函数。 何时调用析构函数&#xff1a; &…

有方N58 HTTP POST 请求连接 TDengine

串口调试软件&#xff1a;格西调试精灵 第一步先注册网络获取IP地址 建立PPP连接 ATXIIC1\r PPP链路建立成功&#xff0c;查询IP地址 ATXIIC?\r 设置网络APN ATCREG?\r 运行结果&#xff0c;红线处是获…

js:可选链运算符(?.)和空值合并运算符(??)

文档&#xff1a; 可选链运算符&#xff08;?.&#xff09;https://developer.mozilla.org/zh-CN/docs/Web/JavaScript/Reference/Operators/Optional_chaining空值合并运算符&#xff08;??&#xff09;https://developer.mozilla.org/zh-CN/docs/Web/JavaScript/Referenc…

【算法与数据结构】--算法和数据结构的进阶主题--并行算法和分布式数据结构

一、并行算法 1.1 并行计算概述 并行计算是一种计算方法&#xff0c;旨在通过同时执行多个计算任务来提高计算性能和效率。与传统的串行计算不同&#xff0c;其中每个任务按顺序执行&#xff0c;并行计算允许多个任务同时执行。这种并行性通常通过将计算任务分解为较小的子任…

算法:Java构建二叉树并迭代实现二叉树的前序、中序、后序遍历

先自定义一下二叉树的类&#xff1a; // Definition for a binary tree node. public class TreeNode {int val;TreeNode left;TreeNode right;TreeNode() {}TreeNode(int val) { this.val val; }TreeNode(int val, TreeNode left, TreeNode right) {this.val val;this.left…

MongoDB安装及开发系例全教程

一、系列文章目录 一、MongoDB安装教程—官方原版 二、MongoDB 使用教程(配置、管理、监控)_linux mongodb 监控 三、MongoDB 基于角色的访问控制 四、MongoDB用户管理 五、MongoDB基础知识详解 六、MongoDB—Indexs 七、MongoDB事务详解 八、MongoDB分片教程 九、Mo…