记一次项目所学(中间件等)–动态提醒功能(RocketMQ)
订阅发布模式与观察者模式
RocketMQ:纯java编写的开源消息中间件 高性能低延迟分布式事务
Redis : 高性能缓存工具,数据存储在内存中,读写速度非常快
RocketMQ相关工具类及配置实现
配置类
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.9.1</version></dependency>//redis<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId><version>2.2.2.RELEASE</version></dependency>
生产者发送消息工具类
public class RocketMQUtil {//同步发送消息public static void syncSendMsg(DefaultMQProducer producer, Message msg) throws Exception{SendResult result = producer.send(msg);System.out.println(result);}//异步发送消息public static void asyncSendMsg(DefaultMQProducer producer, Message msg) throws Exception{producer.send(msg, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {Logger logger = LoggerFactory.getLogger(RocketMQUtil.class);logger.info("异步发送消息成功,消息id:" + sendResult.getMsgId());}@Overridepublic void onException(Throwable e) {e.printStackTrace();}});}
}
RocketMQ配置类
@Configuration
public class RocketMQConfig {// rocketMQ名称服务器的地址@Value("${rocketmq.name.server.address}")private String nameServerAddr;@Autowiredprivate RedisTemplate<String, String> redisTemplate;@Autowiredprivate UserFollowingService userFollowingService;//生产者@Bean("momentsProducer")public DefaultMQProducer momentsProducer() throws Exception{DefaultMQProducer producer = new DefaultMQProducer(UserMomentsConstant.GROUP_MOMENTS);producer.setNamesrvAddr(nameServerAddr);producer.start();return producer;}@Bean("momentsConsumer")//push 为推送,还有拉取等consumerpublic DefaultMQPushConsumer momentsConsumer() throws Exception{DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(UserMomentsConstant.GROUP_MOMENTS);consumer.setNamesrvAddr(nameServerAddr);//订阅 *表示所有内容consumer.subscribe(UserMomentsConstant.TOPIC_MOMENTS, "*");//消费者监听器,监听到后下一步操作//registerMessageListener注册消息监听consumer.registerMessageListener(new MessageListenerConcurrently() {@Override//ConsumeConcurrentlyStatus并发处理//MessageExt消息的扩充,ConsumeConcurrentlyContext为处理的上下文public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context){MessageExt msg = msgs.get(0);if(msg == null){return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}//取出的是byte数组类型String bodyStr = new String(msg.getBody());UserMoment userMoment = JSONObject.toJavaObject(JSONObject.parseObject(bodyStr), UserMoment.class);Long userId = userMoment.getUserId();//定位粉丝idList<UserFollowing>fanList = userFollowingService.getUserFans(userId);for(UserFollowing fan : fanList){//发到redis用户到redis拿String key = "subscribed-" + fan.getUserId();//把动态列表拿出来String subscribedListStr = redisTemplate.opsForValue().get(key);List<UserMoment> subscribedList;if(StringUtil.isNullOrEmpty(subscribedListStr)){subscribedList = new ArrayList<>();}else{//转换列表的类subscribedList = JSONArray.parseArray(subscribedListStr, UserMoment.class);}subscribedList.add(userMoment);//把列表再转成字符串放进去redisTemplate.opsForValue().set(key, JSONObject.toJSONString(subscribedList));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();return consumer;}
具体业务逻辑:
@Service
public class UserMomentsService {@Autowiredprivate UserMomentsDao userMomentsDao;@Autowiredprivate ApplicationContext applicationContext;@Autowiredprivate RedisTemplate<String, String> redisTemplate;public void addUserMoments(UserMoment userMoment) throws Exception {userMoment.setCreateTime(new Date());//cruduserMomentsDao.addUserMoments(userMoment);DefaultMQProducer producer = (DefaultMQProducer)applicationContext.getBean("momentsProducer");//主题 以及json的数组消息Message msg = new Message(UserMomentsConstant.TOPIC_MOMENTS, JSONObject.toJSONString(userMoment).getBytes(StandardCharsets.UTF_8));RocketMQUtil.syncSendMsg(producer, msg);}// 查询订阅动态public List<UserMoment> getUserSubscribedMoments(Long userId) {String key = "subscribed-" + userId;//查出来的是String描述的json类型String listStr = redisTemplate.opsForValue().get(key);//返回的是List类型,要把查出来的String封装成一个一个的UserMoment再进List中return JSONArray.parseArray(listStr, UserMoment.class);}
}
PS:消费信息逻辑在配置类的Consumer中已经写好了