java8集合框架(十二)-Queue的实现类(ConcurrentLinkedQueue)

java8集合框架(十二)-Queue的实现类(ConcurrentLinkedQueue)

十一回准准丈母娘家商量了订婚事宜,所以期间并未有时间更新,话不多说,继续集合类的源码分析之旅。之前我们分析了LinkedList的实现,提到了它可以被看作是Queue的链表实现,按照命名规则,也可以叫做LinkedQueue(注:这是我YY的名字,不存在这样的类)。接下来分析其链表实现的线程安全版本的源码。其实我们分析了很多线程安全的实现类,知道实现方式无非两种:1.Lock,2.CAS。前者实现较为简单,但性能不如后者,后者相反。ConcurrentLinkedQueue的实现应用了CAS无锁算法。

Note:本篇源码分析基于jdk1.8版本

ConcurrentLinkedQueue

ConcurrentLinkedQueue是基于链表实现的无界队列,严格遵守FIFO规则。它不支持null元素,严格地说这是链表实现的线程安全的非阻塞的的版本,因为下篇就会分析其链表实现的线程安全的阻塞版本实现LinkedBlockingQueue

数据结构

毫无疑问,ConcurrentLinkedQueue的核心数据结构是链表,它有两个指针指向链表的“首尾”。但此“首尾”并非严格的首尾节点,下面会讲到。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
//第一个节点,不一定是第一个活着的节点。(即node.item!=null,节点未被删除)
//有以下特性
//1.通过head可以遍历全部活着的元素。
//2.head永远不会为null
private transient volatile Node<E> head;
//不一定是最后一个活着的节点
//tail永远不会为null
private transient volatile Node<E> tail;
//节点数据类型定义
private static class Node<E> {
//volatile保证可见性
volatile E item;
volatile Node<E> next;
Node(E item) {
//原子操作,直接操作内存
UNSAFE.putObject(this, itemOffset, item);
}
boolean casItem(E cmp, E val) {
return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
}
void lazySetNext(Node<E> val) {
UNSAFE.putOrderedObject(this, nextOffset, val);
}
boolean casNext(Node<E> cmp, Node<E> val) {
return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
...
}

初始化

1
2
3
4
public ConcurrentLinkedQueue() {
head = tail = new Node<E>(null);
}

说明:初始化保证head和tail不为null。

入队列和出队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
public boolean offer(E e) {
checkNotNull(e);
final Node<E> newNode = new Node<E>(e);
//p、t是尾部
for (Node<E> t = tail, p = t;;) {
//q是p的next
Node<E> q = p.next;
//如果p的next是空,那么p一定是最后一个节点
if (q == null) {
// 入队成功
if (p.casNext(null, newNode)) {
//这里要注意,思考一下什么情况下p!=t?
//“第一次”入队肯定是相等的,所以并不会更新tail。
//“第二次”就不相等了,这时更新tail
if (p != t) // hop two nodes at a time
casTail(t, newNode); // Failure is OK.
return true;
}
// Lost CAS race to another thread; re-read next
}
else if (p == q)
// We have fallen off list. If tail is unchanged, it
// will also be off-list, in which case we need to
// jump to head, from which all live nodes are always
// reachable. Else the new tail is a better bet.
p = (t != (t = tail)) ? t : head;
else
//“第二次”执行入队操作前,先将p设置为q(q=p.next)
//这里还有一个疑问 t!=(t=tail)为何意?
p = (p != t && t != (t = tail)) ? t : q;
}
}
public E poll() {
restartFromHead:
for (;;) {
for (Node<E> h = head, p = h, q;;) {
E item = p.item;
//item不为空,说明p节点是第一个“活着的”节点
if (item != null && p.casItem(item, null)) {
//每两次出队更新head指针
if (p != h) // hop two nodes at a time
updateHead(h, ((q = p.next) != null) ? q : p);
return item;
}
//出队的时候,如果p的next为空,及时p即将被删除,head指针也不能设置为null
else if ((q = p.next) == null) {
updateHead(h, p);
return null;
}
else if (p == q)
continue restartFromHead;
else
p = q;
}
}
}

说明:为了提高性能,出入队列并未对每一次的操作调整head/tail,而是每两次操作调整一次。 如何做到“每两次”呢?

  • 在第一次入队不调整tail,那么第二次入队时,p != t成立更新(p指向刚入队的元素,t指向tail)。
  • 在第一次出队调整了head,p != h成立(p指向第一个活着的元素,h指向head),第二次出队列只是将pitem设置为空,所以p == h不会触发更新。

入队列图示:

出队列图示:

图片引自于http://ifeve.com/concurrentlinkedqueue/

查看队首

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public E peek() {
restartFromHead:
for (;;) {
for (Node<E> h = head, p = h, q;;) {
E item = p.item;
if (item != null || (q = p.next) == null) {
updateHead(h, p);
return item;
}
else if (p == q)
continue restartFromHead;
else
p = q;
}
}
}

说明:查看有可能会更新head节点至第一个活元素。

计数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public int size() {
int count = 0;
for (Node<E> p = first(); p != null; p = succ(p))
if (p.item != null)
// Collection.size() spec says to max out
if (++count == Integer.MAX_VALUE)
break;
return count;
}
Node<E> first() {
restartFromHead:
for (;;) {
for (Node<E> h = head, p = h, q;;) {
boolean hasItem = (p.item != null);
if (hasItem || (q = p.next) == null) {
updateHead(h, p);
return hasItem ? p : null;
}
else if (p == q)
continue restartFromHead;
else
p = q;
}
}
}
final Node<E> succ(Node<E> p) {
Node<E> next = p.next;
return (p == next) ? head : next;
}

说明:先找到第一个活节点,从此节点开始继续寻找后继活节点,并计数,时间复杂度O(n)。

总结

每次分析线程安全的代码都会思考它是怎样保证这一点的,ConcurrentLinkedQueue在入队时,先定位最后一个节点,然后CAS更新其next指针。出队时,先定位第一个活节点,然后CAS更新item域。它是链表实现的无锁非阻塞队列,通过上面的图示很容易理解该算法。本来使用了CAS已经很大程度上优化了锁带来的性能损耗,没想到大师级别的老先生还能通过减少一半的更新head/tail操作来进一步优化,实在佩服!

参考

作者: wuzhaoyang(John)
出处: http://wuzhaoyang.me/
因为作者水平有限,无法保证每句话都是对的,但能保证不复制粘贴,每句话经过推敲。希望能表达自己对于技术的态度,做一名优秀的软件工程师。