JUC-并发编程-06-HashMap与CurrentHashMap

    技术2025-04-17  10

    本期学习hashMap与currentHashMap一个是我们在单线程的常用的集合框架,但是对于多线程的情况下就不在适用,所以juc下,有个CurrentHashMap。两个比较学习。后面再做个总结。本次学习是基于1.8

    1、HashMap

    1.1 数据结构

    1.1.1 继承关系

    是Map接口的具体实现。

    1.1.2 属性介绍

    //默认初始容量-必须为2的幂 static final int DEFAULT_INITIAL_CAPACITY = 1 << 4; //最大容量,如果某个具有参数的构造函数隐式指定了更高的值,则使用该值。必须是2的幂<=1<<30。 static final int MAXIMUM_CAPACITY = 1 << 30; //在构造函数中未指定时使用的负载系数。默认加载因子。 static final float DEFAULT_LOAD_FACTOR = 0.75f; //当表未膨胀时要共享的空表实例。 static final Entry<?,?>[] EMPTY_TABLE = {}; //该表,根据需要调整大小。长度必须始终为2的幂。 ransient Entry<K,V>[] table = (Entry<K,V>[]) EMPTY_TABLE; //此映射中包含的键-值映射数。大小 transient int size;

    1.2 构造函数

    1.2.1 自定义大小和加载因子

    public HashMap(int initialCapacity, float loadFactor) { if (initialCapacity < 0) throw new IllegalArgumentException("Illegal initial capacity: " + initialCapacity); if (initialCapacity > MAXIMUM_CAPACITY) initialCapacity = MAXIMUM_CAPACITY; if (loadFactor <= 0 || Float.isNaN(loadFactor)) throw new IllegalArgumentException("Illegal load factor: " + loadFactor); this.loadFactor = loadFactor; this.threshold = tableSizeFor(initialCapacity); }

    1.2.2 自定义大小

    public HashMap(int initialCapacity) { this(initialCapacity, DEFAULT_LOAD_FACTOR); }

    1.2.3 构建默认HashMap

    public HashMap() { this.loadFactor = DEFAULT_LOAD_FACTOR; // all other fields defaulted }

    1.2.4 构造新的HashMap并继承老的Map的数据

    public HashMap(Map<? extends K, ? extends V> m) { this.loadFactor = DEFAULT_LOAD_FACTOR; putMapEntries(m, false); }

    使用与指定映射相同的映射构造新的HashMap。HashMap是使用默认加载因子(0.75)和足以在指定映射中保存映射的初始容量创建的。

    1.3 方法

    1.3.1 put()

    将指定值与该映射中的指定键相关联。 如果映射先前包含键的映射,则替换旧值。 

    参数介绍如下:

    hash(key)对key进行hash运算得到一个hash值key 键值valueonlyIfAbsent 如果为true,则不要更改现有值如果为false,则表处于创建模式。

    调用putVal(),代码如下:

    final V putVal(int hash, K key, V value, boolean onlyIfAbsent, boolean evict) { Node<K,V>[] tab; Node<K,V> p; int n, i; // 判断数组是否为空,长度是否为0,是则进行扩容数组初始化 if ((tab = table) == null || (n = tab.length) == 0) n = (tab = resize()).length; // 通过hash算法找到数组下标得到数组元素,为空则新建 if ((p = tab[i = (n - 1) & hash]) == null) tab[i] = newNode(hash, key, value, null); else { Node<K,V> e; K k; // 找到数组元素,hash相等同时key相等,则直接覆盖 if (p.hash == hash && ((k = p.key) == key || (key != null && key.equals(k)))) e = p; // 该数组元素在链表长度>8后形成红黑树结构的对象,p为树结构已存在的对象 else if (p instanceof TreeNode) e = ((TreeNode<K,V>)p).putTreeVal(this, tab, hash, key, value); else { // 该数组元素hash相等,key不等,同时链表长度<8.进行遍历寻找元素,有就覆盖无则新建 for (int binCount = 0; ; ++binCount) { if ((e = p.next) == null) { // 新建链表中数据元素,尾插法 p.next = newNode(hash, key, value, null); if (binCount >= TREEIFY_THRESHOLD - 1) // -1 for 1st // 链表长度>=8 结构转为 红黑树 treeifyBin(tab, hash); break; } if (e.hash == hash && ((k = e.key) == key || (key != null && key.equals(k)))) break; p = e; } } // 新值覆盖旧值 if (e != null) { // existing mapping for key V oldValue = e.value; // onlyIfAbsent默认false if (!onlyIfAbsent || oldValue == null) e.value = value; afterNodeAccess(e); return oldValue; } } ++modCount; // 判断是否需要扩容 if (++size > threshold) resize(); afterNodeInsertion(evict); return null; }

    如上代码总结分为几个步骤:

    检查数组是否为空,执行resize()扩充;在实例化HashMap时,并不会进行初始化数组。

    通过hash值计算数组索引,获取该索引位的首节点。

    如果首节点为null(没发生碰撞),则创建新的数组元素,直接添加节点到该索引位(bucket)。

    如果首节点不为null(发生碰撞),那么有3种情况

    key和首节点的key相同,覆盖old value(保证key的唯一性);否则执行②或③

    如果首节点是红黑树节点(TreeNode),将键值对添加到红黑树。

    如果首节点是链表,进行遍历寻找元素,有就覆盖无则新建,将键值对添加到链表。添加之后会判断链表长度是否到达TREEIFY_THRESHOLD - 1这个阈值,“尝试”将链表转换成红黑树。

    最后判断当前元素个数是否大于threshold阀值,扩充数组。

    整体流程图如下:

    综上所述,hashMap的数据结构就是有数组+链表+红黑树

    数据结构如下图:

    1.3.2 扩容机制

    final Node<K,V>[] resize() { Node<K,V>[] oldTab = table; int oldCap = (oldTab == null) ? 0 : oldTab.length; int oldThr = threshold; int newCap, newThr = 0; if (oldCap > 0) { // 如果已达到最大容量不在扩容 if (oldCap >= MAXIMUM_CAPACITY) { threshold = Integer.MAX_VALUE; return oldTab; } // 通过位运算扩容到原来的两倍 else if ((newCap = oldCap << 1) < MAXIMUM_CAPACITY && oldCap >= DEFAULT_INITIAL_CAPACITY) newThr = oldThr << 1; // double threshold } else if (oldThr > 0) // initial capacity was placed in threshold newCap = oldThr; else { // zero initial threshold signifies using defaults newCap = DEFAULT_INITIAL_CAPACITY; newThr = (int)(DEFAULT_LOAD_FACTOR * DEFAULT_INITIAL_CAPACITY); } if (newThr == 0) { float ft = (float)newCap * loadFactor; newThr = (newCap < MAXIMUM_CAPACITY && ft < (float)MAXIMUM_CAPACITY ? (int)ft : Integer.MAX_VALUE); } // 新的扩容临界值 threshold = newThr; @SuppressWarnings({"rawtypes","unchecked"}) Node<K,V>[] newTab = (Node<K,V>[])new Node[newCap]; table = newTab; if (oldTab != null) { for (int j = 0; j < oldCap; ++j) { Node<K,V> e; if ((e = oldTab[j]) != null) { oldTab[j] = null; // 如果该位置元素没有next节点,将该元素放入新数组 if (e.next == null) newTab[e.hash & (newCap - 1)] = e; else if (e instanceof TreeNode) // 树节点 ((TreeNode<K,V>)e).split(this, newTab, j, oldCap); else { // preserve order // 链表节点。 // lo串的新索引位置与原先相同 Node<K,V> loHead = null, loTail = null; // hi串的新索引位置为[原先位置j+oldCap] Node<K,V> hiHead = null, hiTail = null; Node<K,V> next; do { next = e.next; // 原索引,oldCap是2的n次方,二进制表示只有一个1,其余是0 if ((e.hash & oldCap) == 0) { if (loTail == null) loHead = e; else // 尾插法 loTail.next = e; loTail = e; } // 原索引+oldCap else { if (hiTail == null) hiHead = e; else hiTail.next = e; hiTail = e; } } while ((e = next) != null); // 根据hash判断该bucket上的整个链表的index还是旧数组的index,还是index+oldCap if (loTail != null) { loTail.next = null; newTab[j] = loHead; } if (hiTail != null) { hiTail.next = null; newTab[j + oldCap] = hiHead; } } } } } return newTab; }

    当hashmap中的元素越来越多的时候,碰撞的几率也就越来越高(因为数组的长度是固定的),所以为了提高查询的效率,就要对hashmap的数组进行扩容,数组扩容这个操作也会出现在ArrayList中,所以这是一个通用的操作,很多人对它的性能表示过怀疑,不过想想我们的“均摊”原理,就释然了,而在hashmap数组扩容之后,最消耗性能的点就出现了:原数组中的数据必须重新计算其在新数组中的位置,并放进去,这就是resize。           那么hashmap什么时候进行扩容呢?当hashmap中的元素个数超过数组大小*loadFactor时,就会进行数组扩容,loadFactor的默认值为0.75,也就是说,默认情况下,数组大小为16,那么当hashmap中元素个数超过16*0.75=12的时候,就把数组的大小扩展为2*16=32,即扩大一倍,然后重新计算每个元素在数组中的位置,而这是一个非常消耗性能的操作,所以如果我们已经预知hashmap中元素的个数,那么预设元素的个数能够有效的提高hashmap的性能。比如说,我们有1000个元素new HashMap(1000), 但是理论上来讲new HashMap(1024)更合适,不过上面annegu已经说过,即使是1000,hashmap也自动会将其设置为1024。 但是new HashMap(1024)还不是更合适的,因为0.75*1000 < 1000, 也就是说为了让0.75 * size > 1000, 我们必须这样new HashMap(2048)才最合适,既考虑了&的问题,也避免了resize的问题。

    其余还有为什么阈值=8转红黑树,长度<=6 转链表这些问题。基本都是数据科学家根据概率做出的经验值,同时避免数据结构频繁的转换引起的性能开销。

    1.4 get()方法

    public V get(Object key) { Node<K,V> e; return (e = getNode(hash(key), key)) == null ? null : e.value; } final Node<K,V> getNode(int hash, Object key) { Node<K,V>[] tab; Node<K,V> first, e; int n; K k; if ((tab = table) != null && (n = tab.length) > 0 && (first = tab[(n - 1) & hash]) != null) { if (first.hash == hash && // always check first node ((k = first.key) == key || (key != null && key.equals(k)))) return first; if ((e = first.next) != null) { if (first instanceof TreeNode) return ((TreeNode<K,V>)first).getTreeNode(hash, key); do { if (e.hash == hash && ((k = e.key) == key || (key != null && key.equals(k)))) return e; } while ((e = e.next) != null); } } return null; }

    在Hashmap1.8中,无论是存元素还是取元素,都是优先判断bucket上第一个元素是否匹配,而在1.7中则是直接遍历查找。

    基本过程如下:

    根据key计算hash;查数组是否为空,为空返回null;根据hash计算bucket位置,如果bucket第一个元素是目标元素,直接返回。否则执行4;如果bucket上元素大于1并且是树结构,则执行树查找。否则执行5;如果是链表结构,则遍历寻找目标;

    1.8中hashmap的确不会因为多线程put导致链表死循环,因为在1.8中使用的是尾插法。

    hash的Map的主要流程就是如此,下面学习currentHashMap

    2、CurrentHashMap

    CurrentHashMap主要就是为了应对hashmap在并发环境下不安全而诞生的,ConcurrentHashMap的设计与实现非常精巧,大量的利用了volatile,final,CAS等lock-free技术来减少锁竞争对于性能的影响。

    1.7不在这里学习了。毕竟现在普遍都在用1.8.这里做个总结:

    JDK1.8版本的ConcurrentHashMap的数据结构已经接近HashMap,相对而言,ConcurrentHashMap只是增加了同步的操作来控制并发,从JDK1.7版本的ReentrantLock+Segment+HashEntry,到JDK1.8版本中synchronized+CAS+HashEntry+红黑树。

    数据结构:取消了Segment分段锁的数据结构,取而代之的是数组+链表+红黑树的结构。保证线程安全机制:JDK1.7采用segment的分段锁机制实现线程安全,其中segment继承自ReentrantLock。JDK1.8采用CAS+Synchronized保证线程安全。锁的粒度:原来是对需要进行数据操作的Segment加锁,现调整为对每个数组元素加锁(Node)。链表转化为红黑树:定位结点的hash算法简化会带来弊端,Hash冲突加剧,因此在链表节点数量大于8时,会将链表转化为红黑树进行存储。查询时间复杂度:从原来的遍历链表O(n),变成遍历红黑树O(logN)。

    2.1 继承关系

    2.2 属性介绍

    //默认最大表容量大小 private static final int MAXIMUM_CAPACITY = 1 << 30; //初始大小 private static final int DEFAULT_CAPACITY = 16; //数组的最值 static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8; //默认并发级别 private static final int DEFAULT_CONCURRENCY_LEVEL = 16; //加载因子 private static final float LOAD_FACTOR = 0.75f; //链表长度阀值 static final int TREEIFY_THRESHOLD = 8; //取消阈值 static final int UNTREEIFY_THRESHOLD = 6; static final int MIN_TREEIFY_CAPACITY = 64; private static final int MIN_TRANSFER_STRIDE = 16; private static int RESIZE_STAMP_BITS = 16; private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1; private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;

    2.3 构造函数

    //创建一个新的空的默认的初始表大小(16)的Map。 public ConcurrentHashMap() { } //创建一个新的空映射,其初始表大小可以容纳指定数量的元素,而无需动态调整大小 public ConcurrentHashMap(int initialCapacity) { if (initialCapacity < 0) throw new IllegalArgumentException(); int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY : tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1)); this.sizeCtl = cap; } //创建一个与给定Map具有相同映射的新Map public ConcurrentHashMap(Map<? extends K, ? extends V> m) { this.sizeCtl = DEFAULT_CAPACITY; putAll(m); } //创建一个新的Map,自定义大小与加载因子 public ConcurrentHashMap(int initialCapacity, float loadFactor) { this(initialCapacity, loadFactor, 1); } //自定义大小,加载因子,并发线程数 public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) { if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0) throw new IllegalArgumentException(); if (initialCapacity < concurrencyLevel) // Use at least as many bins initialCapacity = concurrencyLevel; // as estimated threads long size = (long)(1.0 + (long)initialCapacity / loadFactor); int cap = (size >= (long)MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : tableSizeFor((int)size); this.sizeCtl = cap; }

    2.4 操作方法

    2.4.1 put()

    public V put(K key, V value) { return putVal(key, value, false); }

    调用putVal方法

    final V putVal(K key, V value, boolean onlyIfAbsent) { //不允许key和value为空情况,否则直接空指针异常 if (key == null || value == null) throw new NullPointerException(); //对key进行hash取值 int hash = spread(key.hashCode()); //要插入的元素所在桶的元素个数 int binCount = 0; //将table赋给tab开始循环操作,死循环,结合CAS使用(如果CAS失败,则会重新取整个桶进行下面的流程) for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; //第一次操作,table不存在,开始初始化 if (tab == null || (n = tab.length) == 0) tab = initTable(); //如果要插入的元素所在的桶还没有元素,则把这个元素插入到这个桶中 else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { // 如果使用CAS插入元素时,发现已经有元素了,则进入下一次循环,重新操作 // 如果使用CAS插入元素成功,则break跳出循环,流程结束 if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) break; // no lock when adding to empty bin } // 如果要插入的元素所在的桶的第一个元素的hash是MOVED // 表示正在扩容,则当前线程帮忙一起迁移元素,然后进入下一次循环 else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); // 如果这个桶不为空且不在迁移元素,则锁住这个桶(分段锁) // 并查找要插入的元素是否在这个桶中 // 存在,则替换值(onlyIfAbsent=false) // 不存在,则插入到链表结尾或插入树中 else { V oldVal = null; synchronized (f) { // 再次检测第一个元素是否有变化,如果有变化则进入下一次循环,从头来过 if (tabAt(tab, i) == f) { // 如果第一个元素的hash值大于等于0(说明不是在迁移,也不是树) // 那就是桶中的元素使用的是链表方式存储 if (fh >= 0) { // 桶中元素个数赋值为1 binCount = 1; // 遍历整个桶,每次结束binCount加1 for (Node<K,V> e = f;; ++binCount) { K ek; // 如果找到了这个元素,则赋值了新值(onlyIfAbsent=false) 并退出循环 if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break; } Node<K,V> pred = e; //如果到链表尾部还没有找到元素 就把它插入到链表结尾并退出循环 if ((e = e.next) == null) { pred.next = new Node<K,V>(hash, key, value, null); break; } } } //如果第一个元素是树节点 else if (f instanceof TreeBin) { Node<K,V> p; //桶中元素个数赋值为2 binCount = 2; // 调用红黑树的插入方法插入元素 如果成功插入则返回null 否则返回寻找到的节点 if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } // 如果binCount不为0,说明成功插入了元素或者寻找到了元素 if (binCount != 0) { // 如果链表元素个数达到了8,则尝试树化 // 因为上面把元素插入到树中时,binCount只赋值了2,并没有计算整个树中元素的个数 所以不会重复树化 if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); // 如果要插入的元素已经存在,则返回旧值 if (oldVal != null) return oldVal; break; } } } // 成功插入元素,元素个数加1(是否要扩容在这个里面) addCount(1L, binCount); return null; }

    整体流程跟HashMap比较类似,大致是以下几步:

    如果桶没有初始化,则初始化桶如果要插入的元素所在的桶还没有元素,则把这个元素插入到这个桶中如果要插入的元素所在的桶的第一个元素的hash是MOVED,表示正在扩容,则当前线程帮忙一起迁移元素

    如果待插入的元素所在的桶不为空且不在迁移元素,则锁住这个桶(分段锁)

    查找要插入的元素是否在这个桶中,存在,则替换值(onlyIfAbsent=false)不存在,则插入到链表结尾或插入树中

    如果元素存在,则返回旧值;

    如果元素不存在,整个Map的元素个数加1,并检查是否需要扩容;

    为什么使用synchronized而不是ReentrantLock?我们在之前的学习的synchronized原理可知,索升级模式已经让synchronized得到极大优化,几乎与ReentrantLock性能相差无几。

    Node中定义的Hash值有哪些?

    // hash for forwarding nodes //MOVED(-1),表示正在扩容,作用在ForwardingNode上; static final int MOVED = -1; // hash for roots of trees //TREEBIN(-2),表示树节点,作用在TreeBin上;(TreeBin是桶中的第一个元素,真正的元 素存储在TreeBin里面的TreeNode上) static final int TREEBIN = -2; //hash for transient reservations //RESERVED(-3),表示保留节点,作用在ReservationNode上,ReservationNode是用在compute()和computeIfAbsent()方法里的; static final int RESERVED = -3; //HASH_BITS,正常元素的hash掩码,与HASH_BITS做&操作后的hash值都是大于0的。 // usable bits of normal node hash static final int HASH_BITS = 0x7fffffff;

    2.4.2 初始化桶数组 initTable()

    第一次放元素时,初始化桶数组,代码如下:

    private final Node<K,V>[] initTable() { Node<K,V>[] tab; int sc; while ((tab = table) == null || tab.length == 0) { if ((sc = sizeCtl) < 0) // 如果sizeCtl<0说明正在初始化或者扩容,让出CPU Thread.yield(); // lost initialization race; just spin else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { // 如果把sizeCtl原子更新为-1成功,则当前线程进入初始化 // 如果原子更新失败则说明有其它线程先一步进入初始化了,则进入下一次循环 // 如果下一次循环时还没初始化完毕,则sizeCtl<0进入上面if的逻辑让出CPU // 如果下一次循环更新完毕了,则table.length!=0,退出循环 try { // 再次检查table是否为空,防止ABA问题 if ((tab = table) == null || tab.length == 0) { // 如果sc为0则使用默认值16 int n = (sc > 0) ? sc : DEFAULT_CAPACITY; // 新建数组 @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; // 赋值给table桶数组 table = tab = nt; // 设置sc为数组长度的0.75倍 // n - (n >>> 2) = n - n/4 = 0.75n // 可见这里装载因子和扩容门槛都是写死了的 // 这也正是没有threshold和loadFactor属性的原因 sc = n - (n >>> 2); } } finally { // 把sc赋值给sizeCtl,这时存储的是扩容门槛 sizeCtl = sc; } break; } } return tab; }

    这里提到sizeCtl,代码如下:

    //-1,表示有线程正在进行初始化操作 //-(1 + nThreads),表示有n个线程正在一起扩容 //0,默认值,后续在真正初始化的时候使用默认容量 //> 0,初始化或扩容完成后下一次的扩容门槛 private transient volatile int sizeCtl; 总结如下: 使用CAS锁控制只有一个线程初始化桶数组;sizeCtl在初始化后存储的是扩容门槛;扩容门槛写死的是桶数组大小的0.75倍,桶数组大小即map的容量,也就是最多存储多少个元素。

    2.4.3 addCount()判断是否需要扩容

    private final void addCount(long x, int check) { CounterCell[] as; long b, s; // 这里使用的思想跟LongAdder类是一模一样的(后面会讲) // 把数组的大小存储根据不同的线程存储到不同的段上(也是分段锁的思想) // 并且有一个baseCount,优先更新baseCount,如果失败了再更新不同线程对应的段 // 这样可以保证尽量小的减少冲突 // 先尝试把数量加到baseCount上,如果失败再加到分段的CounterCell上 if ((as = counterCells) != null || !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) { CounterCell a; long v; int m; boolean uncontended = true; // 如果as为空 // 或者长度为0 // 或者当前线程所在的段为null // 或者在当前线程的段上加数量失败 if (as == null || (m = as.length - 1) < 0 || (a = as[ThreadLocalRandom.getProbe() & m]) == null || !(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) { // 强制增加数量(无论如何数量是一定要加上的,并不是简单地自旋) // 不同线程对应不同的段都更新失败了 // 说明已经发生冲突了,那么就对counterCells进行扩容 // 以减少多个线程hash到同一个段的概率 fullAddCount(x, uncontended); return; } if (check <= 1) return; // 计算元素个数 s = sumCount(); } if (check >= 0) { Node<K,V>[] tab, nt; int n, sc; // 如果元素个数达到了扩容门槛,则进行扩容 // 注意,正常情况下sizeCtl存储的是扩容门槛,即容量的0.75倍 while (s >= (long)(sc = sizeCtl) && (tab = table) != null && (n = tab.length) < MAXIMUM_CAPACITY) { // rs是扩容时的一个邮戳标识 int rs = resizeStamp(n); if (sc < 0) { // sc<0说明正在扩容中 if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0) // 扩容已经完成了,退出循环 // 正常应该只会触发nextTable==null这个条件,其它条件没看出来何时触发 break; // 扩容未完成,则当前线程加入迁移元素中 // 并把扩容线程数加1 if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) transfer(tab, nt); } else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)) // 这里是触发扩容的那个线程进入的地方 // sizeCtl的高16位存储着rs这个扩容邮戳 // sizeCtl的低16位存储着扩容线程数加1,即(1+nThreads) // 所以官方说的扩容时sizeCtl的值为 -(1+nThreads)是错误的 // 进入迁移元素 transfer(tab, null); // 重新计算元素个数 s = sumCount(); } } } 元素个数的存储方式类似于LongAdder类,存储在不同的段上,减少不同线程同时更新size时的冲突;计算元素个数时把这些段的值及baseCount相加算出总的元素个数;正常情况下sizeCtl存储着扩容门槛,扩容门槛为容量的0.75倍;扩容时sizeCtl高位存储扩容邮戳(resizeStamp),低位存储扩容线程数加1(1+nThreads);其它线程添加元素后如果发现存在扩容,也会加入的扩容行列中来;

    2.4.4 协助扩容

    final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) { Node<K,V>[] nextTab; int sc; // 如果桶数组不为空,并且当前桶第一个元素为ForwardingNode类型,并且nextTab不为空 // 说明当前桶已经迁移完毕了,才去帮忙迁移其它桶的元素 // 扩容时会把旧桶的第一个元素置为ForwardingNode,并让其nextTab指向新桶数组 if (tab != null && (f instanceof ForwardingNode) && (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) { int rs = resizeStamp(tab.length); // sizeCtl<0,说明正在扩容 while (nextTab == nextTable && table == tab && (sc = sizeCtl) < 0) { if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || transferIndex <= 0) break; // 扩容线程数加1 if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) { // 当前线程帮忙迁移元素 transfer(tab, nextTab); break; } } return nextTab; }

    2.4.5 迁移元素

    扩容时容量变为两倍,并把部分元素迁移到其它桶中。方法transfer()

    private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) { int n = tab.length, stride; if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) stride = MIN_TRANSFER_STRIDE; // subdivide range if (nextTab == null) { // initiating // 如果nextTab为空,说明还没开始迁移 // 就新建一个新桶数组 try { // 新桶数组是原桶的两倍 @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1]; nextTab = nt; } catch (Throwable ex) { // try to cope with OOME sizeCtl = Integer.MAX_VALUE; return; } nextTable = nextTab; transferIndex = n; } // 新桶数组大小 int nextn = nextTab.length; // 新建一个ForwardingNode类型的节点,并把新桶数组存储在里面 ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab); boolean advance = true; boolean finishing = false; // to ensure sweep before committing nextTab for (int i = 0, bound = 0;;) { Node<K,V> f; int fh; // 整个while循环就是在算i的值,过程太复杂,不用太关心 // i的值会从n-1依次递减,感兴趣的可以打下断点就知道了 // 其中n是旧桶数组的大小,也就是说i从15开始一直减到1这样去迁移元素 while (advance) { int nextIndex, nextBound; if (--i >= bound || finishing) advance = false; else if ((nextIndex = transferIndex) <= 0) { i = -1; advance = false; } else if (U.compareAndSwapInt (this, TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0))) { bound = nextBound; i = nextIndex - 1; advance = false; } } if (i < 0 || i >= n || i + n >= nextn) { // 如果一次遍历完成了 // 也就是整个map所有桶中的元素都迁移完成了 int sc; if (finishing) { // 如果全部迁移完成了,则替换旧桶数组 // 并设置下一次扩容门槛为新桶数组容量的0.75倍 nextTable = null; table = nextTab; sizeCtl = (n << 1) - (n >>> 1); return; } if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { // 当前线程扩容完成,把扩容线程数-1 if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) // 扩容完成两边肯定相等 return; // 把finishing设置为true // finishing为true才会走到上面的if条件 finishing = advance = true; // i重新赋值为n // 这样会再重新遍历一次桶数组,看看是不是都迁移完成了 // 也就是第二次遍历都会走到下面的(fh = f.hash) == MOVED这个条件 i = n; // recheck before commit } } else if ((f = tabAt(tab, i)) == null) // 如果桶中无数据,直接放入ForwardingNode标记该桶已迁移 advance = casTabAt(tab, i, null, fwd); else if ((fh = f.hash) == MOVED) // 如果桶中第一个元素的hash值为MOVED // 说明它是ForwardingNode节点 // 也就是该桶已迁移 advance = true; // already processed else { // 锁定该桶并迁移元素 // 【本篇文章由公众号“彤哥读源码”原创】 synchronized (f) { // 再次判断当前桶第一个元素是否有修改 // 也就是可能其它线程先一步迁移了元素 if (tabAt(tab, i) == f) { // 把一个链表分化成两个链表 // 规则是桶中各元素的hash与桶大小n进行与操作 // 等于0的放到低位链表(low)中,不等于0的放到高位链表(high)中 // 其中低位链表迁移到新桶中的位置相对旧桶不变 // 高位链表迁移到新桶中位置正好是其在旧桶的位置加n // 这也正是为什么扩容时容量在变成两倍的原因 Node<K,V> ln, hn; if (fh >= 0) { // 第一个元素的hash值大于等于0 // 说明该桶中元素是以链表形式存储的 // 这里与HashMap迁移算法基本类似 // 唯一不同的是多了一步寻找lastRun // 这里的lastRun是提取出链表后面不用处理再特殊处理的子链表 // 比如所有元素的hash值与桶大小n与操作后的值分别为 0 0 4 4 0 0 0 // 则最后后面三个0对应的元素肯定还是在同一个桶中 // 这时lastRun对应的就是倒数第三个节点 // 至于为啥要这样处理,我也没太搞明白 int runBit = fh & n; Node<K,V> lastRun = f; for (Node<K,V> p = f.next; p != null; p = p.next) { int b = p.hash & n; if (b != runBit) { runBit = b; lastRun = p; } } // 看看最后这几个元素归属于低位链表还是高位链表 if (runBit == 0) { ln = lastRun; hn = null; } else { hn = lastRun; ln = null; } // 遍历链表,把hash&n为0的放在低位链表中 // 不为0的放在高位链表中 for (Node<K,V> p = f; p != lastRun; p = p.next) { int ph = p.hash; K pk = p.key; V pv = p.val; if ((ph & n) == 0) ln = new Node<K,V>(ph, pk, pv, ln); else hn = new Node<K,V>(ph, pk, pv, hn); } // 低位链表的位置不变 setTabAt(nextTab, i, ln); // 高位链表的位置是原位置加n setTabAt(nextTab, i + n, hn); // 标记当前桶已迁移 setTabAt(tab, i, fwd); // advance为true,返回上面进行--i操作 advance = true; } else if (f instanceof TreeBin) { // 如果第一个元素是树节点 // 也是一样,分化成两颗树 // 也是根据hash&n为0放在低位树中 // 不为0放在高位树中 TreeBin<K,V> t = (TreeBin<K,V>)f; TreeNode<K,V> lo = null, loTail = null; TreeNode<K,V> hi = null, hiTail = null; int lc = 0, hc = 0; // 遍历整颗树,根据hash&n是否为0分化成两颗树 for (Node<K,V> e = t.first; e != null; e = e.next) { int h = e.hash; TreeNode<K,V> p = new TreeNode<K,V> (h, e.key, e.val, null, null); if ((h & n) == 0) { if ((p.prev = loTail) == null) lo = p; else loTail.next = p; loTail = p; ++lc; } else { if ((p.prev = hiTail) == null) hi = p; else hiTail.next = p; hiTail = p; ++hc; } } // 如果分化的树中元素个数小于等于6,则退化成链表 ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : (hc != 0) ? new TreeBin<K,V>(lo) : t; hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : (lc != 0) ? new TreeBin<K,V>(hi) : t; // 低位树的位置不变 setTabAt(nextTab, i, ln); // 高位树的位置是原位置加n setTabAt(nextTab, i + n, hn); // 标记该桶已迁移 setTabAt(tab, i, fwd); // advance为true,返回上面进行--i操作 advance = true; } } } } } } 新桶数组大小是旧桶数组的两倍;迁移元素先从靠后的桶开始;迁移完成的桶在里面放置一ForwardingNode类型的元素,标记该桶迁移完成;迁移时根据hash&n是否等于0把桶中元素分化成两个链表或树;低位链表(树)存储在原来的位置;高们链表(树)存储在原来的位置加n的位置;迁移元素时会锁住当前桶,也是分段锁的思想;

    2.4.6 删除元素

    删除元素和添加元素思想一样,先找到对应的数组桶,然后锁住,再进行操作

    public V remove(Object key) { // 调用替换节点方法 return replaceNode(key, null, null); } final V replaceNode(Object key, V value, Object cv) { // 计算hash int hash = spread(key.hashCode()); // 自旋 for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; if (tab == null || (n = tab.length) == 0 || (f = tabAt(tab, i = (n - 1) & hash)) == null) // 如果目标key所在的桶不存在,跳出循环返回null break; else if ((fh = f.hash) == MOVED) // 如果正在扩容中,协助扩容 tab = helpTransfer(tab, f); else { V oldVal = null; // 标记是否处理过 boolean validated = false; synchronized (f) { // 再次验证当前桶第一个元素是否被修改过 if (tabAt(tab, i) == f) { if (fh >= 0) { // fh>=0表示是链表节点 validated = true; // 遍历链表寻找目标节点 for (Node<K,V> e = f, pred = null;;) { K ek; if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { // 找到了目标节点 V ev = e.val; // 检查目标节点旧value是否等于cv if (cv == null || cv == ev || (ev != null && cv.equals(ev))) { oldVal = ev; if (value != null) // 如果value不为空则替换旧值 e.val = value; else if (pred != null) // 如果前置节点不为空 // 删除当前节点 pred.next = e.next; else // 如果前置节点为空 // 说明是桶中第一个元素,删除之 setTabAt(tab, i, e.next); } break; } pred = e; // 遍历到链表尾部还没找到元素,跳出循环 if ((e = e.next) == null) break; } } else if (f instanceof TreeBin) { // 如果是树节点 validated = true; TreeBin<K,V> t = (TreeBin<K,V>)f; TreeNode<K,V> r, p; // 遍历树找到了目标节点 if ((r = t.root) != null && (p = r.findTreeNode(hash, key, null)) != null) { V pv = p.val; // 检查目标节点旧value是否等于cv if (cv == null || cv == pv || (pv != null && cv.equals(pv))) { oldVal = pv; if (value != null) // 如果value不为空则替换旧值 p.val = value; else if (t.removeTreeNode(p)) // 如果value为空则删除元素 // 如果删除后树的元素个数较少则退化成链表 // t.removeTreeNode(p)这个方法返回true表示删除节点后树的元素个数较少 setTabAt(tab, i, untreeify(t.first)); } } } } } // 如果处理过,不管有没有找到元素都返回 if (validated) { // 如果找到了元素,返回其旧值 if (oldVal != null) { // 如果要替换的值为空,元素个数减1 if (value == null) addCount(-1L, -1); return oldVal; } break; } } } // 没找到元素返回空 return null; } 计算hash如果所在的桶不存在,表示没有找到目标元素,返回如果正在扩容,则协助扩容完成后再进行删除操作;如果是以链表形式存储的,则遍历整个链表查找元素,找到之后再删除;如果是以树形式存储的,则遍历树查找元素,找到之后再删除;如果是以树形式存储的,删除元素之后树较小,则退化成链表;如果确实删除了元素,则整个map元素个数减1,并返回旧值;如果没有删除元素,则返回null;

    2.4.7 获取元素 get方法

    获取元素,根据目标key所在桶的第一个元素的不同采用不同的方式来获取元素,关键点在于find方法的重写。

    public V get(Object key) { Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek; // 计算hash int h = spread(key.hashCode()); // 如果元素所在的桶存在且里面有元素 if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1) & h)) != null) { // 如果第一个元素就是要找的元素,直接返回 if ((eh = e.hash) == h) { if ((ek = e.key) == key || (ek != null && key.equals(ek))) return e.val; } else if (eh < 0) // hash小于0,说明是树或者正在扩容 // 使用find寻找元素,find的寻找方式依据Node的不同子类有不同的实现方式 return (p = e.find(h, key)) != null ? p.val : null; // 遍历整个链表寻找元素 while ((e = e.next) != null) { if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek)))) return e.val; } } return null; } hash到元素所在的桶;如果桶中第一个元素就是该找的元素,直接返回;如果是树或者正在迁移元素,则调用各自Node子类的find()方法寻找元素;如果是链表,遍历整个链表寻找元素;获取元素没有加锁

    2.4.7 获取元素 

    获取元素也是采用分段锁的思想,把所有的段加起来得到元素的总和。

    public int size() { // 调用sumCount()计算元素个数 long n = sumCount(); return ((n < 0L) ? 0 : (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int)n); } final long sumCount() { // 计算CounterCell所有段及baseCount的数量之和 CounterCell[] as = counterCells; CounterCell a; long sum = baseCount; if (as != null) { for (int i = 0; i < as.length; ++i) { if ((a = as[i]) != null) sum += a.value; } } return sum; } 元素的个数依据不同的线程存在在不同的段里;计算CounterCell所有段及baseCount的数量之和;获取元素个数没有加锁;

    2.5 总结

    ConcurrentHashMap是HashMap的线程安全版本;ConcurrentHashMap采用(数组 + 链表 + 红黑树)的结构存储元素;ConcurrentHashMap相比于同样线程安全的HashTable,效率要高很多;ConcurrentHashMap采用的锁有 synchronized,CAS,自旋锁,分段锁,volatile等;ConcurrentHashMap中没有threshold和loadFactor这两个字段,而是采用sizeCtl来控制;更新操作时如果正在进行扩容,当前线程协助扩容;更新操作会采用synchronized锁住当前桶的第一个元素,这是分段锁的思想;整个扩容过程都是通过CAS控制sizeCtl这个字段来进行的,这很关键;迁移完元素的桶会放置一个ForwardingNode节点,以标识该桶迁移完毕;元素个数的存储也是采用的分段思想,类似于LongAdder的实现;元素个数的更新会把不同的线程hash到不同的段上,减少资源争用;元素个数的更新如果还是出现多个线程同时更新一个段,则会扩容段(CounterCell);获取元素个数是把所有的段(包括baseCount和CounterCell)相加起来得到的;查询操作是不会加锁的,所以ConcurrentHashMap不是强一致性的;ConcurrentHashMap中不能存储key或value为null的元素;
    Processed: 0.013, SQL: 9