【微服务】mysql + elasticsearch数据双写设计与实现

目录

一、前言

二、为什么使用mysql+es双写

2.1 单用mysql的问题

2.2 为什么不直接使用es

2.2.1 非关系型表达

2.2.2 不支持事务

2.2.3 多字段将造成性能低下

三、mysql+es双写方案设计要点

3.1 全新设计 VS 中途调整架构

3.2 全表映射 VS 关键字段存储

3.2.1 最大程度发挥es性能

3.2.2 选择mysql还是es作为数据托底

3.3 数据一致性保障

3.3.1 同步双写

3.3.2 异步双写

3.3.3 定期同步

3.3.4 数据订阅

四、mysql+es双写方案数据迁移

4.1 数据迁移整体方案

4.1.1 创建索引

4.1.2 双写改造

4.1.3 数据迁移

4.1.4 搜索服务上线

4.2 数据迁移补充说明

五、方案实施

5.1 前置准备

5.1.1 搭建环境

5.1.2 创建数据表

5.1.3 插入初始化数据

5.1.4 创建一个索引

5.2 搭建springboot工程

5.2.1 引入基础依赖

5.2.2 核心配置文件

5.2.3 es客户端连接配置

5.2.3 mybatis文件

5.2.4 业务实现类

5.2.4 相关测试

5.3 双写业务实现

5.4 数据搜索

5.5 数据迁移

六、写在文末


一、前言

在很多电商网站中,对商品的搜索要求很高,主要体现在页面快速响应搜索结果。这就对服务端接口响应速度提出了很高的要求。而商品数据存储离不开mysql,在高并发场景下,尤其是数据规模达到一定量级,mysql的性能瓶颈一定会出现,为了满足极致的搜索速度,往往需要借助第三方存储,比如nosql数据库,当然主流的搭配还是使用搜索引擎来完成,于是在很多场景下,会选择mysql+elasticsearch来满足这个场景下对搜索的要求。

如下是一个典型的使用mysql+es实现数据双写的应用场景。

二、为什么使用mysql+es双写

2.1 单用mysql的问题

在很多互联网项目中,mysql数据库仍然是主流,毕竟关系型数据库可以处理现实场景中很多复杂的业务模型,但是mysql随着数据规模的增长,一旦单表数据量达到了千万级,性能将下降的很快,于是不得不进行数据库的扩展,这样也带来了架构上的复杂性,综合来说,在类似某宝,某东等这样的电商场景下,单表存储数据带来的问题主要如下:

  • 单表数据承载有限,当数据规模超过千万就要考虑分库或分表,从而给数据库架构设计提出新的挑战;

  • mysql不适合全文检索,经管mysql从某个版本支持了全文检索,但是在实际使用中性能很弱;

  • mysql的模糊匹配无法满足多场景下的复杂的搜索要求,比如电商场景下,多维度任意组合搜索是很常用得,而复杂的搜索将会使得mysql性能急剧下降;

2.2 为什么不直接使用es

到这里也会有人提出疑问,既然es搜索速度如此高效,并且也可以存储数据,直接使用es存储mysql表中的数据不就行了。对于这个问题,主要从下面几点进行考虑,

2.2.1 非关系型表达

使用mysql进行数据库设计的一个好处就是,不同的表之间可以通过某个字段进行关联,关联关系的存在,让现实中复杂的业务模型通过表关联进行实现,而es则不支持不同索引之间的关联搜索。

2.2.2 不支持事务

mysql事务的存在,让数的写入完整性得到保障,而es是不支持事务的,这就导致在往es写数据时,数据的一致性需要通过其他的手段来保障。

2.2.3 多字段将造成性能低下

上面谈到,由于es不支持关联查询,实际业务中,一个页面展现的数据往往来自多张表的关联聚合查询结果,es为了达到与mysql同样的效果,只好尽可能在一个索引中冗余更多的字段,从es存储的角度来说,es是基于字段的,大行超多字段将会大大降低性能,同时也会导致后续数据的维护困难和复杂性。

三、mysql+es双写方案设计要点

在很多开发者看来,使用mysql+es双写的方案,就是把某个高频搜索的表的数据存储一份到es就可以了,这么理解倒也不错,不过还缺少很多深入的考虑。在正式开始设计方案之前,需要重点考虑下面几点,这也将是本文后续探讨的重点,以及在实际开发中需要关注的。

3.1 全新设计 VS 中途调整架构

这是一个很现实的摆在很多架构设计者面前的问题,为什么这么说呢,在很多企业的项目中,经历了从单体架构到微服务的改造,从简单的http调用,webservice调用到使用dubbo等服务治理的技术改造。

如果现在的你正在经历一个全新的项目,那么恭喜你,你可以拥有更多的技术选型空间,但是如果你正则经历项目的服务化改造,这个过程可能比较痛苦,不仅要考虑引入新技术的成本,更要考虑新技术的实现会给未来技术的演进带来何种影响,包括团队学习、维护成本,上线后的运维成本,与其他技术的融合成本等。

回到上面的问题,在使用mysql+es双写方案来说,同样会面临相同的难题,如果是全新的设计,主要考虑的是如何实现mysql与es双写数据的一致性,及如何基于团队成员现有的技术、业务上对双写数据实时性等方面,评估出以最低成本的实现方案即可。

而如果是中途更换设计方案,比如线上的数据规模已经达到千万量级,顶不住客户的压力到了不得不调整架构的阶段来考虑这个问题,这个过程将会拉得很长。此时你考虑的点会更多了,包括:

  • 如何设计es索引?

  • 如何基于现有的代码实现数据双写并且尽可能降低对现有逻辑的侵入性?

  • 如何保障双写数据的一致性?

  • 针对历史数据如何迁移?

  • 如何减少生产上线后的实施成本和运维成本?

  • ...

3.2 全表映射 VS 关键字段存储

使用过mysql的同学应该不陌生,mysql是行式存储数据,而es中,数据则以准json的结构存储,两者之间经管能够通过字段进行对应,但在检索的时候原理是不一样的,如下图所示。

在实际使用mysql+es进行双写方案设计时,很多人直接就认为,将mysql的表字段进行一份全量的拷贝到es的索引中即可,这样从实现上固然没有差别,最终也能达到效果,但这样做真的合理吗?在进行方案设计的时候,从实际经验来说,功能的实现固然重要,但如何做到既能满足功能,又能让设计显得合理才是更需要深入思考的。就这个问题来说,如何才算合理呢?可以从下面几点展开思考。

3.2.1 最大程度发挥es性能

不管是mysql,还是es,不管是hbase还是clickhouse...所有的数据存储介质,都有自己的优势和不足,因此在选择某种存储引擎时一定是利用其优势,同时规避其不足。就es来说,选择它的原因就是因为在海量的数据且复杂的检索场景下,仍然能够保持高性能。

在上文也谈到单纯使用es带来的不足,其中值得注意的一点就是,es是基于字段存储的,对一行数据来说,字段数量越多,当一个待检索的请求发来时,其计算耗费的成本必然越高,这不仅是针对es,甚至mysql等很多关系型数据库,对于单表过多字段的冗余设计也不推荐,所以对es来说,也不建议存储mysql表的所有字段,而是关键的具有重要业务意义的字段数据。

3.2.2 选择mysql还是es作为数据托底

这是一个架构设计中容易被忽略的问题。文章开始谈到,一个基本的业务场景是,主业务数据写入到mysql,同时将数据同步写入es,检索从es获取数据。那么问题来了,实际业务中,究竟以哪个数据为准呢?我们以下面一个简单的同步写入场景的业务逻辑为例来说明相信就能理解了。

@Transactional
public boolean save(){//数据组装try{//写入mysql//写入es}catch(Exception e){//es数据回滚}
}

这是一段同步双写的伪代码,从这段代码不难看出,mysql的写入由事务机制保障,但是es的数据写入与回滚就比较麻烦了,而且这样的实现对业务逻辑的侵入性强,维护性差,但可以发现,我们首要保障的是mysql数据的完整性,因为只有数据成功写入,界面上展示的数据才是正确的。

从这个分析结合实际的业务实现,以一个电商或类似的场景,从产品列表到具体的详情页面为例进行说明,参考下面的流程;

  • 用户浏览列表页;

  • 用户从列表页通过关键字搜索目标数据;

  • 从搜到的结果中选择某个具体的产品;

  • 进入具体产品的页,展示与当前产品完整的数据;

从上面的业务流程分析不难看出,实际要展示某个产品字段数据是非常多的,以某大型电商网站上面展现的某个产品为例,展现在用户面前的商品包括了非常多的数据,这些数据是多个源表经过服务端聚合以后再经过复杂的处理得到的,所以如果将这么多的字段放在es的某个索引中,这明显是不合适的,总结来说,两者搭配使用时可以遵循下面的思路:

  • es存放核心业务表的核心字段,比如产品ID,产品的详情描述,SKU等信息;

  • 列表搜索走es索引,通过es的检索,返回业务主键等关键信息;

  • 将第二步es得到的数据给到mysql的业务表,返回最终的数据给到页面;

从上面的分析来看,在实际业务中,应该酌情考虑是否应该将核心业务表的全量数据存于es,一般建议业务表的核心字段,比如业务主键 + 高频搜索的字段存放es中;

3.3 数据一致性保障

使用双写方案在实际操作中,基于双写方案,如何保障mysql与es的数据一致性是设计与开发过程中需要重点关注的。

我们知道,mysql有事务机制保障数据的一致性,而es没有事务,在上文的伪代码中仅仅是使用了一种非常简单的逻辑来保障,这样是远远不够的。一旦发生了mysql与es数据的不一致,带来的问题是很严重的。关于如何保障数据的一致性,结合实际操作经验,给出下面的几点建议:

3.3.1 同步双写

同步双写是保障数据一致性最简单的方式,也是实际操作中比较简单的操作方式,只需要将数据写到 MySQL 时,同时将数据写到 ES即可,通过mysql自身的事务机制间接保障两者数据一致性,其优缺点如下。

优点:

  • 这种方式简单粗暴,实时写入能做到秒级。

缺点:

  • 业务耦合,代码侵入性强,即在代码中需要写入mysql表的位置都需要加写入es的代码;

  • 性能影响,同步写入两个存储,响应时间变长;

  • 可能存在丢数据的风险;

3.3.2 异步双写

异步双写,即在数据写入mysql的同时,异步写入到es中,具体在实践过程中也有多种方式可以选择,下面提供几种方案。

异步线程

利用异步线程的方式,写入mysql的时候,开启多线程写入es;

内存队列

可以利用Java中提供的内存队列,写入mysql的同时向内存队列,比如BlockingQueue,另有一个线程消费内存队列中的数据写入es;

事件监听

主业务流程写入mysql的同时发布事件,另有一个事件订阅者订阅mysql写入事件,从而做到与主业务逻辑的解耦。

引入消息中间件

也可以考虑引入消息中间件,做到与主业务逻辑的彻底解耦,写入mysql的同时,向消息队列发送消息,另有服务消费者订阅消息消费,异步写入es ;

上述各种方式均可以在实践中使用,需要结合团队的技术储备,以及服务器资源,后续的运维成本等综合考虑。  

3.3.3 定期同步

定期同步适合对搜索场景不那么敏感的业务,在这种场景下,可以考虑每隔一段时间,或每天的某些时间点进行同步,将数据批量从mysql写入到es中。定期同步的优缺点如下。

优点:

  • 实现简单,系统资源占用少;

缺点:

  • 实时性难以保证;

  • 瞬时存储压力较大;

3.3.4 数据订阅

既要提高实时性,又要低入侵, 可以考虑利用 MySQL 的 Binlog 来进行同步。在很多数据同步工具中,都采用了类似的思想,简单来说,订阅mysql的binglog日志,然后通过回放binlog日志变化解析出变化的数据,从而进行数据同步。比如大家熟悉的canal就是很好的利用了这一点。

这种方式可以很好的与核心业务解耦,从而实现异步,总结来说,优点如下:

  • 降低对主业务逻辑的代码侵入性;

  • 数据的实时性好;

缺点:

  • 对第三方组件存在一定的依赖性;

  • 同步很难做到灵活性,很难对同步的数据做进一步的处理,比如同步时那些明显有问题的数据;

四、mysql+es双写方案数据迁移

对于一个全新的系统,结合上面考虑的要点,设计出一个相对完善的方案并落地实施不算难事,但是据个人经验,比较难的是中途引入es来补充和完善mysql的搜索能力上的短板。为什么这么说呢?

试想你的生产系统已经运行了很久了,mysql核心业务表也产生了相当量级的数据了。引入es之后,即便是双写,es中的存储的数据也是从某个时间点开始,搜索出来的数据也只有那个时间点之后的。那么之前的数据怎么办呢?肯定不能扔掉的。这时候就需考虑如何将之前mysql中老数据无损的迁入到es索引中。

这时候可能有人说这也不是什么难事吧,找个业务不繁忙的时间段将mysql中的老数据一次性迁移到es不就解决问题了吗?如果真是这么简单,就不会有那么多的麻烦事了,下面结合实践经验,从迁移的方案和迁移注意事项两方面进行说明。

4.1 数据迁移整体方案

以一个对数据搜索场景不是那么敏感的场景为例进行说明。整体业务流程如下:

 结合上面的流程,完整的数据迁移思路如下:

  • 创建索引;

  • 双写方案V1版生产上线(不包括es搜索),业务数据实现mysql+es双写,考虑使用消息中间件,记录时间点为T1;

  • 在完成数据迁移之前,搜索业务逻辑仍然走mysql,此时es索引中存储的是T1时间点开始之后的mysql数据;

  • 业务低峰期,利用数据同步工具或FlinkCDC等方案第一次完成全量迁移,针对T1之前的;

  • 双写方案V2版生产上线,数据搜索走es;

4.1.1 创建索引

建议自定义创建索引,控制索引中的字段信息,结合上面谈到的要点,es索引存储的字段信息为mysql核心业务表中的核心业务字段,比如业务主键,用于搜索的高频字段信息。

4.1.2 双写改造

稳妥起见,在第一个改造发布的版本中,代码逻辑层面先支持双写,比如通过异步线程将数据写入es,此时es索引中就存储了某个时间点T1之后的数据。

4.1.3 数据迁移

使用数据迁移工具或自己开发一个微服务,在业务低峰期(凌晨2点)完成一次全量数据的迁移,迁移完成后,ES中的数据基本与mysql表数据同步了。

4.1.4 搜索服务上线

上线搜索服务,此时数据的搜索将走es,具体的实现逻辑结合自身的业务场景酌情改造。比如上文谈到的,如果产品的详情页面是多个表的聚合结果,首先需要通过搜索得到核心的业务字段信息,然后代入到后面的逻辑中进行数据的组装。

4.2 数据迁移补充说明

以上结合实际场景给出了一个相对通用的数据迁移方案,在实际操作中,遇到的情况可能比这个更复杂,比如你可能遇到下面的这些情况:

  • 你要迁移的数据表经过了分库分表,即业务表的数据存储在多个库或多张表中,这种情况下如何迁移?;

  • 你要迁移的数据表数据量非常大,而且可以预计每月的增长量为几百万,如何保障保证es的存储容量?如何规划es的后续扩容?;

  • 迁移的数据量巨大,需要很久怎么办?

  • 迁移数据量巨大,迁移过程中发生异常怎么办?

  • ...

五、方案实施

下面通过实际代码演示一下完整的业务流程。

5.1 前置准备

5.1.1 搭建环境

这里假设你已经提前搭建好es、mysql的环境。

es的搭建可以参考文章:es脚本编程使用,mysql可以使用下面的docker命令快速开启mysql服务

docker run -p 3307:3306 --name mysql57 \
-v /usr/local/docker/mysql/data:/var/lib/mysql \
-v /usr/local/docker/mysql/conf:/etc/mysql/conf.d \
-v /usr/local/docker/mysql/log:/var/log/mysql \
-e MYSQL_ROOT_PASSWORD=你的root密码\
-d mysql:5.7

5.1.2 创建数据表

使用下面的sql语句创建一张数据表,其中desc字段会被作为高频字段搜索使用

CREATE TABLE `product` (`id` int(12) NOT NULL,`pro_name` varchar(64) DEFAULT NULL,`pro_no` varchar(32) DEFAULT NULL,`price` int(10) DEFAULT NULL,`category` varchar(32) DEFAULT NULL,`stock` int(32) DEFAULT NULL,`desc` varchar(255) DEFAULT NULL,PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

5.1.3 插入初始化数据

为上述的表插入一些数据

INSERT INTO `pt_res`.`product`(`id`, `pro_name`, `pro_no`, `price`, `category`, `stock`, `desc`) VALUES (1, '小米14', 'A100', 3999, 'phone', 32, 'xiao mi phone');
INSERT INTO `pt_res`.`product`(`id`, `pro_name`, `pro_no`, `price`, `category`, `stock`, `desc`) VALUES (2, 'Java入门到精通', 'B100', 56, 'book', 12, 'Java technology');
INSERT INTO `pt_res`.`product`(`id`, `pro_name`, `pro_no`, `price`, `category`, `stock`, `desc`) VALUES (3, '精品男鞋', 'X100', 325, 'shoe', 82, 'Man shoe');

5.1.4 创建一个索引

创建一个名为product的索引,并指定desc字段分词,里面的字段与mysql表对应,但不是所有字段;

PUT product
{"mappings": {"properties": {"id":{"type": "long"},"pro_name": {"type": "keyword"},"desc": {"type": "text"}}}
}

测试创建一条数据

PUT /product/_doc/11
{"pro_name":"汪汪队纪念品","desc":"for children play"
}

查询这条数据

GET /product/_doc/11 

到这里,我们的准备工作就完成了,接下来将在代码中完成剩下的操作。

5.2 搭建springboot工程

本工程要做的事情如下:

  • 整合mybatis,与es;
  • 利用mybatis实现增删改查功能;
  • 利用异步线程写入es;
  • 实现mysql历史数据的迁移;

5.2.1 引入基础依赖

    <dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><scope>runtime</scope></dependency><dependency><groupId>org.mybatis.spring.boot</groupId><artifactId>mybatis-spring-boot-starter</artifactId><version>2.1.4</version></dependency><dependency><groupId>org.elasticsearch</groupId><artifactId>elasticsearch</artifactId><version>7.6.2</version></dependency><dependency><groupId>org.elasticsearch.client</groupId><artifactId>elasticsearch-rest-high-level-client</artifactId><version>7.6.2</version></dependency><!--<dependency><groupId>org.elasticsearch.client</groupId><artifactId>elasticsearch-rest-client</artifactId><version>7.6.2</version></dependency>--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId><version>${boot-web.version}</version></dependency><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.8.15</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>${lomok.version}</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.83</version></dependency></dependencies>

5.2.2 核心配置文件

主要配置mysql,mybatis以及es相关的连接信息

server:port: 8082spring:datasource:username: rootpassword: rooturl: jdbc:mysql://IP:3307/pt_res?serverTimezone=UTC&useUnicode=true&characterEncoding=utf-8&useSSL=falsedriver-class-name : com.mysql.jdbc.Driverelasticsearch:rest:uris: [IP:9200]host: IPport: 9200mybatis:mapper-locations: classpath:mybatis/*.xmltype-aliases-package: com.congge.entity

5.2.3 es客户端连接配置

自定义一个类,自定义一个RestHighLevelClient 的bean,配置es连接信息

import lombok.extern.slf4j.Slf4j;
import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
@Slf4j
public class EsConfig {@Value("${spring.elasticsearch.host}")private String host;@Value("${spring.elasticsearch.port}")private int port;@Bean(name = "restHighLevelClient")public RestHighLevelClient restHighLevelClient() {return new RestHighLevelClient(RestClient.builder(new HttpHost(host, port, "http")));}}

5.2.3 mybatis文件

在resources目录下创建mybatis目录,在里面编写与mysql操作的文件,这里创建一个操作product表的xml文件

<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapperPUBLIC "-//mybatis.org//DTD Mapper 3.0//EN""http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.congge.dao.ProductDao"><resultMap id="BaseResultMap" type="com.congge.entity.Product"><id column="id" property="id" jdbcType="VARCHAR" /><result column="pro_name" property="proName" jdbcType="VARCHAR" /><result column="pro_no" property="proNo" jdbcType="VARCHAR" /><result column="price" property="price" jdbcType="INTEGER" /><result column="category" property="category" jdbcType="VARCHAR" /><result column="stock" property="stock" jdbcType="INTEGER" /><result column="desc" property="desc" jdbcType="VARCHAR" /></resultMap><select id="getAll" resultMap="BaseResultMap">select * from product</select>
</mapper>

注意启动类上面添加dao包的扫描

import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
@MapperScan("com.congge.dao")
public class SyncApp {public static void main(String[] args) {SpringApplication.run(SyncApp.class,args);}}

5.2.4 业务实现类

@Service
public class ProductServiceImpl implements ProductService {@Autowiredprivate ProductDao productDao;@Overridepublic List<Product> getAll() {return productDao.getAll();}
}

5.2.4 相关测试

框架整合完毕之后,及时通过单元测试验证是否整合成功,下面给出了一些关于mysql操作以及索引操作的单元测试用例

import com.alibaba.fastjson.JSONObject;
import com.congge.SyncApp;
import com.congge.entity.Product;
import com.congge.entity.es.ProductInfo;
import com.congge.service.ProductService;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.IndicesClient;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearch.client.indices.GetIndexResponse;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;import java.util.List;
import java.util.Map;//@RunWith(SpringRunner.class)
@SpringBootTest(classes = {SyncApp.class})
public class EsTest {@Autowiredprivate RestHighLevelClient restHighLevelClient;@Autowiredprivate ProductService productService;@Testpublic void testFindAll(){List<Product> all = productService.getAll();System.out.println(all);}@org.junit.jupiter.api.Testvoid contextLoads() {System.out.println(restHighLevelClient);}/*** 判断索引是否存在*/@Testpublic void getIndex() throws Exception {IndicesClient indices = restHighLevelClient.indices();GetIndexRequest student0517 = new GetIndexRequest("product");boolean exists = indices.exists(student0517, RequestOptions.DEFAULT);if(exists){GetIndexResponse indexResponse = indices.get(student0517, RequestOptions.DEFAULT);Map<String, MappingMetaData> mappings = indexResponse.getMappings();System.out.println(mappings);}else{System.out.println("索引不存在");}}@Testpublic void getDocById() throws Exception {GetRequest getRequest = new GetRequest("product").id("11");GetResponse documentFields = restHighLevelClient.get(getRequest, RequestOptions.DEFAULT);//集合方式Map<String, Object> source = documentFields.getSource();for (String key : source.keySet()) {System.out.println(source.get(key));}//字符串  -----JSONString sourceAsString = documentFields.getSourceAsString();System.out.println(sourceAsString);//把JSON转换为 stuent//JSON字符串-->JSON对象JSONObject jsonObject = JSONObject.parseObject(sourceAsString);System.out.println(jsonObject);}@Testpublic void getDocByIdV2() throws Exception {SearchRequest searchRequest = new SearchRequest();searchRequest.indices("product");SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();searchRequest.source(searchSourceBuilder.query(QueryBuilders.termQuery("_id", 11)));searchSourceBuilder.size(1);SearchResponse search = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);SearchHits searchHits = search.getHits();for (SearchHit searchHit : searchHits) {Map<String, Object> sourceMap = searchHit.getSourceAsMap();System.out.println(sourceMap);}}@Testpublic void insertDoc() throws Exception {com.fasterxml.jackson.databind.ObjectMapper objectMapper = new com.fasterxml.jackson.databind.ObjectMapper();IndexRequest indexRequest = new IndexRequest("product");ProductInfo pro = new ProductInfo();pro.setId(13);pro.setPro_name("MP3");pro.setDesc("music player");String proData = objectMapper.writeValueAsString(user);indexRequest.source(proData,XContentType.JSON);//插入数据IndexResponse response = restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);System.out.println(response.status());System.out.println(response.getResult());}}

5.3 双写业务实现

按照上文的业务实现流程,向mysql表插入一条数据,同时写入一条数据到es

    @Override@Transactionalpublic Object save(Product product) {productDao.save(product);saveEs(product)//CompletableFuture.runAsync(() -> saveEs(product), newCachedThreadPool());return product.getId();}public void saveEs(Product product){com.fasterxml.jackson.databind.ObjectMapper objectMapper = new com.fasterxml.jackson.databind.ObjectMapper();IndexRequest indexRequest = new IndexRequest("product");ProductInfo pro = new ProductInfo();pro.setId(product.getId());pro.setPro_name(product.getProName());pro.setDesc(product.getDesc());String productData = null;try {productData = objectMapper.writeValueAsString(pro);} catch (JsonProcessingException e) {e.printStackTrace();}indexRequest.source(productData,XContentType.JSON);//插入数据IndexResponse response = null;try {response = restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);} catch (IOException e) {log.error("save to es error,error : 【{}】",e.getMessage());e.printStackTrace();}System.out.println(response.status());System.out.println(response.getResult());}/*** 带有缓存功能线程池** @return*/public static ExecutorService newCachedThreadPool() {return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());}

使用单元测试测试一下方法

    @Testpublic void testSave(){Product product = new Product();product.setId(6);product.setProName("可比克薯片");product.setProNo("F003");product.setPrice(7);product.setCategory("food");product.setStock(33);product.setDesc("classics food");Object save = productService.save(product);System.out.println(save);}

跑通之后,检查mysql与es的数据是否正常写入

5.4 数据搜索

我们假设用户输入关键字进行搜索,首先通过es的检索,得到表的基本关键字段,比如id,然后去mysql中查询完整的信息,核心业务实现逻辑如下。

@Overridepublic  List<Product> query(String key) {List<Integer> result = queryFromEs(key);List<Product> queryRes = null;if(!CollectionUtils.isEmpty(result)){queryRes =  productDao.getProductIn(result);}return queryRes;}private List<Integer> queryFromEs(String key) {SearchRequest request = new SearchRequest();request.indices("product");SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();FuzzyQueryBuilder fuzzyQueryBuilder =QueryBuilders.fuzzyQuery("desc", key).fuzziness(Fuzziness.ONE);sourceBuilder.query(fuzzyQueryBuilder);request.source(sourceBuilder);SearchResponse response = null;try {response = restHighLevelClient.search(request, RequestOptions.DEFAULT);} catch (IOException e) {e.printStackTrace();}System.out.println(response.getHits().getHits());System.out.println(response.getHits().getTotalHits());SearchHits hits = response.getHits();List<Integer> ids = new ArrayList<>();for (SearchHit searchHit : hits){Map<String, Object> sourceAsMap = searchHit.getSourceAsMap();System.out.println(sourceAsMap);ids.add(Integer.valueOf(sourceAsMap.get("id").toString()));}return ids;}

编写单元测试用例

    @Testpublic void query(){List<Product> res = productService.query("food");System.out.println(res);}

事实上,实际业务中,从es中查出了id等信息之后,需要通过id字段去mysql中进行多表关联的查询才能聚合结果,但是走es的搜索之后,可以大大提升获取id的性能

5.5 数据迁移

简单起见,这里直接使用定时任务做数据同步,可以考虑凌晨的时候来做这件事,核心迁移方法

public void doSync() {//设置一个时间点的条件作为同步数据的边界List<Product> syncDatas = productDao.getSyncDatas();for(Product product :syncDatas ){ObjectMapper objectMapper = new ObjectMapper();IndexRequest indexRequest = new IndexRequest("product");ProductInfo productInfo = new ProductInfo();productInfo.setId(product.getId());productInfo.setPro_name(product.getProName());productInfo.setDesc(product.getDesc());String proData = null;try {proData = objectMapper.writeValueAsString(productInfo);} catch (JsonProcessingException e) {e.printStackTrace();}indexRequest.source(proData,XContentType.JSON);//插入数据try {IndexResponse response = restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);} catch (IOException e) {e.printStackTrace();}}log.info("同步完成");}

最后增加一个定时任务的类,将上述的方法添加进去

import com.congge.service.ProductService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;@Configuration
@EnableScheduling
public class SyncTask {@Autowiredprivate ProductService productService;@Scheduled(cron = "0/2 * * * * ?")private void configureTasks(){System.out.println("开始执行数据同步");productService.doSync();System.out.println("数据同步完成");}}

六、写在文末

本文通过较大的篇幅详细讨论了mysql与es实现双写的设计以及实现过程,当然在实际操作过程中还有很多值得探讨和细节,希望为看到的小伙伴提供一个思路,本篇到此结束,感谢观看。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/132758.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

FPGA高端项目:图像采集+GTP+UDP架构,高速接口以太网视频传输,提供2套工程源码加QT上位机源码和技术支持

目录 1、前言免责声明本项目特点 2、相关方案推荐我这里已有的 GT 高速接口解决方案我这里已有的以太网方案 3、设计思路框架设计框图视频源选择OV5640摄像头配置及采集动态彩条视频数据组包GTP 全网最细解读GTP 基本结构GTP 发送和接收处理流程GTP 的参考时钟GTP 发送接口GTP …

js原型链

什么叫原型链 原型链是js中的核心&#xff0c;原型链将各个属性链接起来&#xff0c;在原型链上面定义&#xff0c;原型链上的其他属性能够使用&#xff0c;原型链就是保证继承 原型链区分 原型链分为显式原型和隐式原型 显式原型&#xff1a;只有函数和构建函数才有显式原型…

spring面试题笔记

SpringBoot 有几种读取配置文件的方式 1.value 必须是bean里才能生效&#xff0c;&#xff0c;final或static无法生效 2ConfigurationProperties注解 ConfigurationProperties是springboot提供读取配置文件的一个注解 注意&#xff1a; 前缀定义了哪些外部属性将绑定到类的字…

AI:62-基于深度学习的人体CT影像肺癌的识别与分类

🚀 本文选自专栏:AI领域专栏 从基础到实践,深入了解算法、案例和最新趋势。无论你是初学者还是经验丰富的数据科学家,通过案例和项目实践,掌握核心概念和实用技能。每篇案例都包含代码实例,详细讲解供大家学习。 📌📌📌在这个漫长的过程,中途遇到了不少问题,但是…

【漏洞复现】Fastjson_1.2.47_rce

感谢互联网提供分享知识与智慧&#xff0c;在法治的社会里&#xff0c;请遵守有关法律法规 文章目录 1.1、漏洞描述1.2、漏洞等级1.3、影响版本1.4、漏洞复现1、基础环境2、漏洞检测3、漏洞验证 1.5、深度利用1、反弹Shell 说明内容漏洞编号漏洞名称Fastjson_1.2.47_远程执行漏…

vue:js中合并对象的方法

目前比较常用的一共有三种 1、使用object.assign() 它可以将一个或多个对象的属性复制到目标对象中&#xff0c;第一个参数就是目标对象&#xff0c;这里举个例子&#xff1a; <template><div>{{data}}</div> </template> <script> export de…

劲升逻辑与安必快、鹏海运于进博会签署合作协议,助力大湾区外贸高质量发展

新中经贸与投资论坛签约现场 中国上海&#xff0c;2023 年 11 月 6 日——第六届进博会期间&#xff0c;由新加坡工商联合总会主办的新中经贸与投资论坛在上海同期举行。跨境贸易数字化领域的领导者劲升逻辑与安必快科技&#xff08;深圳&#xff09;有限公司&#xff08;简称…

Webpack的Tree Shaking。它的作用是什么?

聚沙成塔每天进步一点点 ⭐ 专栏简介 前端入门之旅&#xff1a;探索Web开发的奇妙世界 欢迎来到前端入门之旅&#xff01;感兴趣的可以订阅本专栏哦&#xff01;这个专栏是为那些对Web开发感兴趣、刚刚踏入前端领域的朋友们量身打造的。无论你是完全的新手还是有一些基础的开发…

猫罐头怎么选?5款不踩雷的猫罐头推荐!

在我们的日常生活中&#xff0c;猫罐头是一种常见的宠物食品&#xff0c;但是有很多养猫的铲屎官都不知道应该如何为猫咪挑选一款合适且满意的猫罐头。作为经营一家宠物店7年的店长&#xff0c;我店里的猫猫最多的时候可以达到60多只&#xff0c;这些年来它们也吃过了很多种类的…

【公益案例展】中国水利水电科学研究院——云从科技青海湖湟鱼监测保护项目...

‍ 云从科技公益案例 本项目案例由云从科技投递并参与数据猿与上海大数据联盟联合推出的 #榜样的力量# 《2023中国数据智能产业最具社会责任感企业》榜单/奖项”评选。 大数据产业创新服务媒体 ——聚焦数据 改变商业 1994年青海湖裸鲤被列为国家二级保护动物&#xff0c;2003…

noip模拟赛多校第八场 T4 不要翻墙 (矩阵乘法优化DP, 动态DP)

题目描述 简要题意&#xff1a;太长了&#xff0c;就不总结了&#xff0c;自己看吧。 分析 我们首先考虑 m 1 m 1 m1 的情况&#xff1a; T > 0 T > 0 T>0 时&#xff0c;显然我们可以 O ( n ) O(n) O(n) 的维护一个 前缀积 和 前缀积的逆元&#xff0c…

python基础(Python高级特性(切片、列表生成式)、字符串的正则表达式、函数、模块、Python常用内置函数、错误处理)培训讲义

文章目录 1. Python高级特性&#xff08;切片、列表生成式&#xff09;a) 切片的概念、列表/元组/字符串的切片切片的概念列表切片基本索引简单切片超出有效索引范围缺省 扩展切片step为正数step为负数 b) 列表生成式以及使用列表生成式需要注意的地方概念举例说明1. 生成一个列…

【移远QuecPython】EC800M物联网开发板的内置GNSS定位的恶性BUG(目前没有完全的解决方案)

【移远QuecPython】EC800M物联网开发板的内置GNSS定位的恶性BUG&#xff08;目前没有完全的解决方案&#xff09; GNSS配置如下&#xff1a; 【移远QuecPython】EC800M物联网开发板的内置GNSS定位获取&#xff08;北斗、GPS和GNSS&#xff09; 测试视频&#xff08;包括BUG复…

简单CMake入门

CMake可以生成不同平台下的Makefile&#xff0c;有了CMake不用再写复杂的Makefile 视频教程&#xff1a;CMake 6分钟入门&#xff0c;不用再写复杂的Makefile 先前知识 Makefile简单入门 Cmake特性 CMake是一个用于管理C/C项目的跨平台构建工具。 跨平台&#xff1a;CMake是…

【漏洞复现】Apache Log4j Server 反序列化命令执行漏洞(CVE-2017-5645)

感谢互联网提供分享知识与智慧&#xff0c;在法治的社会里&#xff0c;请遵守有关法律法规 文章目录 1.1、漏洞描述1.2、漏洞等级1.3、影响版本1.4、漏洞复现1、基础环境2、漏洞扫描3、漏洞验证 1.5、深度利用1、反弹Shell 说明内容漏洞编号CVE-2017-5645漏洞名称Log4j Server …

判断字符串是否为json

//营业时间返回数组String businessDate merchantInfoResp.getBusinessDate();Object obj JSON.parse(businessDate);if (obj instanceof JSONArray) {merchantInfoResp.setBusinessDateDesc(JSON.parseArray(JSON.toJSONString(obj), Integer.class));} else {//营业日期判断…

Ubuntu下安装vscode,并解决终端打不开vscode的问题

Visual Studio Code安装 1&#xff0c;使用 apt 安装 Visual Studio Code 在官方的微软 Apt 源仓库中可用。按照下面的步骤进行即可&#xff1a; 以 sudo 用户身份运行下面的命令&#xff0c;更新软件包索引&#xff0c;并且安装依赖软件&#xff1a; sudo apt update sud…

09 基变换

基变换 基本概念坐标转换詹妮弗坐标系→平面直角坐标系平面直角坐标系→詹妮弗坐标系转换对比基本原则 这是关于3Blue1Brown "线性代数的本质"的学习笔记。 基本概念 对于右手平面直角坐标系&#xff0c;一般用 i ⃗ \vec{i} i 和 j ⃗ \vec{j} j ​表示其基向量。…

86.Linux系统下复制进程fork(逻辑地址和物理地址)

目录 fork复制进程 逻辑地址和物理地址 fork复制进程 fork 是一个系统调用&#xff0c;在 Linux/Unix 系统中用于创建一个新的进程&#xff0c;新进程称为子进程。子进程是父进程的副本&#xff0c;它从父进程那里继承了大部分属性和资源&#xff0c;包括代码、数据、打开的文…

VMware 虚拟机安装 CentOS 7

CentOS 7 1. 下载CentOS 7 iso镜像 Index of /centos/7.9.2009/isos/x86_64/ 2. Vmware安装CentOS 7 安装教程&#xff1a; 超详细VMware CentOS7(最小安装)安装教程_虚拟机最小化安装-CSDN博客 【精选】VMware 安装 Centos7 详细过程_vm虚拟机安装centos7_expectation Fu…