文章目录
- Redis
- spring整合redis
- 实现点赞
- 帖子的赞
- 用户的赞
- 关注功能
- 热帖排行
- redis存储验证码、登录凭证、用户信息
- kafka
- 阻塞队列
- kafka![在这里插入图片描述](https://img-blog.csdnimg.cn/direct/d35be55986344b548710985cd8ecbd87.png)
- 触发事件
- 处理事件
- Redis高级
- 网站数据统计
- 实现搜索功能
- 实现的功能
- 1. elasticsearch基本设置
- 2. 实现数据同步
- 3. es搜索并分页、高亮的功能
- 4. 实现搜索功能
- Quarzt
- 任务调度
Redis
redis-cli
select 0
# 加值减值
incr count decr count
# 查询库中所有的key
keys *
# 以test开头的所有的key
keys test*
# key的类型, 名为test:user的key
type test:user
# key是否存在
exists test:user
del test:user# 删除
#过期时间, 10秒过期
expire test:user 10
spring整合redis
实现点赞
帖子的赞
like:entity:entityType:entityId
实体的赞中存的是用户id, 可以更好的适应各种需求–>set(userId)–无序唯一
- 点一次是点赞,再点一次是取消赞
- 先看集合中有没有userId,
operations.opsForSet().isMember(entityLikeKey, userId)
如果存在,就删除。不存在就添加
public void like(int userId,int entityType,int entityId,int entityUserId){// 事务redisTemplate.execute(new SessionCallback() {@Overridepublic Object execute(RedisOperations operations) throws DataAccessException {String entityLikeKey= RedisKeyUtil.getEntityLikeKey(entityType,entityId);// 用户该帖子获得的赞String userLikeKey=RedisKeyUtil.getUserLikeKey(entityUserId);Boolean isMember = operations.opsForSet().isMember(entityLikeKey, userId);operations.multi();//开启事务if(isMember){operations.opsForSet().remove(entityLikeKey,userId);operations.opsForValue().decrement(userLikeKey);}else {operations.opsForSet().add(entityLikeKey,userId);operations.opsForValue().increment(userLikeKey);}return operations.exec();//执行事务}});}
- 查询某实体(帖子)的点赞数量
- 查询用户是否给实体点赞
- 查询某个用户获得的赞
// 查询某实体点赞的数量public long findEntityLikeCount(int entityType,int entityId){String entityLikeKey=RedisKeyUtil.getEntityLikeKey(entityType,entityId);return redisTemplate.opsForSet().size(entityLikeKey);}//查询某人对某实体的点赞状态public int findEntityLikeStatus(int userId,int entityType,int entityId){String entityLikeKey=RedisKeyUtil.getEntityLikeKey(entityType,entityId);return redisTemplate.opsForSet().isMember(entityLikeKey,userId)?1:0;//查询某个用户的赞的数量public int findUserLikeCount(int userId){String userLikeKey = RedisKeyUtil.getUserLikeKey(userId);Integer count = (Integer) redisTemplate.opsForValue().get(userLikeKey);return count==null?0:count.intValue();}}
用户的赞
like:user:userId
关注功能
- 某个用户关注的所有实体
followee:userId:entityType ->Zset(entityId,now)
按时间排序 - 实体的粉丝
follower:entityType:entityId
->Zset(userId,now)
关注取关的功能实现
public void follow(int userId,int entityType,int entityId){redisTemplate.execute(new SessionCallback() {@Overridepublic Object execute(RedisOperations operations) throws DataAccessException {String followeeKey= RedisKeyUtil.getFolloweeKey(userId,entityType);String followerKey= RedisKeyUtil.getFollowerKey(entityId,entityType);operations.multi();operations.opsForZSet().add(followeeKey,entityId,System.currentTimeMillis());operations.opsForZSet().add(followerKey,userId,System.currentTimeMillis());return operations.exec();}});}public void unfollow(int userId,int entityType,int entityId){redisTemplate.execute(new SessionCallback() {@Overridepublic Object execute(RedisOperations operations) throws DataAccessException {String followeeKey= RedisKeyUtil.getFolloweeKey(userId,entityType);String followerKey= RedisKeyUtil.getFollowerKey(entityId,entityType);operations.multi();operations.opsForZSet().remove(followeeKey,entityId);operations.opsForZSet().remove(followerKey,userId);return operations.exec();}});}
- 查询关注实体的数量
- 查询实体粉丝的数量
- 查询当前用户是否已关注这个实体
粉丝列表、关注列表 - 查询某用户关注的人
- 查询某用户的粉丝
//查询某用户关注的人public List<Map<String,Object>> findFollowees(int userId,int offset,int limit){String followeeKey= RedisKeyUtil.getFolloweeKey(userId,ENTITY_TYPE_USER);Set<Integer> targetIds=redisTemplate.opsForZSet().reverseRange(followeeKey,offset,offset+limit-1);if(targetIds==null){return null;}List<Map<String,Object>> list=new ArrayList<>();for(Integer targetId:targetIds){Map<String,Object> map=new HashMap<>();User user = userService.findUserById(targetId);map.put("user",user);Double score=redisTemplate.opsForZSet().score(followeeKey,targetId);map.put("followTime",new Date(score.longValue()));list.add(map);}return list;}//查询某用户的粉丝public List<Map<String,Object>> findFollowers(int userId,int offset,int limit){String followerKey= RedisKeyUtil.getFollowerKey(userId,ENTITY_TYPE_USER);Set<Integer> targetIds=redisTemplate.opsForZSet().reverseRange(followerKey,offset,offset+limit-1);if(targetIds==null){return null;}List<Map<String,Object>> list=new ArrayList<>();for(Integer targetId:targetIds){Map<String,Object> map=new HashMap<>();User user = userService.findUserById(targetId);map.put("user",user);Double score=redisTemplate.opsForZSet().score(followerKey,targetId);map.put("followTime",new Date(score.longValue()));list.add(map);}return list;}
热帖排行
// 帖子分数public static String getPostScoreKey() {return PREFIX_POST + SPLIT + "score";}
- 评论、加精、点赞、收藏时,对帖子算分数
- 将需要重新计算分数的帖子Id存入Redis中
// 保存需要计算帖子分数的idString redisKey = RedisKeyUtil.getPostScoreKey();redisTemplate.opsForSet().add(redisKey, discussPostId);
- 定时刷新分数,五分钟一次
- quartz对帖子分数进行计算
private void refresh(int postId) {DiscussPost post = discussPostService.findDiscussPostById(postId);if(post == null) {logger.error("该帖子不存在:id=" + postId);return;}// 是否精华boolean wonderful = post.getStatus() == 1;// 评论数量int commentCount = post.getCommentCount();// 点赞数量long likeCount = likeService.findEntityLikeCount(ENTITY_TYPE_POST, postId);// 计算权重double w = (wonderful ? 75 : 0) + commentCount * 10 + likeCount * 2;// 分数 = 帖子权重 + 距离天数double score = Math.log10(Math.max(w, 1)) + (post.getCreateTime().getTime()-epoch.getTime()) / (1000 * 3600 * 24);// 更新帖子分数discussPostService.updateScore(postId, score);// 同步搜索的数据post.setScore(score);elasticSearchService.saveDiscussPost(post);}
配置quartz
// 刷新帖子分数任务@Beanpublic JobDetailFactoryBean postScoreRefreshJobDetail(){JobDetailFactoryBean factoryBean = new JobDetailFactoryBean();factoryBean.setJobClass(PostScoreRefreshJob.class);factoryBean.setName("postScoreRefreshJob");factoryBean.setGroup("communityJobJobGroup");factoryBean.setDurability(true); // 持久化factoryBean.setRequestsRecovery(true);return factoryBean;}@Beanpublic SimpleTriggerFactoryBean postScoreRefreshTrigger(JobDetail postScoreRefreshJobDetail){SimpleTriggerFactoryBean factoryBean = new SimpleTriggerFactoryBean();factoryBean.setJobDetail(postScoreRefreshJobDetail);factoryBean.setName("postScoreRefreshTrigger");factoryBean.setGroup("communityTriggerGroup");factoryBean.setRepeatInterval(1000 * 60 * 5); // 频率factoryBean.setJobDataMap(new JobDataMap()); // 存储job的状态return factoryBean;}
redis存储验证码、登录凭证、用户信息
- 存储验证码时,可以给用户生成一个临时的随机字符串,用于标识用户。
- 登录凭证,退出后,将状态改变,再存入
- 缓存用户信息
- 优先从缓存中取值
- 取不到数据时,初始化缓存数据
- 数据变更时,清除缓存数据
//1.优先从缓存中读取数据public User getCache(int userId){String redisKey = RedisKeyUtil.getUserKey(userId);return (User) redisTemplate.opsForValue().get(redisKey);}//2.在缓存中读不到数据时,初始化缓存数据public User initCache(int userId){User user = userMapper.selectById(userId);String redisKey = RedisKeyUtil.getUserKey(userId);redisTemplate.opsForValue().set(redisKey,user,3600, TimeUnit.SECONDS);return user;}//3.数据变更时清除缓存private void clearCache(int userId){String redisKey = RedisKeyUtil.getUserKey(userId);redisTemplate.delete(redisKey);}
kafka
阻塞队列
kafka
数据存到硬盘上,主副本,从副本
触发事件
将消息放入队列中
封装成事件,
- 评论后,发布通知
触发发帖事件
Event event=new Event().setTopic(TOPIC_COMMENT).setUserId(hostHolder.getUser().getId()).setEntityType(comment.getEntityType()).setEntityId(comment.getEntityId()).setData("postId", discussPostId);if(comment.getEntityType()==ENTITY_TYPE_POST){DiscussPost target = discussPostService.findDiscussPostById(comment.getEntityId());event.setEntityUserId(target.getUserId());}else if(comment.getEntityType()==ENTITY_TYPE_COMMENT){Comment target=commentService.findCommentsById(comment.getEntityId());event.setEntityUserId(target.getUserId());}eventProducer.fireEvent(event);
- 点赞后, 发布通知
- 关注后,发布通知
public class EventProducer {@Autowiredprivate KafkaTemplate kafkaTemplate;//处理事件public void fireEvent(Event event){//将事件发布到指定的主题kafkaTemplate.send(event.getTopic(), JSONObject.toJSONString(event));}
}
处理事件
消费事件
//发送站内消息Message message = new Message();message.setFromId(SYSTEM_USER_ID);message.setToId(event.getEntityUserId());message.setConversationId(event.getTopic());message.setCreateTime(new Date());Map<String,Object> content=new HashMap<>();content.put("userId",event.getUserId());content.put("entityType",event.getEntityType());content.put("entityId",event.getEntityId());if(!event.getData().isEmpty()){for (Map.Entry<String ,Object> entry:event.getData().entrySet()) {content.put(entry.getKey(),entry.getValue());}}message.setContent(JSONObject.toJSONString(content));messageService.addMessage(message);
- 封装事件对象
- 开发事件的生产者
- 开发事件的消费者
Redis高级
网站数据统计
- 设定redis键值
用时间来当做rediskey
// 单日uvpublic static String getUVKey(String date){return PREFIX_UV + SPLIT + date;}// 区间uvpublic static String getUVKey(String startDate, String endDate){return PREFIX_UV + SPLIT + startDate + SPLIT + endDate;}// 单日活跃用户public static String getDAUKey(String date){return PREFIX_DAU + SPLIT + date;}// 区间活跃用户public static String getDAUKey(String startDate, String endDate){return PREFIX_DAU + SPLIT + startDate + SPLIT + endDate;}
- service层
// 将指定的IP计入UVpublic void recordUV(String ip){// 访问的日期作为rediskeyString redisKey = RedisKeyUtil.getUVKey(df.format(new Date()));// ip地址作为值存入redisredisTemplate.opsForHyperLogLog().add(redisKey, ip);}// 指定统计日期范围内的UVpublic long calculateUV(Date start, Date end){if(start == null || end == null){throw new IllegalArgumentException("参数不能为空!");}// 整理该日期范围内的keyList<String> keyList = new ArrayList<>();// 实例化,获取当前时间Calendar calendar = Calendar.getInstance();calendar.setTime(start);while(!calendar.getTime().after(end)){String key = RedisKeyUtil.getUVKey(df.format(calendar.getTime()));keyList.add(key);// 日期往后增加一天calendar.add(Calendar.DATE, 1);}// 合并数据String redisKey = RedisKeyUtil.getUVKey(df.format(start), df.format(end));redisTemplate.opsForHyperLogLog().union(redisKey, keyList.toArray());// 返回统计的结果return redisTemplate.opsForHyperLogLog().size(redisKey);}// 将指定用户计入DAUpublic void recordDAU(int userId){String redisKey = RedisKeyUtil.getDAUKey(df.format(new Date()));redisTemplate.opsForValue().setBit(redisKey, userId, true);}// 统计指定日期范围内的DAUpublic long calculateDAU(Date start, Date end){if(start == null || end == null){throw new IllegalArgumentException("参数不能为空!");}// 整理该日期范围内的keyList<byte[]> keyList = new ArrayList<>();// 实例化Calendar calendar = Calendar.getInstance();calendar.setTime(start);while(!calendar.getTime().after(end)){String key = RedisKeyUtil.getDAUKey(df.format(calendar.getTime()));keyList.add(key.getBytes());calendar.add(Calendar.DATE, 1);}// 进行OR运算return (long) redisTemplate.execute(new RedisCallback() {@Overridepublic Object doInRedis(RedisConnection connection) throws DataAccessException {String redisKey = RedisKeyUtil.getDAUKey(df.format(start),df.format(end));connection.bitOp(RedisStringCommands.BitOperation.OR, redisKey.getBytes(),keyList.toArray(new byte[0][0]));return connection.bitCount(redisKey.getBytes());}});}
- 在prehandle中实现调用
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {// 统计UVString ip = request.getRemoteHost();dataService.recordUV(ip);// 统计DAUUser user = hostHolder.getUser();if(user != null){dataService.recordDAU(user.getId());}return true;}
- controller层
实现搜索功能
实现的功能
- 搜索服务:将帖子保存到elasticsearch服务器;从es服务器搜索、删除帖子;
- 发布服务:发布帖子时,将帖子异步提交到es服务器;增加评论时,将帖子异步提交到es服务器;·在消费组件中增加一个方法,消费帖子发布事件;
- 显示结果:在控制器中处理搜索请求,在HTML上显示搜索结果。
1. elasticsearch基本设置
底层是基于netty的
// 管理bean的生命周期的,初始化的方法
// 被它修饰的方法在构造器调用完以后执行@PostConstructpublic void init(){//解决netty启动冲突问题System.setProperty("es.set.netty.runtime.available.processors","false");}
要考虑mysql中的表和es中的索引的对应关系.数据库中的字段和es的字段的对应关系
// 索引的名字,类型,分片,副本
@Document(indexName = "discusspost",type = "doc",shards=6,replicas=3)
public class DiscussPost {@Idprivate int id;// 主键@Field(type = FieldType.Integer)private int userId;// 存储的分词器和搜索的分词器@Field(type = FieldType.Text,analyzer = "ik_max_word",searchAnalyzer = "ik_smart")private String title;
}
设置Repository接口
// 定义处理的实体类是DiscussPost,实体的主键类型Integer
@Repository
public interface DiscussPostRepository extends ElasticsearchRepository<DiscussPost,Integer> {
}
2. 实现数据同步
通过生产者消费者的方式同步数据的变化,将数据保存在es服务器中
触发发帖事件。
//触发发帖事件Event event=new Event().setTopic(TOPIC_PUBLISH).setUserId(user.getId()).setEntityType(ENTITY_TYPE_POST).setEntityId(discussPost.getId());eventProducer.fireEvent(event);
消费发帖事件
Event event=JSONObject.parseObject(record.value().toString(),Event.class);
// 查找新增的帖子,并保存到es
DiscussPost post= discussPostService.findDiscussPostById(event.getEntityId());
elasticSearchService.saveDiscussPost(post);
3. es搜索并分页、高亮的功能
public Page<DiscussPost> searchDiscussPost(String keyword,int current,int limit){SearchQuery searchQuery = new NativeSearchQueryBuilder().withQuery(QueryBuilders.multiMatchQuery(keyword, "title", "content")).withSort(SortBuilders.fieldSort("type").order(SortOrder.DESC)).withSort(SortBuilders.fieldSort("score").order(SortOrder.DESC)).withSort(SortBuilders.fieldSort("createTime").order(SortOrder.DESC)).withPageable(PageRequest.of(current, limit)).withHighlightFields(new HighlightBuilder.Field("title").preTags("<em>").postTags("</em>"),new HighlightBuilder.Field("content").preTags("<em>").postTags("</em>")).build();return elasticsearchTemplate.queryForPage(searchQuery, DiscussPost.class, new SearchResultMapper() {@Overridepublic <T> AggregatedPage<T> mapResults(SearchResponse response, Class<T> aClass, Pageable pageable) {SearchHits hits = response.getHits();if (hits.getTotalHits() <= 0) {return null;}List<DiscussPost> list = new ArrayList<>();for (SearchHit hit : hits) {DiscussPost post = new DiscussPost();String id = hit.getSourceAsMap().get("id").toString();post.setId(Integer.valueOf(id));String userId = hit.getSourceAsMap().get("userId").toString();post.setUserId(Integer.valueOf(userId));String title = hit.getSourceAsMap().get("title").toString();post.setTitle(title);String content = hit.getSourceAsMap().get("content").toString();post.setContent(content);String status = hit.getSourceAsMap().get("status").toString();post.setStatus(Integer.valueOf(status));String createTime = hit.getSourceAsMap().get("createTime").toString();post.setCreateTime(new Date(Long.valueOf(createTime)));String commentCount = hit.getSourceAsMap().get("commentCount").toString();post.setCommentCount(Integer.valueOf(commentCount));// 处理高亮显示的结果HighlightField titleField = hit.getHighlightFields().get("title");if (titleField != null) {post.setTitle(titleField.getFragments()[0].toString());}HighlightField contentField = hit.getHighlightFields().get("content");if (contentField != null) {post.setContent(contentField.getFragments()[0].toString());}list.add(post);}return new AggregatedPageImpl(list, pageable,hits.getTotalHits(), response.getAggregations(), response.getScrollId(), hits.getMaxScore());}});
4. 实现搜索功能
org.springframework.data.domain.Page<DiscussPost> searchResult=
elasticSearchService.searchDiscussPost(keyword, page.getCurrent()-1, page.getLimit());
// 聚合数据发送到前端
List<Map<String,Object>> discussPosts=new ArrayList<>();
...
Quarzt
任务调度
存到数据库中
配置