牛客项目(五)-使用kafka实现发送系统通知

kafka入门以及与spring整合

在这里插入图片描述

Message.java

import java.util.Date;public class Message {private int id;private int fromId;private int toId;private String conversationId;private String content;private int status;private Date createTime;public int getId() {return id;}public void setId(int id) {this.id = id;}public int getFromId() {return fromId;}public void setFromId(int fromId) {this.fromId = fromId;}public int getToId() {return toId;}public void setToId(int toId) {this.toId = toId;}public String getConversationId() {return conversationId;}public void setConversationId(String conversationId) {this.conversationId = conversationId;}public String getContent() {return content;}public void setContent(String content) {this.content = content;}public int getStatus() {return status;}public void setStatus(int status) {this.status = status;}public Date getCreateTime() {return createTime;}public void setCreateTime(Date createTime) {this.createTime = createTime;}@Overridepublic String toString() {return "Message{" +"id=" + id +", fromId=" + fromId +", toId=" + toId +", conversationId='" + conversationId + '\'' +", content='" + content + '\'' +", status=" + status +", createTime=" + createTime +'}';}
}

EventConsumer.java

定义事件消费者

import com.alibaba.fastjson.JSONObject;
import edu.npu.newcoder.community.community.entity.DiscussPost;
import edu.npu.newcoder.community.community.entity.Event;
import edu.npu.newcoder.community.community.entity.Message;
import edu.npu.newcoder.community.community.service.DiscussPostService;
import edu.npu.newcoder.community.community.service.ElasticsearchService;
import edu.npu.newcoder.community.community.service.MessageService;
import edu.npu.newcoder.community.community.util.CommunityConstant;
import org.apache.kafka.clients.consumer.ConsumerRecord;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;import java.util.Date;
import java.util.HashMap;
import java.util.Map;@Component
public class EventConsumer implements CommunityConstant {
//    private static final Logger logger= LoggerFactory.getLogger(EventConsumer.class);@Autowiredprivate MessageService messageService;@Autowiredprivate DiscussPostService discussPostService;@Autowiredprivate ElasticsearchService elasticsearchService;//加一个监听相关主题的listener@KafkaListener(topics = {TOPIC_COMMENT,TOPIC_LIKE,TOPIC_FOLLOW})public void handleCommentMessage(ConsumerRecord record){if(record == null || record.value()==null){System.out.println("错误发帖");return;}Event event= JSONObject.parseObject(record.value().toString(),Event.class);if(event == null){System.out.println("错误发帖");return;}//发送站内通知Message message = new Message();message.setFromId(SYSTEM_USERID);message.setToId(event.getEntityUserId());message.setConversationId(event.getTopic());message.setCreateTime(new Date());//message的内容Map<String,Object> content=new HashMap<>();content.put("userId",event.getUserId());content.put("entityType",event.getEntityType());content.put("entityId",event.getEntityId());if(!event.getData().isEmpty()){for(Map.Entry<String,Object> entry:event.getData().entrySet()){content.put(entry.getKey(),entry.getValue());}}message.setContent(JSONObject.toJSONString(content));System.out.println(message);messageService.addMessage(message);System.out.println("成功处理事件");}}

Event.java

定义一个事件实体 以方便在消息的发送与处理

import java.util.HashMap;
import java.util.Map;//用于事件驱动的kafka消息队列开发
public class Event {private String topic;//事件触发的人private int userId;//事件发生在哪个实体private int entityType;private int entityId;//实体作者private int entityUserId;//存储额外数据private Map<String,Object> data = new HashMap<>();public String getTopic() {return topic;}public Event setTopic(String topic) {this.topic = topic;return this;}public int getUserId() {return userId;}public Event setUserId(int userId) {this.userId = userId;return this;}public int getEntityType() {return entityType;}public Event setEntityType(int entityType) {this.entityType = entityType;return this;}public int getEntityId() {return entityId;}public Event setEntityId(int entityId) {this.entityId = entityId;return this;}public int getEntityUserId() {return entityUserId;}public Event setEntityUserId(int entityUserId) {this.entityUserId = entityUserId;return this;}public Map<String, Object> getData() {return data;}public Event setData(String key,Object value) {this.data.put(key,value);return this;}}

EventProducer.java

定义事件的生产者

import com.alibaba.fastjson.JSONObject;
import edu.npu.newcoder.community.community.entity.Event;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;@Component
public class EventProducer {
//生产者使用kafkaTemplate发送消息@AutowiredKafkaTemplate kafkaTemplate;//处理事件public void fireEvent(Event event){//将事件发布到指定的主题//将event转换为json数据进行消息发送kafkaTemplate.send(event.getTopic(), JSONObject.toJSONString(event));System.out.println("成功发送"+event.getTopic());}
}

EventConsumer.java

定义事件消费者

import com.alibaba.fastjson.JSONObject;
import edu.npu.newcoder.community.community.entity.DiscussPost;
import edu.npu.newcoder.community.community.entity.Event;
import edu.npu.newcoder.community.community.entity.Message;
import edu.npu.newcoder.community.community.service.DiscussPostService;
import edu.npu.newcoder.community.community.service.ElasticsearchService;
import edu.npu.newcoder.community.community.service.MessageService;
import edu.npu.newcoder.community.community.util.CommunityConstant;
import org.apache.kafka.clients.consumer.ConsumerRecord;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;import java.util.Date;
import java.util.HashMap;
import java.util.Map;@Component
public class EventConsumer implements CommunityConstant {
//    private static final Logger logger= LoggerFactory.getLogger(EventConsumer.class);@Autowiredprivate MessageService messageService;@Autowiredprivate DiscussPostService discussPostService;@Autowiredprivate ElasticsearchService elasticsearchService;//加一个监听相关主题的listener@KafkaListener(topics = {TOPIC_COMMENT,TOPIC_LIKE,TOPIC_FOLLOW})public void handleCommentMessage(ConsumerRecord record){if(record == null || record.value()==null){System.out.println("错误发帖");return;}Event event= JSONObject.parseObject(record.value().toString(),Event.class);if(event == null){System.out.println("错误发帖");return;}//发送站内通知Message message = new Message();message.setFromId(SYSTEM_USERID);message.setToId(event.getEntityUserId());message.setConversationId(event.getTopic());message.setCreateTime(new Date());//message的内容Map<String,Object> content=new HashMap<>();content.put("userId",event.getUserId());content.put("entityType",event.getEntityType());content.put("entityId",event.getEntityId());if(!event.getData().isEmpty()){for(Map.Entry<String,Object> entry:event.getData().entrySet()){content.put(entry.getKey(),entry.getValue());}}message.setContent(JSONObject.toJSONString(content));System.out.println(message);messageService.addMessage(message);System.out.println("成功处理事件");}}

在特定的地方触发消息产生

CommentController

 //触发评论事件Event event=new Event().setTopic(TOPIC_COMMENT).setUserId(hostHolder.getUser().getId()).setEntityType(comment.getEntityType()).setEntityId(comment.getEntityId()).setData("postId",discussPostId);if(comment.getEntityType() == ENTITY_TYPE_POST){DiscussPost target=discussPostService.findDiscussPostById(comment.getEntityId());event.setEntityUserId(target.getUserId());}else if(comment.getEntityType()==ENTITY_TYPE_COMMENT){//根据评论的id查询评论Comment target =commentService.findCommentById(comment.getEntityId());event.setEntityUserId(target.getUserId());}eventProducer.fireEvent(event);

LikeController

 //触发点赞事件if(likeStatus ==1){Event event =new Event().setTopic(TOPIC_LIKE).setUserId(hostHolder.getUser().getId()).setEntityType(entityType).setEntityId(entityId).setEntityUserId(entityUserId).setData("postId",postId);eventProducer.fireEvent(event);}

FollowController

 //触发关注事件Event event = new Event().setTopic(TOPIC_FOLLOW).setUserId(hostHolder.getUser().getId()).setEntityType(entityType).setEntityId(entityId).setEntityUserId(entityId);eventProducer.fireEvent(event);

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/129312.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Python数据分析实战-筛选出DataFrame中指定列都不包含缺失值的记录(附源码和实现效果)

实现功能 筛选出DataFrame中指定列都不包含缺失值的记录 实现代码 import pandas as pd# 创建示例DataFrame data {A: [1, 2, 3, None, 5],B: [1, None, 3, 4, 5],C: [1, 2, 3, 4, 5] } df pd.DataFrame(data)# 筛选出指定列都不包含缺失值的记录 columns_to_check [A, B…

Prometheus+Node_exporter+Grafana实现监控主机

PrometheusNode_exporterGrafana实现监控主机 如果没有安装相关的配置&#xff0c;首先要进行安装配置&#xff0c;环境是基于Linux&#xff0c;虚拟机的相关环境配置在文末给出&#xff0c;现在先讲解PrometheusNode_exporterGrafana的安装和使用。 一.Prometheus安装 虽然…

Spring Boot 2.x.x 升级至 Spring Boot 3.x.x

小伙伴们&#xff0c;你们好呀&#xff0c;好久不见&#xff0c;我是老寇&#xff0c;跟我一起升级Spring Boot版本 一、JDK 版本 JDK8 需要升级至 JDK17 二、Spring Boot 版本 Spring Boot 2.x.x 升级至 Spring Boot 3.x.x 三、Java Api 变更 javax 变更成 jakarta 四、…

家用洗地机什么牌子最好?家用洗地机排行榜

对于现在的年轻人来说&#xff0c;打扫家里的卫生一直是非常头疼的问题&#xff0c;上班一天已经很累了&#xff0c;回家还需要花费很长时间吸地、拖地真的很闹心。特别是对于有小孩子的家庭&#xff0c;地面弄上一些油污、饭菜简直就是家常便饭&#xff0c;每次打扫起来非常费…

NFC芯片MS520:非接触式读卡器 IC

MS520 是一款应用于 13.56MHz 非接触式通信中的高集成 度读写卡芯片。它集成了 13.56MHz 下所有类型的被动非接触 式通信方式和协议&#xff0c;支持 ISO14443A 的多层应用。 主要特点 ◼ 高度集成的解调和解码模拟电路 ◼ 采用少量外部器件&#xff0c;即可将输…

X号是否可以接入OKCC

答案当然是可以。 但是有个前提&#xff0c;需要借助X号平台&#xff0c;也就是我们常说的小号平台。 X号只是号码资源&#xff0c;没有载体&#xff0c;要将X号接入OKCC&#xff0c;需要通过小号平台&#xff0c;类似号码池一样的平台&#xff0c;以API的方式接入OKCC。 所以很…

Redis集群

目录 一, 集群及分片算法 1.1 什么是集群 1.2 数据分片算法 1. 哈希求余 2. 一致性哈希算 3. 哈希槽分区算法(Redis使用) 二, 集群的故障处理 2.1 故障判定 2.2 故障迁移 三, 集群扩容 四, 集群缩容 一, 集群及分片算法 1.1 什么是集群 我们在Redis哨兵中学习了,哨…

甘特图组件DHTMLX Gantt用例 - 如何拆分任务和里程碑项目路线图

创建一致且引人注意的视觉样式是任何项目管理应用程序的重要要求&#xff0c;这就是为什么我们会在这个系列中继续探索DHTMLX Gantt图库的自定义。在本文中我们将考虑一个新的甘特图定制场景&#xff0c;DHTMLX Gantt组件如何创建一个项目路线图。 DHTMLX Gantt正式版下载 用…

六氟化硫气体监测装置单位VOL%/LEL%/PPM分别是什么意思?

我们在使用六氟化硫等气体监测装置仪器时&#xff0c;经常看到VOL%、LEL%、PPM等单位&#xff0c;以及仪器中反复性、响应时间、灵敏度等这些词在气体检测仪中代表什么意思呢&#xff1f;今天主要给大家解释气体检测仪一些常见的单位及常用术语的意思。 一、常见单位 &#xff…

上线项目问题——无法加载响应数据

目录 无法加载响应数据解决 无法加载响应数据 上线项目时 改用服务器上的redis和MySQL 出现请求能请求到后端&#xff0c;后端也能正常返回数据&#xff0c;但是在前端页面会显示 以为是跨域问题&#xff0c;但是环境还在本地&#xff0c;排除跨域问题以为是服务器问题&#…

iOS App Store上传项目报错 缺少隐私政策网址(URL)解决方法

​ 一、问题如下图所示&#xff1a; ​ 二、解决办法&#xff1a;使用Google浏览器&#xff08;翻译成中文&#xff09;直接打开该网址 https://www.freeprivacypolicy.com/free-privacy-policy-generator.php 按照要求填写APP信息&#xff0c;最后将生成的网址复制粘贴到隐…

计算两个时间之间连续的日期(java)

背景介绍 给出两个时间&#xff0c;希望算出两者之间连续的日期&#xff0c;比如时间A:2023-10-01 00:00:00 时间B:2023-11-30 23:59:59,期望得到的连续日期为2023-10-01、2023-10-02、… 2023-11-30 Java版代码示例 import java.time.temporal.ChronoUnit; import java.tim…

Proxysql读写分离

Proxysql读写分离 主从配置 # /etc/my.cnf 主节点 [mysqld] log-binmysql-bin server-id1从节点 [mysqld] server-id2 read_only1#初始化以及创建主从复制用户 mysql> alter user rootlocalhost identified with mysql_native_password by Jianren123; Query OK, 0 rows …

arm-none-eabi-gcc下实现printf的两种方式

方式1&#xff0c;移植第三方printf库&#xff1a; 1. 下载地址&#xff1a;https://github.com/mpaland/printf 2. 拷贝其中的printf.c和printf.h到本地&#xff1b; 3. 重新实现 void _putchar(char character) 接口&#xff0c;使用具体串口发送ch数据&#xff0c;如在 u…

C++:关联式容器set的介绍

1、set的介绍 set是按照一定次序存储元素的容器 在set中&#xff0c;元素的value也标识它(value就是key&#xff0c;类型为T)&#xff0c;并且每个value必须是唯一的。 set中的元素不能在容器中修改(元素总是const)&#xff0c;但是可以从容器中插入或删除它们。 在内部&#…

如何在Vue3项目中使用防抖节流技巧

前言 防抖节流是可以说是一种优化组件性能的技巧&#xff0c;可以有效减少组件中的渲染次数和计算量&#xff0c;从而提高组件的响应速度和用户体验。在Vue3中可以使用lodash库中的debounce和throttle函数来分别实现防抖和节流。当然也可以自行设计实现防抖节流函数&#xff0…

ES替换某个索引下的字段的值

ES替换某个索引下的字段的值 前言 在ES的操作中&#xff0c;如果我们已经同步完所有的索引&#xff0c;但是发现同步的时候数据出现了失误&#xff0c;那么在数据量很大的情况下还是要避免重新同步&#xff0c;这个时候就用到了更新替换操作&#xff1a; 操作1 针对于name字…

使用Llama index构建多代理 RAG

检索增强生成(RAG)已成为增强大型语言模型(LLM)能力的一种强大技术。通过从知识来源中检索相关信息并将其纳入提示&#xff0c;RAG为LLM提供了有用的上下文&#xff0c;以产生基于事实的输出。 但是现有的单代理RAG系统面临着检索效率低下、高延迟和次优提示的挑战。这些问题在…

讲座分享|《追AI的人》——中国科学技术大学张卫明教授分享《人工智能背景下的数字水印》

本篇博客记录 2023年11月1日 《人工智能背景下的数字水印》 讲座笔记。 先来明确一下水印在信息隐藏中的定位&#xff0c;如下图&#xff1a; 目录 概述AI for Watermark图像传统攻击方式&#xff08;如JPEG压缩&#xff09;跨媒介攻击方式&#xff08;屏摄&#xff09; 文档水…

自动驾驶常用英文表述

英文单词 英文单词单词意义ego自车&#xff0c;特指自动驾驶汽车自己yield车辆&#xff08;停下来或者减速&#xff09;从而避让其它车辆、行人side pass车辆绕行其它行驶缓慢的车辆、行人nudge车辆横向避让道路上障碍物的动作&#xff0c;与变道和借道行驶不同&#xff0c;避…