1,引用jar包
build.gradle文件添加jar包引用
compile group: 'org.apache.rocketmq', name: 'rocketmq-spring-boot-starter', version: '2.1.1'
2,配置文件
application.properties 配置文件
spring.application.name=app-demp
server.port=8081
###rocketmq###
rocketmq.name-server=192.168.1.107:9876
rocketmq.producer.timeout=10000
3,生成者
MQSender.java - 消息发生接口
import org.apache.rocketmq.client.producer.SendResult;
public interface MQSender{
/**
* 发送消息
*
* @param message 消息信息
* @param topic 主题
* @return 发送结果
*/
SendResult sendMessage(Object message, String topic);
/**
* 发送消息
*
* @param message 消息信息
* @param topic 主题
* @param tags 主题的标签
* @return 发送结果
*/
SendResult sendMessage(Object message, String topic, String tags);
}
RocketMQSender.java - RockemtMQ实现
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
@Slf4j
@Component
public class RocketMQSender implements MQSender {
@Value("${rocketmq.name-server}")
private String nameServer;
@Value("${rocketmq.producer.timeout}")
private int timeout;
@Value("${spring.application.name}")
private String group;
private DefaultMQProducer producer;
@PostConstruct
public void init() {
producer = new DefaultMQProducer(group); try {
producer.setNamesrvAddr(nameServer); producer.setSendMsgTimeout(timeout); producer.start(); log.info("RocketMQ Producer启动成功! nameServer={}, group={}", nameServer, group);
} catch (MQClientException e) {
log.error("RocketMQ Producer启动失败! nameServer={}, group={} ", nameServer, group, e);
} } @Override
public SendResult sendMessage(Object message, String topic) {
try {
Message msg = new Message(topic, JSON.toJSONBytes(message)); SendResult sendResult = producer.send(msg); log.info("发送MQ成功:sendResult={},message={}", sendResult, message.toString());
return sendResult;
} catch (Exception e) {
log.error("消息发送失败, topic:{}, message:{}", topic, message, e);
} return null;
} @Override
public SendResult sendMessage(Object message, String topic, String tags) {
try {
Message msg = new Message(topic, tags, JSON.toJSONBytes(message)); SendResult sendResult = producer.send(msg); log.info("发送MQ成功:sendResult={},message={}", sendResult, message.toString());
return sendResult;
} catch (Exception e) {
log.error("消息发送失败, topic:{}, tags:{}, message:{}", topic, tags, message, e);
} return null;
}}
OrderProducer.java - 发送者实例
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
@Service
public class OrderProducer {
@Resource
private MQSender mqSender;
public void createOrder() {
mqSender.sendMessage("我是注册订单,请尽快处理", "TEMP");
}}
4,消费者
OrderConsumer.java
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
@Slf4j
@Component
@RocketMQMessageListener(consumerGroup = "TEMP-GROUP", topic = "TEMP")
public class OrderConsumer implements RocketMQListener {
@Override
public void onMessage(MessageExt messageExt) {
String message = new String(messageExt.getBody());
log.info("收到消息,topic:{}, tag:{}, msgId:{}, body:{}", messageExt.getTopic(), messageExt.getTags(),
messageExt.getMsgId(), message); }}
发送者执行结果
消费者执行结果