依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><parent><artifactId>java_sc_alibaba</artifactId><groupId>jkw.life</groupId><version>1.0-SNAPSHOT</version></parent><modelVersion>4.0.0</modelVersion><artifactId>test-rocketmq8009</artifactId><properties><maven.compiler.source>17</maven.compiler.source><maven.compiler.target>17</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties><dependencies><!-- rocketmq--><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.3</version></dependency><!-- SpringMVC--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency></dependencies></project>
application.yml
server:port: 8009
spring:application:name: test-rocketmq8009
rocketmq:# nameserver地址name-server: 192.168.66.101:9876producer:# 生产组group: my-group1# 发送消息超时时间send-message-timeout: 300000
demo:rocketmq:topic: testtopicconsumer: test_customer
启动类
package jkw;import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;@Slf4j
@SpringBootApplication
public class Main8009 {public static void main(String[] args) {SpringApplication.run(Main8009.class, args);log.info("************** 服务提供者 8009 启动成功 ************");}
}
RocketMQ SpringBoot 3.0不兼容解决方案
我们要在resources文件夹中,新建META-INF/spring文件夹,在里面新建一个叫 org.springframework.boot.autoconfigure.AutoConfiguration.imports 的文件里面填入 org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration
创建topic
mqadmin updateTopic -n localhost:9876 -c DefaultCluster -t testtopic
生产者服务
package jkw.service;import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
@Service
public class MessageProduce {@Autowiredprivate RocketMQTemplate rocketMQTemplate; // 直接注入生产者@Value("${demo.rocketmq.topic}")private String topic;/*** 发送消息** @param message* @return*/public SendResult sendMessage(String message) {return rocketMQTemplate.syncSend(topic, message);}
}
生产者控制器
package jkw.controller;import jkw.service.MessageProduce;
import org.apache.rocketmq.client.producer.SendResult;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class RocketmqProduceCon {@Autowiredprivate MessageProduce messageProduce;/*** 发送消息** @param message* @return*/@GetMapping("/send")public SendResult sendMessage(String message) {return messageProduce.sendMessage(message);}
}
消费者服务
package jkw.service;import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.annotation.SelectorType;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;/*** 消费者主要使用RocketMQMessageListener接口进行监听配置*/
@Service
@RocketMQMessageListener(//主题topic = "${demo.rocketmq.topic}",//消费组consumerGroup = "${demo.rocketmq.consumer}",// 过滤方式:默认为Tag过滤selectorType = SelectorType.TAG,// 过滤值:默认为全部消费,不过滤【*】selectorExpression = "*",// 消费模式:顺序消费ORDERLY,并发消费CONCURRENTLYconsumeMode = ConsumeMode.ORDERLY,// 消息模式:有集群消费CLUSTERING,广播消费messageModel = MessageModel.CLUSTERING)
public class MessageConsumer implements RocketMQListener<String> {@Overridepublic void onMessage(String s) {System.out.println(s);}
}
测试
1.发送消息:http://localhost:8009/send?message=test
2.idea的控制台查看监控后输出的内容