牛客项目(五)-使用kafka实现发送系统通知

kafka入门以及与spring整合

在这里插入图片描述

Message.java

import java.util.Date;public class Message {private int id;private int fromId;private int toId;private String conversationId;private String content;private int status;private Date createTime;public int getId() {return id;}public void setId(int id) {this.id = id;}public int getFromId() {return fromId;}public void setFromId(int fromId) {this.fromId = fromId;}public int getToId() {return toId;}public void setToId(int toId) {this.toId = toId;}public String getConversationId() {return conversationId;}public void setConversationId(String conversationId) {this.conversationId = conversationId;}public String getContent() {return content;}public void setContent(String content) {this.content = content;}public int getStatus() {return status;}public void setStatus(int status) {this.status = status;}public Date getCreateTime() {return createTime;}public void setCreateTime(Date createTime) {this.createTime = createTime;}@Overridepublic String toString() {return "Message{" +"id=" + id +", fromId=" + fromId +", toId=" + toId +", conversationId='" + conversationId + '\'' +", content='" + content + '\'' +", status=" + status +", createTime=" + createTime +'}';}
}

EventConsumer.java

定义事件消费者

import com.alibaba.fastjson.JSONObject;
import edu.npu.newcoder.community.community.entity.DiscussPost;
import edu.npu.newcoder.community.community.entity.Event;
import edu.npu.newcoder.community.community.entity.Message;
import edu.npu.newcoder.community.community.service.DiscussPostService;
import edu.npu.newcoder.community.community.service.ElasticsearchService;
import edu.npu.newcoder.community.community.service.MessageService;
import edu.npu.newcoder.community.community.util.CommunityConstant;
import org.apache.kafka.clients.consumer.ConsumerRecord;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;import java.util.Date;
import java.util.HashMap;
import java.util.Map;@Component
public class EventConsumer implements CommunityConstant {
//    private static final Logger logger= LoggerFactory.getLogger(EventConsumer.class);@Autowiredprivate MessageService messageService;@Autowiredprivate DiscussPostService discussPostService;@Autowiredprivate ElasticsearchService elasticsearchService;//加一个监听相关主题的listener@KafkaListener(topics = {TOPIC_COMMENT,TOPIC_LIKE,TOPIC_FOLLOW})public void handleCommentMessage(ConsumerRecord record){if(record == null || record.value()==null){System.out.println("错误发帖");return;}Event event= JSONObject.parseObject(record.value().toString(),Event.class);if(event == null){System.out.println("错误发帖");return;}//发送站内通知Message message = new Message();message.setFromId(SYSTEM_USERID);message.setToId(event.getEntityUserId());message.setConversationId(event.getTopic());message.setCreateTime(new Date());//message的内容Map<String,Object> content=new HashMap<>();content.put("userId",event.getUserId());content.put("entityType",event.getEntityType());content.put("entityId",event.getEntityId());if(!event.getData().isEmpty()){for(Map.Entry<String,Object> entry:event.getData().entrySet()){content.put(entry.getKey(),entry.getValue());}}message.setContent(JSONObject.toJSONString(content));System.out.println(message);messageService.addMessage(message);System.out.println("成功处理事件");}}

Event.java

定义一个事件实体 以方便在消息的发送与处理

import java.util.HashMap;
import java.util.Map;//用于事件驱动的kafka消息队列开发
public class Event {private String topic;//事件触发的人private int userId;//事件发生在哪个实体private int entityType;private int entityId;//实体作者private int entityUserId;//存储额外数据private Map<String,Object> data = new HashMap<>();public String getTopic() {return topic;}public Event setTopic(String topic) {this.topic = topic;return this;}public int getUserId() {return userId;}public Event setUserId(int userId) {this.userId = userId;return this;}public int getEntityType() {return entityType;}public Event setEntityType(int entityType) {this.entityType = entityType;return this;}public int getEntityId() {return entityId;}public Event setEntityId(int entityId) {this.entityId = entityId;return this;}public int getEntityUserId() {return entityUserId;}public Event setEntityUserId(int entityUserId) {this.entityUserId = entityUserId;return this;}public Map<String, Object> getData() {return data;}public Event setData(String key,Object value) {this.data.put(key,value);return this;}}

EventProducer.java

定义事件的生产者

import com.alibaba.fastjson.JSONObject;
import edu.npu.newcoder.community.community.entity.Event;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;@Component
public class EventProducer {
//生产者使用kafkaTemplate发送消息@AutowiredKafkaTemplate kafkaTemplate;//处理事件public void fireEvent(Event event){//将事件发布到指定的主题//将event转换为json数据进行消息发送kafkaTemplate.send(event.getTopic(), JSONObject.toJSONString(event));System.out.println("成功发送"+event.getTopic());}
}

EventConsumer.java

定义事件消费者

import com.alibaba.fastjson.JSONObject;
import edu.npu.newcoder.community.community.entity.DiscussPost;
import edu.npu.newcoder.community.community.entity.Event;
import edu.npu.newcoder.community.community.entity.Message;
import edu.npu.newcoder.community.community.service.DiscussPostService;
import edu.npu.newcoder.community.community.service.ElasticsearchService;
import edu.npu.newcoder.community.community.service.MessageService;
import edu.npu.newcoder.community.community.util.CommunityConstant;
import org.apache.kafka.clients.consumer.ConsumerRecord;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;import java.util.Date;
import java.util.HashMap;
import java.util.Map;@Component
public class EventConsumer implements CommunityConstant {
//    private static final Logger logger= LoggerFactory.getLogger(EventConsumer.class);@Autowiredprivate MessageService messageService;@Autowiredprivate DiscussPostService discussPostService;@Autowiredprivate ElasticsearchService elasticsearchService;//加一个监听相关主题的listener@KafkaListener(topics = {TOPIC_COMMENT,TOPIC_LIKE,TOPIC_FOLLOW})public void handleCommentMessage(ConsumerRecord record){if(record == null || record.value()==null){System.out.println("错误发帖");return;}Event event= JSONObject.parseObject(record.value().toString(),Event.class);if(event == null){System.out.println("错误发帖");return;}//发送站内通知Message message = new Message();message.setFromId(SYSTEM_USERID);message.setToId(event.getEntityUserId());message.setConversationId(event.getTopic());message.setCreateTime(new Date());//message的内容Map<String,Object> content=new HashMap<>();content.put("userId",event.getUserId());content.put("entityType",event.getEntityType());content.put("entityId",event.getEntityId());if(!event.getData().isEmpty()){for(Map.Entry<String,Object> entry:event.getData().entrySet()){content.put(entry.getKey(),entry.getValue());}}message.setContent(JSONObject.toJSONString(content));System.out.println(message);messageService.addMessage(message);System.out.println("成功处理事件");}}

在特定的地方触发消息产生

CommentController

 //触发评论事件Event event=new Event().setTopic(TOPIC_COMMENT).setUserId(hostHolder.getUser().getId()).setEntityType(comment.getEntityType()).setEntityId(comment.getEntityId()).setData("postId",discussPostId);if(comment.getEntityType() == ENTITY_TYPE_POST){DiscussPost target=discussPostService.findDiscussPostById(comment.getEntityId());event.setEntityUserId(target.getUserId());}else if(comment.getEntityType()==ENTITY_TYPE_COMMENT){//根据评论的id查询评论Comment target =commentService.findCommentById(comment.getEntityId());event.setEntityUserId(target.getUserId());}eventProducer.fireEvent(event);

LikeController

 //触发点赞事件if(likeStatus ==1){Event event =new Event().setTopic(TOPIC_LIKE).setUserId(hostHolder.getUser().getId()).setEntityType(entityType).setEntityId(entityId).setEntityUserId(entityUserId).setData("postId",postId);eventProducer.fireEvent(event);}

FollowController

 //触发关注事件Event event = new Event().setTopic(TOPIC_FOLLOW).setUserId(hostHolder.getUser().getId()).setEntityType(entityType).setEntityId(entityId).setEntityUserId(entityId);eventProducer.fireEvent(event);

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/129312.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Python数据分析实战-筛选出DataFrame中指定列都不包含缺失值的记录(附源码和实现效果)

实现功能 筛选出DataFrame中指定列都不包含缺失值的记录 实现代码 import pandas as pd# 创建示例DataFrame data {A: [1, 2, 3, None, 5],B: [1, None, 3, 4, 5],C: [1, 2, 3, 4, 5] } df pd.DataFrame(data)# 筛选出指定列都不包含缺失值的记录 columns_to_check [A, B…

Prometheus+Node_exporter+Grafana实现监控主机

PrometheusNode_exporterGrafana实现监控主机 如果没有安装相关的配置&#xff0c;首先要进行安装配置&#xff0c;环境是基于Linux&#xff0c;虚拟机的相关环境配置在文末给出&#xff0c;现在先讲解PrometheusNode_exporterGrafana的安装和使用。 一.Prometheus安装 虽然…

家用洗地机什么牌子最好?家用洗地机排行榜

对于现在的年轻人来说&#xff0c;打扫家里的卫生一直是非常头疼的问题&#xff0c;上班一天已经很累了&#xff0c;回家还需要花费很长时间吸地、拖地真的很闹心。特别是对于有小孩子的家庭&#xff0c;地面弄上一些油污、饭菜简直就是家常便饭&#xff0c;每次打扫起来非常费…

NFC芯片MS520:非接触式读卡器 IC

MS520 是一款应用于 13.56MHz 非接触式通信中的高集成 度读写卡芯片。它集成了 13.56MHz 下所有类型的被动非接触 式通信方式和协议&#xff0c;支持 ISO14443A 的多层应用。 主要特点 ◼ 高度集成的解调和解码模拟电路 ◼ 采用少量外部器件&#xff0c;即可将输…

X号是否可以接入OKCC

答案当然是可以。 但是有个前提&#xff0c;需要借助X号平台&#xff0c;也就是我们常说的小号平台。 X号只是号码资源&#xff0c;没有载体&#xff0c;要将X号接入OKCC&#xff0c;需要通过小号平台&#xff0c;类似号码池一样的平台&#xff0c;以API的方式接入OKCC。 所以很…

Redis集群

目录 一, 集群及分片算法 1.1 什么是集群 1.2 数据分片算法 1. 哈希求余 2. 一致性哈希算 3. 哈希槽分区算法(Redis使用) 二, 集群的故障处理 2.1 故障判定 2.2 故障迁移 三, 集群扩容 四, 集群缩容 一, 集群及分片算法 1.1 什么是集群 我们在Redis哨兵中学习了,哨…

甘特图组件DHTMLX Gantt用例 - 如何拆分任务和里程碑项目路线图

创建一致且引人注意的视觉样式是任何项目管理应用程序的重要要求&#xff0c;这就是为什么我们会在这个系列中继续探索DHTMLX Gantt图库的自定义。在本文中我们将考虑一个新的甘特图定制场景&#xff0c;DHTMLX Gantt组件如何创建一个项目路线图。 DHTMLX Gantt正式版下载 用…

六氟化硫气体监测装置单位VOL%/LEL%/PPM分别是什么意思?

我们在使用六氟化硫等气体监测装置仪器时&#xff0c;经常看到VOL%、LEL%、PPM等单位&#xff0c;以及仪器中反复性、响应时间、灵敏度等这些词在气体检测仪中代表什么意思呢&#xff1f;今天主要给大家解释气体检测仪一些常见的单位及常用术语的意思。 一、常见单位 &#xff…

上线项目问题——无法加载响应数据

目录 无法加载响应数据解决 无法加载响应数据 上线项目时 改用服务器上的redis和MySQL 出现请求能请求到后端&#xff0c;后端也能正常返回数据&#xff0c;但是在前端页面会显示 以为是跨域问题&#xff0c;但是环境还在本地&#xff0c;排除跨域问题以为是服务器问题&#…

iOS App Store上传项目报错 缺少隐私政策网址(URL)解决方法

​ 一、问题如下图所示&#xff1a; ​ 二、解决办法&#xff1a;使用Google浏览器&#xff08;翻译成中文&#xff09;直接打开该网址 https://www.freeprivacypolicy.com/free-privacy-policy-generator.php 按照要求填写APP信息&#xff0c;最后将生成的网址复制粘贴到隐…

计算两个时间之间连续的日期(java)

背景介绍 给出两个时间&#xff0c;希望算出两者之间连续的日期&#xff0c;比如时间A:2023-10-01 00:00:00 时间B:2023-11-30 23:59:59,期望得到的连续日期为2023-10-01、2023-10-02、… 2023-11-30 Java版代码示例 import java.time.temporal.ChronoUnit; import java.tim…

Proxysql读写分离

Proxysql读写分离 主从配置 # /etc/my.cnf 主节点 [mysqld] log-binmysql-bin server-id1从节点 [mysqld] server-id2 read_only1#初始化以及创建主从复制用户 mysql> alter user rootlocalhost identified with mysql_native_password by Jianren123; Query OK, 0 rows …

C++:关联式容器set的介绍

1、set的介绍 set是按照一定次序存储元素的容器 在set中&#xff0c;元素的value也标识它(value就是key&#xff0c;类型为T)&#xff0c;并且每个value必须是唯一的。 set中的元素不能在容器中修改(元素总是const)&#xff0c;但是可以从容器中插入或删除它们。 在内部&#…

使用Llama index构建多代理 RAG

检索增强生成(RAG)已成为增强大型语言模型(LLM)能力的一种强大技术。通过从知识来源中检索相关信息并将其纳入提示&#xff0c;RAG为LLM提供了有用的上下文&#xff0c;以产生基于事实的输出。 但是现有的单代理RAG系统面临着检索效率低下、高延迟和次优提示的挑战。这些问题在…

讲座分享|《追AI的人》——中国科学技术大学张卫明教授分享《人工智能背景下的数字水印》

本篇博客记录 2023年11月1日 《人工智能背景下的数字水印》 讲座笔记。 先来明确一下水印在信息隐藏中的定位&#xff0c;如下图&#xff1a; 目录 概述AI for Watermark图像传统攻击方式&#xff08;如JPEG压缩&#xff09;跨媒介攻击方式&#xff08;屏摄&#xff09; 文档水…

生成模型常见损失函数Python代码实现+计算原理解析

前言 损失函数无疑是机器学习和深度学习效果验证的核心检验功能&#xff0c;用于评估模型预测值与实际值之间的差异。我们学习机器学习和深度学习或多或少都接触到了损失函数&#xff0c;但是我们缺少细致的对损失函数进行分类&#xff0c;或者系统的学习损失函数在不同的算法…

Docker DeskTop安装与启动(Windows版本)

一、官网下载Docker安装包 Docker官网如下&#xff1a; Docker官网不同操作系统下载页面https://docs.docker.com/desktop/install/windows-install/ 二、安装Docker DeskTop 2.1 双击 Docker Installer.exe 以运行安装程序 2.2 安装操作 默认勾选&#xff0c;具体操作如下…

升级智能监控,真香!

随着社会的发展与进步&#xff0c;传统依赖看的监控已经无法满足大众的需求&#xff0c;不够智能、识别不精准&#xff0c;传统监控的弊端也日益显现&#xff0c;智能监控升级迫在眉睫。 升级智能监控&#xff0c;不仅能够促进公共安全&#xff0c;同时也能促进社会文明的发展…

macOS 安装brew

参考链接&#xff1a; https://mirrors4.tuna.tsinghua.edu.cn/help/homebrew/ https://www.yii666.com/blog/429332.html 安装中科大源的&#xff1a; https://zhuanlan.zhihu.com/p/470873649

深度学习_8_对Softmax回归的理解

回归问题&#xff0c;例如之前做房子价格预测的线性回归问题 而softmax回归是一个分类问题,即给定一个图片&#xff0c;从猫狗两种动物类别中选出最可靠的那种答案&#xff0c;这个是两类分类问题&#xff0c;因为狗和猫是两类 上述多个输出可以这样理解&#xff0c;假设一个图…