java8集合框架(四)-Map的实现类(CurrentSkipListMap)

java8集合框架(四)-Map的实现类(CurrentSkipListMap)

前篇提到LinkedHashMap的有序指的是按访问/插入的顺序遍历。如果需求是按照key的大小进行有序遍历,那么我们有两个秘密武器,分别是单线程的TreeMap以及本篇要分析的线程安全的CurrentSkipListMap

Note:本篇源码分析基于jdk1.8版本

CurrentSkipListMap

CurrentSkipListMap不接受nullkey和nullvalue。

数据结构

CurrentSkipListMap的数据结构是跳表,是平衡二叉树的一种替代方案。它非常简单,不需要左旋右旋等操作来保持树的平衡性,只需要熟悉对链表的常规操作即可。ConcurrentSkipListMap的数据结构包含3个类型的节点。1.数据节点Node,2.索引节点Index,3头索引节点HeadIndex。底层数据节点链从小到大排列。我想没有哪个图会比源码注释中的更清晰的表示。

注:最左侧一列是HeadIndex类型节点,记录所在链的level。
最下面一行(base-level)都是Node类型节点,记录key-value映射关系。
base-level第一个节点是一个Object,所在链的头标志,也是其他HeadIndex类型节点的Node阈值。
其他节点均为Index类型节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
//数据节点链的头
private static final Object BASE_HEADER = new Object();
//最高层HeadIndex节点指针
private transient volatile HeadIndex<K,V> head;
//数据排序的比较器
final Comparator<? super K> comparator;
//Node类型节点定义,存放映射关系
static final class Node<K,V> {
final K key;
volatile Object value;
volatile Node<K,V> next;
Node(K key, Object value, Node<K,V> next) {...}
//创建一个marker节点
Node(Node<K,V> next) { ... }
//CAS无锁更新
boolean casValue(Object cmp, Object val) {...}
boolean casNext(Node<K,V> cmp, Node<K,V> val) {...}
boolean isMarker() {
return value == this;
}
//是否是数据节点的头
boolean isBaseHeader() {return value == BASE_HEADER;}
//为该节点添加一个删除的标志,它的next指向一个Marker节点
boolean appendMarker(Node<K,V> f) {
return casNext(f, new Node<K,V>(f));
}
void helpDelete(Node<K,V> b, Node<K,V> f) {
if (f == next && this == b.next) {
if (f == null || f.value != f) // not already marked
casNext(f, new Node<K,V>(f));
else
b.casNext(this, f.next);
}
}
V getValidValue() {
Object v = value;
if (v == this || v == BASE_HEADER)
return null;
@SuppressWarnings("unchecked") V vv = (V)v;
return vv;
}
...
}
//索引类型节点定义
static class Index<K,V> {
//索引指向的数据节点
final Node<K,V> node;
//下一层索引节点
final Index<K,V> down;
//当前层的下一索引
volatile Index<K,V> right;
Index(Node<K,V> node, Index<K,V> down, Index<K,V> right) {...}
//CAS无锁替换下一索引
final boolean casRight(Index<K,V> cmp, Index<K,V> val) {...}
//判断该索引是否已被删除
final boolean indexesDeletedNode() {
return node.value == null;
}
//替换当前节点的right为newSucc
final boolean link(Index<K,V> succ, Index<K,V> newSucc) {...}
//替换当前节点的right为right的right
final boolean unlink(Index<K,V> succ) {
return node.value != null && casRight(succ, succ.right);
}
...
}
//头索引类型节点定义,在Index类型基础上添加了level阈
static final class HeadIndex<K,V> extends Index<K,V> {
final int level;
HeadIndex(Node<K,V> node, Index<K,V> down, Index<K,V> right, int level) {
super(node, down, right);
this.level = level;
}
}

关键操作-初始化

1
2
3
4
5
6
7
8
9
//初始化
private void initialize() {
keySet = null;
entrySet = null;
values = null;
descendingMap = null;
//第一层HeadIndex
head = new HeadIndex<K,V>(new Node<K,V>(null, BASE_HEADER, null),null, null, 1);
}

说明:初始化后跳表中只有两个元素:一个level1的头指针和baselevel的头节点。

关键操作-put

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
private V doPut(K key, V value, boolean onlyIfAbsent) {
Node<K,V> z; // added node
if (key == null)
throw new NullPointerException();
Comparator<? super K> cmp = comparator;
outer: for (;;) {
//b是传入key的先行Node节点,n是先行Node节点的next,f是next的后继节点
//这里满足 key<n.key
for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
if (n != null) {
Object v; int c;
Node<K,V> f = n.next;
//b.next被其他线程改变
if (n != b.next) break;
//next的Node节点已经被其他线程删除
if ((v = n.value) == null) { // n is deleted
n.helpDelete(b, f);
break;
}
//先行Node节点已经被删除
if (b.value == null || v == n) // b is deleted
break;
//不满足key<n.key
if ((c = cpr(cmp, key, n.key)) > 0) {
b = n;
n = f;
continue;
}
//查找到key对应的Node,更新
if (c == 0) {
if (onlyIfAbsent || n.casValue(v, value)) {
@SuppressWarnings("unchecked") V vv = (V)v;
return vv;
}
break; // restart if lost race to replace value
}
// else c < 0; fall through
}
//构造Node节点,该节点的next字段指向next数据节点
z = new Node<K,V>(key, value, n);
//先行节点的next字段指向新节点
if (!b.casNext(n, z))
break; // restart if lost race to append to b
break outer;
}
}
//取随机数
int rnd = ThreadLocalRandom.nextSecondarySeed();
//只有随机数的头尾bit均为0才会考虑给该Node节点添加索引
if ((rnd & 0x80000001) == 0) { // test highest and lowest bits
int level = 1, max;
//计算添加索引的层级。即从第二位开始往左连续1的个数
while (((rnd >>>= 1) & 1) != 0)
++level;
//idx始终指向新索引节点的最高层级
Index<K,V> idx = null;
HeadIndex<K,V> h = head;
//level不超过现最高层级,则叠加索引节点
if (level <= (max = h.level)) {
for (int i = 1; i <= level; ++i)
idx = new Index<K,V>(z, idx, null);
}
//否则,无论上面计算的level是多少,将现最高层级+1,叠加索引节点
else { // try to grow by one level
level = max + 1;
@SuppressWarnings("unchecked")Index<K,V>[] idxs =
(Index<K,V>[])new Index<?,?>[level+1];
for (int i = 1; i <= level; ++i)
idxs[i] = idx = new Index<K,V>(z, idx, null);
for (;;) {
h = head;
//现最高层级
int oldLevel = h.level;
if (level <= oldLevel) // lost race to add level
break;
HeadIndex<K,V> newh = h;
Node<K,V> oldbase = h.node;
//给HeadIndex叠加升级。升级的部份,headindex的right直接指向idxs[j]
for (int j = oldLevel+1; j <= level; ++j)
newh = new HeadIndex<K,V>(oldbase, newh, idxs[j], j);
//置换头索引节点
if (casHead(h, newh)) {
h = newh;
idx = idxs[level = oldLevel];
break;
}
}
}
// find insertion points and splice in
//insertionLevel代表的是idx需要插入的层,最开始需要处理插入的层肯定为原最高层,因为最新高层都是直接将idx设置为right
splice: for (int insertionLevel = level;;) {
int j = h.level;
//q先行索引 r后继索引 t需要插入的新索引
for (Index<K,V> q = h, r = q.right, t = idx;;) {
if (q == null || t == null)
break splice;
//寻找q.node.key<key<r.node.key的位置,
//idx要插入在q和r之间
if (r != null) {
Node<K,V> n = r.node;
// compare before deletion check avoids needing recheck
int c = cpr(cmp, key, n.key);
if (n.value == null) {
if (!q.unlink(r))
break;
r = q.right;
continue;
}
//key大于r.node.key,右移
if (c > 0) {
q = r;
r = r.right;
continue;
}
}
if (j == insertionLevel) {
//该层的索引插入工作完成
if (!q.link(r, t))
break; // restart
if (t.node.value == null) {
findNode(key);
break splice;
}
//到达baselevel,整个跳表的插入工作结束
if (--insertionLevel == 0)
break splice;
}
//开始下一层的插入工作,j设置为j-1,
//t设置为t.down,q设置为q.down
if (--j >= insertionLevel && j < level)
t = t.down;
q = q.down;
r = q.right;
}
}
}
return null;
}
//查找传入key的先行baselevel节点,同时也帮忙清除无效索引
//该节点的next.key>key
private Node<K,V> findPredecessor(Object key, Comparator<? super K> cmp) {
if (key == null)
throw new NullPointerException(); // don't postpone errors
for (;;) {
//从head开始遍历
for (Index<K,V> q = head, r = q.right, d;;) {
if (r != null) {
Node<K,V> n = r.node;
K k = n.key;
//清除无效索引
if (n.value == null) {
if (!q.unlink(r))
break; // restart
r = q.right; // reread r
continue;
}
//传入的key"大于"下一索引节点的key,继续循环
if (cpr(cmp, key, k) > 0) {
q = r;
r = r.right;
continue;
}
}
//已经到baselevel,返回数据节点。唯一出口!
if ((d = q.down) == null)
return q.node;
//下一层跳表的遍历
q = d;
r = d.right;
}
}
}

说明:在插入一个新元素(key-value)的时候,处理步骤如下

  1. 利用索引,查找元素的先行Node节点b,先行节点的next节点n,满足key<n.key
  2. 如果b.key == key,更新之,结束
  3. 否则插入在bn之间
  4. 生成随机数,随机数的首尾bit均为1则为新元素添加索引。
  5. 计算添加索引的层数,如果超过现最高层数,现最高层+1。
  6. 叠加生成索引节点。从最高层依次往下将索引节点嵌入索引链表。

图片引用自http://blog.csdn.net/guangcigeyun/article/details/8278349

关键操作-get

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
private V doGet(Object key) {
if (key == null)
throw new NullPointerException();
Comparator<? super K> cmp = comparator;
outer: for (;;) {
for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
Object v; int c;
//先行节点的next都为空了,肯定找不到了
if (n == null)
break outer;
Node<K,V> f = n.next;
//例行检查
if (n != b.next) // inconsistent read
break;
//v=n.value
if ((v = n.value) == null) { // n is deleted
n.helpDelete(b, f);
break;
}
if (b.value == null || v == n) // b is deleted
break;
//key相等,返回v
if ((c = cpr(cmp, key, n.key)) == 0) {
@SuppressWarnings("unchecked") V vv = (V)v;
return vv;
}
if (c < 0)
break outer;
//继续遍历数据节点链
b = n;
n = f;
}
}
return null;
}

说明:弄明白怎样put元素,那么查找元素就很容易理解了。

图片引用自http://blog.csdn.net/guangcigeyun/article/details/8278349

关键操作-remove

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
final V doRemove(Object key, Object value) {
if (key == null)
throw new NullPointerException();
Comparator<? super K> cmp = comparator;
outer: for (;;) {
for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
Object v; int c;
if (n == null)
break outer;
Node<K,V> f = n.next;
if (n != b.next) // inconsistent read
break;
if ((v = n.value) == null) { // n is deleted
n.helpDelete(b, f);
break;
}
if (b.value == null || v == n) // b is deleted
break;
if ((c = cpr(cmp, key, n.key)) < 0)
break outer;
if (c > 0) {
b = n;
n = f;
continue;
}
if (value != null && !value.equals(v))
break outer;
//设置该节点的value为null
if (!n.casValue(v, null))
break;
//为该节点添加删除的标志并且将b的next设置为f
if (!n.appendMarker(f) || !b.casNext(n, f))
findNode(key);
else {
//清除索引
findPredecessor(key, cmp); // clean index
if (head.right == null)
//降低层级
tryReduceLevel();
}
@SuppressWarnings("unchecked") V vv = (V)v;
return vv;
}
}
return null;
}
//降低索引层。条件是从最高层的headindex往下3层right均为null,降低一层
private void tryReduceLevel() {
HeadIndex<K,V> h = head;
HeadIndex<K,V> d;
HeadIndex<K,V> e;
if (h.level > 3 &&
(d = (HeadIndex<K,V>)h.down) != null &&
(e = (HeadIndex<K,V>)d.down) != null &&
e.right == null &&
d.right == null &&
h.right == null &&
casHead(h, d) && // try to set
h.right != null) // recheck
casHead(d, h); // try to backout
}

说明:删除节点经历以下步骤

  • 先找到该节点,将该节点的value设置为null
  • 将该节点的next设置为marker类型节点,即断开了与后继节点的连接
  • 将先行节点的next设置为该节点原next节点
  • 清除索引
  • 尝试降低索引层高

图片引用自http://blog.csdn.net/guangcigeyun/article/details/8278349

思考

思考这样的一个问题:ConcurrentSkipListMap到底是怎样保证线程安全的呢?
通篇没有看见一个锁。我们以put操作为例,尝试从保证线程安全的角度分析一下。
首先针对可能出现的外部环境变化做了相关判断:例如n != b.next(v = n.value) == nullb.value == null || v == n等。如果需要更新n.casValue(v, value)使用了CAS,不成功肯定是外部环境变化了,重新循环,在下一次循环过程中处理了变化的外部环境。在插入的时候b.casNext(n, z)也同样使用了CAS。
到这里,我不禁想到ConcurrentHashMap的链是否也可以不使用lock呢?经过思考,答案是肯定的,因为(锁=死循环+CAS)本来就是一种替代方案。那么ConcurrentHashMap的作者为什么不完全消灭其中的锁呢?我的猜想是

  1. 出现hash碰撞的情况还是比较少的,简单的使用synchronized方便,代码可读性也高。
  2. 在1的基础上,优化后的synchronized的性能也不是太差

总结

CurrentSkipListMap是线程安全的有序的Map实现.它的核心数据结构是跳表。跳表是非常简单的数据结构,能避免平衡树的旋转操作。说到平衡树,下一篇我们就要分析单线程版本TreeMap,它的核心数据结构就是红黑树。届时你可能会明白为什么这里强调了很多次跳表的简单性。

参考

作者: wuzhaoyang(John)
出处: http://wuzhaoyang.me/
因为作者水平有限,无法保证每句话都是对的,但能保证不复制粘贴,每句话经过推敲。希望能表达自己对于技术的态度,做一名优秀的软件工程师。