ConcurrentHashMap源码分析

    技术2023-07-12  94

    ConcurrentHashMap源码分析

    put方法

    final V putVal(K key, V value, boolean onlyIfAbsent) { if (key == null || value == null) throw new NullPointerException(); int hash = spread(key.hashCode());//获取哈希值 int binCount = 0; //记录链表长度 //死循环 for (Node<K, V>[] tab = table; ; ) { Node<K, V> f; int n, i, fh; if (tab == null || (n = tab.length) == 0) //第一次添加元素时,初始化table数组 tab = initTable(); //添加到一个空的哈希桶时,通过CAS算法无锁插入数据 else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { if (casTabAt(tab, i, null, new Node<K, V>(hash, key, value, null))) break; // no lock when adding to empty bin } //hash桶(tab[i])是fwd节点,表示正在扩容 else if ((fh = f.hash) == MOVED) //扩容 tab = helpTransfer(tab, f); else { V oldVal = null; //锁住哈希桶的头结点 synchronized (f) { if (tabAt(tab, i) == f) { // 再次判断当前数据 //哈希值>=0说明是Node链表 if (fh >= 0) { binCount = 1; for (Node<K, V> e = f; ; ++binCount) { K ek; //新增元素的哈希值与头结点相同,则将值val替换为新值 if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break; } Node<K, V> pred = e; //遍历链表,向尾结点插入数据 // 如果key不同 直接在链表后面新加 if ((e = e.next) == null) { pred.next = new Node<K, V>(hash, key, value, null); break; } } } //红黑树 如果节点是红黑树 存放在红黑树中 else if (f instanceof TreeBin) { Node<K, V> p; binCount = 2; if ((p = ((TreeBin<K, V>) f).putTreeVal(hash, key, value)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } if (binCount != 0) { //链表长度>=8时,转换成红黑树 TREEIFY_THRESHOLD = 8 if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); if (oldVal != null) return oldVal; break; } } } //哈希桶存储的元素个数+1 addCount(1L, binCount); return null; }

    初始化容器

    private final Node<K, V>[] initTable() { Node<K, V>[] tab; int sc; while ((tab = table) == null || tab.length == 0) { // sizeCtl 数组初始化或者扩容的控制位标识 -1 标识正在初始化 -N标识线有现成正在扩容 if ((sc = sizeCtl) < 0) // 判断其他线程是否在操作数组,如果其他线程抢占了操作 让出CPU时间片 Thread.yield(); // lost initialization race; just spin else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { // 通过cas操作将sizeCtl设置成-1 标识当前线程正在初始化 try { if ((tab = table) == null || tab.length == 0) { int n = (sc > 0) ? sc : DEFAULT_CAPACITY; // 没有初始化 数组 sc = -1,默认给数组一个长度 16 @SuppressWarnings("unchecked") Node<K, V>[] nt = (Node<K, V>[]) new Node<?, ?>[n]; // 创建一个长度为16的数组 table = tab = nt; // 将数组赋值给table sc = n - (n >>> 2); // 计算下次扩容的大小,就是0.75 用右移来计算 >>> 2 表示右移两位 10000 -> 100 16->4 所以下次扩容成都为16 - 4 = 12 } } finally { sizeCtl = sc; // 设置sizeCtl为sc } break; } }

    addCount :添加计数

    private final void addCount(long x, int check) { CounterCell[] as; long b, s; if ((as = counterCells) != null || !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) { // 对baseCount直接+1 如果失败就进方法 CounterCell a; long v; int m; boolean uncontended = true; // 如果counterCells为null 进行初始化 // 如果counterCells的随机元素为空 对counterCells元素值进行增加计数 // 如果counterCells的随机元素不为空 ,尝试通过 cas对改随机元素值进行+1操作 如果失败进入fullAddCount方法 if (as == null || (m = as.length - 1) < 0 || (a = as[ThreadLocalRandom.getProbe() & m]) == null || !(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) { fullAddCount(x, uncontended); return; } if (check <= 1) // 如果 binCount 长度不超过1个 return; // 如果counterCells的随机元素不为空 ,尝试通过 cas对改随机元素值进行+1操作 如果成功 s = sumCount(); // binCount值加上counterCells所有元素值 } if (check >= 0) { Node<K, V>[] tab, nt; int n, sc; // 如果map元素个数(s)不小于 map扩容数(sizeCtl) 并且容器不为空 容器长度小于1 << 30 while (s >= (long) (sc = sizeCtl) && (tab = table) != null && (n = tab.length) < MAXIMUM_CAPACITY) { int rs = resizeStamp(n); if (sc < 0) { // RESIZE_STAMP_SHIFT = 16 if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0) break; if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) transfer(tab, nt); } else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)) transfer(tab, null); s = sumCount(); } } }

    fullAddCount:多个线程同时操作map的时候 计数通过baseCount+counterCells的值来计算

    baseCount 单个线程操作时直接+1 在多个线程同时操作的时候 使用CAS方法把baseCount+1会出错,所用会使用counterCells来进行辅助计数

    counterCells:初始化是一个长度为2的数组,然后在随机一个元素设置值为1 表示计数增加1 ,

    在多个线程同时操作map的时候 会随机取出counterCells一个元素,如果取出的元素为null 就使用CAS方式将值更改为1,如果随机取出的元素值不为空 ,通过CAS修改,如果失败,表示被别的线程修改了,就需要对counterCells进行扩容2倍。

    private final void fullAddCount(long x, boolean wasUncontended) { int h; if ((h = ThreadLocalRandom.getProbe()) == 0) { ThreadLocalRandom.localInit(); // force initialization h = ThreadLocalRandom.getProbe(); wasUncontended = true; } boolean collide = false; // True if last slot nonempty for (; ; ) { CounterCell[] as; CounterCell a; int n; long v; // counterCells 已经有值之后 if ((as = counterCells) != null && (n = as.length) > 0) { if ((a = as[(n - 1) & h]) == null) { // 随机取counterCells其中一个元素 如果为null if (cellsBusy == 0) { // 就是一个标志位,如果不是0就是被其他线程在操作 // Try to attach new Cell CounterCell r = new CounterCell(x); // Optimistic create 1 定义一个值为1 if (cellsBusy == 0 && U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) { // cellsBusy 标准位设置成1 表示被占用 boolean created = false; try { // Recheck under lock CounterCell[] rs; int m, j; // 如果counterCells不为null,但是选取的随机元素为null,将这个随机元素值设置成1 if ((rs = counterCells) != null && (m = rs.length) > 0 && rs[j = (m - 1) & h] == null) { rs[j] = r; created = true; } } finally { cellsBusy = 0; } if (created) break; continue; // Slot is now non-empty 如果counterCells为null,或者选取的随机元素不为null,继续下面的操作 } } collide = false; //collide设置成false } else if (!wasUncontended) // CAS already known to fail 如果cas操作失败 wasUncontended = true; // Continue after rehash else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x)) // 用cas给随机的那个counterCells元素设置 值就是本身的值+1 如果设置成功调处自旋 break; else if (counterCells != as || n >= NCPU) collide = false; // At max size or stale else if (!collide) collide = true; // 这儿是counterCells不为空 而且选择的随机元素值也不为null,同时通过cas去将随机的元素值设置+1失败(失败原因可能是别的线程已经将这个数据值改变) // 所以将counterCells进行扩容 else if (cellsBusy == 0 && U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) { // cellsBusy 标准位设置成1 表示被占用 try { if (counterCells == as) {// Expand table unless stale CounterCell[] rs = new CounterCell[n << 1]; // 扩容2倍 for (int i = 0; i < n; ++i) rs[i] = as[i]; counterCells = rs; } } finally { cellsBusy = 0; // 标记正在操作的cellsBusy 设置为0 表示没有线程操作 } collide = false; continue; // Retry with expanded table } h = ThreadLocalRandom.advanceProbe(h); // 重新获取扩容之后的随机数 // 最新数据过来 满足条件 对cellsBusy进行赋值为0 counterCells初始化长度为2的数组 其中一个随机元素设置为1 } else if (cellsBusy == 0 && counterCells == as && U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) { boolean init = false; try { // Initialize table if (counterCells == as) { CounterCell[] rs = new CounterCell[2]; // 初始化长度为2的数组 rs[h & 1] = new CounterCell(x); // 随机设置一个元素值为1 x=1 counterCells = rs; // 将初始化的数组设置给counterCells init = true; } } finally { cellsBusy = 0; // cellsBusy设值为0 } if (init) // 初始化之后跳出自旋 break; } else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x)) // 直接对counterCells进行+1 (在这之前只是对counterCells进行扩容,没有设置值的时候) break; // Fall back on using base } }
    Processed: 0.011, SQL: 9