ConcurrentHashMap-源码分析

产生原因

  • 在多线程环境下,HashMap在扩容时会产生死循环,而线程安全的HashTable,在涉及多线程的操作都采用Synchronized锁住整个数组表,即所有线程在争夺一个资源,效率很低。

1.7 vs 1.8

  • 1.7的 ConcurrentHashMap 采用的是分段锁的设计,底层数据结构是Segment + HashEntry,Segment实现了ReentrantLock,自带锁的功能,每次只锁定对应的Segment,多个线程争夺同个segment时,通过tryLock争夺,锁定的粒度下降了,性能也就提高了不少。
  • 1.8的ConcurrentHashMap 摒弃了分段锁的概念(Segment),沿用了HashMap的思想,基于数组 + 链表 + 红黑树,底层采用的Node + CAS + Synchronized,保证并发的安全性,锁住的是粒度更小的Node。

分析

内部类Node, 实现了Map.Entry,用于存储键值对,节点用 volatile修饰,保证了多线程间的可见性,同时注意 value也用volatile修饰,无法通过setValue设置value变量。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
static class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
volatile V val;
volatile Node<K,V> next;

Node(int hash, K key, V val, Node<K,V> next) {
this.hash = hash;
this.key = key;
this.val = val;
this.next = next;
}

public final K getKey() { return key; }
public final V getValue() { return val; }
public final int hashCode() { return key.hashCode() ^ val.hashCode(); }
public final String toString(){ return key + "=" + val; }
public final V setValue(V value) {
throw new UnsupportedOperationException();
}

public final boolean equals(Object o) {
Object k, v, u; Map.Entry<?,?> e;
return ((o instanceof Map.Entry) &&
(k = (e = (Map.Entry<?,?>)o).getKey()) != null &&
(v = e.getValue()) != null &&
(k == key || k.equals(key)) &&
(v == (u = val) || v.equals(u)));
}

/**
* Virtualized support for map.get(); overridden in subclasses.
*/
Node<K,V> find(int h, Object k) {
Node<K,V> e = this;
if (k != null) {
do {
K ek;
if (e.hash == h &&
((ek = e.key) == k || (ek != null && k.equals(ek))))
return e;
} while ((e = e.next) != null);
}
return null;
}
}

属性值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// Node数组,数组大小会被调整到2的整数次幂,同时用volatile修饰
transient volatile Node<K,V>[] table;


// 基本计数器,CAS更新
private transient volatile long baseCount;

// 表示表初始化或扩容时的标识符
// 主要有一下几种状态:
-1:初始化
-n:有n-1个线程进行扩容操作
0|n:hash尚未初始化,标识下次扩容的大小
private transient volatile int sizeCtl;


private transient volatile int transferIndex;


private transient volatile int cellsBusy;

private transient volatile CounterCell[] counterCells;

取hash值

1
2
3
4
5
6
7
8
9
10
11
12
13
// 与HashMap差不多,前16位与后16位进行异或操作,结果值要 & HASH_BITS,
//这个 HASH_BITS代表的是Integer.MAX_VALUE,目的是消除 hash最高位的负号,因为符号在ConcurrentHashMap有特别的意义,代表不同的节点
static final int spread(int h) {
return (h ^ (h >>> 16)) & HASH_BITS;
}

static final int HASH_BITS = 0x7fffffff;

// 记录hash值的不同状态(forwarding节点、TreeBin节点)
static final int MOVED = -1; // hash for forwarding nodes
static final int TREEBIN = -2; // hash for roots of trees
static final int RESERVED = -3; // hash for transient reservations
static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash

put()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
public V put(K key, V value) {
return putVal(key, value, false);
}

/** Implementation for put and putIfAbsent */
final V putVal(K key, V value, boolean onlyIfAbsent) {

// key值不允许为null,否则抛出异常
if (key == null || value == null) throw new NullPointerException();

// 取hash值
int hash = spread(key.hashCode());
int binCount = 0;

// 循环添加
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;

// table为null,则进行初始化
if (tab == null || (n = tab.length) == 0)
tab = initTable();

// 通过 hash & length - 1,得到对应的位置,如果位置为空,直接CAS放入
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}

// forward节点处于 MOVE 状态(扩容),当前线程加入到扩容操作中
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);

// 插入节点非null,且不处于扩容状态,synchronized同步添加,判断是否是链表还是树
else {
V oldVal = null;

synchronized (f) {

// 对应的数组下标已经有了元素
if (tabAt(tab, i) == f) {

// fh >= 0,说明是链表,遍历节点,添加值或替换值
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;

// 替换
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}

// 尾节点添加
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}

// 树节点,采用红黑树添加
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {

// 判断链表长度是否到达8,转化为红黑树
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}

// 更新ConcurrentHashMap的数量,+1
addCount(1L, binCount);
return null;
}

get(Object key)

对于get(Object key)相对简单,只要匹配对应的key,hash,就可以获取相应的value

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;

// 取得hash值
int h = spread(key.hashCode());
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {

// key和hash匹配,直接返回对用节点
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}

// eh < 0,非正常节点(扩容或树节点),采用 内部类的ForwardingNode 和 TreeNode的find()
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;

// 正常节点,直接链表递归查找
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}

initTable()

初始化table,初始化操作依赖于sizeCtl变量,通过这个变量判断其他线程是否在执行,否则线程自己进行初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;

// 循环创建table,保证一定创建成功
while ((tab = table) == null || tab.length == 0) {

// 根据sizeCtl变量判断其他线程是否在执行初始化,如果是,将自己线程挂起,保证只有一个线程进行初始化
if ((sc = sizeCtl) < 0)
Thread.yield(); // lost initialization race; just spin

// 只有自己线程在进行初始化,那么CAS将sizeCtl设置为-1,表示当前线程在进行初始化
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}

transfer()

扩容操作主要分为两部分:
(1)创建一个nextTable数组,容量为原来的两倍,单线程完成
(2)将原table数组的元素按规则复制到nextTable,多线程处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
// 创建新数组
private transient volatile Node<K,V>[] nextTable;

private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;

// 根据cpu个数找出扩容时的数组跨度大小 16,32,64
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range


// 正式初始化
// 新创建nextTable,容量为原来的两倍
if (nextTab == null) { // initiating
try {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;

//旧数组开始的转移位置,逆序迁移
transferIndex = n;
}

// 创建扩容连接节点,将nextTable[]放入
int nextn = nextTab.length;
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);

// 表示节点是否已经处理过
boolean advance = true;

// 扩容是否完成
boolean finishing = false; // to ensure sweep before committing nextTab

// 使用i表示当前处理的槽位,bound表示需要处理的槽位边界
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
while (advance) {
int nextIndex, nextBound;
if (--i >= bound || finishing)
advance = false;

// 迁移到头了
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
// i = 15,bound = 0
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}

// 全部迁移
if (i < 0 || i >= n || i + n >= nextn) {
int sc;

// 扩容完成,将nextTable赋值给table,清空临时表,重新设置sizeCtl
if (finishing) {
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);
return;
}

// CAS修改sizeCtl,sc - 1说明新增加扩容线程
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;

// 完成
finishing = advance = true;
i = n; // recheck before commit
}
}

// 遍历的节点为空,放入Forwarding node
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);

// 遍历到的节点为Forwarding node节点,说明被处理过了,直接跳过
else if ((fh = f.hash) == MOVED)
advance = true; // already processed

else {
// 锁住节点
synchronized (f) {
if (tabAt(tab, i) == f) {

// 构造两个节点,在新数组中保存低位(0)和高位(1)的数组节点
Node<K,V> ln, hn;

// fh > 0,Node节点,获取
if (fh >= 0) {

// 获取原标志位,用于比较节点是否位于新数组
int runBit = fh & n;
Node<K,V> lastRun = f;

// 遍历链表判断状态,记录下 runBit 和 lastRun
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}

// 这里为两个链表设置头,一个正序,一个倒序
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}

// 遍历链表,根据标识位构造 ln链表和 hn链表
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}

// 在nextTable的 i 位置插入ln链表
setTabAt(nextTab, i, ln);

// 在nextTable的 i+n 位置插入hn链表
setTabAt(nextTab, i + n, hn);

// 设置forwarding节点,表示已经处理过了
setTabAt(tab, i, fwd);

// 标识advance 为true,i--
advance = true;
}

// 对树节点进行处理
else if (f instanceof TreeBin) {
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}