Elastic Search(ES)Java 入门实操(3)数据同步

基本概念和数据查询代码:

Elastic Search (ES)Java 入门实操(1)下载安装、概念-CSDN博客

Elastic Search(ES)Java 入门实操(2)搜索代码-CSDN博客

想要使用 ES 来查询数据,首先得要 ES 里有数据,但是如果是后来引入的 ES,数据库上万条的数据肯定不能通过手动进行同步,需要使用其他方法进行同步。

数据同步分为全量同步和增量同步。

所谓全量同步,就是引入 ES 时将 MySQL 里的数据全部同步到 ES 里。增量同步就是当数据库的数据发生变化时,将变化的数据同步到 ES 里。

同步方法

定时任务

通过定时任务的方式,每隔一段时间进行同步。比如每一分钟同步一次。

优点:简单,占用资源少,不用引入第三方中间件

缺点:有时间差,数据一致性要求高的场景不适用

全量同步通过实现 CommandLineRunner 接口,在程序启动时执行。

/*** CommandLineRunner 接口,当spring启动时就执行方法*/
@Component
public class FullSycnToEs implements CommandLineRunner {@Resourceprivate ArticleService articleService;@Resourceprivate ArticleEsDao articleEsDao;@Overridepublic void run(String... args) throws Exception {//spring 启动就执行方法进行全量同步//1.从MySQL获取数据List<Article> articleList = articleService.list();if(CollectionUtils.isEmpty(articleList)){return;}//2.将数据转换为DTOList<ArticleEsDto> articleDtoList = articleList.stream().map(ArticleEsDto::toDto).collect(Collectors.toList());//3.将数据同步到ESarticleEsDao.saveAll(articleDtoList);System.out.println("全量同步完成");}
}

增量同步使用 @ Scheduled 定时任务监控更新时间

注意启动类要加上注解 @EnableScheduling

/*** 定时任务执行数据同步*/
@Component
public class InSyncToEs {@Resourceprivate ArticleMapper articleMapper;@Resourceprivate ArticleEsDao articleEsDao;@Scheduled(fixedRate = 100)public void run(){// 定时任务,将数据同步到es,根据更新时间来判断//假定3分钟内,如果更新时间大于3分钟之前的时间,就是更新了,获取这个数据存入到ES 中Date minUpdateTime = new Date(new Date().getTime() - 5* 60*1000L);List<Article> newArticles = articleMapper.getNewArticles(minUpdateTime);//判断是否有数据更新if(CollectionUtils.isEmpty(newArticles)){//没有数据更新System.out.println("没有数据更新");return;}//有数据更新,将数据转换成dto格式List<ArticleEsDto> articleEsDtoList = newArticles.stream().map(ArticleEsDto::toDto).collect(Collectors.toList());//将数据存入到ES中articleEsDao.saveAll(articleEsDtoList);System.out.println("数据同步完成");}
}

双写

写入数据库时同时同步到 ES 中,需要考虑 ES 同步失败了怎么办。

使用事务来保证一致性,如果 ES 同步失败了,可以通过定时任务 + 日志 + 告警进行检测和修复(补偿)

Logstash 数据同步管道

传输和处理数据的管道

下载地址:https://artifacts.elastic.co/downloads/logstash/logstash-7.17.21-windows-x86_64.zip

官方文档:Jdbc input plugin | Logstash Reference [7.17] | Elastic

同样的,需要注意版本,下载解压之后在 config 文件夹创建新的同步文件,建议不同的同步脚本创建不同的文件,不要在同一个文件下配置。

文件配置根据官方文档修改,MySQL jar包使用绝对路径即可,否则可能找不到 jar 包,jar 包可以自行准备,也可以从项目的 maven 仓库获取。

# Sample Logstash configuration for creating a simple
# Beats -> Logstash -> Elasticsearch pipeline.input {jdbc {// MySQL jar包路径jdbc_driver_library => "D:\software\logstash-7.17.21\config\mysql-connector-java-8.0.29.jar"// MySQL 驱动jdbc_driver_class => "com.mysql.cj.jdbc.Driver"// MySQL 连接地址jdbc_connection_string => "jdbc:mysql://localhost:3306/mydb"//账号密码jdbc_user => "root"jdbc_password => "1234"//动态 SQLstatement => "SELECT * from article where 1=1"parameters => { "favorite_artist" => "Beethoven" }//定时执行,core 表达式schedule => "*/5 * * * * *"}
}output {stdout { codec => rubydebug }
}

 配置好之后在 logstash 目录下执行下面的命令,完成初步从数据库获取数据

.\bin\logstash.bat -f .\config\my-task.conf

 成功获取数据

增量同步配置,使用 updateTime 来进行同步更新的数据。

完整 input 配置如下。

input {jdbc {jdbc_driver_library => "D:\software\logstash-7.17.21\config\mysql-connector-java-8.0.29.jar"jdbc_driver_class => "com.mysql.cj.jdbc.Driver"jdbc_connection_string => "jdbc:mysql://localhost:3306/mydb"jdbc_user => "root"jdbc_password => "1234"// 动态查询语句,保证最后一条是最大的statement => "SELECT * from article where updateTime > :sql_last_value and updateTime < now() order by updateTime asc"// 查询参数的 hash,不用更改parameters => { "favorite_artist" => "Beethoven" }// 查询参数的类型,updatetime 是 timestamp 类型的tracking_column_type => "timestamp"// 查询参数tracking_column => "updatetime"// 设置为 true 时,将定义的查询参数值用作动态 SQL 中sql_last_value,false 时:sql_last_value 是上次查询时间use_column_value => true// 时区设置为上海,否则存在 8小时时差jdbc_default_timezone => "Asia/Shanghai"// core 表达式schedule => "*/5 * * * * *"}
}

配置好从 MySQL 获取的数据之后,就可以同步到 ES 中了。同样需要书写配置。

官方文档:Elasticsearch output plugin | Logstash Reference [7.17] | Elastic

output {stdout { codec => rubydebug }elasticsearch {//访问地址,就是本地 ES 端口hosts => "127.0.0.1:9200"// ES 索引index =>"article_1"// 数据 id,从数据库获取document_id => "%{id}"}

最终配置

# Sample Logstash configuration for creating a simple
# Beats -> Logstash -> Elasticsearch pipeline.input {jdbc {jdbc_driver_library => "D:\software\logstash-7.17.21\config\mysql-connector-java-8.0.29.jar"jdbc_driver_class => "com.mysql.cj.jdbc.Driver"jdbc_connection_string => "jdbc:mysql://localhost:3306/mydb"jdbc_user => "root"jdbc_password => "1234"statement => "SELECT * from article where updateTime > :sql_last_value and updateTime < now() order by updateTime asc "parameters => { "favorite_artist" => "Beethoven" }tracking_column_type => "timestamp"tracking_column => "updatetime"use_column_value => truejdbc_default_timezone => "Asia/Shanghai"schedule => "*/5 * * * * *"}
}
// 筛选
filter{mutate{//重命名rename => {"updatetime" =>"updateTime""createtime" => "createTime""isdetele" => "isDetele"}}
}output {stdout { codec => rubydebug }elasticsearch {hosts => "127.0.0.1:9200"index =>"article_1"document_id => "%{id}"}
}

同步成功! 

logstash 的优点:配置完成后使用比较方便,插件多

                缺点:要多维护组件,一般需要配合其他中间件,比如(kafka)

Canal

下载地址:Releases · alibaba/canal (github.com)

文档:QuickStart · alibaba/canal Wiki (github.com)

实时同步数据,通过监控 MySQL 的 binlog,当数据库发生修改时,会修改 binlog 文件,然后 canal 监听到就可以同步到 ES 中。

对于自建 MySQL , 需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,在 MySQL 目录下新建一个my.ini,配置如下

[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复

授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant,直接在查询控制台执行如下命令

CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;

bin 目录下 startup 启动即可。

然后 Java 需要一个客户端,首先引入依赖

<dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId><version>1.1.0</version>
</dependency>

客户端代码

import java.net.InetSocketAddress;
import java.util.List;import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;public class SimpleCanalClientExample {public static void main(String args[]) {// 创建链接CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),11111), "example", "", "");int batchSize = 1000;int emptyCount = 0;try {connector.connect();connector.subscribe(".*\\..*");connector.rollback();int totalEmptyCount = 120;while (emptyCount < totalEmptyCount) {Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据long batchId = message.getId();int size = message.getEntries().size();if (batchId == -1 || size == 0) {emptyCount++;System.out.println("empty count : " + emptyCount);try {Thread.sleep(1000);} catch (InterruptedException e) {}} else {emptyCount = 0;// System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);printEntry(message.getEntries());}connector.ack(batchId); // 提交确认// connector.rollback(batchId); // 处理失败, 回滚数据}System.out.println("empty too many times, exit");} finally {connector.disconnect();}
}private static void printEntry(List<Entry> entrys) {for (Entry entry : entrys) {if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {continue;}RowChange rowChage = null;try {rowChage = RowChange.parseFrom(entry.getStoreValue());} catch (Exception e) {throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),e);}EventType eventType = rowChage.getEventType();System.out.println(String.format("================&gt; binlog[%s:%s] , name[%s,%s] , eventType : %s",entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),eventType));for (RowData rowData : rowChage.getRowDatasList()) {if (eventType == EventType.DELETE) {printColumn(rowData.getBeforeColumnsList());} else if (eventType == EventType.INSERT) {printColumn(rowData.getAfterColumnsList());} else {System.out.println("-------&gt; before");printColumn(rowData.getBeforeColumnsList());System.out.println("-------&gt; after");printColumn(rowData.getAfterColumnsList());}}}
}private static void printColumn(List<Column> columns) {for (Column column : columns) {System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());}
}}

正在监听,修改数据之后可以把修改前和修改后的数据查询出来,之后只需要将修改后的数据同步到 ES 即可,比如通过 ES 的 save 方法。 

过程出现的问题

1. 在执行命令.\bin\logstash.bat -f .\config\my-task.conf  时报错

只需要更改 bin 目录下的 setup.bat 文件中的双引号去掉即可。 

2. canal 启动 报错 canal 1.1.8版本

不知道什么原因,是和 MySQL 8不兼容还是其他原因,报 druid 错误。

解决方法:简单粗暴,下载 1.1.7 版本,实测有效

3. 找不到 JAVA_HOME

修改变量或者修改启动项

编辑 startup.bat,在文件中添加如下配置:

// 自己的 jdk 路径
set JAVA_HOME=C:\Users\p'b\.jdks\corretto-1.8.0_392
// 覆盖环境变量
set PATH=%JAVA_HOME%\bin;%PATH%

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/24967.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【WEB前端2024】3D智体编程:乔布斯3D纪念馆-第37课-自动切换纹理

【WEB前端2024】3D智体编程&#xff1a;乔布斯3D纪念馆-第37课-自动切换纹理 使用dtns.network德塔世界&#xff08;开源的智体世界引擎&#xff09;&#xff0c;策划和设计《乔布斯超大型的开源3D纪念馆》的系列教程。dtns.network是一款主要由JavaScript编写的智体世界引擎&…

jupyter notebook使用conda环境

pycharm中安装过可以使用的库在jupyter notebook中导入不进来 1 检查pycharm中安装的库的位置 2 检查jupyter notebook中安装的库的位置 3 查看jupyter notebook内核名字 可以看到jupyter notebook中内核名字叫ipykernel 4 安装ipykernel 在pycharm的terminal中 pip instal…

【C语言】动态内存经典笔试题(下卷)

前言 如果说动态内存是C语言给我们的一个工具&#xff0c;那么只有掌握了工具的特点我们才能更好地使用。 紧随上卷&#xff0c;我们再来看看动态内存另外两道经典的笔试题。 &#xff08;建议没看过上卷的朋友可以先看完上卷再回来&#xff1a;【C语言】动态内存经典笔试题…

项目总结报告(Word模板)

2 项目工作成果 2.1 交付给用户的产品 2.2 交付给研发中心的产品 2.2.1 代码部分 2.2.2 文档部分 2.3 需求完成情况与功能及性能符合性统计 2.3.1 需求完成情况统计 2.3.2 功能符合性分析 2.3.3 性能符合性分析 3 项目工作分析 3.1 项目计划与进度实施分析 3.1.1 开发进度 3.1.…

2024年6月8日 (周六) 叶子游戏新闻

万能嗅探: 实测 网页打开 某视频号、某音、某红薯、某站&#xff0c;可以做到无水印的视频和封面下载功能哦&#xff0c;具体玩法大家自行发挥吧。 《丝之歌》粉丝又要失望&#xff1a;大概率不会亮相Xbox发布会即将于后天举行的 Xbox 发布会预计将会有许多令人兴奋的消息。早些…

STM32F103C8T6 HAL库 printf重定向 USART1 DMA方式发送数据

前言&#xff1a; 在上一篇文章里&#xff0c;我采用printf重定向为usart1&#xff0c;但是这样发送&#xff0c;对于MPU的负载比较大&#xff0c;所以本篇文章采用DMA方式&#xff0c;解放MPU资源&#xff0c;去做其他的事情&#xff0c;这里仅做为自己的记录。 正文开始&…

Halcon 双相机标定与拼图(二)

一、概述 这种标定有两种模式&#xff0c;有一个标定板和多个标定板两种 一个标定板 两个相机的重叠区域比较大&#xff0c;那么我们可以把标定板放到那个重叠区域来统一坐标系&#xff0c;如下 这种是只需要一个标定板&#xff0c;这种是推荐的方式 。这种是比较简单的&…

第二十节:带你梳理Vue2:Vue子组件向父组件传参(事件传参)

1. 自定义事件 除了可以处理原生的DOM事件, v-on指令也可以处理组件内部触发的自定义的事件,调用this.$emit()函数就可以触发一个自定义事件 $emit() 触发事件函数接受一个自定义事件的事件名以及其他任何给事件函数传递的参数. 然后就可以在组件上使用v-on来绑定这个自定义事…

启动游戏出现concrt140.dll错误的解决方法

concrt140.dll是一个动态链接库文件&#xff0c;属于Microsoft Visual C 2015 Redistributable组件集的一部分。这个文件是并发运行时库&#xff08;Concurrency Runtime&#xff09;的一部分&#xff0c;对于支持和增强应用程序的多线程与并发执行能力至关重要。它包含了实现并…

003-两台交换机堆叠(IRF)

两台交换机堆叠&#xff08;IRF&#xff09; 概念 IRF (Intelligent Resilient Framework) 是华为公司提出的一种交换机堆叠技术&#xff0c;它允许用户将多台物理交换机设备连接在一起&#xff0c;从而逻辑上形成一个单一的、统一的、大容量的虚拟交换机。IRF堆叠技术通过堆…

【自动部署】4.阿里云ECS服务器 IDEA自动部署项目

如何在IDEA中,自动部署项目到阿里云ECS服务器?今天我们就来实现一键部署功能。 执行maven命令打包免密登录,在IEDA中连接服务器执行stop服务上传jar包到服务器执行start脚本查看运行日志1.安装Alibaba Cloud Toolkit 2.配置host 3.自动化部署后端 右击项目,选择Alibaba CL…

第一篇红队笔记-百靶精讲之W1R3S-john

https://download.vulnhub.com/w1r3s/w1r3s.v1.0.1.zip 主机发现 nmap端口扫描及思路 扫描某个网段 扫描单个ip所有端口 重复扫描单个ip具体端口 udp协议再来一次 漏洞扫描 FTP渗透 尝试匿名登陆 防止文件损坏 识别加密方式-hash-identifier base64 Web目录爆破…

C#操作MySQL从入门到精通(19)——插入数据

前言: 谈到数据库,大家最容易脱口而出的就是增删改查,在本篇文章之前一直都是说的各种查询,本文就是说的增删改查中的增,所谓增也就是增加数据的意思,插入数据就是增加数据。 本文测试使用的数据库如下: 1、插入完整行 所谓插入完整行就是一行所有列的数据都是自己插…

《精通ChatGPT:从入门到大师的Prompt指南》第4章:避免常见错误

第4章&#xff1a;避免常见错误 在使用ChatGPT进行Prompt编写时&#xff0c;常见的错误可能会大大影响生成内容的质量和准确性。本章将详细讨论这些错误&#xff0c;并提供如何避免它们的建议。 4.1 不明确的指令 在使用ChatGPT时&#xff0c;一个常见的问题是指令不够明确。…

理解数仓建模

​​​在数仓建设的过程中&#xff0c;由于未能完全按照规范操作&#xff0c; 从而导致数据仓库建设比较混乱&#xff0c;常见有以下问题&#xff1a; 数仓常见问题 ● 数仓分层不清晰&#xff1a;数仓的分层没有明确的逻辑&#xff0c;难以管理和维护。 ● 数据域划分不明确…

【网络编程开发】10.UNIX套接字域

10.UNIX套接字域 UNIX域套接字是用于在同一台计算机上运行的进程之间进行通信的一种机制。它与传统基于TCP/IP协议栈的套接字不同&#xff0c;UNIX域套接字操作更为高效&#xff0c;因为它避免了网络层的开销&#xff0c;不涉及网络报头、检验和、顺序号等复杂的网络协议处理过…

C#操作MySQL从入门到精通(16)——使用子查询

前言: 我们在查询数据的过程中有时候查询的数据不是从数据库中来的,而是从另一个查询的结果来的,这时候就需要使用子查询,本文使用的测试数据如下: 1、子查询 下面的代码就是先查询地址是安徽和广西的学生年龄,然后获取年龄对应的姓名 private void button__SubQuery…

Spring boot+vue前后端分离

目录 1、前端vue的搭建 2、后端项目的构建 pom文件中引入的jar包 yml文件用来配置连接数据库和端口的设置 application.property进行一些整合 service层 imp层 mapper 实体类 额外写一个类、解决跨域问题 3、测试 1、前端vue的搭建 建立项目的过程略 开启一个建立好…

用自然语言连接信息孤岛

信息孤岛互联互通的困难 尽管已经进入了互联网时代&#xff0c;信息系统中的信息孤岛现象仍然十分地严重&#xff0c;不同部门&#xff0c;不同机器之间难以实现信息的互联互通。存在大量的信息孤岛。 不同信息系统的相互通信依赖通信协议和数据模型的定义&#xff0c;前者决定…

上海安全员C证继续教育题库(附答案)

1.从业人员经过安全教育培训&#xff0c;了解岗位操作规程&#xff0c;但未遵守而造成事故的&#xff0c;行为人应负( )责任&#xff0c;有关负责人应负( )责任。 A.直接 间接 B.直接 领导 C.间接 管理D.直接 管理 2.对生产附着式升降脚手架产品的单位&#xff0c;必须…