kafka入门以及与spring整合
Message.java
import java.util.Date;public class Message {private int id;private int fromId;private int toId;private String conversationId;private String content;private int status;private Date createTime;public int getId() {return id;}public void setId(int id) {this.id = id;}public int getFromId() {return fromId;}public void setFromId(int fromId) {this.fromId = fromId;}public int getToId() {return toId;}public void setToId(int toId) {this.toId = toId;}public String getConversationId() {return conversationId;}public void setConversationId(String conversationId) {this.conversationId = conversationId;}public String getContent() {return content;}public void setContent(String content) {this.content = content;}public int getStatus() {return status;}public void setStatus(int status) {this.status = status;}public Date getCreateTime() {return createTime;}public void setCreateTime(Date createTime) {this.createTime = createTime;}@Overridepublic String toString() {return "Message{" +"id=" + id +", fromId=" + fromId +", toId=" + toId +", conversationId='" + conversationId + '\'' +", content='" + content + '\'' +", status=" + status +", createTime=" + createTime +'}';}
}
EventConsumer.java
定义事件消费者
import com.alibaba.fastjson.JSONObject;
import edu.npu.newcoder.community.community.entity.DiscussPost;
import edu.npu.newcoder.community.community.entity.Event;
import edu.npu.newcoder.community.community.entity.Message;
import edu.npu.newcoder.community.community.service.DiscussPostService;
import edu.npu.newcoder.community.community.service.ElasticsearchService;
import edu.npu.newcoder.community.community.service.MessageService;
import edu.npu.newcoder.community.community.util.CommunityConstant;
import org.apache.kafka.clients.consumer.ConsumerRecord;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;import java.util.Date;
import java.util.HashMap;
import java.util.Map;@Component
public class EventConsumer implements CommunityConstant {
// private static final Logger logger= LoggerFactory.getLogger(EventConsumer.class);@Autowiredprivate MessageService messageService;@Autowiredprivate DiscussPostService discussPostService;@Autowiredprivate ElasticsearchService elasticsearchService;//加一个监听相关主题的listener@KafkaListener(topics = {TOPIC_COMMENT,TOPIC_LIKE,TOPIC_FOLLOW})public void handleCommentMessage(ConsumerRecord record){if(record == null || record.value()==null){System.out.println("错误发帖");return;}Event event= JSONObject.parseObject(record.value().toString(),Event.class);if(event == null){System.out.println("错误发帖");return;}//发送站内通知Message message = new Message();message.setFromId(SYSTEM_USERID);message.setToId(event.getEntityUserId());message.setConversationId(event.getTopic());message.setCreateTime(new Date());//message的内容Map<String,Object> content=new HashMap<>();content.put("userId",event.getUserId());content.put("entityType",event.getEntityType());content.put("entityId",event.getEntityId());if(!event.getData().isEmpty()){for(Map.Entry<String,Object> entry:event.getData().entrySet()){content.put(entry.getKey(),entry.getValue());}}message.setContent(JSONObject.toJSONString(content));System.out.println(message);messageService.addMessage(message);System.out.println("成功处理事件");}}
Event.java
定义一个事件实体 以方便在消息的发送与处理
import java.util.HashMap;
import java.util.Map;//用于事件驱动的kafka消息队列开发
public class Event {private String topic;//事件触发的人private int userId;//事件发生在哪个实体private int entityType;private int entityId;//实体作者private int entityUserId;//存储额外数据private Map<String,Object> data = new HashMap<>();public String getTopic() {return topic;}public Event setTopic(String topic) {this.topic = topic;return this;}public int getUserId() {return userId;}public Event setUserId(int userId) {this.userId = userId;return this;}public int getEntityType() {return entityType;}public Event setEntityType(int entityType) {this.entityType = entityType;return this;}public int getEntityId() {return entityId;}public Event setEntityId(int entityId) {this.entityId = entityId;return this;}public int getEntityUserId() {return entityUserId;}public Event setEntityUserId(int entityUserId) {this.entityUserId = entityUserId;return this;}public Map<String, Object> getData() {return data;}public Event setData(String key,Object value) {this.data.put(key,value);return this;}}
EventProducer.java
定义事件的生产者
import com.alibaba.fastjson.JSONObject;
import edu.npu.newcoder.community.community.entity.Event;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;@Component
public class EventProducer {
//生产者使用kafkaTemplate发送消息@AutowiredKafkaTemplate kafkaTemplate;//处理事件public void fireEvent(Event event){//将事件发布到指定的主题//将event转换为json数据进行消息发送kafkaTemplate.send(event.getTopic(), JSONObject.toJSONString(event));System.out.println("成功发送"+event.getTopic());}
}
EventConsumer.java
定义事件消费者
import com.alibaba.fastjson.JSONObject;
import edu.npu.newcoder.community.community.entity.DiscussPost;
import edu.npu.newcoder.community.community.entity.Event;
import edu.npu.newcoder.community.community.entity.Message;
import edu.npu.newcoder.community.community.service.DiscussPostService;
import edu.npu.newcoder.community.community.service.ElasticsearchService;
import edu.npu.newcoder.community.community.service.MessageService;
import edu.npu.newcoder.community.community.util.CommunityConstant;
import org.apache.kafka.clients.consumer.ConsumerRecord;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;import java.util.Date;
import java.util.HashMap;
import java.util.Map;@Component
public class EventConsumer implements CommunityConstant {
// private static final Logger logger= LoggerFactory.getLogger(EventConsumer.class);@Autowiredprivate MessageService messageService;@Autowiredprivate DiscussPostService discussPostService;@Autowiredprivate ElasticsearchService elasticsearchService;//加一个监听相关主题的listener@KafkaListener(topics = {TOPIC_COMMENT,TOPIC_LIKE,TOPIC_FOLLOW})public void handleCommentMessage(ConsumerRecord record){if(record == null || record.value()==null){System.out.println("错误发帖");return;}Event event= JSONObject.parseObject(record.value().toString(),Event.class);if(event == null){System.out.println("错误发帖");return;}//发送站内通知Message message = new Message();message.setFromId(SYSTEM_USERID);message.setToId(event.getEntityUserId());message.setConversationId(event.getTopic());message.setCreateTime(new Date());//message的内容Map<String,Object> content=new HashMap<>();content.put("userId",event.getUserId());content.put("entityType",event.getEntityType());content.put("entityId",event.getEntityId());if(!event.getData().isEmpty()){for(Map.Entry<String,Object> entry:event.getData().entrySet()){content.put(entry.getKey(),entry.getValue());}}message.setContent(JSONObject.toJSONString(content));System.out.println(message);messageService.addMessage(message);System.out.println("成功处理事件");}}
在特定的地方触发消息产生
CommentController
//触发评论事件Event event=new Event().setTopic(TOPIC_COMMENT).setUserId(hostHolder.getUser().getId()).setEntityType(comment.getEntityType()).setEntityId(comment.getEntityId()).setData("postId",discussPostId);if(comment.getEntityType() == ENTITY_TYPE_POST){DiscussPost target=discussPostService.findDiscussPostById(comment.getEntityId());event.setEntityUserId(target.getUserId());}else if(comment.getEntityType()==ENTITY_TYPE_COMMENT){//根据评论的id查询评论Comment target =commentService.findCommentById(comment.getEntityId());event.setEntityUserId(target.getUserId());}eventProducer.fireEvent(event);
LikeController
//触发点赞事件if(likeStatus ==1){Event event =new Event().setTopic(TOPIC_LIKE).setUserId(hostHolder.getUser().getId()).setEntityType(entityType).setEntityId(entityId).setEntityUserId(entityUserId).setData("postId",postId);eventProducer.fireEvent(event);}
FollowController
//触发关注事件Event event = new Event().setTopic(TOPIC_FOLLOW).setUserId(hostHolder.getUser().getId()).setEntityType(entityType).setEntityId(entityId).setEntityUserId(entityId);eventProducer.fireEvent(event);