java8集合框架(十一)-Queue的实现类(PriorityBlockingQueue)

java8集合框架(十一)-Queue的实现类(PriorityBlockingQueue)

前篇分析了PriorityQueue,内部实现是数组+最小堆,非线程安全。本篇分析PriorityBlockingQueue,它是前者的线程安全的阻塞的实现版本。

Note:本篇源码分析基于jdk1.8版本

PriorityBlockingQueue

PriorityBlockingQueuePriorityQueue最大的不同有两点:1.线程安全,这是通过锁实现的,2.支持阻塞操作,特性来自于接口BlockingQueue。实现上两个都很相似,都是基于优先堆的无界队列实现。元素的优先顺序也是基于元素实现的Comparable接口或者构造函数中传入的Comparator比较器。对于前种情况,如元素并未实现该接口,会抛出异常ClassCastException。优先级一样的元素出队列顺序是没有保证的。PriorityBlockingQueue不接受null值,线程安全。

数据结构

PriorityBlockingQueue的核心数据结构是数组+堆。默认是最小堆,可通过改变比较器实现最大堆。可以指定数组的初始大小。这里有两把锁,lock是公用锁,用来保证增删改等操作的线程安全性。allocationSpinLock是自旋锁,通过CAS更新,用于扩容。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
//默认数组初始化大小
private static final int DEFAULT_INITIAL_CAPACITY = 11;
//元素存放的数组容器,实现优先堆
transient Object[] queue;
//元素的个数
private int size = 0;
//比较器,可以通过构造函数传入
private final Comparator<? super E> comparator;
//增删改等操作的锁
private final ReentrantLock lock;
//阻塞的`put`和`take`条件信号
private final Condition notEmpty;
//自旋锁,用于扩容
private transient volatile int allocationSpinLock;

图片引用自http://www.cnblogs.com/yangecnu/archive/2014/03/02/3577568.html

初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public PriorityBlockingQueue() {
this(DEFAULT_INITIAL_CAPACITY, null);
}
public PriorityBlockingQueue(int initialCapacity) {
this(initialCapacity, null);
}
//构造函数
public PriorityBlockingQueue(int initialCapacity,
Comparator<? super E> comparator) {
if (initialCapacity < 1)
throw new IllegalArgumentException();
this.lock = new ReentrantLock();
this.notEmpty = lock.newCondition();
this.comparator = comparator;
this.queue = new Object[initialCapacity];
}

说明:PriorityQueue默认大小是11,可以在构造函数中传入comparator。初始化了锁和条件信号。

入队列和出队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
//队尾插入元素,如果队列满无法插入,则抛异常
public boolean add(E e) {
return offer(e);
}
public void put(E e) {
offer(e); // never need to block
}
public boolean offer(E e) {
if (e == null)
throw new NullPointerException();
final ReentrantLock lock = this.lock;
lock.lock();
int n, cap;
Object[] array;
//扩容,因为queue是
while ((n = size) >= (cap = (array = queue).length))
tryGrow(array, cap);
try {
Comparator<? super E> cmp = comparator;
if (cmp == null)
siftUpComparable(n, e, array);
else
siftUpUsingComparator(n, e, array, cmp);
size = n + 1;
//condition释放信号
notEmpty.signal();
} finally {
lock.unlock();
}
return true;
}
//从位置k将x元素上浮(与PriorityQueue完全一致)
private static <T> void siftUpComparable(int k, T x, Object[] array) {
Comparable<? super T> key = (Comparable<? super T>) x;
while (k > 0) {
int parent = (k - 1) >>> 1;
Object e = array[parent];
if (key.compareTo((T) e) >= 0)
break;
array[k] = e;
k = parent;
}
array[k] = key;
}
//从队头删除元素,如果队列空,则抛异常
public E remove() {
E x = poll();
if (x != null)
return x;
else
throw new NoSuchElementException();
}
//从队头删除元素,如果队列空,则返回null
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return dequeue();
} finally {
lock.unlock();
}
}
private E dequeue() {
int n = size - 1;
if (n < 0)
return null;
else {
Object[] array = queue;
E result = (E) array[0];
E x = (E) array[n];
array[n] = null;
Comparator<? super E> cmp = comparator;
if (cmp == null)
siftDownComparable(0, x, array, n);
else
siftDownUsingComparator(0, x, array, n, cmp);
size = n;
return result;
}
}
//下沉同PriorityQueue完全一致
private static <T> void siftDownComparable(int k, T x, Object[] array,
int n) {
if (n > 0) {
Comparable<? super T> key = (Comparable<? super T>)x;
int half = n >>> 1; // loop while a non-leaf
while (k < half) {
int child = (k << 1) + 1; // assume left child is least
Object c = array[child];
int right = child + 1;
if (right < n &&
((Comparable<? super T>) c).compareTo((T) array[right]) > 0)
c = array[child = right];
if (key.compareTo((T) c) <= 0)
break;
array[k] = c;
k = child;
}
array[k] = key;
}
}
//队列空会阻塞的删除
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
E result;
try {
while ( (result = dequeue()) == null)
notEmpty.await();
} finally {
lock.unlock();
}
return result;
}

说明:仔细对比就能看出offerpoll的实现跟PriorityQueue非常相似,lock保证了线程安全。同时,通过put操作的notEmpty.signal()take操作的notEmpty.await(),控制了线程阻塞,而不用死循环遍历询问。另外,因为是无界队列,add,put均不会出现仅出现在队列满情况下的阻塞或报错。takeremove还是可能出现出现在队列空情况下的阻塞和报错。

扩容

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
private void tryGrow(Object[] array, int oldCap) {
lock.unlock(); // must release and then re-acquire main lock
//我们可以假象,有很多线程都可以执行到这一步,这样既使在扩容的时候
//也可以保证并发。而自旋锁保证了只有一个线程会完成数组扩容。
Object[] newArray = null;
//自旋锁,CAS更新
if (allocationSpinLock == 0 &&
UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
0, 1)) {
try {
int newCap = oldCap + ((oldCap < 64) ?
(oldCap + 2) : // grow faster if small
(oldCap >> 1));
if (newCap - MAX_ARRAY_SIZE > 0) { // possible overflow
int minCap = oldCap + 1;
if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
throw new OutOfMemoryError();
newCap = MAX_ARRAY_SIZE;
}
//要非常注意queue == array的判断,当第一个线程完成扩容后,
//同批次的其他线程再进来,queue的地址已经变化,不会再分配新的
//空间
if (newCap > oldCap && queue == array)
newArray = new Object[newCap];
} finally {
allocationSpinLock = 0;
}
}
if (newArray == null) // back off if another thread is allocating
Thread.yield();
lock.lock();
//除第一个完成扩容的线程,其他线程的条件都是newArray==null.
if (newArray != null && queue == array) {
queue = newArray;
System.arraycopy(array, 0, newArray, 0, oldCap);
}
}

说明:当数组空间满,进行扩容。如果容量小于64则两倍扩容,否则1.5倍扩容。这里有数组的复制动作。这里使用了CAS更新allocationSpinLock,确保只有一个线程会执行扩容操作。这里要弄明白为什么要先放弃锁在获取锁?这是因为有更好的并发特性,在扩容的过程中不影响其他线程并发执行takepoll操作,甚至是offer操作,通过自旋锁使线程安全的进行扩容工作。

查看队首

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public E element() {
E x = peek();
if (x != null)
return x;
else
throw new NoSuchElementException();
}
public E peek() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return (size == 0) ? null : (E) queue[0];
} finally {
lock.unlock();
}
}

说明:堆顶是优先级最高的元素。读加锁是因为防止读到正在插入或删除而导致队列元素还在调整的错误数据。

遍历

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
//快照读,在CopyOnWriteArrayList中见过
public Iterator<E> iterator() {
return new Itr(toArray());
}
final class Itr implements Iterator<E> {
final Object[] array; // Array of all elements
int cursor; // index of next element to return
int lastRet; // index of last element, or -1 if no such
Itr(Object[] array) {
lastRet = -1;
this.array = array;
}
public boolean hasNext() {
return cursor < array.length;
}
public E next() {
if (cursor >= array.length)
throw new NoSuchElementException();
lastRet = cursor;
return (E)array[cursor++];
}
public void remove() {
if (lastRet < 0)
throw new IllegalStateException();
removeEQ(array[lastRet]);
lastRet = -1;
}
}
//迭代器中的删除,加锁保护
void removeEQ(Object o) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] array = queue;
for (int i = 0, n = size; i < n; i++) {
if (o == array[i]) {
removeAt(i);
break;
}
}
} finally {
lock.unlock();
}
}
private void removeAt(int i) {
Object[] array = queue;
int n = size - 1;
if (n == i) // removed last element
array[i] = null;
else {
E moved = (E) array[n];
array[n] = null;
Comparator<? super E> cmp = comparator;
if (cmp == null)
siftDownComparable(i, moved, array, n);
else
siftDownUsingComparator(i, moved, array, n, cmp);
if (array[i] == moved) {
if (cmp == null)
siftUpComparable(i, moved, array);
else
siftUpUsingComparator(i, moved, array, cmp);
}
}
size = n;
}

说明:这里也要关注remove的实现,不像PriorityQueue的迭代器中有ForgotMeNot队列来保存未被访问的元素,这里面没有。原因还是因为这里是快照读,删除的动作实际上还是在queue上删除了元素。就不怕其他操作调用PriorityBlockQueue#poll执行出队列操作吗?我反问一句,它有锁吗?

思考

  • 为什么queue这个字段并没有添加volatile关键字修饰?在扩容的时候while ((n = size) >= (cap = (array = queue).length))这里的queue并没刷新到主内存,读到的都是线程工作区的副本。

这个问题我暂时也没想通,后续研究锁和并发的时候可能会有不一样的理解

总结

PriorityBlockingQueue是一种按照优先级顺序线程安全的阻塞的队列实现。遍历的结果也没有顺序,可以通过Arrays.sort(queue.toArray(),comparator)达到有序遍历的目的。可以在多线程环境下替代PriorityQueue

参考

作者: wuzhaoyang(John)
出处: http://wuzhaoyang.me/
因为作者水平有限,无法保证每句话都是对的,但能保证不复制粘贴,每句话经过推敲。希望能表达自己对于技术的态度,做一名优秀的软件工程师。