本篇文章讲一讲我们的项目中用到的消息队列。
1.阻塞队列
2.kafka
我的项目为什么要用消息队列?
如果采用消息队列,那么评论、点赞、关注三类不同的事,可以定义三类不同的主题(评论、点赞、关注),发生相应的事件可以将其包装成一条消息放入对应的队列里。那么当前的线程可以继续处理下一条消息,不用处理后续的业务(后续由消费者线程处理)。面向事件驱动编程。
3.发布系统通知
下面讲讲怎么发布系统通知。
在entity包下创建event类:
public class Event {private String topic; //表示事件的主题的字符串private int userId; //表示与事件相关的用户的IDprivate int entityType; //表示与事件相关的实体类型private int entityId; //表示与事件相关的实体的IDprivate int entityUserId; //表示与实体相关的用户的IDprivate Map<String, Object> data = new HashMap<>(); //一个Map对象,用于存储与事件相关的附加数据//省略了很多get()方法和set()方法
}
通过使用这个Event
类,我就可以创建一个事件对象,并设置事件的主题、用户ID、实体类型、实体ID、实体用户ID以及其他相关的附加数据。
接着在在community的event目录下创建EventProducer、EventCnsumer类。
@Component
public class EventProducer {@Autowiredprivate KafkaTemplate kafkaTemplate;// 处理事件public void fireEvent(Event event) {// 将事件发布到指定的主题kafkaTemplate.send(event.getTopic(), JSONObject.toJSONString(event));}}
通过调用fireEvent
方法,我们就可以将一个事件对象发送到Kafka消息队列中的指定主题。这样,消费者可以从该主题订阅事件,并对事件进行相应的处理逻辑。
@Component
public class EventConsumer implements CommunityConstant {private static final Logger logger = LoggerFactory.getLogger(EventConsumer.class);@Autowiredprivate MessageService messageService;@KafkaListener(topics = {TOPIC_COMMENT, TOPIC_LIKE, TOPIC_FOLLOW})public void handleCommentMessage(ConsumerRecord record) {if (record == null || record.value() == null) {logger.error("消息的内容为空!");return;}Event event = JSONObject.parseObject(record.value().toString(), Event.class);if (event == null) {logger.error("消息格式错误!");return;}// 发送站内通知Message message = new Message();message.setFromId(SYSTEM_USER_ID);message.setToId(event.getEntityUserId());message.setConversationId(event.getTopic());message.setCreateTime(new Date());Map<String, Object> content = new HashMap<>();content.put("userId", event.getUserId());content.put("entityType", event.getEntityType());content.put("entityId", event.getEntityId());if (!event.getData().isEmpty()) {for (Map.Entry<String, Object> entry : event.getData().entrySet()) {content.put(entry.getKey(), entry.getValue());}}message.setContent(JSONObject.toJSONString(content));messageService.addMessage(message);}
}
在handleCommentMessage
方法中,首先检查接收到的消息是否为空,然后将消息内容解析为Event
对象。如果解析失败,会记录错误并返回。
接下来,根据接收到的事件信息,构建一个站内通知的Message
对象。设置发送者ID为系统用户ID,接收者ID为事件关联的实体用户ID,会话ID为事件主题,创建时间为当前时间。
然后,构建一个存储事件相关信息的content
映射。
最后,将content
转换为JSON字符串,并将其设置为站内通知的内容。
说了一堆乱七八糟的,反正就是这段代码提供了一个事件消费者类,用于订阅和处理从Kafka消息队列中接收到的指定事件主题的消息。它可以根据事件的类型执行相应的处理操作,例如发送站内通知等。
在评论、点赞、关注视图层增加方法。
CommentController。LikeController。Followcontroller。
4.显示系统通知
消息未读使用拦截器实现,因为每个请求都需要记录次数。
@Component
public class MessageInterceptor implements HandlerInterceptor {@Autowiredprivate HostHolder hostHolder;@Autowiredprivate MessageService messageService;@Overridepublic void postHandle(HttpServletRequest request, HttpServletResponse response, Object handler, ModelAndView modelAndView) throws Exception {User user = hostHolder.getUser();if (user != null && modelAndView != null) {int letterUnreadCount = messageService.findLetterUnreadCount(user.getId(), null);int noticeUnreadCount = messageService.findNoticeUnreadCount(user.getId(), null);modelAndView.addObject("allUnreadCount", letterUnreadCount + noticeUnreadCount);}}
}
上面的这段代码提供了一个消息拦截器类,用于在请求处理完成后,对返回的ModelAndView
对象进行拦截和处理,以实现消息的统计和展示功能。