ConcurrentHashMap第1讲——哪些地方做了并发控制

我们知道在多线程环境下,HashMap在初始化桶数组、put桶、插入链表以及树化等阶段都有线程安全问题,在jdk1.5之前我们通常用HashTableCollections.synchronizedMap包装过的HashMap来保证线程安全,不过它们在执行任何操作时都需要锁住整个hash表,这显著的限制了并发性能,所以在jdk1.5我们的并发大神Doug Lea设计一个高性能且线程安全的集合——ConcurrentHashMap。

ps:本文默认是jdk1.8版本,如有别的版本会强调

那么ConcurrentHashMap在哪些地方做了并发控制呢?

一、put操作的并发控制

ConcurrentHashMap的工作原理与HashMap类似,同样也是在jdk1.8前后区别较大,这里只讨论保证线程安全的实现措施:

  • jdk1.7:使用了分段锁技术,将哈希表分成多个段,每个段拥有一个独立的锁,这样可以在多个线程访问哈希表时,只需要锁住需要操作的那个段,而不是整个哈希表,从而提高了并发性能,锁用的是ReentranLock。

  • jdk1.8:对1.7的实现方式进行了改进,使用节点锁的思想,即采用“cas+synchronized”的机制来保证线程安全。在1.8中,如果某个桶/段没有元素,那么使用CAS操作来添加新节点,如果有元素,则使用synchronized锁住当前桶/段,再次尝试put。这样可以避免分段锁机制下的锁粒度太大,再次提高了并发性能。

下面是jdk1.7中分段锁的实现:

static final class Node<K,V> implements Map.Entry<K,V> {final int hash;final K key;volatile V val;volatile Node<K, V> next;
​Node(int hash, K key, V val, Node<K, V> next) {this.hash = hash;this.key = key;this.val = val;this.next = next;}//...
}
static final class Segment<K,V> extends ReentrantLock implements Serializable {private static final long serialVersionUID = 2249069246763182397L;transient volatile HashEntry<K,V>[] table;transient int count;transient int modCount;transient int threshold:final float loadFactor;
}

我们可以看到,每个Segment都是ReentrantLock的实现,每个Segment包含一个HashEntry数组,每个HashEntry则包含一个k-v键值对。

接下来再看下jdk 1.8中“cas+synchronized”机制的代码实现:

final V putVal(K key, V value, boolean onlyIfAbsent) {if (key == null || value == null) throw new NullPointerException();//扰动计算,和HashMap一样int hash = spread(key.hashCode());int binCount = 0;//进入循环for (Node<K,V>[] tab = table;;) {Node<K,V> f; int n, i, fh;
​if (tab == null || (n = tab.length) == 0){//也是懒加载,如果table为null或长度为0,则进行初始化tab = initTable();}
​else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {//当前索引位置没有元素,则通过CAS操作尝试插入新节点if (casTabAt(tab, i, null,new Node<K,V>(hash, key, value, null)))break;                   // no lock when adding to empty bin}else if ((fh = f.hash) == MOVED){//说明当前索引位置有元素,且hash值为MOVED,表示正在进行扩容。//帮助迁移数据tab = helpTransfer(tab, f);} else {//ps:到这步则进行链表/红黑树的节点遍历和插入操作V oldVal = null;//加锁,确保只有一个线程操作改桶位置的链表或红黑树synchronized (f) {if (tabAt(tab, i) == f) {if (fh >= 0) {//遍历链表,找到相同的key的节点,更新值或插入新节点binCount = 1;for (Node<K,V> e = f;; ++binCount) {K ek;if (e.hash == hash &&((ek = e.key) == key ||(ek != null && key.equals(ek)))) {oldVal = e.val;if (!onlyIfAbsent)e.val = value;break;}Node<K,V> pred = e;if ((e = e.next) == null) {//将新节点插入到链表末尾(尾插法)pred.next = new Node<K,V>(hash, key,value, null);break;}}}//遍历红黑树,找到相同的key的节点,更新至或插入新节点else if (f instanceof TreeBin) {Node<K,V> p;binCount = 2;if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,value)) != null) {oldVal = p.val;if (!onlyIfAbsent)p.val = value;}}}}if (binCount != 0) {//插入或更新值成功,判断是否需要进行树化操作if (binCount >= TREEIFY_THRESHOLD)treeifyBin(tab, i);if (oldVal != null)return oldVal;break;}}}//累加元素个数,传入添加的数量1,以及链表节点个数,用于控制是否执行扩容操作addCount(1L, binCount);return null;
}

从上述代码可以看出,如果某个桶/段为空,那么使用CAS操作来添加新节点,如果第一个节点的hash为MOVED,表示当前段/桶正在进行扩容操作,那么就调用helpTransfer方法来协助扩容;否则,使用synchronized锁住当前段/桶,然后进行节点的添加操作。

二、初始化阶段的并发控制

先来了解一个重要的属性:

/*** 用来控制表初始化和扩容,默认值为0,当在初始化的时候指定了大小,这会将这个大小保存在sizeCtl中,大小为数组的0.75* 当为负的时候,说明表正在初始化或扩张,*0:默认状态,表示数组还没有被初始化。*-1:初始化数组*-(1+n):n:表示活动的扩张线程*sizeCtl>0:记录下一次需要扩容的大小。为3/4数组最大长度,相当于扩容的阈值*/
private transient volatile int sizeCtl;

此处采用CAS操作,如果此时没有线程初始化,则去初始化,否则当前线程让出cpu时间片,等待下一次唤醒,源码如下:

private final Node<K,V>[] initTable() {Node<K,V>[] tab; int sc;while ((tab = table) == null || tab.length == 0) {//ps:进行初始化操作if ((sc = sizeCtl) < 0){//sizeCtl为负数,说明有线程在初始化//临时放弃cpu,让给优先级比自己高的线程或相同的线程Thread.yield(); // lost initialization race; just spin} else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {//尝试把SIZECTL修改为-1(尝试获取锁)//ps:获取锁成功,进行初始化操作try {if ((tab = table) == null || tab.length == 0) {int n = (sc > 0) ? sc : DEFAULT_CAPACITY;@SuppressWarnings("unchecked")Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];table = tab = nt;//sc=n*3/4sc = n - (n >>> 2);}} finally {//初始化后,sizeCtl长度为数组长度的3/4sizeCtl = sc;}break;}}return tab;
}

三、扩容阶段的并发控制(transfer())

在扩容阶段,ConcurrentHashMap并没有一味的通过CAS或锁区限制多线程,而是通过多线程来加速扩容。

在分析之前,我们需要知道两件事:

  • ConcurrentHashMap通过ForwardingNode来记录当前桶是否被迁移,如果old Table[i] instanceOf ForwardingNode则说明处于i节点的桶已经被移动到newTable中了。它里面有一个nextTable变量,指向的是下一次扩容后的table。

  • transferIndex记录了当前扩容的桶索引,最开始为oldTable.length,它给下一个线程指定了要扩容的节点。

下面是大致的扩容流程:

  • 通过CPU核数为每个线程计算划分任务,每个线程最少的任务是迁移16个桶。

  • 将当前桶扩容的索引transferIndex赋值给当前线程,如果索引<=0,则说明扩容完毕,结束流程,否则

  • 再将当前线程扩容后的索引赋值给transferIndex,比如如果transferIndex原来是32,那么赋值之后transferIndex应该变为16,这样下一个线程就可以从16开始扩容。通过CAS进行设置,不会有并发问题。

  • 之后就可以对真正的扩容流程进行加锁操作了。

源码如下: 

private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {//n:数组长度  stride:每个处理多少任务int n = tab.length, stride;//多喝处理n>>>3/核心数个任务,最少处理16个任务if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)stride = MIN_TRANSFER_STRIDE; // subdivide range
​if (nextTab == null) {            // initiating//ps:表示第一个线程进此方法try {//扩容为原数组1倍@SuppressWarnings("unchecked")Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];nextTab = nt;} catch (Throwable ex) {      // try to cope with OOMEsizeCtl = Integer.MAX_VALUE;return;}nextTable = nextTab;transferIndex = n;}int nextn = nextTab.length;//创建一个fwd节点,用于控制并发。当一个节点为空或已被转移后,接设置fwd节点,表示处于move状态ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);//是否继续向前查找boolean advance = true;//在完成之前重新再扫描一遍数组,看看有没完成的没boolean finishing = false; // to ensure sweep before committing nextTabfor (int i = 0, bound = 0;;) {Node<K,V> f; int fh;// 执行数据迁移// 且重新非陪transferIndex的值,用于不停向前推进更新迁移数据while (advance) {int nextIndex, nextBound;if (--i >= bound || finishing)advance = false;else if ((nextIndex = transferIndex) <= 0) {i = -1;advance = false;}else if (U.compareAndSwapInt(this, TRANSFERINDEX, nextIndex,nextBound = (nextIndex > stride ?nextIndex - stride : 0))) {bound = nextBound;i = nextIndex - 1;advance = false;}}//数据迁移完成的后置处理。包括重新检查一遍迁移数据以及归还线程。if (i < 0 || i >= n || i + n >= nextn) {int sc;if (finishing) {nextTable = null;table = nextTab;sizeCtl = (n << 1) - (n >>> 1);return;}if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)return;finishing = advance = true;i = n; // recheck before commit}}
​else if ((f = tabAt(tab, i)) == null)//把数组中null的元素的hash值置为MOVED,让循环体处理下一个节点,后续辅助线程发现节点为MOVE则会直接跳过advance = casTabAt(tab, i, null, fwd);else if ((fh = f.hash) == MOVED)//表示已有线程再处理,让循环体处理下一个节点advance = true; // already processedelse {//锁住节点,开始真正的扩容流程synchronized (f) {if (tabAt(tab, i) == f) {Node<K,V> ln, hn;if (fh >= 0) {//ps:说明是node节点int runBit = fh & n;Node<K,V> lastRun = f;for (Node<K,V> p = f.next; p != null; p = p.next) {int b = p.hash & n;if (b != runBit) {runBit = b;lastRun = p;}}if (runBit == 0) {ln = lastRun;hn = null;}else {hn = lastRun;ln = null;}//前面的节点不确定高低位,所以遍历f~LastRun范围的所有节点//分别逆序存入Ln或hn链表中for (Node<K,V> p = f; p != lastRun; p = p.next) {int ph = p.hash; K pk = p.key; V pv = p.val;if ((ph & n) == 0)ln = new Node<K,V>(ph, pk, pv, ln);elsehn = new Node<K,V>(ph, pk, pv, hn);}//存入之前的位置setTabAt(nextTab, i, ln);//存入改变后的位置setTabAt(nextTab, i + n, hn);//设置fwd,这样其它线程执行的时候,会跳过去setTabAt(tab, i, fwd);advance = true;}else if (f instanceof TreeBin) {//ps:红黑树的处理//...省略}}}}}
}

End:希望对大家有所帮助,如果有纰漏或者更好的想法,请您一定不要吝啬你的赐教🙋。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/47350.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

UNIX中sigaction和sigevent有啥区别

sigaction和sigevent在UNIX和类UNIX系统&#xff08;如Linux&#xff09;的信号处理机制中扮演着不同的角色。 sigaction 功能&#xff1a; sigaction是一个用于查询或设置信号处理方式的函数。它允许进程为特定的信号指定一个信号处理函数&#xff0c;以及设置信号处理时的其…

MyPostMan 迭代文档管理、自动化接口闭环测试工具(自动化测试篇)

MyPostMan 是一款类似 PostMan 的接口请求软件&#xff0c;按照 项目&#xff08;微服务&#xff09;、目录来管理我们的接口&#xff0c;基于迭代来管理我们的接口文档&#xff0c;文档可以导出和通过 url 实时分享&#xff0c;按照迭代编写自动化测试用例&#xff0c;在不同环…

kubernetes--Istio(四)

一、可观测性 Istio 为网格内所有的服务通信生成详细的遥测数据。这种遥测技术提供了服务行为的可观测性&#xff0c; 使运维人员能够排查故障、维护和优化应用程序&#xff0c;而不会给服务的开发人员带来任何额外的负担。 通过 Istio&#xff0c;运维人员可以全面了解到受监…

【React】创建React项目:使用 create-react-app 创建 React 应用

在本文中&#xff0c;我们将介绍如何使用 create-react-app 创建一个名为 react-basic 的 React 应用。以下步骤将帮助你快速搭建一个新的 React 项目。 1. 确保已安装 Node.js 和 npm 在开始之前&#xff0c;确保你的系统上已经安装了 Node.js 和 npm&#xff08;Node 包管理…

Burp安全扫描Web应用

一、浏览器设置代理 如下图所示&#xff0c;点击火狐浏览器的“扩展和主题”&#xff0c;搜索“代理”。 如下图所示&#xff0c;选择搜索到的第一个代理&#xff08;选择任何一个都可以&#xff09;。 如上图所示&#xff0c;第一个点击后&#xff0c;进入如下页面&#xff0…

在 Electron 中,主进程和渲染进程之间有多种通信方式

在 Electron 中&#xff0c;主进程和渲染进程之间有多种通信方式。以下列出了其中几种&#xff1a; 1. ipcMain 和 ipcRenderer&#xff1a;使用主进程和渲染进程之间的 Electron 网络协议 (ipc) 模块来发送事件和消息。这是一种双向通信的方式&#xff0c;可以实现消息的传递…

opencv—常用函数学习_“干货“_11

目录 二九、图像累加 将输入图像累加到累加图像中 (accumulate) 将输入图像加权累加到累加图像中 (accumulateWeighted) 将输入图像的平方累加到累加图像中 (accumulateSquare) 将两个输入图像的乘积累加到累加图像中 (accumulateProduct) 解释 三十、随机数与添加噪声 …

【Access、Trunk和Hybrid】

概述 Access类型的端口只能属于1个VLAN&#xff0c;一般用于连接计算机的端口&#xff1b;Trunk类型的端口可以允许多个VLAN通过&#xff0c;可以接收和发送多个VLAN的报文&#xff0c;一般用于交换机之间连接的端口&#xff1b;Hybrid类型的端口可以允许多个VLAN通过&#xf…

自己编写一个谷歌浏览器插件, 模拟某音直播间自动发消息

闲来没事&#xff0c; 做个插件玩一玩&#xff0c;于是一顿学习。 按照浏览器插件规范&#xff0c;一顿代码编写&#xff0c; 搞了一个简单的插件。仅做学习。 可以实现在直播间自动发消息。 定时轮发。 实现原理&#xff1a; 利用谷歌popub.js 发送消息。 在content-script.…

leetcode-49. 字母异位词分组

题目描述 给你一个字符串数组&#xff0c;请你将 字母异位词 组合在一起。可以按任意顺序返回结果列表。 字母异位词 是由重新排列源单词的所有字母得到的一个新单词。 示例 1: 输入: strs ["eat", "tea", "tan", "ate", "n…

g2plot如何实现面积图和折线图的动态切换?

一开始的时候显示的是面积图&#xff1a; 当我点击折线图的时候&#xff0c;要变成折线图&#xff1a; 当我再点击面积图的时候&#xff0c;还要变回面积图&#xff1a; 要实现这个功能&#xff0c;得知道g2plot几个重要的API。 参考文档如下&#xff1a;https://g2plot…

【python】网络通信socket

一、什么是socket socket网络通信是一种基于TCP/IP协议的通信方式&#xff0c;通过套接字&#xff08;Socket&#xff09;实现不同主机间的通信。它基于客户端-服务器架构&#xff0c;客户端和服务器通过Socket进行连接、通信和数据交换。在网络中&#xff0c;进程之间如何通信…

防火墙之双机热备篇

为什么要在防火墙上配置双机热备技术呢&#xff1f; 相信大家都知道&#xff0c;为了提高可靠性&#xff0c;避免单点故障 肯定有聪明的小伙伴会想到那为什么不直接多配置两台防火墙&#xff0c;然后再将他们进行线路冗余&#xff0c;不就完成备份了吗&#xff1f; 答案是不…

JDK、JRE、JVM的区别java的基本数据类型

说一说JDK、JRE、JVM的区别在哪&#xff1f; JDK&#xff1a; Java Delopment kit是java工具包&#xff0c;包含了编译器javac&#xff0c;调试器&#xff08;jdb&#xff09;以及其他用于开发和调试java程序的工具。JDK是开发人员在开发java应用程序时候所需要的的基本工具。…

海外社媒矩阵为何会被关联?如何IP隔离?

在当今的数字时代&#xff0c;社交媒体已经成为人们日常生活中不可或缺的一部分。通过社交媒体&#xff0c;人们可以与朋友互动&#xff0c;分享生活&#xff0c;甚至进行业务推广和营销。然而&#xff0c;社交媒体账号关联问题逐渐受到广泛关注。社交媒体账号为何会关联&#…

问题清除指南|成功解决pipmatplotlib因为ConnectTimeoutError更新失败问题

前言&#xff1a;跑baseline需要升级matplotlib和pip&#xff0c;在此记录一个错误和一个「别致」的解决方案。 北京时间 14:00 左右&#xff0c;在终端环境中运行命令python -m pip install --upgrade pip&#xff0c;报错&#xff1a; 多次尝试&#xff0c;未果。 隔天上午 0…

Elasticsearch 企业级实战 01:Painless 脚本如何调试?

在企业级应用中&#xff0c;Elasticsearch 常常被用来处理复杂的数据查询和操作。 Painless 是 Elasticsearch 的内置脚本语言&#xff0c;虽然强大&#xff0c;但调试起来并不容易。 本文将详细介绍如何在实战中有效调试 Painless 脚本&#xff0c;以提高开发和运维效率。 本文…

2.javaWeb_请求和响应的处理(Request,Response)

2.请求和响应的处理 文章目录 2.请求和响应的处理一、动态资源和静态资源javax.servlet(包) 二、Servlet体系1.简介2.HttpServlet3.Servlet生命周期 三、Request对象1.ServletRequest1)ServletRequest主要功能有&#xff1a;2)ServletRequest类的常用方法: 2.HttpServletReques…

通过SchedulingConfigurer 接口完成动态定时任务

通过SchedulingConfigurer 接口完成动态定时任务 一.背景 在Spring中&#xff0c;除了使用Scheduled注解外&#xff0c;还可以通过实现SchedulingConfigurer接口来创建定时任务。它们之间的主要区别在于灵活性和动态性。Scheduled注解适用于固定周期的任务&#xff0c;一旦任…