es数据频繁的更新_es之文档更新过程中并发冲突问题

1:乐观锁控制

ES是分布式的,也是异步并发的,我们的复制请求是并行发送的;这就意味着请求到达目的地的顺序是不可控制的,是乱序的;

如果是乱序的方式,很有可能出现这样的一个问题,新version的文档被旧version的文档覆盖掉—-数据丢失,或者直接抛异常;

TransportClient client = null;

@Before

public void testConn(){

try {

Settings settings = Settings.builder()

.put("cluster.name", "cluster_es").build();

client = new PreBuiltTransportClient(settings)

.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("hadoop01"), 9300));

System.out.println("========连接成功=============");

} catch (UnknownHostException e) {

e.printStackTrace();

}

}

/**

* upsert

* */

@Test

public void upsertDocument2() throws InterruptedException {

ExecutorService executorService = Executors.newFixedThreadPool(3);

for (int i = 0; i < 10; i++){

executorService.execute(new Thread1());

}

Thread.sleep(10000);

executorService.shutdown();

}

class Thread1 implements Runnable {

public void run() {

System.out.println("*************" + Thread.currentThread().getName() + " *************");

// 设置查询条件, 查找不到则添加

IndexRequest indexRequest = null;

try {

indexRequest = new IndexRequest("website", "blog", "1")

.source(XContentFactory.jsonBuilder()

.startObject()

.field("id", "1")

.endObject());

// 设置更新, 查找到更新下面的设置

UpdateRequest upsert = new UpdateRequest("website", "blog", "1")

.doc(XContentFactory.jsonBuilder()

.startObject()

.field("process_id", Thread.currentThread().getId())

.endObject())

.upsert(indexRequest);

client.update(upsert).get();

} catch (Exception e) {

e.printStackTrace();

}

}

}

@After

public void close(){

client.close();

}

所以在分布式异步并发场景中,需要一种方式:新版本的文档不会被旧版本的文档覆盖——【乐观锁】

Elasticsearch使用这个 _version 号来确保变更以正确顺序得到执行。如果旧版本的文档在新版本之后到达,它可以被简单的忽略。

我们可以利用 _version 号来确保 应用中相互冲突的变更不会导致数据丢失。我们通过指定想要修改文档的 version 号来达到这个目的。 如果该版本不是当前版本号,我们的请求将会失败。

新建一个文档,这个时候我们可以看到新文档的版本号_version=1:

PUT /website/blog/1/_create

{

"title" : "this is title" ,

"txt" : "just do it"

}

现在尝试通过重建文档索引来保存修改数据:

请求成功,并且响应体告诉我们 _version 已经递增到 2

PUT /website/blog/1?version=1

{

"title" : "this is test" ,

"txt" : "just do it"

}

然而,如果我们重新运行相同的索引请求,仍然指定 version=1 , Elasticsearch 返回 409 ConflictHTTP 响应码,和一个如下所示的响应体:

以上通过version的控制,可以让es在并行情况下操作而不出现丢失数据的现象,这种乐观锁的操作是比较常用的;

2:通过外部系统进行版本控制

上面我们讲到的是基于version进行版本的控制。在分布式环境下,只要version不同,那么修改就会报错;

而通过外部系统进行控制:version_type=external,只有当你提供的version比es中的_version大的时候,才能完成修改

_versionversion_type=external

只有_versioin相同,才会执行修改

只有当你提供的version比es中的_version大的时候,才能完成修改

例如,要创建一个新的具有外部版本号 5 的博客文章,我们可以按以下方法进行:

PUT /website/blog/2?version=5&version_type=external

{

"title": "My first external blog entry",

"text":  "Starting to get the hang of this..."

}

现在我们更新这个文档,指定一个新的 version 号是 10 :

PUT /website/blog/2?version=10&version_type=external

{

"title": "My first external blog entry",

"text":  "This is a piece of cake..."

}

version_type=external能够修改的条件就是:提供的版本号必须比_version大

如果此时插入版本号比现在的_version小的,就会报错:

3:重复提交retry_on_conflict

elasticsearch设计的目的就是多用户的海量数据操作;

那么可能存在这样场景:A进程接收到请求尝试去检索(retrieve)和重建索引(reindex)某个文档C,B进程也接收到请求检索(retrieve)和重建索引(reindex)文档C;

那么这个时候就会出现:其中一个进程提前修改了文档C,然后另一个进程在做检索的时候,因为_version改变了,所以匹配不到文档C,操作就会失败,然后数据丢失

这就是在并发操作的时候经常出现的现象;

解决:

对于多用户的更新操作,文档被修改了并不要紧,如果出现了匹配不到的现象,我们只要重新在操作一遍就可以了;所以需要使用关键字retry_on_conflict(默认0)

POST /website/pageviews/1/_update?retry_on_conflict=5

{

"script" : "ctx._source.views+=1",

"upsert": {

"views": 0

}

}

retry_on_conflict=5 代表如果出现失败,最大可以重复五次的update操作

5.7.6:悲观锁控制【无用】

类似传统数据库————mysql,在处理并发的时候,为了防止出现冲突的问题,就会使用悲观锁;

这种方法被关系型数据库广泛使用,它假定有变更冲突可能发生,因此阻塞访问资源以防止冲突。

一个典型的例子是读取一行数据之前先将其锁住,确保只有放置锁的线程能够对这行数据进行修改(想想java中的synchronize)。

5.7.6.1:全局锁(无用)

只允许一个线程进行执行更新操作,这样能够避免并发性问题,在es中,全局锁是将一份文档是否存在作为依据

获取一个全局锁:

PUT website/blog/1/_create

{}

这样就上锁了,然后使用java的多线程做测试,在里面修改数据

TransportClient client = null;

@Before

public void testConn(){

try {

Settings settings = Settings.builder()

.put("cluster.name", "cluster").build();

client = new PreBuiltTransportClient(settings)

.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("hadoop01"), 9300));

System.out.println("========连接成功=============");

} catch (UnknownHostException e) {

e.printStackTrace();

}

}

/**

* upsert

* */

@Test

public void upsertDocument2() throws InterruptedException {

ExecutorService executorService = Executors.newFixedThreadPool(1);//线程数为1是全局锁

for (int i = 0; i < 10; i++){

executorService.execute(new Thread1());

}

Thread.sleep(10000);

executorService.shutdown();

}

class Thread1 implements Runnable {

public void run() {

System.out.println("*************" + Thread.currentThread().getName() + " *************");

// 设置查询条件, 查找不到则添加

IndexRequest indexRequest = null;

try {

indexRequest = new IndexRequest("website", "blog", "1")

.source(XContentFactory.jsonBuilder()

.startObject()

.field("id", "1")

.endObject());

// 设置更新, 查找到更新下面的设置

UpdateRequest upsert = new UpdateRequest("website", "blog", "1")

.doc(XContentFactory.jsonBuilder()

.startObject()

.field("process_id", Thread.currentThread().getId())

.endObject())

.upsert(indexRequest);

client.update(upsert).get();

} catch (Exception e) {

e.printStackTrace();

}

}

}

@After

public void close(){

client.close();

}

如果另一个进行想同时在创建一个website/blog/1 就会抛异常

释放全局锁:

全局锁必须通过删除来释放:

DELETE website/blog/1

优点:操作非常简单,非常容易使用,成本低缺点:你直接就把整个index给上锁了,这个时候对index中所有的doc的操作,都会被block住,导致整个系统的并发能力很低

5.7.6.2:document文档锁(无用)

这种锁比全局锁的粒度小,因为全局锁是锁定整个index,那么文档所就是针对单个文档完成锁定

上锁的方式依赖groovy脚本:/config/scripts

vim documentLock.groovy 【脚本需要上传到所有节点】

if ( ctx._source.process_id != process_id ) { assert false }; ctx.op = 'noop';

脚本的意思:

如果当前传入的process_id和设定的process_id不一致,就抛异常assert false

如果一致的,返回'noop'

插入一个文档:

PUT website/blog/1

{

"id" : 1,

"process_id" : 234

}

对当前文档上文档锁:

POST /website/blog/1/_update

{

"upsert": { "process_id": 234 },

"script": {

"lang": "groovy",

"file": "documentLock",

"params": {

"process_id": 234

}

}

}

注意,当前设定的"process_id": 234,如果此时换一个"process_id" : 123,那么就会抛异常:assert false

比如:

POST /website/blog/1/_update

{

"upsert": { "process_id": 123 },

"script": {

"lang": "groovy",

"file": "documentLock",

"params": {

"process_id": 123

}

}

}

注意:如果传入的是"process_id": 234,传入正确参数,直接返回ctx.op = 'noop'

POST /website/blog/1/_update

{

"upsert": { "process_id": 234 },

"script": {

"lang": "groovy",

"file": "documentLock",

"params": {

"process_id": 234

}

}

}

如何释放悲观锁 , 删除对应的process_id数据即可:

DELETE website/blog/1

{

"query": {

"term": {

"process_id": 234

}

}

}

文档级锁可以实现细粒度的访问控制,但是当文档数量达到百分甚至上千万的时候,这种方式开销是比较昂贵的

5.7.6.3:共享锁和排它锁(无用)

共享锁:数据是共享的,多个线程可以获取同一个数据的共享锁,然后对这个数据执行读操作 排它锁:只能有一个线程获取排它锁,然后执行更新操作

在config/scripts下 vim gongxiang_paita.groovy

if (ctx._source.lock_type == 'exclusive') {

assert false

} else {

ctx._source.lock_count++

}

脚本意思:

如果其他线程共享:ctx._source.lock_count++

POST /website/blog/1/_update

{

"upsert": {

"lock_type":  "shared",

"lock_count": 1

},

"script": {

"lang": "groovy",

"file": "gongxiang_paita"

}

}

如果其他线程添加排他锁'exclusive',那么抛异常:

(1):将共享share标记修改成排他exclusive标记

POST /website/blog/1/_update

{

"doc" : {

"lock_type": "exclusive"

}

}

(2):修改成排他标记后,在尝试共享修改操作,报错

POST /website/blog/1/_update

{

"upsert": {

"lock_type":  "shared",

"lock_count": 1

},

"script": {

"lang": "groovy",

"file": "gongxiang_paita"

}

}

如何释放锁:

Vim unlock.groovy

if (ctx._source.lock_type == "shared") {ctx._source.lock_count --};

if (ctx._source.lock_count == 0) { ctx.op = 'delete' };

脚本意思:

ctx._source.lock_type == "shared" 则lock_count—

当lock_count == 0,那么删除/website/blog/1

(1):GET website/blog/1 查看一下,当前是共享锁还是排它锁;

(2): 如果是排他锁,需要修改会共享锁

POST /website/blog/1/_update

{

"doc" : {

"lock_type": "shared"

}

}

(3):释放共享锁

POST /website/blog/1/_update

{

"upsert": {

"lock_type":  "shared",

"lock_count": 1

},

"script": {

"lang": "groovy",

"file": "unlock"

}

}

这样就释放了共享锁;

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/344596.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【渝粤教育】广东开放大学 网络编程技术 形成性考核 (57)

选择题 题目&#xff1a;在JavaScript中&#xff0c;运行以下代码&#xff1a; var flagtrue; document .write(typeof(flag)); 值是( )。 题目&#xff1a;下面的代码( )能在页面中弹出提示框&#xff0c;并且输入框中默认无任何内容 题目&#xff1a;在JavaScript中&#xff…

【渝粤教育】广东开放大学 金融学 形成性考核 (37)

选择题 题目&#xff1a;金融体系中居于联接宏微观的纽带和运作核心地位的是&#xff08; &#xff09; 题目&#xff1a;对居民盈余与赤字的管理选择&#xff0c;说法正确的是&#xff08; &#xff09; 题目&#xff1a;从形式上看&#xff0c;收入可分为&#xff08; &a…

面试题目_总结面试中 promise 相关题目的套路

Promise 作为当下主流的异步解决方案&#xff0c;在工作中和面试中常常出现&#xff0c;尤其是在面试中&#xff0c;会弄个场景让你手写代码&#xff0c;这里给大家介绍五道比较有代表性的题目&#xff0c;以便熟悉一些套路。promise 简单介绍先简单介绍下 PromisePromise 对象…

javafx 自定义控件_JavaFX自定义控件– Nest Thermostat第1部分

javafx 自定义控件几周前&#xff0c;由于Hendrik Ebbers的出色文章 &#xff0c;我决定花一些时间观看有关JavaFX的JavaOne讨论。 我不得不说我已经学到了很多东西&#xff0c;只是看这些视频&#xff08;即使我还没有完成&#xff09;&#xff01; Gerrit的“使用力&#xf…

【渝粤教育】电大中专Office办公软件 (4)作业 题库

1.以下软件不属于系统软件的是&#xff08; &#xff09;。 A.Visual Studio 2019 B.MySQL 5.7 C.Windows 10 D.Office 2016 错误 正确答案&#xff1a;左边查询 学生答案&#xff1a;未作答 2.学习好Office办公软件这门课程&#xff0c;应该&#xff08; &#xff09;。 A.重视…

【渝粤教育】电大中专中药学基础作业 题库

试卷答案 1.首创按药物自然属性进行分类的本草著作是&#xff08;&#xff09;。 A.《神农本草经》 B.《本草经集注》 C.《本草纲目》 D.《新修本草》 E.《本草拾遗》 正确 正确答案&#xff1a;左边查询 学生答案&#xff1a;B 2.载药数最多的本草著作是&#xff08;&#xff…

不等号属于不等式吗_考研专业课备考时,仅仅多刷几遍目标院校的期末考试题就够吗?...

考研专业课备考时&#xff0c;仅仅多刷几遍目标院校的期末考试题就够吗&#xff1f;也许这要看各专业情况&#xff0c;部分专业的考研题和本科生的期末考试题难度类似&#xff0c;比如说人文社科类的专业&#xff0c;这也是我的猜测情况。大部分专业的专业课题目难度&#xff0…

使用Spring Boot和H2可以完全工作的原型

我们确实在弹簧上使用了很多h2&#xff0c;特别是对于单元测试。 但是&#xff0c;我们可能希望拥有一个功能齐全的原型来显示数据&#xff0c;而不是进行单元测试。 H2是最理想的选择&#xff0c;它在spring上运行良好&#xff0c;与大多数数据库都具有良好的语法兼容性&…

【渝粤教育】电大中专品牌管理与推广 (2)作业 题库

1通常&#xff0c;对品牌的排他专有性的保护手段主要是注册商标、申请专利、授权经营&#xff0c;等等。该说法&#xff08;&#xff09; A正确 B错误 正确 正确答案&#xff1a;左边查询 学生答案&#xff1a;A 2品牌服务是以服务而不是以产品为主要特征的品牌&#xff0c;如商…

【渝粤教育】电大中专学前教育学作业 题库

1学前教育的孕育阶段的时间定位于&#xff08;&#xff09; A21世纪 B16世纪以前 C远古时期 D18世纪 错误 正确答案&#xff1a;左边查询 学生答案&#xff1a;A 2我国封建社会第一部完整的家庭教科书《颜氏家训》出自&#xff08;&#xff09; A陶行知 B昆体良 C颜之推 D朱熹 …

饿了吗商品列表_仅仅一字之差,饿了么起诉饿了吗

饿了么与“饿了吗”&#xff0c;仅仅一字之差&#xff0c;相信不少人乍看会以为是一家。但近日公开的一则判决书显示&#xff0c;因为太近似&#xff0c;二者曾对簿公堂。饿了吗公司以败诉收场&#xff0c;被判处立即变更其企业名称&#xff0c;变更后企业名称中不得含有与“饿…

【渝粤教育】电大中专市场营销管理 (2)作业 题库

1企业的内部环境不包括&#xff08;&#xff09; A企业的生产能力 B财务能力 C社会文化环境 D企业在公众中的形象 错误 正确答案&#xff1a;左边查询 学生答案&#xff1a;A 2企业营销战略规划不包括哪一步骤&#xff08;&#xff09; A确定企业的任务与目标 B选择合宜的市场机…

pandas 遍历并修改_Pandas循环提速7万多倍!Python数据分析攻略

乾明 编译整理 量子位 报道 | 公众号 QbitAI用Python和Pandas进行数据分析&#xff0c;很快就会用到循环。但在这其中&#xff0c;就算是较小的DataFrame&#xff0c;使用标准循环也比较耗时。遇到较大的DataFrame时&#xff0c;需要的时间会更长&#xff0c;会让人更加头疼。现…

spring消息队列_AmazonSQS和Spring用于消息传递队列

spring消息队列下一篇文章将演示如何将Spring JMS模板和DLMC与AmazonSQS API一起使用&#xff0c;以放置消息队列。 我为什么要使用Amazon SQS&#xff1f; 易于配置 跨平台支持 从您的自我冗余&#xff0c;连带和扩展方面的烦恼中赚钱。 为什么我不使用Amazon SQS&#x…

教程:用Java创建和验证JWT

“我喜欢编写身份验证和授权代码。” 〜从来没有Java开发人员。 厌倦了一次又一次地建立相同的登录屏幕&#xff1f; 尝试使用Okta API进行托管身份验证&#xff0c;授权和多因素身份验证。 Java对JWT&#xff08;JSON Web令牌&#xff09;的支持过去需要进行大量工作&#xf…

python用递归法写斐波那契_python实现斐波那契数列: 递归+备忘录法+动态规划实现...

1.为什么备忘录法和动态规划法&#xff1a;斐波那契是很多人入门递归思想的第一课&#xff0c;所以很多人都会最简单的一种递归写法&#xff0c;但是其实递归的过程&#xff0c;他的时间复杂度非常高&#xff0c;达到了O(2的n次方)这样的一个指数级别。先看最简单的&#xff1a…

【渝粤教育】电大中专电商运营实操 (2)_1作业 题库

1.电子商务最重要的是&#xff08;&#xff09; A.商务 B.网站 C.货物 D.信息技术 正确 正确答案&#xff1a;左边查询 学生答案&#xff1a;A 2.目前菜鸟网络依赖大数据和云计算已实现了哪些功能&#xff08;&#xff09; A.自动化仓库 B.智能发货 C.物流云加速 D.以上都正确 …

q7goodies事例_Java 8 Friday Goodies:java.io终于成功了!

q7goodies事例在Data Geekery &#xff0c;我们喜欢Java。 而且&#xff0c;由于我们真的很喜欢jOOQ的流畅的API和查询DSL &#xff0c;我们对Java 8将为我们的生态系统带来什么感到非常兴奋。 我们已经写了一些关于Java 8好东西的博客 &#xff0c;现在我们觉得是时候开始一个…

python人脸识别环境搭建_Win10:Python3.6安装face_recognition人脸识别库

face_recognition简介face_recognition是Python的一个开源人脸识别库&#xff0c;支持Python 3.3和Python 2.7。引用官网介绍&#xff1a;Recognize and manipulate faces from Python or from the command line with the worlds simplest face recognition library.安装配置我…

【渝粤教育】电大中专电子商务网站建设与维护 (11)作业 题库

1.目前&#xff0c;阿里巴巴集团旗下主要交易市场不包括哪个&#xff08; &#xff09; A.中国批发交易平台 B.全球批发交易平台 C.中国交易市场 D.国际交易市场 错误 正确答案&#xff1a;左边查询 学生答案&#xff1a;未作答 2.阿里巴巴是于1999年创立的&#xff08; &#…