热文章数据查询
分布式任务调度xxl-job
概述
环境搭建
docker化部署
docker run -p 3306:3306 --name mysql57 \
-v /opt/mysql/conf:/etc/mysql \
-v /opt/mysql/logs:/var/log/mysql \
-v /opt/mysql/data:/var/lib/mysql \
-e MYSQL_ROOT_PASSWORD=root\
-d mysql:5.7
docker pull xuxueli/xxl-job-admin:2.3.0
docker run -e PARAMS="--spring.datasource.url=jdbc:mysql://192.168.200.130:3306/xxl_job?Unicode=true&characterEncoding=UTF-8 \
--spring.datasource.username=root \
--spring.datasource.password=root" \
-p 8888:8080 -v /tmp:/data/applogs \
--name xxl-job-admin --restart=always -d xuxueli/xxl-job-admin:2.3.0
访问http://127.0.0.1:8888/xxl-job-admin/成功登录。
入门案例
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!--xxl-job--><dependency><groupId>com.xuxueli</groupId><artifactId>xxl-job-core</artifactId><version>2.3.0</version></dependency>
</dependencies>
server:port: 8881xxl:job:admin:addresses: http://192.168.200.130:8888/xxl-job-adminexecutor:appname: xxl-job-executor-sampleport: 9999
执行器明层appname必须和调度中心里设置的一样
package com.heima.xxljob.config;import com.xxl.job.core.executor.impl.XxlJobSpringExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** xxl-job config** @author xuxueli 2017-04-28*/
@Configuration
public class XxlJobConfig {private Logger logger = LoggerFactory.getLogger(XxlJobConfig.class);@Value("${xxl.job.admin.addresses}")private String adminAddresses;@Value("${xxl.job.executor.appname}")private String appname;@Value("${xxl.job.executor.port}")private int port;@Beanpublic XxlJobSpringExecutor xxlJobExecutor() {logger.info(">>>>>>>>>>> xxl-job config init.");XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();xxlJobSpringExecutor.setAdminAddresses(adminAddresses);xxlJobSpringExecutor.setAppname(appname);xxlJobSpringExecutor.setPort(port);return xxlJobSpringExecutor;}}
任务代码,重要注解:@XxlJob(“demoJobHandler”)
这里的demoJobhandler要和调度中心里创建的一样
package com.heima.xxljob.job;import com.xxl.job.core.handler.annotation.XxlJob;
import org.springframework.stereotype.Component;@Component
public class HelloJob {@XxlJob("demoJobHandler")public void helloJob(){System.out.println("简单任务执行了。。。。");}
}
在调度中心启动之后,就会开始执行了
任务详解
以下是执行器的属性说明:
属性名称 | 说明 |
---|---|
AppName | 是每个执行器集群的唯一标示AppName, 执行器会周期性以AppName为对象进行自动注册。可通过该配置自动发现注册成功的执行器, 供任务调度时使用; |
名称 | 执行器的名称, 因为AppName限制字母数字等组成,可读性不强, 名称为了提高执行器的可读性; |
排序 | 执行器的排序, 系统中需要执行器的地方,如任务新增, 将会按照该排序读取可用的执行器列表; |
注册方式 | 调度中心获取执行器地址的方式; |
机器地址 | 注册方式为"手动录入"时有效,支持人工维护执行器的地址信息; |
集群下任务路由策略
轮询案例
修改yml配置文件
server:port: ${port:8881}xxl:job:admin:addresses: http://192.168.200.130:8888/xxl-job-adminexecutor:appname: xxl-job-executor-sampleport: ${executor.port:9999}
3.启动多个微服务
每个微服务轮询的去执行任务
分片广播案例
轮询是一个一个调度,分片是同时调度两个,区别差不多是这样,但是上面这个案例代码实际还是循环了各循环了两万次,只是有一半是对面的,所以没有执行输出。
热文章计算—定时计算
实现步骤
频道列表远程接口准备
计算完成新热数据后,需要给每个频道缓存一份数据,所以需要查询所有频道信息
① 在heima-leadnews-feign-api定义远程接口
import com.heima.model.common.dtos.ResponseResult;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.GetMapping;@FeignClient("leadnews-wemedia")
public interface IWemediaClient {@GetMapping("/api/v1/channel/list")public ResponseResult getChannels();
}
② heima-leadnews-wemedia端提供接口
import com.heima.apis.wemedia.IWemediaClient;
import com.heima.model.common.dtos.ResponseResult;
import com.heima.wemedia.service.WmChannelService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;@RestController
public class WemediaClient implements IWemediaClient {@Autowiredprivate WmChannelService wmChannelService;@GetMapping("/api/v1/channel/list")@Overridepublic ResponseResult getChannels() {return wmChannelService.findAll();}
}
在ApArticleMapper.xml新增方法
<select id="findArticleListByLast5days" resultMap="resultMap">SELECTaa.*FROM`ap_article` aaLEFT JOIN ap_article_config aac ON aa.id = aac.article_id<where>and aac.is_delete != 1and aac.is_down != 1<if test="dayParam != null">and aa.publish_time <![CDATA[>=]]> #{dayParam}</if></where>
</select>
修改ApArticleMapper类
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.heima.model.article.dtos.ArticleHomeDto;
import com.heima.model.article.pojos.ApArticle;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;import java.util.Date;
import java.util.List;@Mapper
public interface ApArticleMapper extends BaseMapper<ApArticle> {/*** 加载文章列表* @param dto* @param type 1 加载更多 2记载最新* @return*/public List<ApArticle> loadArticleList(ArticleHomeDto dto,Short type);public List<ApArticle> findArticleListByLast5days(@Param("dayParam") Date dayParam);
}
热文章业务层
定义业务层接口
package com.heima.article.service;public interface HotArticleService {/*** 计算热点文章*/public void computeHotArticle();
}
修改ArticleConstans
public class ArticleConstants {public static final Short LOADTYPE_LOAD_MORE = 1;public static final Short LOADTYPE_LOAD_NEW = 2;public static final String DEFAULT_TAG = "__all__";public static final String ARTICLE_ES_SYNC_TOPIC = "article.es.sync.topic";public static final Integer HOT_ARTICLE_LIKE_WEIGHT = 3;public static final Integer HOT_ARTICLE_COMMENT_WEIGHT = 5;public static final Integer HOT_ARTICLE_COLLECTION_WEIGHT = 8;public static final String HOT_ARTICLE_FIRST_PAGE = "hot_article_first_page_";
}
创建一个vo接收计算分值后的对象
import com.heima.model.article.pojos.ApArticle;
import lombok.Data;@Data
public class HotArticleVo extends ApArticle {/*** 文章分值*/private Integer score;
}
业务层实现类
package com.heima.article.service.impl;import com.alibaba.fastjson.JSON;
import com.heima.apis.wemedia.IWemediaClient;
import com.heima.article.mapper.ApArticleMapper;
import com.heima.article.service.HotArticleService;
import com.heima.common.constants.ArticleConstants;
import com.heima.common.redis.CacheService;
import com.heima.model.article.pojos.ApArticle;
import com.heima.model.article.vos.HotArticleVo;
import com.heima.model.common.dtos.ResponseResult;
import com.heima.model.wemedia.pojos.WmChannel;
import lombok.extern.slf4j.Slf4j;
import org.joda.time.DateTime;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;import java.util.ArrayList;
import java.util.Comparator;
import java.util.Date;
import java.util.List;
import java.util.stream.Collectors;@Service
@Slf4j
@Transactional
public class HotArticleServiceImpl implements HotArticleService {@Autowiredprivate ApArticleMapper apArticleMapper;/*** 计算热点文章*/@Overridepublic void computeHotArticle() {//1.查询前5天的文章数据Date dateParam = DateTime.now().minusDays(50).toDate();List<ApArticle> apArticleList = apArticleMapper.findArticleListByLast5days(dateParam);//2.计算文章的分值List<HotArticleVo> hotArticleVoList = computeHotArticle(apArticleList);//3.为每个频道缓存30条分值较高的文章cacheTagToRedis(hotArticleVoList);}@Autowiredprivate IWemediaClient wemediaClient;@Autowiredprivate CacheService cacheService;/*** 为每个频道缓存30条分值较高的文章* @param hotArticleVoList*/private void cacheTagToRedis(List<HotArticleVo> hotArticleVoList) {//每个频道缓存30条分值较高的文章ResponseResult responseResult = wemediaClient.getChannels();if(responseResult.getCode().equals(200)){String channelJson = JSON.toJSONString(responseResult.getData());List<WmChannel> wmChannels = JSON.parseArray(channelJson, WmChannel.class);//检索出每个频道的文章if(wmChannels != null && wmChannels.size() > 0){for (WmChannel wmChannel : wmChannels) {List<HotArticleVo> hotArticleVos = hotArticleVoList.stream().filter(x -> x.getChannelId().equals(wmChannel.getId())).collect(Collectors.toList());//给文章进行排序,取30条分值较高的文章存入redis key:频道id value:30条分值较高的文章sortAndCache(hotArticleVos, ArticleConstants.HOT_ARTICLE_FIRST_PAGE + wmChannel.getId());}}}//设置推荐数据//给文章进行排序,取30条分值较高的文章存入redis key:频道id value:30条分值较高的文章sortAndCache(hotArticleVoList, ArticleConstants.HOT_ARTICLE_FIRST_PAGE+ArticleConstants.DEFAULT_TAG);}/*** 排序并且缓存数据* @param hotArticleVos* @param key*/private void sortAndCache(List<HotArticleVo> hotArticleVos, String key) {hotArticleVos = hotArticleVos.stream().sorted(Comparator.comparing(HotArticleVo::getScore).reversed()).collect(Collectors.toList());if (hotArticleVos.size() > 30) {hotArticleVos = hotArticleVos.subList(0, 30);}cacheService.set(key, JSON.toJSONString(hotArticleVos));}/*** 计算文章分值* @param apArticleList* @return*/private List<HotArticleVo> computeHotArticle(List<ApArticle> apArticleList) {List<HotArticleVo> hotArticleVoList = new ArrayList<>();if(apArticleList != null && apArticleList.size() > 0){for (ApArticle apArticle : apArticleList) {HotArticleVo hot = new HotArticleVo();BeanUtils.copyProperties(apArticle,hot);Integer score = computeScore(apArticle);hot.setScore(score);hotArticleVoList.add(hot);}}return hotArticleVoList;}/*** 计算文章的具体分值* @param apArticle* @return*/private Integer computeScore(ApArticle apArticle) {Integer scere = 0;if(apArticle.getLikes() != null){scere += apArticle.getLikes() * ArticleConstants.HOT_ARTICLE_LIKE_WEIGHT;}if(apArticle.getViews() != null){scere += apArticle.getViews();}if(apArticle.getComment() != null){scere += apArticle.getComment() * ArticleConstants.HOT_ARTICLE_COMMENT_WEIGHT;}if(apArticle.getCollection() != null){scere += apArticle.getCollection() * ArticleConstants.HOT_ARTICLE_COLLECTION_WEIGHT;}return scere;}
}
在ArticleApplication的引导类中添加以下注解
@EnableFeignClients(basePackages = "com.heima.apis")
定时计算
①:在heima-leadnews-article中的pom文件中新增依赖
<!--xxl-job-->
<dependency><groupId>com.xuxueli</groupId><artifactId>xxl-job-core</artifactId><version>2.3.0</version>
</dependency>
② 在xxl-job-admin中新建执行器和任务
新建执行器:leadnews-hot-article-executor
新建任务:路由策略为轮询,Cron表达式:0 0 2 * * ?
③ leadnews-article中集成xxl-job
XxlJobConfig
import com.xxl.job.core.executor.impl.XxlJobSpringExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** xxl-job config** @author xuxueli 2017-04-28*/
@Configuration
public class XxlJobConfig {private Logger logger = LoggerFactory.getLogger(XxlJobConfig.class);@Value("${xxl.job.admin.addresses}")private String adminAddresses;@Value("${xxl.job.executor.appname}")private String appname;@Value("${xxl.job.executor.port}")private int port;@Beanpublic XxlJobSpringExecutor xxlJobExecutor() {logger.info(">>>>>>>>>>> xxl-job config init.");XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();xxlJobSpringExecutor.setAdminAddresses(adminAddresses);xxlJobSpringExecutor.setAppname(appname);xxlJobSpringExecutor.setPort(port);return xxlJobSpringExecutor;}
}
在nacos配置新增配置
xxl:job:admin:addresses: http://192.168.200.130:8888/xxl-job-adminexecutor:appname: leadnews-hot-article-executorport: 9999
④:在article微服务中新建任务类
import com.heima.article.service.HotArticleService;
import com.xxl.job.core.handler.annotation.XxlJob;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;@Component
@Slf4j
public class ComputeHotArticleJob {@Autowiredprivate HotArticleService hotArticleService;@XxlJob("computeHotArticleJob")public void handle(){log.info("热文章分值计算调度任务开始执行...");hotArticleService.computeHotArticle();log.info("热文章分值计算调度任务结束...");}
}
查询文章接口改造
在ApArticleService中新增方法
/*** 加载文章列表* @param dto* @param type 1 加载更多 2 加载最新* @param firstPage true 是首页 flase 非首页* @return*/
public ResponseResult load2(ArticleHomeDto dto,Short type,boolean firstPage);
实现方法
/*** 加载文章列表* @param dto* @param type 1 加载更多 2 加载最新* @param firstPage true 是首页 flase 非首页* @return*/
@Override
public ResponseResult load2(ArticleHomeDto dto, Short type, boolean firstPage) {if(firstPage){String jsonStr = cacheService.get(ArticleConstants.HOT_ARTICLE_FIRST_PAGE + dto.getTag());if(StringUtils.isNotBlank(jsonStr)){List<HotArticleVo> hotArticleVoList = JSON.parseArray(jsonStr, HotArticleVo.class);ResponseResult responseResult = ResponseResult.okResult(hotArticleVoList);return responseResult;}}return load(type,dto);
}
修改控制器
/*** 加载首页* @param dto* @return*/
@PostMapping("/load")
public ResponseResult load(@RequestBody ArticleHomeDto dto){// return apArticleService.load(dto, ArticleConstants.LOADTYPE_LOAD_MORE);return apArticleService.load2(dto, ArticleConstants.LOADTYPE_LOAD_MORE,true);
}
测试成功