创建springboot项目省略
项目依赖
注意:当前客户端版本是 5.1.3
,安装的rocketmq服务的版本要与其对应
<properties><java.version>11</java.version><rocketmq-client-java-version>5.1.3</rocketmq-client-java-version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-configuration-processor</artifactId><optional>true</optional></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>${rocketmq-client-java-version}</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency></dependencies>
创建 JmsConfig
public class JmsConfig {//roketmq 服务地址public static String nameServerAddr = "192.168.2.109:9876";//主题public static String TOPIC = "test_topic";
}
创建生产者 Producer
package com.example.springbootrocketmq.jms;import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.springframework.stereotype.Component;import javax.annotation.PreDestroy;@Component
public class PayProducer {//生产组private String producerGroup = "test_group";private DefaultMQProducer producer;public PayProducer() {producer = new DefaultMQProducer(producerGroup);//多个NameServer地址 多个地址 ; 号隔开producer.setNamesrvAddr(JmsConfig.nameServerAddr);start();}/*** 开始*/public void start(){try {this.producer.start();} catch (MQClientException e) {e.printStackTrace();}}public DefaultMQProducer getProducer(){return this.producer;}/*** 一般关闭上下文是关闭*/@PreDestroypublic void shutdown(){System.out.println("关闭....");this.producer.shutdown();}
}
创建消费者 Consumer
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.Message;
import org.springframework.stereotype.Component;import java.io.UnsupportedEncodingException;@Component
public class Consumer {private DefaultMQPushConsumer consumer;private String consumerGroup = "test_consumer_group";public PayConsumer() throws Exception{consumer = new DefaultMQPushConsumer(consumerGroup);consumer.setNamesrvAddr(JmsConfig.nameServerAddr);consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);consumer.subscribe(JmsConfig.TOPIC, "*");consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {try {Message msg = msgs.get(0);System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(msgs.get(0).getBody()));String topic = msg.getTopic();String body = new String(msg.getBody(), "utf-8");String tags = msg.getTags();String keys = msg.getKeys();System.out.println("topic=" + topic + ", tags=" + tags + ", keys=" + keys + ", msg=" + body);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;} catch (UnsupportedEncodingException e) {e.printStackTrace();return ConsumeConcurrentlyStatus.RECONSUME_LATER;}});consumer.start();}
}
配置 TestController
import com.example.springbootrocketmq.jms.JmsConfig;
import com.example.springbootrocketmq.jms.Producer;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;@Slf4j
@RestController
public class TestController{@Autowiredprivate Producer producer;@RequestMapping("/api/v1/test_cb")public Object callback(String text) throws Exception {Message message = new Message(JmsConfig.TOPIC,"taga",("hello rocketmq = "+ text).getBytes());SendResult sendResult = producer.getProducer().send(message);log.info(sendResult.toString());return null;}
}
测试结果