elasticsearch解决同步删除数据库中不存在的数据

        摘要: jdbc-input-plugin 只能实现数据库的追加,对于 elasticsearch 增量写入,但经常jdbc源一端的数据库可能会做数据库删除或者更新操作。这样一来数据库与搜索引擎的数据库就出现了不对称的情况。当然你如果有开发团队可以写程序在删除或者更新的时候同步对搜索引擎操作。如果你没有这个能力,可以尝试下面的方法。

 

 

 

解决MySQL与Elasticsearch 数据不对称问题

jdbc-input-plugin 只能实现数据库的追加,对于 elasticsearch 增量写入,但经常jdbc源一端的数据库可能会做数据库删除或者更新操作。这样一来数据库与搜索引擎的数据库就出现了不对称的情况。

当然你如果有开发团队可以写程序在删除或者更新的时候同步对搜索引擎操作。如果你没有这个能力,可以尝试下面的方法。

这里有一个数据表 article , mtime 字段定义了 ON UPDATE CURRENT_TIMESTAMP 所以每次更新mtime的时间都会变化

mysql> desc article;
+-------------+--------------+------+-----+--------------------------------+-------+
| Field       | Type         | Null | Key | Default                        | Extra |
+-------------+--------------+------+-----+--------------------------------+-------+
| id          | int(11)      | NO   |     | 0                              |       |
| title       | mediumtext   | NO   |     | NULL                           |       |
| description | mediumtext   | YES  |     | NULL                           |       |
| author      | varchar(100) | YES  |     | NULL                           |       |
| source      | varchar(100) | YES  |     | NULL                           |       |
| content     | longtext     | YES  |     | NULL                           |       |
| status      | enum('Y','N')| NO   |     | 'N'                            |       |
| ctime       | timestamp    | NO   |     | CURRENT_TIMESTAMP              |       |
| mtime       | timestamp    | YES  |     | ON UPDATE CURRENT_TIMESTAMP    |       |
+-------------+--------------+------+-----+--------------------------------+-------+
7 rows in set (0.00 sec)| Field       | Type         | Null | Key | Default                        | Extra |
+-------------+--------------+------+-----+--------------------------------+-------+
| id          | int(11)      | NO   |     | 0                              |       |
| title       | mediumtext   | NO   |     | NULL                           |       |
| description | mediumtext   | YES  |     | NULL                           |       |
| author      | varchar(100) | YES  |     | NULL                           |       |
| source      | varchar(100) | YES  |     | NULL                           |       |
| content     | longtext     | YES  |     | NULL                           |       |
| status      | enum('Y','N')| NO   |     | 'N'                            |       |
| ctime       | timestamp    | NO   |     | CURRENT_TIMESTAMP              |       |
| mtime       | timestamp    | YES  |     | ON UPDATE CURRENT_TIMESTAMP    |       |
+-------------+--------------+------+-----+--------------------------------+-------+
7 rows in set (0.00 sec)

logstash 增加 mtime 的查询规则

jdbc {jdbc_driver_library => "/usr/share/java/mysql-connector-java.jar"jdbc_driver_class => "com.mysql.jdbc.Driver"jdbc_connection_string => "jdbc:mysql://localhost:3306/cms"jdbc_user => "cms"jdbc_password => "password"schedule => "* * * * *"	#定时cron的表达式,这里是每分钟执行一次statement => "select * from article where mtime > :sql_last_value"use_column_value => truetracking_column => "mtime"tracking_column_type => "timestamp" record_last_run => truelast_run_metadata_path => "/var/tmp/article-mtime.last"}"/usr/share/java/mysql-connector-java.jar"jdbc_driver_class => "com.mysql.jdbc.Driver"jdbc_connection_string => "jdbc:mysql://localhost:3306/cms"jdbc_user => "cms"jdbc_password => "password"schedule => "* * * * *"	#定时cron的表达式,这里是每分钟执行一次statement => "select * from article where mtime > :sql_last_value"use_column_value => truetracking_column => "mtime"tracking_column_type => "timestamp" record_last_run => truelast_run_metadata_path => "/var/tmp/article-mtime.last"}

创建回收站表,这个事用于解决数据库删除,或者禁用 status = 'N' 这种情况的。

CREATE TABLE `elasticsearch_trash` (`id` int(11) NOT NULL,`ctime` timestamp NULL DEFAULT CURRENT_TIMESTAMP,PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 TABLE `elasticsearch_trash` (`id` int(11) NOT NULL,`ctime` timestamp NULL DEFAULT CURRENT_TIMESTAMP,PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8

为 article 表创建触发器

CREATE DEFINER=`dba`@`%` TRIGGER `article_BEFORE_UPDATE` BEFORE UPDATE ON `article` FOR EACH ROW
BEGIN-- 此处的逻辑是解决文章状态变为 N 的时候,需要将搜索引擎中对应的数据删除。IF NEW.status = 'N' THENinsert into elasticsearch_trash(id) values(OLD.id);END IF;-- 此处逻辑是修改状态到 Y 的时候,方式elasticsearch_trash仍然存在该文章ID,导致误删除。所以需要删除回收站中得回收记录。IF NEW.status = 'Y' THENdelete from elasticsearch_trash where id = OLD.id;END IF;
ENDCREATE DEFINER=`dba`@`%` TRIGGER `article_BEFORE_DELETE` BEFORE DELETE ON `article` FOR EACH ROW
BEGIN-- 此处逻辑是文章被删除同事将改文章放入搜索引擎回收站。insert into elasticsearch_trash(id) values(OLD.id);
END DEFINER=`dba`@`%` TRIGGER `article_BEFORE_UPDATE` BEFORE UPDATE ON `article` FOR EACH ROW
BEGIN-- 此处的逻辑是解决文章状态变为 N 的时候,需要将搜索引擎中对应的数据删除。IF NEW.status = 'N' THENinsert into elasticsearch_trash(id) values(OLD.id);END IF;-- 此处逻辑是修改状态到 Y 的时候,方式elasticsearch_trash仍然存在该文章ID,导致误删除。所以需要删除回收站中得回收记录。IF NEW.status = 'Y' THENdelete from elasticsearch_trash where id = OLD.id;END IF;
ENDCREATE DEFINER=`dba`@`%` TRIGGER `article_BEFORE_DELETE` BEFORE DELETE ON `article` FOR EACH ROW
BEGIN-- 此处逻辑是文章被删除同事将改文章放入搜索引擎回收站。insert into elasticsearch_trash(id) values(OLD.id);
END

接下来我们需要写一个简单地 Shell 每分钟运行一次,从 elasticsearch_trash 数据表中取出数据,然后使用 curl 命令调用 elasticsearch restful 接口,删除被收回的数据。

你还可以开发相关的程序,这里提供一个 Spring boot 定时任务例子。

实体


package cn.netkiller.api.domain.elasticsearch;import java.util.Date;import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.Id;
import javax.persistence.Table;@Entity
@Table
public class ElasticsearchTrash {@Idprivate int id;@Column(columnDefinition = "TIMESTAMP DEFAULT CURRENT_TIMESTAMP")private Date ctime;public int getId() {return id;}public void setId(int id) {this.id = id;}public Date getCtime() {return ctime;}public void setCtime(Date ctime) {this.ctime = ctime;}}

仓库 


package cn.netkiller.api.repository.elasticsearch;import org.springframework.data.repository.CrudRepository;import com.example.api.domain.elasticsearch.ElasticsearchTrash;public interface ElasticsearchTrashRepository extends CrudRepository<ElasticsearchTrash, Integer>{}

定时任务 

package cn.netkiller.api.schedule;import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.rest.RestStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;import com.example.api.domain.elasticsearch.ElasticsearchTrash;
import com.example.api.repository.elasticsearch.ElasticsearchTrashRepository;@Component
public class ScheduledTasks {private static final Logger logger = LoggerFactory.getLogger(ScheduledTasks.class);@Autowiredprivate TransportClient client;@Autowiredprivate ElasticsearchTrashRepository alasticsearchTrashRepository;public ScheduledTasks() {}@Scheduled(fixedRate = 1000 * 60) // 60秒运行一次调度任务public void cleanTrash() {for (ElasticsearchTrash elasticsearchTrash : alasticsearchTrashRepository.findAll()) {DeleteResponse response = client.prepareDelete("information", "article", elasticsearchTrash.getId() + "").get();RestStatus status = response.status();logger.info("delete {} {}", elasticsearchTrash.getId(), status.toString());if (status == RestStatus.OK || status == RestStatus.NOT_FOUND) {alasticsearchTrashRepository.delete(elasticsearchTrash);}}}
}

Spring boot 启动主程序。 

package cn.netkiller.api;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;@SpringBootApplication
@EnableScheduling
public class Application {public static void main(String[] args) {SpringApplication.run(Application.class, args);}
}

本文转载自:

https://my.oschina.net/neochen/blog/1518679#comment-list

 

本人最近开了一个公众号,会讲一些常用的技术,以及面试题,欢迎关注

扫码关注,每天获取最前沿的互联网知识~

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/413568.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

今天读了JDK1.8源码,知道了并行迭代器Spliterator

在JDK1.8的ArrayList里面偶然看到了这个内部类&#xff0c;同时对比了1.7的版本&#xff0c;发现1.7并没有这后面的东西&#xff0c; 随着好奇心&#xff0c;就搜了一下下&#xff0c;发现很有意思~ 也查了一些资料&#xff0c;如下总结&#xff1a; Spliterator是什么&#…

牛客网挑战赛24 青蛙(BFS)

链接&#xff1a;https://www.nowcoder.com/acm/contest/157/E来源&#xff1a;牛客网 有一只可爱的老青蛙&#xff0c;在路的另一端发现了一个黑的东西&#xff0c;想过去一探究竟。于是便开始踏上了旅途 一直这个小路上有很多的隧道&#xff0c;从隧道的a进入&#xff0c;会从…

20.pipe

pipe相当于angular1里面的filter 做一些格式转换啊&#xff0c;或者从一个数组里面选取一个元素等等 只要你愿意可以定义很复杂的内容‘’ 我们先看看 angular2 里面自带的一些pipe 我们去我们的week3 下的problem-list下 我们到html里面 之前是这样的 之后是这样的 我们再写三…

Redis内部数据结构-跳跃表

今天学习了跳跃表&#xff0c;记录一下下~ 一、跳跃表简介 跳跃表是一种随机化数据结构&#xff0c;基于并联的链表&#xff0c;其效率可以比拟平衡二叉树&#xff0c;查找、删除、插入等操作都可以在对数期望时间内完成&#xff0c;对比平衡树&#xff0c;跳跃表的实现要简…

Mybatis源码学习笔记

Mybatis核心概念: Configuration : 管理 mysql-config.xml 全局配置关系类 SqlSessionFactory: Session 管理工厂接口 Session: SqlSession 是一个面向用户&#xff08;程序员&#xff09;的接口。SqlSession 中提供了很多操作数据库的方法 Executor : 执行器是一个接口…

JQData数据提取及MySQL简单操作——基于Python

JQData平台真的挺不错&#xff0c;平台数据可以免费使用一年&#xff0c;满足绝大多数人需求&#xff0c;具体账号获取请自行百度哟~ 因需要高频数据而Wind也只给近三年&#xff0c;再要还得购&#xff0c;&#xff0c;机缘遇到这一平台&#xff0c;获得了账号试用很不错&#…

JVM模型学习笔记

JVM由三个主要的子系统构成 1. 运行时数据区&#xff08;内存结构&#xff09;: 运行时数据区也是JVM的核心部分 内存数据区又分&#xff1a;堆、java栈、本地方法栈、程序计数器、方法区 1.1 本地方法栈(线程私有)&#xff1a; 登记native方法&#xff0c;在Execution Eng…

tomcat 设置虚拟路径的4种方法

通常使用方法1或者方法2 方法1 &#xff08;添加配置文件&#xff09;&#xff1a;推荐使用&#xff0c;不需要重启服务器 在Tomcat根目录下的/conf/Catalina/localhost/ 路径下新建一个filename.xml&#xff0c;并在该xml中编写语句 即可创建虚拟站点&#xff0c;虚拟站点名为…

sharding-sphere按月动态分表

公司有个记录表&#xff0c;每天有几百万的数据&#xff0c;所以我决定按月把他分下表。 用spring整合的。 首先&#xff0c;sharding-sphere不支持自动创建表&#xff0c;所以我提前创建了两年的表&#xff0c;命名规则 logicTableName _2019_06 以下是官方文档上面的分片…

大厂Java岗面试心得记录

最近裸辞&#xff0c;面了几家大厂&#xff0c;offer率高达100% 哈哈&#xff0c;然后发现选公司也是一件难事。 废话不多说&#xff0c;分享一下&#xff0c;我遇到的面试题&#xff0c;大概有以下这些&#xff1a; JVM: 1.JVM有哪些区域&#xff1f; 2.堆和栈分别说说内部东…

原生Js 两种方法实现页面关键字高亮显示

方法一 依靠正则表达式修改 1.获取obj的html 2.统一替换html标签 3.替换要修改的关键字 4.再把html标签修改回去 不足就是如果查找的关键字跟替换的标签一样就有冲突了 <!DOCTYPE HTML> <html lang"en"> <meta http-equiv"Content-Type" co…

SpringBoot启动报错java.nio.charset.MalformedInputException: Input length = 2解决方案

最近新搭的一个项目&#xff0c;启动时&#xff0c;会报MalformedInputException这个异常&#xff0c; 百度了很久&#xff0c;网上说的千篇一律&#xff0c; 有的说&#xff0c;把yml复制到txt再复制回来 有的说&#xff0c;设置eclipse的utf-8环境 这些根本没有根治这个问…

IDEA主题设置与eclipse代码风格一致

习惯了用eclipse的你&#xff0c;是不是转到用idea特别不习惯&#xff0c;没有关系&#xff0c; 我们可以把idea的代码风格改成eclipse的&#xff0c;快捷键也换成eclipse的。 下载这个jar包&#xff0c;导入settings文件即可 下载地址 : https://download.csdn.net/download…

浅谈分布式锁

概述 为了防止分布式系统中的多个进程之间相互干扰&#xff0c;我们需要一种分布式协调技术来对这些进程进行调度。而这个分布式协调技术的核心就是来实现这个分布式锁。 为什么要使用分布式锁 成员变量 A 存在 JVM1、JVM2、JVM3 三个 JVM 内存中 成员变量 A 同时都会在 JVM …

[css] 说说sroll-snap-type属性的运用场景有哪些?相关联的属性还有哪些?

[css] 说说sroll-snap-type属性的运用场景有哪些&#xff1f;相关联的属性还有哪些&#xff1f; 使用 sroll-snap-type 优化滚动 根据 CSS Scroll Snap Module Level 1 规范&#xff0c;CSS 新增了一批能够控制滚动的属性&#xff0c;让滚动能够在仅仅通过 CSS 的控制下&#…

简单的一个月之设计实现内容1

需求:简单的新闻管理系统,实现简单的增删改查功能 1.数据库表的建立 ID非空,数据类型看着给 2.写实体(entity)News.java 要与数据库中的字段相对应,(id,optimistic我没写,问题不大)1 package com.pay.boss.entity; //封装好的entity包,直接引用2 3 import java.util.Date; …

分享一个有趣的网站“让我帮你百度一下“

如何解决同事的弱智问题&#xff0c;分享一个有趣的网站 日常工作中&#xff0c;总有些人会问你一些弱智的问题 你只需要三步就可以完美解决: 1.打开这个链接: 让我帮你百度一下 2.输入他的问题、点回车 3.复制结果链接甩到他的脸上_ hahahhahahaha 这样就解决了一切烦恼&a…

SQL Server2014 SP2新增的数据库克隆功能

SQL Server2014 SP2新增的数据库克隆功能 原文:SQL Server2014 SP2新增的数据库克隆功能SQL Server2014 SP2新增的数据库克隆功能 创建测试库 --创建测试数据库 create database testtestuse testtest go --创建表 create table testtest(id int ,name varchar(20)) --插入数据…

工作328:uni-两个页面对象传递

getDetailList(record){console.log(record)uni.navigateTo({url:../formdaliyList/formdaliyList?recordencodeURIComponent(JSON.stringify(record))})},onLoad(e){/* JSON.parse() */let obj JSON.parse(decodeURIComponent(e.record));console.log(obj)},

vue-js 特殊变量$event常识

背景 如果我们要阻止默认事件&#xff0c;在 chrome 等浏览器中&#xff0c;我们可能要写一个&#xff1a; event.preventDefault(); 而在 IE 中&#xff0c;我们则需要写&#xff1a; event.returnValue false; jquery &#xff0c;跨浏览器的实现&#xff0c;我们统一只…