RocketMQ—RocketMQ集成SpringBoot
新建生产者的boot项目和消费者的boot项目,pom文件重点如下:
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.2</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>2.0.25</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency>
</dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><configuration><excludes><exclude><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></exclude></excludes></configuration></plugin></plugins>
</build>
02-boot-producer和03-boot-consumer分别对应生产者和消费者。
生产者
生产者yml文件如下:
rocketmq:name-server: 地址:端口producer:group: boot-producer-group
同步发送消息
生产者同步发送消息的代码如下:
@SpringBootTest
class Rocketmq02BootProducerApplicationTests {//注入rocketMQTemplate@Autowiredprivate RocketMQTemplate rocketMQTemplate;@Testpublic void producer(){rocketMQTemplate.syncSend("bootTestTopic","这是boot的一个消息");}}
运行完毕看面板如下:
发送异步消息
// 异步
rocketMQTemplate.asyncSend("bootAsyncTestTopic", "我是boot的一个异步消息", new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {System.out.println("成功");}@Overridepublic void onException(Throwable throwable) {System.out.println("失败" + throwable.getMessage());}
});
发送单向消息
rocketMQTemplate.sendOneWay("bootOnewayTopic", "单向消息");
延迟消息
// 延迟消息
Message<String> msg = MessageBuilder.withPayload("我是一个延迟消息").build();
rocketMQTemplate.syncSend("bootMsTopic", msg, 3000, 3); //第三个参数表示连接消息队列的超时时间,第四个参数表示延时等级
顺序消息
MSGModel类如下
@Data
@AllArgsConstructor
@NoArgsConstructor
public class MsgModel {private String orderSn;private Integer userId;private String desc; // 下单 短信 物流}
发送顺序消息的生产者如下:
//发送者放 需要将一组消息 都发在同一个队列中去 消费者 需要单线程消费
List<MsgModel> msgModels = Arrays.asList(new MsgModel("qwer", 1, "下单"),new MsgModel("qwer", 1, "短信"),new MsgModel("qwer", 1, "物流"),new MsgModel("zxcv", 2, "下单"),new MsgModel("zxcv", 2, "短信"),new MsgModel("zxcv", 2, "物流")
);
msgModels.forEach(msgModel -> {// 发送 一般都是以json的方式进行处理rocketMQTemplate.syncSendOrderly("bootOrderlyTopic", JSON.toJSONString(msgModel), msgModel.getOrderSn());//第二个参数表示消息内容 第三个参数表示hashKey
});
带标签的消息
@Test
void tagKeyTest() throws Exception {rocketMQTemplate.syncSend("bootTagTopic:tagA", "我是一个带tag的消息");
}
带key的消息
@Test
void tagKeyTest() throws Exception {// key是写带在消息头的Message<String> message = MessageBuilder.withPayload("我是一个带key的消息").setHeader(RocketMQHeaders.KEYS, "key-id-1").build();rocketMQTemplate.syncSend("bootKeyTopic", message);}
消费者
yml配置文件如下:
server:port: 8890
rocketmq:name-server: 地址:端口
简单消费者
消费者代码如下
@Component
@RocketMQMessageListener(topic = "bootTestTopic",consumerGroup = "boot-test-consumer-group")
public class ASimpleMsgListener implements RocketMQListener<MessageExt> {//如果泛型指定固定类型,消息体就是我们的参数//MessageExt 是消息所有内容,可以拿到所有内容/*** 这个方法就是消费消息的方法* 只要没有报错,就签收了* 如果报错了,就是拒收,就会重试* @param message 是消息内容*/@Overridepublic void onMessage(MessageExt message) {System.out.println(new String(message.getBody()));}
}
顺序消息的消费者
@Component
@RocketMQMessageListener(topic = "bootOrderlyTopic",consumerGroup = "boot-orderly-consumer-group",consumeMode = ConsumeMode.ORDERLY, // 顺序消费模式 单线程maxReconsumeTimes = 5 // 消费重试的次数
)
public class BOrderlyMsgListener implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt message) {MsgModel msgModel = JSON.parseObject(new String(message.getBody()), MsgModel.class);System.out.println(msgModel);}
}
带tag的消费者
@Component
@RocketMQMessageListener(topic = "bootTagTopic",consumerGroup = "boot-tag-consumer-group",selectorType = SelectorType.TAG,// tag过滤模式selectorExpression = "tagA || tagB"
// selectorType = SelectorType.SQL92,// sql92过滤模式 这种一般不用,这种默认没有开启,需要在sql92 //需要在broker.conf配置文件中开启enbalePropertyFilter=true
// selectorExpression = "a in (3,5,7)" // broker.conf中开启enbalePropertyFilter=true
)
public class CTagMsgListener implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt message) {System.out.println(new String(message.getBody()));}
}