elasticsearch解决同步删除数据库中不存在的数据

        摘要: jdbc-input-plugin 只能实现数据库的追加,对于 elasticsearch 增量写入,但经常jdbc源一端的数据库可能会做数据库删除或者更新操作。这样一来数据库与搜索引擎的数据库就出现了不对称的情况。当然你如果有开发团队可以写程序在删除或者更新的时候同步对搜索引擎操作。如果你没有这个能力,可以尝试下面的方法。

 

 

 

解决MySQL与Elasticsearch 数据不对称问题

jdbc-input-plugin 只能实现数据库的追加,对于 elasticsearch 增量写入,但经常jdbc源一端的数据库可能会做数据库删除或者更新操作。这样一来数据库与搜索引擎的数据库就出现了不对称的情况。

当然你如果有开发团队可以写程序在删除或者更新的时候同步对搜索引擎操作。如果你没有这个能力,可以尝试下面的方法。

这里有一个数据表 article , mtime 字段定义了 ON UPDATE CURRENT_TIMESTAMP 所以每次更新mtime的时间都会变化

mysql> desc article;
+-------------+--------------+------+-----+--------------------------------+-------+
| Field       | Type         | Null | Key | Default                        | Extra |
+-------------+--------------+------+-----+--------------------------------+-------+
| id          | int(11)      | NO   |     | 0                              |       |
| title       | mediumtext   | NO   |     | NULL                           |       |
| description | mediumtext   | YES  |     | NULL                           |       |
| author      | varchar(100) | YES  |     | NULL                           |       |
| source      | varchar(100) | YES  |     | NULL                           |       |
| content     | longtext     | YES  |     | NULL                           |       |
| status      | enum('Y','N')| NO   |     | 'N'                            |       |
| ctime       | timestamp    | NO   |     | CURRENT_TIMESTAMP              |       |
| mtime       | timestamp    | YES  |     | ON UPDATE CURRENT_TIMESTAMP    |       |
+-------------+--------------+------+-----+--------------------------------+-------+
7 rows in set (0.00 sec)| Field       | Type         | Null | Key | Default                        | Extra |
+-------------+--------------+------+-----+--------------------------------+-------+
| id          | int(11)      | NO   |     | 0                              |       |
| title       | mediumtext   | NO   |     | NULL                           |       |
| description | mediumtext   | YES  |     | NULL                           |       |
| author      | varchar(100) | YES  |     | NULL                           |       |
| source      | varchar(100) | YES  |     | NULL                           |       |
| content     | longtext     | YES  |     | NULL                           |       |
| status      | enum('Y','N')| NO   |     | 'N'                            |       |
| ctime       | timestamp    | NO   |     | CURRENT_TIMESTAMP              |       |
| mtime       | timestamp    | YES  |     | ON UPDATE CURRENT_TIMESTAMP    |       |
+-------------+--------------+------+-----+--------------------------------+-------+
7 rows in set (0.00 sec)

logstash 增加 mtime 的查询规则

jdbc {jdbc_driver_library => "/usr/share/java/mysql-connector-java.jar"jdbc_driver_class => "com.mysql.jdbc.Driver"jdbc_connection_string => "jdbc:mysql://localhost:3306/cms"jdbc_user => "cms"jdbc_password => "password"schedule => "* * * * *"	#定时cron的表达式,这里是每分钟执行一次statement => "select * from article where mtime > :sql_last_value"use_column_value => truetracking_column => "mtime"tracking_column_type => "timestamp" record_last_run => truelast_run_metadata_path => "/var/tmp/article-mtime.last"}"/usr/share/java/mysql-connector-java.jar"jdbc_driver_class => "com.mysql.jdbc.Driver"jdbc_connection_string => "jdbc:mysql://localhost:3306/cms"jdbc_user => "cms"jdbc_password => "password"schedule => "* * * * *"	#定时cron的表达式,这里是每分钟执行一次statement => "select * from article where mtime > :sql_last_value"use_column_value => truetracking_column => "mtime"tracking_column_type => "timestamp" record_last_run => truelast_run_metadata_path => "/var/tmp/article-mtime.last"}

创建回收站表,这个事用于解决数据库删除,或者禁用 status = 'N' 这种情况的。

CREATE TABLE `elasticsearch_trash` (`id` int(11) NOT NULL,`ctime` timestamp NULL DEFAULT CURRENT_TIMESTAMP,PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 TABLE `elasticsearch_trash` (`id` int(11) NOT NULL,`ctime` timestamp NULL DEFAULT CURRENT_TIMESTAMP,PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8

为 article 表创建触发器

CREATE DEFINER=`dba`@`%` TRIGGER `article_BEFORE_UPDATE` BEFORE UPDATE ON `article` FOR EACH ROW
BEGIN-- 此处的逻辑是解决文章状态变为 N 的时候,需要将搜索引擎中对应的数据删除。IF NEW.status = 'N' THENinsert into elasticsearch_trash(id) values(OLD.id);END IF;-- 此处逻辑是修改状态到 Y 的时候,方式elasticsearch_trash仍然存在该文章ID,导致误删除。所以需要删除回收站中得回收记录。IF NEW.status = 'Y' THENdelete from elasticsearch_trash where id = OLD.id;END IF;
ENDCREATE DEFINER=`dba`@`%` TRIGGER `article_BEFORE_DELETE` BEFORE DELETE ON `article` FOR EACH ROW
BEGIN-- 此处逻辑是文章被删除同事将改文章放入搜索引擎回收站。insert into elasticsearch_trash(id) values(OLD.id);
END DEFINER=`dba`@`%` TRIGGER `article_BEFORE_UPDATE` BEFORE UPDATE ON `article` FOR EACH ROW
BEGIN-- 此处的逻辑是解决文章状态变为 N 的时候,需要将搜索引擎中对应的数据删除。IF NEW.status = 'N' THENinsert into elasticsearch_trash(id) values(OLD.id);END IF;-- 此处逻辑是修改状态到 Y 的时候,方式elasticsearch_trash仍然存在该文章ID,导致误删除。所以需要删除回收站中得回收记录。IF NEW.status = 'Y' THENdelete from elasticsearch_trash where id = OLD.id;END IF;
ENDCREATE DEFINER=`dba`@`%` TRIGGER `article_BEFORE_DELETE` BEFORE DELETE ON `article` FOR EACH ROW
BEGIN-- 此处逻辑是文章被删除同事将改文章放入搜索引擎回收站。insert into elasticsearch_trash(id) values(OLD.id);
END

接下来我们需要写一个简单地 Shell 每分钟运行一次,从 elasticsearch_trash 数据表中取出数据,然后使用 curl 命令调用 elasticsearch restful 接口,删除被收回的数据。

你还可以开发相关的程序,这里提供一个 Spring boot 定时任务例子。

实体


package cn.netkiller.api.domain.elasticsearch;import java.util.Date;import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.Id;
import javax.persistence.Table;@Entity
@Table
public class ElasticsearchTrash {@Idprivate int id;@Column(columnDefinition = "TIMESTAMP DEFAULT CURRENT_TIMESTAMP")private Date ctime;public int getId() {return id;}public void setId(int id) {this.id = id;}public Date getCtime() {return ctime;}public void setCtime(Date ctime) {this.ctime = ctime;}}

仓库 


package cn.netkiller.api.repository.elasticsearch;import org.springframework.data.repository.CrudRepository;import com.example.api.domain.elasticsearch.ElasticsearchTrash;public interface ElasticsearchTrashRepository extends CrudRepository<ElasticsearchTrash, Integer>{}

定时任务 

package cn.netkiller.api.schedule;import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.rest.RestStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;import com.example.api.domain.elasticsearch.ElasticsearchTrash;
import com.example.api.repository.elasticsearch.ElasticsearchTrashRepository;@Component
public class ScheduledTasks {private static final Logger logger = LoggerFactory.getLogger(ScheduledTasks.class);@Autowiredprivate TransportClient client;@Autowiredprivate ElasticsearchTrashRepository alasticsearchTrashRepository;public ScheduledTasks() {}@Scheduled(fixedRate = 1000 * 60) // 60秒运行一次调度任务public void cleanTrash() {for (ElasticsearchTrash elasticsearchTrash : alasticsearchTrashRepository.findAll()) {DeleteResponse response = client.prepareDelete("information", "article", elasticsearchTrash.getId() + "").get();RestStatus status = response.status();logger.info("delete {} {}", elasticsearchTrash.getId(), status.toString());if (status == RestStatus.OK || status == RestStatus.NOT_FOUND) {alasticsearchTrashRepository.delete(elasticsearchTrash);}}}
}

Spring boot 启动主程序。 

package cn.netkiller.api;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;@SpringBootApplication
@EnableScheduling
public class Application {public static void main(String[] args) {SpringApplication.run(Application.class, args);}
}

本文转载自:

https://my.oschina.net/neochen/blog/1518679#comment-list

 

本人最近开了一个公众号,会讲一些常用的技术,以及面试题,欢迎关注

扫码关注,每天获取最前沿的互联网知识~

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/413568.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【BZOJ1048】分割矩阵(记忆化搜索,动态规划)

【BZOJ1048】分割矩阵&#xff08;记忆化搜索&#xff0c;动态规划&#xff09; 题面 BZOJ洛谷 题解 一个很简单的\(dp\)&#xff0c;写成记忆化搜索的形式的挺不错的。 #include<iostream> #include<cstdio> #include<cstdlib> #include<cstring> #in…

[css] 使用css实现气泡框的效果

[css] 使用css实现气泡框的效果 <!DOCTYPE html> <html lang"en"><head> <meta charset"UTF-8"> <meta name"viewport" content"widthdevice-width, initial-scale1.0"> <title></title> …

JVM内存原理及高级特性

今天看了一篇文章&#xff0c;对JVM内存机制&#xff0c;讲的比较细致&#xff0c;决定转载过来 1、JVM 体系结构 线程共享内存 可以被所有线程共享的区域&#xff0c;包括堆区、方法区、运行时常量池。 1.1 堆&#xff08;Heap&#xff09; 大多数时候&#xff0c;Java 堆…

[css] 如何使用伪元素实现增大点击热区来增加用户体验?

[css] 如何使用伪元素实现增大点击热区来增加用户体验&#xff1f; .extend-via-pseudo-elem {position: relative; }.extend-via-pseudo-elem::before {content: ;position: absolute;top: -20px;right: -20px;bottom: -20px;left: -20px; }个人简介 我是歌谣&#xff0c;欢…

今天读了JDK1.8源码,知道了并行迭代器Spliterator

在JDK1.8的ArrayList里面偶然看到了这个内部类&#xff0c;同时对比了1.7的版本&#xff0c;发现1.7并没有这后面的东西&#xff0c; 随着好奇心&#xff0c;就搜了一下下&#xff0c;发现很有意思~ 也查了一些资料&#xff0c;如下总结&#xff1a; Spliterator是什么&#…

牛客网挑战赛24 青蛙(BFS)

链接&#xff1a;https://www.nowcoder.com/acm/contest/157/E来源&#xff1a;牛客网 有一只可爱的老青蛙&#xff0c;在路的另一端发现了一个黑的东西&#xff0c;想过去一探究竟。于是便开始踏上了旅途 一直这个小路上有很多的隧道&#xff0c;从隧道的a进入&#xff0c;会从…

[css] 如何使用css显示a链接的url?

[css] 如何使用css显示a链接的url&#xff1f; .some-a-tag:before {content: attr(href); }个人简介 我是歌谣&#xff0c;欢迎和大家一起交流前后端知识。放弃很容易&#xff0c; 但坚持一定很酷。欢迎大家一起讨论 主目录 与歌谣一起通关前端面试题

JAVA手写ArrayList以及LinkedList

手写记录一下~ 顶级接口List public interface List<E> {//返回线性表的大小public int getSize();//判断线性表中是否为空public boolean isEmpty();//判断线性表中是否包含元素oboolean contains(E o);//在线性表中查找元素o&#xff0c;若成功找到&#xff0c;返回其…

[css] css中的url()要不要加引号?说说你的理解

[css] css中的url()要不要加引号&#xff1f;说说你的理解 可以加&#xff0c;也可以不加。这个跟html标签的属性书写可以加引号也可以不加引号是一样的道理&#xff0c;当然如果属性中含有特殊字符比如空格则需要加空格&#xff0c;否则会引起浏览器解析错误。如果想养成良好…

JDK源码学习路线~每天学一点~每天进步一点点

很多java开发的小伙伴都会阅读jdk源码&#xff0c;然而确不知道应该从哪读起。以下为小编整理的通常所需阅读的源码范围。 标题为包名&#xff0c;后面序号为优先级1-4&#xff0c;优先级递减 1、java.lang 1) Object 1 2) String 1 3) AbstractStringBuilder 1 4) StringBuff…

[css] 使用css写一个垂直翻转图片的效果

[css] 使用css写一个垂直翻转图片的效果 transform: rotateX(180deg); /* 垂直镜像翻转 */个人简介 我是歌谣&#xff0c;欢迎和大家一起交流前后端知识。放弃很容易&#xff0c; 但坚持一定很酷。欢迎大家一起讨论 主目录 与歌谣一起通关前端面试题

20.pipe

pipe相当于angular1里面的filter 做一些格式转换啊&#xff0c;或者从一个数组里面选取一个元素等等 只要你愿意可以定义很复杂的内容‘’ 我们先看看 angular2 里面自带的一些pipe 我们去我们的week3 下的problem-list下 我们到html里面 之前是这样的 之后是这样的 我们再写三…

TCP/UDP相关知识

三次握手&#xff1a; 为了方便描述我们将主动发起请求的172.16.50.72:65076 主机称为客户端&#xff0c;将返回数据的主机172.16.17.94:8080称为服务器。 第一次握手: 建立连接。客户端发送连接请求&#xff0c;发送SYN报文&#xff0c;将seq设置为0。然后&#xff0c;…

[css] 请写出font属性的快捷写法

[css] 请写出font属性的快捷写法 p { font:italic bold 12px/20px arial,sans-serif; }个人简介 我是歌谣&#xff0c;欢迎和大家一起交流前后端知识。放弃很容易&#xff0c; 但坚持一定很酷。欢迎大家一起讨论 主目录 与歌谣一起通关前端面试题

Redis内部数据结构-跳跃表

今天学习了跳跃表&#xff0c;记录一下下~ 一、跳跃表简介 跳跃表是一种随机化数据结构&#xff0c;基于并联的链表&#xff0c;其效率可以比拟平衡二叉树&#xff0c;查找、删除、插入等操作都可以在对数期望时间内完成&#xff0c;对比平衡树&#xff0c;跳跃表的实现要简…

[css] 举例说明与打印有关的属性有哪些?

[css] 举例说明与打印有关的属性有哪些&#xff1f; page page-break-before page-break-after page-break-inside个人简介 我是歌谣&#xff0c;欢迎和大家一起交流前后端知识。放弃很容易&#xff0c; 但坚持一定很酷。欢迎大家一起讨论 主目录 与歌谣一起通关前端面试题…

MYSQL索引结构学习笔记

mysql 的数据、索引、DDL 等数据&#xff0c;都是以文件形式存储的&#xff0c; 所以导致每次查询都是一次I/O操作&#xff0c;当I/O操作过大时&#xff0c;会严重影响效率 MYSQL索引结构: mysql使用的是B树来存储索引的&#xff0c;为什么不用其他的呢&#xff1f;二叉树 -&g…

[css] 如何让背景图片固定不随滚动条滚动

[css] 如何让背景图片固定不随滚动条滚动 background-attachment:fixed个人简介 我是歌谣&#xff0c;欢迎和大家一起交流前后端知识。放弃很容易&#xff0c; 但坚持一定很酷。欢迎大家一起讨论 主目录 与歌谣一起通关前端面试题

SKYLINE UVALive - 4108

我一开始没有想到用线段树&#xff0c;是我学得太僵了... 我们要记录每段的最大高度&#xff0c;而且要组织成区间信息&#xff0c;这要用到线段树 怎么用呢&#xff1f; 线段树维护区间最大值&#xff1b;对于每个线段&#xff0c;先二分至其包含区间&#xff0c;如果最大值&g…

Mybatis源码学习笔记

Mybatis核心概念: Configuration : 管理 mysql-config.xml 全局配置关系类 SqlSessionFactory: Session 管理工厂接口 Session: SqlSession 是一个面向用户&#xff08;程序员&#xff09;的接口。SqlSession 中提供了很多操作数据库的方法 Executor : 执行器是一个接口…