分布式事务控制方案
本篇文章给出一种要求高可用性(AP思想)的分布式事务控制方案
上篇回顾:点我查看
- 分布式事务控制方案
- 1、前景回顾
- 2、数据库和缓存的操作
- 3、分布式文件系统
- 1)页面静态化
- 2)远程调用
- 3)调用接口方法
- 4、分布式搜索系统
- 1)部署ES
- 1. 准备工作
- 2. 添加文档
- 3. 搜索文档
- 2)远程调用
- 3)调用接口方法
- 5、总结
1、前景回顾
在上一节中我们完成了分布式事务控制方案的流程设计,使用本地消息表+任务调度的方式实现分布式事务控制方案,并完成了消息表的操作,流程控制抽象类的实现,和任务调度的准备工作。
那么我们这一节会对四个小任务进行实现。
这四个小任务分别是将课程发布信息同步到数据库、缓存、分布式文件系统和分布式搜索系统。
//课程发布任务处理@Overridepublic boolean execute(MqMessage mqMessage) {//获取消息相关的业务信息String businessKey1 = mqMessage.getBusinessKey1();long courseId = Integer.parseInt(businessKey1);//课程数据库saveCourseToDB(mqMessage, courseId);//课程缓存saveCourseCache(mqMessage, courseId);//课程静态化generateCourseHtml(mqMessage, courseId);//课程索引saveCourseIndex(mqMessage, courseId);return true;}
下面对这四个方法进行实现。
2、数据库和缓存的操作
对于MySQL和Redis的操作代码比较简单,不涉及微服务之间的调用,代码如下
对数据库MySQL的操作包括插入课程发布表、删除课程预发布表、更新消息表,需要开启@Transactional
@Transactional通过注解的方式开始事务,需要注意使用方式,避免注解失效
@Transactional失效的场景有以下几点:
- 方法没有被public修饰
- 方法捕获异常,没有抛出
- 数据库不支持事务,(我们这里使用MySQL是Innodb引擎,是支持事务的)
@Transactional@Override//将课程信息插入到课程发布表中public void saveCourseToMQ(MqMessage mqMessage, long courseId) {//消息idLong id = mqMessage.getId();//消息处理的serviceMqMessageService mqMessageService = this.getMqMessageService();//消息幂等性处理int stageOne = mqMessageService.getStageOne(id);if(stageOne == 1){return ;}//查询课程预发布表CoursePublishPre coursePublishPre = coursePublishPreMapper.selectById(courseId);if (coursePublishPre == null) {EduVideoException.cast("请先提交课程审核,审核通过才可以发布");}//本机构只允许提交本机构的课程if (!coursePublishPre.getCompanyId().equals(companyId)) {EduVideoException.cast("不允许提交其它机构的课程。");}//课程审核状态String auditStatus = coursePublishPre.getStatus();//审核通过方可发布if (!"202004".equals(auditStatus)) {EduVideoException.cast("操作失败,课程审核通过方可发布。");}//保存课程发布信息到课程发布表中,并更新课程基本信息表saveCoursePublish(courseId);//删除课程预发布表对应记录coursePublishPreMapper.deleteById(courseId);//保存第一阶段状态mqMessageService.completedStageOne(id);}
这里对于缓存的建立采用加锁,插入数据的方式。
//将课程信息缓存至redis@Overridepublic void saveCourseCache(MqMessage mqMessage, long courseId) {//消息idLong id = mqMessage.getId();//消息处理的serviceMqMessageService mqMessageService = this.getMqMessageService();//消息幂等性处理int stageTwo = mqMessageService.getStageTwo(id);if(stageTwo == 0){// 分布式锁RLock lock = redissonClient.getLock("coursequerylock:" + courseId);// 获取锁lock.lock();try {System.out.println("从数据库查询...");//从数据库查询CoursePublishDto coursePublishDto = CoursePublish(courseId);redisTemplate.opsForValue().set("course:" + courseId, JSON.toJSONString(coursePreviewDto), 300 + new Random().nextInt(100), TimeUnit.SECONDS);} catch (Exception e) {e.printStackTrace();return ;}finally {//释放锁lock.unlock();}}//保存第一阶段状态mqMessageService.completedStageTwo(id);}
而分布式文件系统和搜索系统都涉及到微服务之间的调用,需要更详细的论述
3、分布式文件系统
调用分布式文件系统的接口实现数据同步,需要先实现页面静态化,然后远程调用插入文件系统的接口
1)页面静态化
我们需要把发布的课程的浏览页生成静态页面,并保存到分布式文件系统中。
页面静态化使用FreeMarker中间件实现,这是一个模板引擎,简单易懂,功能强大。
FreeMarker的依赖、配置和ftl模版的创建不在这里赘述,直接给代码,FreeMarker的使用需要加载模版文件并准备数据,把数据插入到模版中,就能得到一个String类型的变量,这个变量通过输入流,输出流的copy,写入一个临时文件中,在后续会通过远程调用分布式文件系统的保存api插入到文件系统中。
@Overridepublic File generateCourseHtml(Long courseId) {//静态化文件File htmlFile = null;try {//配置freemarkerConfiguration configuration = new Configuration(Configuration.getVersion());//加载模板//选指定模板路径,classpath下templates下//得到classpath路径String classpath = this.getClass().getResource("/").getPath();configuration.setDirectoryForTemplateLoading(new File(classpath + "/templates/"));//设置字符编码configuration.setDefaultEncoding("utf-8");//指定模板文件名称Template template = configuration.getTemplate("course_template.ftl");//准备数据CoursePreviewDto coursePreviewInfo = this.getCoursePreviewInfo(courseId);Map<String, Object> map = new HashMap<>();map.put("model", coursePreviewInfo);//静态化//参数1:模板,参数2:数据模型String content = FreeMarkerTemplateUtils.processTemplateIntoString(template, map);// System.out.println(content);//将静态化内容输出到文件中InputStream inputStream = IOUtils.toInputStream(content);//创建静态化文件htmlFile = File.createTempFile("course", ".html");log.debug("课程静态化,生成静态文件:{}", htmlFile.getAbsolutePath());//输出流FileOutputStream outputStream = new FileOutputStream(htmlFile);IOUtils.copy(inputStream, outputStream);} catch (Exception e) {log.error("课程静态化异常:{}", e.toString());EduVideoException.cast("课程静态化异常");}return htmlFile;}
2)远程调用
课程发布的微服务和分布式文件系统的微服务是两个微服务,他们之间数据传递需要通过接口来实现,远程调用使用Feign来实现。
Feign是远程调用中间件,内部集成了hystrix和Ribbon,方便进行熔断降级的设置。
Feign通过实现HTTP协议来实现远程调用,我们需要在课程发布的微服务中创建分布式文件系统中对应方法的接口,这个接口的类中通过设置@FeignClient的url,以及在方法的@RequestMapping中设置value,两个值的拼接结果就是远程接口的HTTP地址。
我们的项目通过网关Gateway统一管理所有的微服务,所以在FeignClient中写入微服务的id,在RequestMapping中写入接口具体的路径。
我们还需要实现熔断降级,提高系统服务的高可用性,避免由于下游服务的异常导致整个系统的崩溃
熔断通过在配置文件中设置Hystrix的熔断时间来完成
降级通过创建Ribbon的熔断时间和降级方法来完成
/*** @author zkp15* @version 1.0* @description OpenFeign接口* @date 2024/3/22 16:49*/
@FeignClient(value = "media-api", configuration = MultipartSupportConfig.class,fallbackFactory = MediaServiceClientFallbackFactory.class)
public interface MediaServiceClient {@RequestMapping(value = "/media/upload/coursefile", consumes = MediaType.MULTIPART_FORM_DATA_VALUE)String uploadFile(@RequestPart("filedata") MultipartFile upload,@RequestParam(value = "folder", required = false) String folder,@RequestParam(value = "objectName", required = false) String objectName);
}
降级方法
/*** @author zkp15* @version 1.0* @description 熔断降级的方法* @date 2023/3/22 17:57*/
@Component
public class MediaServiceClientFallbackFactory implements FallbackFactory<MediaServiceClient> {@Overridepublic MediaServiceClient create(Throwable throwable) {return new MediaServiceClient(){@Overridepublic String uploadFile(MultipartFile upload, String folder, String objectName) {//降级方法log.debug("调用媒资管理服务上传文件时发生熔断,异常信息:{}",throwable.toString(),throwable);return null;}};}
}
还有一个问题需要注意,就是页面静态化的结果是File文件,而Feign远程调用传递的是一个MultipartFile文件,所以,我们还需要对文件格式进行转化,通过CommonsMultipartFile类实现转化,他接受一个输出流,所以还需要先把File转化为输入流,并设置远程调用的请求头的格式为multipart/form-data
public static MultipartFile getMultipartFile(File file) {FileItem item = new DiskFileItemFactory().createItem("file", MediaType.MULTIPART_FORM_DATA_VALUE, true, file.getName());try (FileInputStream inputStream = new FileInputStream(file);OutputStream outputStream = item.getOutputStream();) {IOUtils.copy(inputStream, outputStream);} catch (Exception e) {e.printStackTrace();}return new CommonsMultipartFile(item);}
3)调用接口方法
//保存课程索引信息public void saveCourseIndex(MqMessage mqMessage, long courseId){log.debug("保存课程索引信息,课程id:{}",courseId);//消息idLong id = mqMessage.getId();//消息处理的serviceMqMessageService mqMessageService = this.getMqMessageService();//消息幂等性处理int stageTwo = mqMessageService.getStageTwo(id);if(stageTwo > 0){log.debug("课程索引已处理直接返回,课程id:{}",courseId);return ;}Boolean result = saveCourseIndexToES(courseId);if(result){//保存第一阶段状态mqMessageService.completedStageTwo(id);}}
4、分布式搜索系统
ES,Elasticsearch,是ELK三件套之一,是一个分布式的、开源的搜索分析引擎,支持各种数据类型,包括文本、数字、地理、结构化、非结构化。
我们使用ES和Kibana进行分布式搜索引擎的开发。
1)部署ES
1. 准备工作
我们在docker中部署ES和Kibana,有需要的同学也可以进行其他部署方式,进入Kibana开发页面,通过DSL直接创建课程发布的索引,当然我们也可以在项目中创建索引,但是索引的创立只用一次。
创建一个微服务,添加ES的依赖和配置文件,注册到Nacos,接受gateway管理,添加配置类,创建RestHighLevelClient客户端,之后我们通过这个客户端进行索引和文档CRUD
@Configuration
public class ElasticsearchConfig {@Value("${elasticsearch.hostlist}")private String hostlist;@Beanpublic RestHighLevelClient restHighLevelClient(){//解析hostlist配置信息String[] split = hostlist.split(",");//创建HttpHost数组,其中存放es主机和端口的配置信息HttpHost[] httpHostArray = new HttpHost[split.length];for(int i=0;i<split.length;i++){String item = split[i];httpHostArray[i] = new HttpHost(item.split(":")[0], Integer.parseInt(item.split(":")[1]), "http");}//创建RestHighLevelClient客户端return new RestHighLevelClient(RestClient.builder(httpHostArray));}}
在微服务中根据字段的mapping创建PO类,其中日期的需要注意格式,如下所示,创建service服务
@JSONField(format="yyyy-MM-dd HH:mm:ss")@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")private LocalDateTime createDate;
在完成PO和service的创建后,在controller中实现建立索引和检索索引的接口。
2. 添加文档
我们通过RestHighLevelClient客户端实现文档的添加,client需要传入一个IndexRequest对象,在这个对象中添加文档内容和文档id,得到一个IndexResponse对象,通过比较其中字段得知是否插入成功。
public Boolean addCourseIndex(String indexName, String id, Object object) {String jsonString = JSON.toJSONString(object);IndexRequest indexRequest = new IndexRequest(indexName).id(id);//指定索引文档内容indexRequest.source(jsonString, XContentType.JSON);//索引响应对象IndexResponse indexResponse = null;try {indexResponse = client.index(indexRequest, RequestOptions.DEFAULT);} catch (IOException e) {log.error("添加索引出错:{}", e.getMessage());EduVideoException.cast("添加索引出错");}String name = indexResponse.getResult().name();return name.equalsIgnoreCase("created") || name.equalsIgnoreCase("updated");}
3. 搜索文档
对于搜索文档,我们的业务是根据课程的分类、难易程度、关键字进行检索,如下面在线课程平台的搜索页面所示。
通过分析发现,一级目录二级目录和难易程度都是通过比较匹配,而关键字需要通过全文检索来匹配,将关键字与文档的名称name和内容描述description进行匹配,返回的结果分页显示,并且对关键字匹配到的名称进行高亮显示
首先描述课程检索方法的流程,我们通过SearchRequest对象进行查询,绑定要插入的索引Index,传入一个SearchSourceBuilder对象,在这个对象中进行条件查询、分页、高亮设置等等。client客户端会返回一个Response对象,得到结果集SearchHit[],这是一个数组对象,遍历取出courseIndex对象,这是我们自己定义的结果体,将其中的name字段替换为hit中的高亮字段,并返回。
方法中需要传入两个参数,分页的参数和查询的参数
@Overridepublic SearchPageResultDto<CourseIndex> queryCoursePubIndex(PageParams pageParams, SearchCourseParamDto courseSearchParam) {......}
创建SearchSourceBuilder 查询对象
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();//source源字段过虑String[] sourceFieldsArray = sourceFields.split(",");searchSourceBuilder.fetchSource(sourceFieldsArray, new String[]{});if(courseSearchParam==null){courseSearchParam = new SearchCourseParamDto();}
从传入的courseSearchParam中分别取出关键字,一级二级目录,课程难度
关键字的搜索方式为全文检索,使用MultiMatchQueryBuilder来对课程的名称name和描述description进行匹配,并设置匹配占比70%
后三个字段的的搜索方式为匹配过滤,
//关键字if(StringUtils.isNotEmpty(courseSearchParam.getKeywords())){//匹配关键字MultiMatchQueryBuilder multiMatchQueryBuilder = QueryBuilders.multiMatchQuery(courseSearchParam.getKeywords(), "name", "description");//设置匹配占比multiMatchQueryBuilder.minimumShouldMatch("70%");//提升另个字段的Boost值multiMatchQueryBuilder.field("name",10);boolQueryBuilder.must(multiMatchQueryBuilder);}//过虑if(StringUtils.isNotEmpty(courseSearchParam.getMt())){boolQueryBuilder.filter(QueryBuilders.termQuery("mtName",courseSearchParam.getMt()));}if(StringUtils.isNotEmpty(courseSearchParam.getSt())){boolQueryBuilder.filter(QueryBuilders.termQuery("stName",courseSearchParam.getSt()));}if(StringUtils.isNotEmpty(courseSearchParam.getGrade())){boolQueryBuilder.filter(QueryBuilders.termQuery("grade",courseSearchParam.getGrade()));}
分页需要手动计算起始位置和查询条数
//分页Long pageNo = pageParams.getPageNo();Long pageSize = pageParams.getPageSize();int start = (int) ((pageNo-1)*pageSize);searchSourceBuilder.from(start);searchSourceBuilder.size(Math.toIntExact(pageSize));
创建SearchSourceBuilder对象,传入搜索条件和分页,并设置高亮字段
高亮字段将关键字对文档字段的name进行匹配,结果通过前置后置HTML尖括号的形式进行高亮设置
//设置索引SearchRequest searchRequest = new SearchRequest(courseIndexStore);//布尔查询searchSourceBuilder.query(boolQueryBuilder);//高亮设置HighlightBuilder highlightBuilder = new HighlightBuilder();highlightBuilder.preTags("<font class='eslight'>");highlightBuilder.postTags("</font>");//设置高亮字段highlightBuilder.fields().add(new HighlightBuilder.Field("name"));searchSourceBuilder.highlighter(highlightBuilder);//请求搜索searchRequest.source(searchSourceBuilder);
通过客户端client进行条件查询,得到Response对象,从中得到结果集SearchHit[],一个数组对象,遍历hit,从中取出courseIndex对象,将其中的name字段替换为hit中的高亮字段这个属性值,最终得到真正的查询结果集
try {searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);} catch (IOException e) {e.printStackTrace();log.error("课程搜索异常:{}",e.getMessage());return new SearchPageResultDto<CourseIndex>(new ArrayList(),0,0,0);}//结果集处理SearchHits hits = searchResponse.getHits();SearchHit[] searchHits = hits.getHits();//记录总数TotalHits totalHits = hits.getTotalHits();//数据列表List<CourseIndex> list = new ArrayList<>();for (SearchHit hit : searchHits) {String sourceAsString = hit.getSourceAsString();CourseIndex courseIndex = JSON.parseObject(sourceAsString, CourseIndex.class);//取出sourceMap<String, Object> sourceAsMap = hit.getSourceAsMap();//课程idLong id = courseIndex.getId();//取出名称String name = courseIndex.getName();//取出高亮字段内容Map<String, HighlightField> highlightFields = hit.getHighlightFields();if(highlightFields!=null){HighlightField nameField = highlightFields.get("name");if(nameField!=null){Text[] fragments = nameField.getFragments();StringBuffer stringBuffer = new StringBuffer();for (Text str : fragments) {stringBuffer.append(str.string());}name = stringBuffer.toString();}}courseIndex.setId(id);courseIndex.setName(name);list.add(courseIndex);}SearchPageResultDto<CourseIndex> pageResult = new SearchPageResultDto<>(list, totalHits.value,pageNo,pageSize);return pageResult;
2)远程调用
与分布式文件系统的远程调用类似,使用Feign进行微服务之间的接口的调用,注意传参格式,开启熔断降级策略,下面直接给出代码实现
远程调用接口
/*** @author zkp15* @version 1.0* @description 远程调用搜索服务接口* @date 2023/6/22 21:37*/
@FeignClient(value = "search",fallbackFactory = SearchServiceClientFallbackFactory.class)
public interface SearchServiceClient {@PostMapping("/search/index/course")public Boolean add(@RequestBody CourseIndex courseIndex);
}
降级策略
/*** @author zkp15* @version 1.0* @description 远程调用搜索异常阻塞降级* @date 2023/6/22 21:38*/
@Slf4j
@Component
public class SearchServiceClientFallbackFactory implements FallbackFactory<SearchServiceClient> {@Overridepublic SearchServiceClient create(Throwable throwable) {return new SearchServiceClient() {@Overridepublic Boolean add(CourseIndex courseIndex) {throwable.printStackTrace();log.debug("调用搜索发生熔断走降级方法,熔断异常:", throwable.getMessage());return false;}};}
}
3)调用接口方法
//生成课程静态化页面并上传至文件系统public void generateCourseHtml(MqMessage mqMessage,long courseId){log.debug("开始进行课程静态化,课程id:{}",courseId);//消息idLong id = mqMessage.getId();//消息处理的serviceMqMessageService mqMessageService = this.getMqMessageService();//消息幂等性处理int stageOne = mqMessageService.getStageOne(id);if(stageOne == 1){log.debug("课程静态化已处理直接返回,课程id:{}",courseId);return ;}//生成静态化页面File file = coursePublishService.generateCourseHtml(courseId);//上传静态化页面if(file!=null){coursePublishService.uploadCourseHtml(courseId,file);}//保存第一阶段状态mqMessageService.completedStageOne(id);}
5、总结
本文在上一篇文章的基础上,对四个任务进行了实现。
四个任务在之前时都需要进行数据库状态的校验,确保任务执行的幂等性
- 数据库MySQL,插入课程发布表,删除课程预发布表
- 缓存Redis,插入kv值,建立缓存
- 分布式文件系统MinIO,将课程页面静态化处理,并上传到文件系统
- 分布式搜索系统ES,建立课程发布的索引,并插入文档,建立约束Mapping,实现关键字全文检索、目录和难度匹配
至此,这个分布式事务AP控制方案全部完成,从业务背景分析,技术选型,方案设计,消息表设计,任务调度,到数据库,缓存,文件系统,搜索系统的实现,可谓是硕果累累,愿与读者共勉,做强做大,再创辉煌。