JDK源码分析(12)之 ConcurrentHashMap 详解

本文将主要讲述 jdk1.8 版本 的 concurrenthashmap,其内部结构和很多的哈希优化算法,都是和 jdk1.8 版本的 hashmap是一样的,所以在阅读本文之前,一定要先了解 hashmap,可以参考 hashmap 相关;另外 concurrenthashmap 中同样有红黑树,这部分可以先不看不影响整体结构把握,有兴趣的可以查看 ;

一、concurrenthashmap 结构概述

1. 整体概述

chm 的源码有 6k 多行,包含的内容多,精巧,不容易理解;建议在查看源码的时候,可以首先把握整体结构脉络,对于一些精巧的优化,哈希技巧可以先了解目的就可以了,不用深究;对整体把握比较清楚后,在逐步分析,可以比较快速的看懂;

jdk1.8 版本中的 chm,和 jdk1.7 版本的差别非常大,在查看资料的时候要注意区分,1.7 中主要是使用 segment 分段锁 来解决并发问题的;而在 1.8 中则完全没有这些稍显臃肿的结构,其结构基本和 hashmap 是一样的,都是 数组 + 链表 + 红黑树,如图所示:


其主要区别就在 chm 支持并发:

  • 使用 unsafe 方法操作数组内部元素,保证可见性;(u.getobjectvolatile、u.compareandswapobject、u.putobjectvolatile);
  • 在更新和移动节点的时候,直接锁住对应的哈希桶,锁粒度更小,且动态扩展;
  • 针对扩容慢操作进行优化,
    • 首先扩容过程的中,节点首先移动到过度表 nexttable ,所有节点移动完毕时替换散列表 table
    • 移动时先将散列表定长等分,然后逆序依次领取任务扩容,设置 sizectl 标记正在扩容;
    • 移动完成一个哈希桶或者遇到空桶时,将其标记为 forwardingnode 节点,并指向 nexttable
    • 后有其他线程在操作哈希表时,遇到 forwardingnode 节点,则先帮助扩容(继续领取分段任务),扩容完成后再继续之前的操作;
  • 优化哈希表计数器,采用 longadder、striped64 类似思想;
  • 以及大量的哈希算法优化和状态变量优化;

以上讲的这些不太清楚也没有关系,主要是有一个印象,大致清楚 chm 的实现方向,具体细节后面还会结合源码详细讲解;

2. 类定义和成员变量

public class concurrenthashmap<k,v> 
             extends abstractmap<k,v> implements concurrentmap<k,v>, serializable {
  private static final int maximum_capacity = 1 << 30;       // 最大容量
  private static final int default_capacity = 16;            // 默认初始化容量
  private static final int default_concurrency_level = 16;   // 并发级别,为兼容1.7,实际未用
  private static final float load_factor = 0.75f;       // 固定负载系数,n - (n >>> 2)
  static final int treeify_threshold = 8;               // 链表超过8时,转为红黑树
  static final int untreeify_threshold = 6;             // 红黑树低于6时,转为链表
  static final int min_treeify_capacity = 64;           // 树化最小容量,容量小于64时,先扩容
  private static final int min_transfer_stride = 16;    // 扩容时拆分散列表,最小步长
  private static int resize_stamp_bits = 16;            
  private static final int max_resizers = (1 << (32 - resize_stamp_bits)) - 1;  // 可参与扩容的最大线程
  static final int ncpu = runtime.getruntime().availableprocessors();  // cpu 数
  transient volatile node<k,v>[] table;                // 散列表
  private transient volatile node<k,v>[] nexttable;    // 扩容时的过度表
  private transient volatile int sizectl;              // 最重要的状态变量,下面详讲
  private transient volatile int transferindex;        // 扩容进度指示
  private transient volatile long basecount;              // 计数器,基础基数
  private transient volatile int cellsbusy;               // 计数器,并发标记
  private transient volatile countercell[] countercells;  // 计数器,并发累计
  public concurrenthashmap() { }
  public concurrenthashmap(int initialcapacity) {
    if (initialcapacity < 0)
      throw new illegalargumentexception();
    int cap = ((initialcapacity >= (maximum_capacity >>> 1)) ?
           maximum_capacity :
           tablesizefor(initialcapacity + (initialcapacity >>> 1) + 1));  // 注意这里不是0.75,后面介绍
    this.sizectl = cap;
  public concurrenthashmap(map<? extends k, ? extends v> m) {
    this.sizectl = default_capacity;
    public concurrenthashmap(int initialcapacity, float loadfactor) {
    this(initialcapacity, loadfactor, 1);
  public concurrenthashmap(int initialcapacity, float loadfactor, int concurrencylevel) {
    if (!(loadfactor > 0.0f) || initialcapacity < 0 || concurrencylevel <= 0) 
      throw new illegalargumentexception();
    if (initialcapacity < concurrencylevel)   // use at least as many bins
      initialcapacity = concurrencylevel;   // as estimated threads
    long size = (long)(1.0 + (long)initialcapacity / loadfactor);   // 注意这里的初始化
    int cap = (size >= (long)maximum_capacity) ? maximum_capacity : tablesizefor((int)size);
    this.sizectl = cap;



这里的负载系数,同 hashmap 等其他 map 的系数有明显区别:

  • 通常的系数默认 0.75,可以由构造函数传入,当节点数 size 超过 loadfactor * capacity 时扩容;

  • 而 cmh 的系数则固定 0.75(使用 n - (n >>> 2) 表示),构造函数传入的系数只影响初始化容量,见第5个构造函数;

  • 上面第二个构造函数中,initialcapacity + (initialcapacity >>> 1) + 1),这里居然不是使用的默认0.75,可以看作bug,也可视作优化,见



sizectl 是 chm 中最重要的状态变量,其中包括很多中状态,这里先整体介绍帮助后面源码理解;

  • sizectl = 0 :初始值,还未指定初始容量;

  • sizectl > 0 :

    • table 未初始化,表示初始化容量;
    • table 已初始化,表示扩容阈值(0.75n);
  • sizectl = -1 :表示正在初始化;

  • sizectl < -1 :表示正在扩容,具体结构如图所示:



 * n=64
 * integer.numberofleadingzeros(n)=26
 * resizestamp(64) = 0001 1010 | 1000 0000 0000 0000 = 1000 0000 0001 1010
static final int resizestamp(int n) {
  return integer.numberofleadingzeros(n) | (1 << (resize_stamp_bits - 1));

所以 resizestamp(64) << resize_stamp_shift) + 2 ,表示扩容目标为 64,有一个线程正在扩容;

3. node 节点

static class node<k,v> implements map.entry<k,v> {  // 哈希表普通节点
  final int hash;
  final k key;
  volatile v val;
  volatile node<k,v> next;
  node<k,v> find(int h, object k) {}   // 主要在扩容时,利用多态查询已转移节点

static final class forwardingnode<k,v> extends node<k,v> {  // 标识扩容节点
  final node<k,v>[] nexttable;  // 指向成员变量 concurrenthashmap.nexttable
  forwardingnode(node<k,v>[] tab) {
    super(moved, null, null, null);  // hash = -1,快速确定 forwardingnode 节点
    this.nexttable = tab;
  node<k,v> find(int h, object k) {}

static final class treebin<k,v> extends node<k,v> { // 红黑树根节点
  treebin(treenode<k,v> b) {
    super(treebin, null, null, null);  // hash = -2,快速确定红黑树,
static final class treenode<k,v> extends node<k,v> { } // 红黑树普通节点,其 hash 同 node 普通节点 > 0;

4. 哈希计算

static final int moved     = -1;          // hash for forwarding nodes
static final int treebin   = -2;          // hash for roots of trees
static final int reserved  = -3;          // hash for transient reservations
static final int hash_bits = 0x7fffffff;  // usable bits of normal node hash

// 让高位16位,参与哈希桶定位运算的同时,保证 hash 为正
static final int spread(int h) {
  return (h ^ (h >>> 16)) & hash_bits;


  • tablesizefor : 将容量转为大于n,且最小的2的幂;
  • 除留余数法 :hash % length = hash & (length-1)
  • 扩容后哈希桶定位:(e.hash & oldcap),0 - 位置不变,1 - 原来的位置 + oldcap;

以上这些哈希优化的具体原理,都在之前的博客讲过了,就不在重复了,hashmap 相关

5. 哈希桶可见性

我们都知道一个数组即使声明为 volatile,也只能保证这个数组引用本身的可见性,其内部元素的可见性是无法保证的,如果每次都加锁,则效率必然大大降低,在 chm 中则使用 unsafe 方法来保证:

static final <k,v> node<k,v> tabat(node<k,v>[] tab, int i) {
  return (node<k,v>)u.getobjectvolatile(tab, ((long)i << ashift) + abase);

static final <k,v> boolean castabat(node<k,v>[] tab, int i, node<k,v> c, node<k,v> v) {
  return u.compareandswapobject(tab, ((long)i << ashift) + abase, c, v);

static final <k,v> void settabat(node<k,v>[] tab, int i, node<k,v> v) {
  u.putobjectvolatile(tab, ((long)i << ashift) + abase, v);


1. inittable 方法

private final node<k,v>[] inittable() {
  node<k,v>[] tab; int sc;
  while ((tab = table) == null || tab.length == 0) {
    if ((sc = sizectl) < 0) thread.yield();  // 有其他线程在初始化
    else if (u.compareandswapint(this, sizectl, sc, -1)) {  // 设置状态 -1
      try {
        if ((tab = table) == null || tab.length == 0) {
          int n = (sc > 0) ? sc : default_capacity;  // 注意此时的 sizectl 表示初始容量,完毕后表示扩容阈值
          node<k,v>[] nt = (node<k,v>[])new node<?,?>[n];
          table = tab = nt;
          sc = n - (n >>> 2);  // 同 0.75n
      } finally {
        sizectl = sc;  // 注意这里没有 cas 更新,这就是状态变量的高明了,因为前面设置了 -1,此时这里没有竞争
  return tab;

2. get 方法

get 方法可能看代码不是很长,但是他却能 保证无锁状态下的内存一致性 ,他的每一句代码都要仔细理解,多设想一下如果发生竞争会怎样,如此才能有所得;

public v get(object key) {
  node<k,v>[] tab; node<k,v> e, p; int n, eh; k ek;
  int h = spread(key.hashcode()); // 计算 hash
  if ((tab = table) != null && (n = tab.length) > 0 &&  // 确保 table 已经初始化
    // 确保对应的哈希桶不为空,注意这里是 volatile 语义获取;因为扩容的时候,是完全拷贝,所以只要不为空,则链表必然完整
    (e = tabat(tab, (n - 1) & h)) != null) {
    if ((eh = e.hash) == h) {
      if ((ek = e.key) == key || (ek != null && key.equals(ek)))
        return e.val;
    // hash < 0,则必然在扩容,原来位置的节点可能全部移动到 i + oldcap 位置,所以利用多态到 nexttable 中查找
    else if (eh < 0) return (p = e.find(h, key)) != null ? p.val : null;
    while ((e = e.next) != null) { // 遍历链表
      if (e.hash == h &&
        ((ek = e.key) == key || (ek != null && key.equals(ek))))
        return e.val;
  return null;

3. putval 方法

注意 chm 的 key 和 value 都不能为空

final v putval(k key, v value, boolean onlyifabsent) {
  if (key == null || value == null) throw new nullpointerexception();
  int hash = spread(key.hashcode());  // hash 计算
  int bincount = 0;                   // 状态变量,主要表示查找链表节点数,最后判断是否转为红黑树
  for (node<k,v>[] tab = table;;) {
    node<k,v> f; int n, i, fh;
    if (tab == null || (n = tab.length) == 0) tab = inittable();  // 初始化
    else if ((f = tabat(tab, i = (n - 1) & hash)) == null) {      // cas 获取哈希桶
      if (castabat(tab, i, null, new node<k,v>(hash, key, value, null))) // cas 更新,失败时继续循环更新
        break;  // no lock when adding to empty bin
    else if ((fh = f.hash) == moved) tab = helptransfer(tab, f);  // 正在扩容的时候,先帮助扩容
    else {
      v oldval = null;
      synchronized (f) {  // 注意这里只锁定了一个哈希桶,所以比 1.7 中的 segment 分段锁 粒度更低
        if (tabat(tab, i) == f) {  // 确认该哈希桶是否已经移动
          if (fh >= 0) {   // hash >=0 则必然是普通节点,直接遍历链表即可
            bincount = 1;
            for (node<k,v> e = f;; ++bincount) {
              k ek;
              if(e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { // 查找成功
                oldval = e.val;
                if (!onlyifabsent) e.val = value;
              node<k,v> pred = e;
              if ((e = e.next) == null) {  // 查找失败时,直接在末尾添加新节点
                pred.next = new node<k,v>(hash, key, value, null);
          else if (f instanceof treebin) {  // 树根节点
            node<k,v> p;
            bincount = 2;
            if ((p = ((treebin<k,v>)f).puttreeval(hash, key, value)) != null) {  // 红黑树查找
              oldval = p.val;
              if (!onlyifabsent) p.val = value;
      if (bincount != 0) {
        if (bincount >= treeify_threshold) treeifybin(tab, i); // 如果链表长度大于8,转为红黑树
        if (oldval != null)
          return oldval;
  addcount(1l, bincount); // 计数加一,注意这里使用的是计数器,普通的 atomic 变量仍然可能称为性能瓶颈;
  return null;



4. 扩容

扩容操作一直都是比较慢的操作,而 chm 中巧妙的利用任务划分,使得多个线程可能同时参与扩容;另外扩容条件也有两个:

  • 有链表长度超过 8,但是容量小于 64 的时候,发生扩容;
  • 节点数超过阈值的时候,发生扩容;


  • 首先扩容过程的中,节点首先移动到过度表 nexttable ,所有节点移动完毕时替换散列表 table
  • 移动时先将散列表定长等分,然后逆序依次领取任务扩容,设置 sizectl 标记正在扩容;
  • 移动完成一个哈希桶或者遇到空桶时,将其标记为 forwardingnode 节点,并指向 nexttable
  • 后有其他线程在操作哈希表时,遇到 forwardingnode 节点,则先帮助扩容(继续领取分段任务),扩容完成后再继续之前的操作;




private final void transfer(node<k,v>[] tab, node<k,v>[] nexttab) {
  int n = tab.length, stride;
  if ((stride = (ncpu > 1) ? (n >>> 3) / ncpu : n) < min_transfer_stride)
    stride = min_transfer_stride; // 根据 cpu 数量计算任务步长
  if (nexttab == null) {          // 初始化 nexttab
    try {
      node<k,v>[] nt = (node<k,v>[])new node<?,?>[n << 1];  // 扩容一倍
      nexttab = nt;
    } catch (throwable ex) {
      sizectl = integer.max_value; // 发生 oom 时,不再扩容
    nexttable = nexttab;
    transferindex = n;
  int nextn = nexttab.length;
  forwardingnode<k,v> fwd = new forwardingnode<k,v>(nexttab);  // 标记空桶,或已经转移完毕的桶
  boolean advance = true;
  boolean finishing = false; // to ensure sweep before committing nexttab
  for (int i = 0, bound = 0;;) {  // 逆向遍历扩容
    node<k,v> f; int fh;
    while (advance) {  // 向前获取哈希桶
      int nextindex, nextbound;
      if (--i >= bound || finishing)               // 已经取到哈希桶,或已完成时退出
        advance = false;
      else if ((nextindex = transferindex) <= 0) { // 遍历到达头节点,已经没有待迁移的桶,线程准备退出
        i = -1;
        advance = false;
      else if (u.compareandswapint
           (this, transferindex, nextindex,
            nextbound = (nextindex > stride ? nextindex - stride : 0))) {  // 当前任务完成,领取下一批哈希桶
        bound = nextbound;
        i = nextindex - 1;  // 索引指向下一批哈希桶
        advance = false;
    // i < 0  :表示扩容结束,已经没有待移动的哈希桶
    // i >= n :扩容结束,再次检查确认
    // i + n >= nextn : 在使用 nexttable 替换 table 时,有线程进入扩容就会出现
    if (i < 0 || i >= n || i + n >= nextn) { // 完成扩容准备退出
      int sc;
      if (finishing) {  // 两次检查,只有最后一个扩容线程退出时,才更新变量
        nexttable = null;
        table = nexttab;
        sizectl = (n << 1) - (n >>> 1); // 0.75*2*n
      if (u.compareandswapint(this, sizectl, sc = sizectl, sc - 1)) {  // 扩容线程减一
        if ((sc - 2) != resizestamp(n) << resize_stamp_shift) return;  // 不是最后一个线程,直接退出
        finishing = advance = true;   // 最后一个线程,再次检查
        i = n;                        // recheck before commit
    else if ((f = tabat(tab, i)) == null)  // 当前节点为空,直接标记为 forwardingnode,然后继续获取下一个桶
      advance = castabat(tab, i, null, fwd);
    // 之前的线程已经完成该桶的移动,直接跳过,正常情况下自己的任务区间,不会出现 forwardingnode 节点,
    else if ((fh = f.hash) == moved)  // 此处为极端条件下的健壮性检查
      advance = true; // already processed
    // 开始处理链表
    else {
      // 注意在 get 的时候,可以无锁获取,是因为扩容是全拷贝节点,完成后最后在更新哈希桶
      // 而在 put 的时候,是直接将节点加入尾部,获取修改其中的值,此时如果允许 put 操作,最后就会发生脏读,
      // 所以 put 和 transfer,需要竞争同一把锁,也就是对应的哈希桶,以保证内存一致性效果
      synchronized (f) { 
        if (tabat(tab, i) == f) {  // 确认锁定的是同一个桶
          node<k,v> ln, hn;
          if (fh >= 0) {  // 正常节点
            int runbit = fh & n;  // hash & n,判断扩容后的索引
            node<k,v> lastrun = f;
            // 此处找到链表最后扩容后处于同一位置的连续节点,这样最后一节就不用再一次复制了
            for (node<k,v> p = f.next; p != null; p = p.next) {
              int b = p.hash & n;
              if (b != runbit) {
                runbit = b;
                lastrun = p;
            if (runbit == 0) {
              ln = lastrun;
              hn = null;
            else {
              hn = lastrun;
              ln = null;
            // 依次将链表拆分成,lo、hi 两条链表,即位置不变的链表,和位置 + oldcap 的链表
            // 注意最后一节链表没有new,而是直接使用原来的节点
            // 同时链表的顺序也被打乱了,lastrun 到最后为正序,前面一节为逆序
            for (node<k,v> p = f; p != lastrun; p = p.next) {
              int ph = p.hash; k pk = p.key; v pv = p.val;
              if ((ph & n) == 0)
                ln = new node<k,v>(ph, pk, pv, ln);
                hn = new node<k,v>(ph, pk, pv, hn);
            settabat(nexttab, i, ln);      // 插入 lo 链表
            settabat(nexttab, i + n, hn);  // 插入 hi 链表
            settabat(tab, i, fwd);         // 哈希桶移动完成,标记为 forwardingnode 节点
            advance = true;                // 继续获取下一个桶
          else if (f instanceof treebin) { // 拆分红黑树
            treebin<k,v> t = (treebin<k,v>)f;
            treenode<k,v> lo = null, lotail = null; // 为避免最后在反向遍历,先留头结点的引用,
            treenode<k,v> hi = null, hitail = null; // 因为顺序的链表,可以加速红黑树构造
            int lc = 0, hc = 0;  // 同样记录 lo,hi 链表的长度
            for (node<k,v> e = t.first; e != null; e = e.next) {  // 中序遍历红黑树
              int h = e.hash;
              treenode<k,v> p = new treenode<k,v>(h, e.key, e.val, null, null);  // 构造红黑树节点
              if ((h & n) == 0) {
                if ((p.prev = lotail) == null)
                  lo = p;
                  lotail.next = p;
                lotail = p;
              else {
                if ((p.prev = hitail) == null)
                  hi = p;
                  hitail.next = p;
                hitail = p;
            // 判断是否需要将其转化为红黑树,同时如果只有一条链,那么就可以不用在构造
            ln = (lc <= untreeify_threshold) ? untreeify(lo) : (hc != 0) ? new treebin<k,v>(lo) : t;
            hn = (hc <= untreeify_threshold) ? untreeify(hi) : (lc != 0) ? new treebin<k,v>(hi) : t;
            settabat(nexttab, i, ln);
            settabat(nexttab, i + n, hn);
            settabat(tab, i, fwd);
            advance = true;

还有其他相关方法不是很复杂,就不详细讲了,比如 trypresize,helptransfer,addcount

5. 计数器

当获取 map.size 的时候,如果使用 atomic 变量,很容易导致过度竞争,产生性能瓶颈,所以 chm 中使用了,计数器的方式:

public int size() {
  long n = sumcount();
  return ((n < 0l) ? 0 : (n > (long)integer.max_value) ? integer.max_value : (int)n);
private transient volatile countercell[] countercells;  // 计数器

@sun.misc.contended static final class countercell {  // @sun.misc.contended 避免伪缓存
  volatile long value;
  countercell(long x) { value = x; }

final long sumcount() {
  countercell[] as = countercells; countercell a;
  long sum = basecount;
  if (as != null) {
    for (int i = 0; i < as.length; ++i) {  // 累计计数
      if ((a = as[i]) != null)
        sum += a.value;
  return sum;



  • 首先 jdk1.8 的 chm,没有使用 segment 分段锁,而是直接锁定单个哈希桶
  • 对数组中的哈希桶使用 cas 操作,保证其可见性
  • 对扩容是用,任务拆分,多线程同时扩容的方式,加速扩容
  • 对 size 使用计数器思想
  • chm 中对状态变量的应用,使得很多操作都得以无所化进行

