Apache RocketMQ官方文档:https://rocketmq.apache.org/zh/docs/bestPractice/06FAQ/,这里面涵盖了所有的基本知识、各种搭建环境、基础代码测试…还有各种问题总结,很值得自主学习。
1.配置依赖:pom.xml文件
可以只截取maven仓库,找合适的版本:https://mvnrepository.com/search?q=rocketMQ
<!-- rocketmq -->
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.1.1</version>
</dependency>
2.配置application.yaml
server:port: 8866rocketmq:#nameservice服务器地址(多个以英文逗号隔开)name-server: ip:port#生产者配置producer:#组名group: sentry-producer-group
3.生产者
package com.demo.mq;import com.demo.api.Result;
import com.demo.entity.DeviceParameter;
import com.demo.service.IDeviceParameterService;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.RequestBody;import javax.servlet.http.HttpServletRequest;
import java.util.concurrent.TimeUnit;@Slf4j
@Component
@RestController
@RequestMapping(value = "/msgSender")
public class MsgSender {@Autowiredprivate RocketMQTemplate rocketMQTemplate;// 主题private static final String TOPIC = "mq_text_topic";/*** * @param jsonObject,消费的数据* @param req* @return*/@PostMapping(value = "/sendMsg")public Result<?> sendMsg(@RequestBody JSONObject jsonObject,HttpServletRequest req){rocketMQTemplate.asyncSend(TOPIC,jsonObject,new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {// 当消息消费成功后,会回调该方法log.info("send successful");}@Overridepublic void onException(Throwable throwable) {// 当消息消费失败后,会回调该方法log.info("send fail; {}", throwable.getMessage());}});return Result.OK("发送参数");}}
4.消费消息
package com.demo.mq;import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;// 注意:消费者消费的主题,需要和生产者设置的主题相同;消费者组,不能重复!!!
@Component
@RocketMQMessageListener(topic = "mq_text_topic", consumerGroup = "${spring.application.name}-group",messageModel = MessageModel.CLUSTERING, consumeMode = ConsumeMode.CONCURRENTLY)
public class MsgConsumer implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {System.out.println("-------消费者接收到rocketmq消息:" + message);}
}
欢迎关注微信公众号:小红的成长日记,一起学Java!