百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 编程网 > 正文

十年之重修ConcurrentHashMap原理

yuyutoo 2025-03-26 18:55 16 浏览 0 评论

弱小和无知并不是生存的障碍,傲慢才是。

---- ---- 面试者

总结

ConcurrentHashMap就是一个线程安全的HashMap,基本数据结构和HashMap一样,都是数组+链表/红黑树结构,但是在数据读写时,运用了大量的CAS操作以及synchronized锁,锁的级别是桶,这里与JDK1.7有了进一步优化,在JDK1.7时是基于分段锁,锁的是Segment,一个segment里包括好几个桶,在JDK1.8之后,就直接细化到桶级别了。

可以先了解HashMap:HashMap原理 下一篇:十年之重修synchronized原理

基本属性

// 最大罩杯
private static final int MAXIMUM_CAPACITY = 1 << 30;
// 默认罩杯
private static final int DEFAULT_CAPACITY = 16;
// 最大的数组长度,这里-8预留长度,是针对不同的
static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
// 负载因子
private static final float LOAD_FACTOR = 0.75f;
// 链表转红黑树阈值
static final int TREEIFY_THRESHOLD = 8;
// 红黑树转链表阈值
static final int UNTREEIFY_THRESHOLD = 6;
// 表数组长度未达到64之前,如果链表长度达到了8,先扩容,不进行红黑树转换
static final int MIN_TREEIFY_CAPACITY = 64;
// 表示参与扩容的线程,最小处理的桶数量
private static final int MIN_TRANSFER_STRIDE = 16;
// 存储桶的数组,注意 volatile修饰
transient volatile Node[] table;
// -1:表示初始化中,其他负数表示正在迁移中,正数:表示扩容的临界值
private transient volatile int sizeCtl;
// 节点信息类
static class Node implements Map.Entry {
        // key的hash值
        final int hash;
        // key
        final K key;
        // val值,注意这里使用了volatile,保证线程之间透明
        volatile V val;
        // 下一个节点
        volatile Node next;
}
// 空的构造函数
public ConcurrentHashMap() {
}

put一个元素

为了保证线程安全,put元素时直接开启一个死循环,如果Map为空则先进行初始化默认16容量,如果不为空则则根据(table.length - 1) & hash值进行选桶,如果当前桶是空的直接复制,这里的判定tabAt方法基于volatile逻辑保证拿到最新数据,这里赋值基于cas逻辑,这样 一开始的死循环 + cas 组成乐观锁逻辑,如果桶不为空,且当前桶头结点的hash是-1,则表示当前桶在扩容迁移,于是当前线程则参与加入扩容操作,如果非扩容流程,则对桶的头结点进行synchronized加锁,如果是链表则进行判定存在则覆盖,不存在则进行尾插,如果是树节点则进行红黑树节点操作,最后如果链表长度超过阈值则转成红黑树。最后检测是否需要扩容,如果需要则扩大到2倍,进行数据复制扩容处理。

这里边细节有很多吊炸天的写法,可以详细研究。

		// put方法
		public V put(K key, V value) {
        return putVal(key, value, false);
    }
		// 包含onlyIfAbsent的put方法
    final V putVal(K key, V value, boolean onlyIfAbsent) {
        if (key == null || value == null) throw new NullPointerException();
      	// 计算hash,跟HashMap的还不太一样
        int hash = spread(key.hashCode());
        int binCount = 0;
      	// 开启死循环,保证数据最后更新成功
        for (Node[] tab = table;;) {
            Node f; int n, i, fh; K fk; V fv;
            if (tab == null || (n = tab.length) == 0)
              	// 如果是空map,直接进行初始化
                tab = initTable();
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
              	// 不为空,则进行 (n - 1) & hash 选择桶的位置,通过tabAt(tab, index)获取值
               // tabAt(tab, index) 底层调用getReferenceVolatile获取桶的值,volatile逻辑保证拿到最新的
              // 如果是空的,直接基于casTabAt进行cas赋值
                if (casTabAt(tab, i, null, new Node(hash, key, value)))
                  	// 成功,则直接结束
                    break;                   
            }
          	// 当发生扩容时,如果当前桶正在迁移,则会把hash值改成 -1
            else if ((fh = f.hash) == MOVED)
              	// 如果是在扩容,则加入帮助扩容,略吊还没细看
                tab = helpTransfer(tab, f);
            else if (onlyIfAbsent // check first node without acquiring lock
                     && fh == hash
                     && ((fk = f.key) == key || (fk != null && key.equals(fk)))
                     && (fv = f.val) != null)
                return fv;
            else {
              	// 进入正常的put流程
                V oldVal = null;
                // 对头结点进行加锁
                synchronized (f) {
                    if (tabAt(tab, i) == f) {
                        if (fh >= 0) {
                          	// key的hash大于等于0
                            binCount = 1;
                            for (Node e = f;; ++binCount) {
                              	// 进行链表遍历
                                K ek;
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                  	// 如果存在,则覆盖
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                Node pred = e;
                                if ((e = e.next) == null) {
                                  	// 如果遍历到最后,直接尾插
                                    pred.next = new Node(hash, key, value);
                                    break;
                                }
                            }
                        }
                        else if (f instanceof TreeBin) {
                          	// 如果是树结构,按照树结构处理
                            Node p;
                            binCount = 2;
                            if ((p = ((TreeBin)f).putTreeVal(hash, key,
                                                           value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                        else if (f instanceof ReservationNode)
                            throw new IllegalStateException("Recursive update");
                    }
                }
                if (binCount != 0) {
                    if (binCount >= TREEIFY_THRESHOLD)
                      	// 如果链表超过阈值长度,则转成红黑树
                        treeifyBin(tab, i);
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
				// 里边包含扩容逻辑
        addCount(1L, binCount);
        return null;
    }
    
    // 初始化方法,保证线程安全
    private final Node[] initTable() {
        Node[] tab; int sc;
      	// 直接死循环,多个线程可能都在初始化
        while ((tab = table) == null || tab.length == 0) {
            if ((sc = sizeCtl) < 0 mapcpu thread.yield else if u.compareandsetintthis sizectl sc -1 compareandsetint casintsizectl-1 thread.yield try if tab='table)' tab.length='= 0)' 16 int n='(sc'> 0) ? sc : DEFAULT_CAPACITY;
                        @SuppressWarnings("unchecked")
                        Node[] nt = (Node[])new Node[n];
                      	// 由于table是volatile,非空之后,其他线程就跳出循环了
                        table = tab = nt;
                      	// sc为n的0.75倍
                        sc = n - (n >>> 2);
                    }
                } finally {
                  // sizeCtl 为容量的0.75倍
                    sizeCtl = sc;
                }
                break;
            }
        }
        return tab;
    }
		// 协助扩容逻辑
    final Node[] helpTransfer(Node[] tab, Node f) {
        Node[] nextTab; int sc;
        if (tab != null && (f instanceof ForwardingNode) &&
            (nextTab = ((ForwardingNode)f).nextTable) != null) {
            // 先判定当前桶,确实处于扩容迁移状态
            // 获取
            int rs = resizeStamp(tab.length);
            while (nextTab == nextTable && table == tab &&
                   (sc = sizeCtl) < 0 if sc>>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                    sc == rs + MAX_RESIZERS || transferIndex <= 0)
                  	// 这里的判断逻辑比较多,还没看明白,就是超过迁移的限制条件,直接退出协助迁移了
                    break;
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
                  	// 修改sizeCTL + 1,表示参与迁移的线程+1
                    transfer(tab, nextTab);
                    break;
                }
            }
            return nextTab;
        }
        return table;
    }

扩容

ConcurrentHashMap的扩容是一个支持多线程并发处理的数据迁移操作,方法无状态,任何线程进入该方法,会根据CPU个数拆分迁移桶的任务,一个线程至少处理16个桶,如果要迁移的nextTable是空的,则先进行初始化默认扩容为原有大小的两倍,然后开始进入死循环,每一个线程都在此遍历获取16个桶进行迁移处理,每迁移完一个桶,基于cas操作更新到新的map中,旧的map这个桶的根节点hash会设置成-1,表示已经迁移完了。

在迁移的过程中,由于基于hash & oldTable.length 选择桶,故链表中的节点要么保留在当前桶index位置,要么迁移到index + oldTable.length的位置,同时会先遍历一遍链表,找到lastRun节点(lastRun节点和之后的连续的节点都在一个桶里),这样再遍历迁移进行高低位拆迁,当碰到lastRun节点时,把这个节点迁移,后边的节点就全部迁移了,由于存在lastRun逻辑,故ConcurrentHashMap是头插,这里与HashMap迁移尾插不一样;如果是红黑树,得遍历每一个节点进行高位桶选择,然后进行迁移,同样红黑树迁移之后,如果节点个数小于6了,需要转换成链表。

		// 迁移方法 tab:原来的数组,nextTab:迁移的目标数组
		private final void transfer(Node[] tab, Node[] nextTab) {
        int n = tab.length, stride;
        if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
          	// 这里基于cpu个数运算,就是每一个线程如果要参与迁移,最少处理16个桶
            stride = MIN_TRANSFER_STRIDE; // subdivide range
        if (nextTab == null) {            // initiating
          	// 如果目标数组为空,则进行初始化
            try {
                @SuppressWarnings("unchecked")
              	// new一个两倍大的数字
                Node[] nt = (Node[])new Node[n << 1];
                nextTab = nt;
            } catch (Throwable ex) {      // try to cope with OOME
                sizeCtl = Integer.MAX_VALUE;
                return;
            }
            nextTable = nextTab;
          	// 这里transferIndex表示接下来需要迁移的桶的index,从大到小
            transferIndex = n;
        }
        int nextn = nextTab.length;
      	// 这里构建一个ForwardingNode 继承于Node ,会把hash设置成-1,表示当前桶在迁移
        ForwardingNode fwd = new ForwardingNode(nextTab);
      	// 是否前进继续获取迁移任务
        boolean advance = true;
        boolean finishing = false; // to ensure sweep before committing nextTab
        for (int i = 0, bound = 0;;) {
            // 死循环直到迁移完成,
          	// i表示需要迁移的桶index,bound表示一次迁移中最小桶的index,bound = i - stride
          	Node f; int fh;
          	
            while (advance) {
              	// true死循环,直到获取到迁移任务,或是迁移任务已经完成,退出
                int nextIndex, nextBound;
                if (--i >= bound || finishing)
                  	// 迁移结束了,或者迁移桶的index达到了当前任务的最小index
                    advance = false;
                else if ((nextIndex = transferIndex) <= 0 transferindexindex0 i='-1;' advance='false;' else if u.compareandswapint this transferindex nextindex nextbound='(nextIndex'> stride ?
                                       nextIndex - stride : 0))) {
                  	// 这里基于cas,去尝试获取迁移任务,一次16个桶,然后设置idnex和最小index
                    bound = nextBound;
                    i = nextIndex - 1;
                  	// 获取到迁移任务,跳出循环
                    advance = false;
                }
            }
            if (i < 0 i>= n || i + n >= nextn) {
              	// 获取迁移任务,进行index边界判定
                int sc;
                if (finishing) {
                  	// 任务结束,
                    nextTable = null;
                  	// 将迁移的nextTab赋值给当前map的table
                    table = nextTab;
                  	// 下次迁移的容量,n*2 - n/2 即0.75倍的新容量大小
                    sizeCtl = (n << 1 - n>>> 1);
                    return;
                }
              	// 基于cas将sizeCtl修改-1
                if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                    if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                      	// 判定是否是最后一个扩容线程,因为第一个扩容线程库容时,会把 SIZECTL =  resizeStamp(n) << RESIZE_STAMP_SHIFT + 2
                      	// 如果不是最后的线程,并且迁移已经结束了,直接退出
                        return;
                    finishing = advance = true;
                  	// 最后一个线程检测所有节点,避免有遗漏的
                    i = n; // recheck before commit
                }
            }
            else if ((f = tabAt(tab, i)) == null)
              	// 如果当前桶是null的,理论上不应该是null的,要么直接结束,要么继续获取迁移任务
                advance = casTabAt(tab, i, null, fwd);
            else if ((fh = f.hash) == MOVED)
              	// 已经在迁移中了,继续获取迁移任务
                advance = true; // already processed
            else {
              	// 对桶的头结点加锁
                synchronized (f) {
                    if (tabAt(tab, i) == f) {
                        Node ln, hn;
                        if (fh >= 0) {
                          	// 头结点hash大于0,非迁移状态
                          	// 这里通过fh&n 计算当前节点迁移,是需要桶位不变,还是迁移到高位index + n
                            int runBit = fh & n;
                            Node lastRun = f;
                            for (Node p = f.next; p != null; p = p.next) {
                              	// 这里遍历一遍,找到lastRun,不关心是高位还是地位,
                              	// 只关注从lastRun开始,之后元素的高低位跟lastRun是一样的
                              	// 这样就只需要把lastRun迁移,那么后边的节点就全部迁移了
                                int b = p.hash & n;
                                if (b != runBit) {
                                    runBit = b;
                                    lastRun = p;
                                }
                            }
                            if (runBit == 0) {
                              	// 如果是地位
                                ln = lastRun;
                                hn = null;
                            }
                            else {
                              	// 如果是高位
                                hn = lastRun;
                                ln = null;
                            }
                            for (Node p = f; p != lastRun; p = p.next) {
                              	// 重新开始遍历迁移,到lastRun停止,因为链表结构,lastRun迁移后边的就同步迁移了
                                int ph = p.hash; K pk = p.key; V pv = p.val;
                              	// 这里采用的是头插法,因为lastRun机制
                                if ((ph & n) == 0)
                                    ln = new Node(ph, pk, pv, ln);
                                else
                                    hn = new Node(ph, pk, pv, hn);
                            }
                          	// 基于cas更新桶
                            setTabAt(nextTab, i, ln);
                          	// 高位的桶index = i + n ,n就是之前桶的长度
                            setTabAt(nextTab, i + n, hn);
                          	// 这里把旧的桶设置为fwd,其中hash = -1 ,表示已经迁移完了
                            setTabAt(tab, i, fwd);
                          	// 设置为true,继续获取迁移任务
                            advance = true;
                        }
                        else if (f instanceof TreeBin) {
                            TreeBin t = (TreeBin)f;
                            TreeNode lo = null, loTail = null;
                            TreeNode hi = null, hiTail = null;
                            int lc = 0, hc = 0;
                            for (Node e = t.first; e != null; e = e.next) {
                              	// 这里同样将红黑树拆成高位和地位两棵树
                                int h = e.hash;
                                TreeNode p = new TreeNode
                                    (h, e.key, e.val, null, null);
                                if ((h & n) == 0) {
                                    if ((p.prev = loTail) == null)
                                        lo = p;
                                    else
                                        loTail.next = p;
                                    loTail = p;
                                  	// 低位树计数
                                    ++lc;
                                }
                                else {
                                    if ((p.prev = hiTail) == null)
                                        hi = p;
                                    else
                                        hiTail.next = p;
                                    hiTail = p;
                                  	// 高位树计数
                                    ++hc;
                                }
                            }
                          	// 当数的树节点低于6时,将数转成链表
                            ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                                (hc != 0) ? new TreeBin(lo) : t;
                            hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                                (lc != 0) ? new TreeBin(hi) : t;
                          	// 更新节点值
                            setTabAt(nextTab, i, ln);
                            setTabAt(nextTab, i + n, hn);
                            setTabAt(tab, i, fwd);
                            advance = true;
                        }
                    }
                }
            }
        }
    }

不积跬步,无以至千里

相关推荐

Python操作Word文档神器:python-docx库从入门到精通

Python操作Word文档神器:python-docx库从入门到精通动动小手,点击关注...

Python 函数调用从入门到精通:超详细定义解析与实战指南 附案例

一、函数基础:定义与调用的核心逻辑定义:函数是将重复或相关的代码块封装成可复用的单元,通过函数名和参数实现特定功能。它是Python模块化编程的基础,能提高代码复用性和可读性。定义语法:...

等这么长时间Python背记手册终于来了,入门到精通(视频400集)

本文毫无套路!真诚分享!前言:无论是学习任何一门语言,基础知识一定要扎实,基础功非常的重要,找一个有丰富编程经验的老师或者师兄带着你会少走很多弯路,你的进步速度也会快很多,无论我们学习的目的是什么,...

图解Python编程:从入门到精通系列教程(附全套速查表)

引言本系列教程展开讲解Python编程语言,Python是一门开源免费、通用型的脚本编程语言,它上手简单,功能强大,它也是互联网最热门的编程语言之一。Python生态丰富,库(模块)极其丰富,这使...

Python入门教程(非常详细)从零基础入门到精通,看完这一篇就够

本书是Python经典实例解析,采用基于实例的方法编写,每个实例都会解决具体的问题和难题。主要内容有:数字、字符串和元组,语句与语法,函数定义,列表、集、字典,用户输入和输出等内置数据结构,类和对象,...

Python函数全解析:从入门到精通,一文搞定!

1.为什么要用函数?函数的作用:封装代码,提高复用性,减少重复,提高可读性。...

Python中的单例模式:从入门到精通

Python中的单例模式:从入门到精通引言单例模式是一种常用的软件设计模式,它保证了一个类只有一个实例,并提供一个全局访问点。这种模式通常用于那些需要频繁创建和销毁的对象,比如日志对象、线程池、缓存等...

【Python王者归来】手把手教你,Python从入门到精通!

用800个程序实例、5万行代码手把手教你,Python从入门到精通!...

Python从零基础入门到精通:一个月就够了

如果想从零基础到入门,能够全职学习(自学),那么一个月足够了。...

Python 从入门到精通:一个月就够了

要知道,一个月是一段很长的时间。如果每天坚持用6-7小时来做一件事,你会有意想不到的收获。作为初学者,第一个月的月目标应该是这样的:熟悉基本概念(变量,条件,列表,循环,函数)练习超过30个编...

Python零基础到精通,这8个入门技巧让你少走弯路,7天速通编程!

Python学习就像玩积木,从最基础的块开始,一步步搭建出复杂的作品。我记得刚开始学Python时也是一头雾水,走了不少弯路。现在回头看,其实掌握几个核心概念,就能快速入门这门编程语言。来聊聊怎么用最...

神仙级python入门教程(非常详细),从0到精通,从看这篇开始!

python入门虽然简单,很多新手依然卡在基础安装阶段,大部分教程对一些基础内容都是一带而过,好多新手朋友,对一些基础知识常常一知半解,需要在网上查询很久。...

Python类从入门到精通,一篇就够!

一、Python类是什么?大家在生活中应该都见过汽车吧,每一辆真实存在、能在路上跑的汽车,都可以看作是一个“对象”。那这些汽车是怎么生产出来的呢?其实,在生产之前,汽车公司都会先设计一个详细的蓝图...

学习Python从入门到精通:30天足够了,这才是python基础的天花板

当年2w买的全套python教程用不着了,现在送给有缘人,不要钱,一个月教你从入门到精通1、本套视频共487集,本套视频共分4季...

30天Python 入门到精通(3天学会python)

以下是一个为期30天的Python入门到精通学习课程,专为零基础新手设计。课程从基础语法开始,逐步深入到面向对象编程、数据处理,最后实现运行简单的大语言模型(如基于HuggingFace...

取消回复欢迎 发表评论: