java8集合框架(十三)-Queue的实现类(LinkedBlockingQueue)

java8集合框架(十三)-Queue的实现类(LinkedBlockingQueue)

本篇主要分析LinkedBlockingQueue的源码,它是基于链表的线程安全的阻塞版本的Queue的实现。跟它关系比较近的有LinkedListConcurrentLinkedQueue,后两者在前面的文章已经分析过源码。

Note:本篇源码分析基于jdk1.8版本

LinkedBlockingQueue

数据结构

LinkedBlockingQueue的核心数据结构是链表+双可重入锁。计数使用AtomicInteger原子类。为了更好的性能,将读锁和写锁分开。构造函数可以设置容量,默认容量是Integer.MAX_VALUE。线程安全,不接受null值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
//节点定义
static class Node<E> {
E item;
Node<E> next;
Node(E x) { item = x; }
}
//容量
private final int capacity;
//原子计数器
private final AtomicInteger count = new AtomicInteger();
//头节点,不保存数据信息
transient Node<E> head;
//队尾,保存最后一个节点数据信息
private transient Node<E> last;
/** take, poll使用的锁 */
private final ReentrantLock takeLock = new ReentrantLock();
/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();
/** put, offer使用的锁 */
private final ReentrantLock putLock = new ReentrantLock();
/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();

初始化

1
2
3
4
5
6
7
8
9
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}

说明:LinkedBlockingQueue的默认容量是Integer.MAX_VALUE,也可以在构造函数中传入指定容量。

入队列和出队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
//c设置为-1,作为入队失败的标记
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
//队列满,阻塞
while (count.get() == capacity) {
notFull.await();
}
enqueue(node);
//这里要注意,返回的是原值。类似于i++;
c = count.getAndIncrement();
//队列未满,发未满通知
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
//队列空,发空通知
if (c == 0)
signalNotEmpty();
}
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
//队列空,阻塞
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
//当前元素个数大于0,发未空通知
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
//队列满,发满通知
if (c == capacity)
signalNotFull();
return x;
}

说明:这里要注意count.getAndIncrement();返回的都是原值。入队列使用putLock,出队列使用takeLock。使用了两个条件notFullnotEmpty来控制队列在不同条件下的阻塞状态。

查看队首

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public E peek() {
if (count.get() == 0)
return null;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
Node<E> first = head.next;
if (first == null)
return null;
else
return first.item;
} finally {
takeLock.unlock();
}
}

说明:实现很简单,无需过多说明

计数

1
2
3
public int size() {
return count.get();
}

说明:O(1)的时间复杂度。

其他

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
public boolean remove(Object o) {
if (o == null) return false;
fullyLock();
try {
for (Node<E> trail = head, p = trail.next;
p != null;
trail = p, p = p.next) {
if (o.equals(p.item)) {
unlink(p, trail);
return true;
}
}
return false;
} finally {
fullyUnlock();
}
}
public boolean contains(Object o) {
if (o == null) return false;
fullyLock();
try {
for (Node<E> p = head.next; p != null; p = p.next)
if (o.equals(p.item))
return true;
return false;
} finally {
fullyUnlock();
}
}
public Object[] toArray() {
fullyLock();
try {
int size = count.get();
Object[] a = new Object[size];
int k = 0;
for (Node<E> p = head.next; p != null; p = p.next)
a[k++] = p.item;
return a;
} finally {
fullyUnlock();
}
}
void fullyLock() {
//先锁写锁
putLock.lock();
//后锁读锁
takeLock.lock();
}
void fullyUnlock() {
//先解读锁
takeLock.unlock();
//后解写锁
putLock.unlock();
}

说明:删除某一个对象,判断是否包含对象,将链表转化成数组,这些操作都先调用fullyLock(),停止了puttake操作。而fullyLockfullyUnlock的锁解顺序也是有讲究的。

思考

为什么要使用两把锁?

双锁减小了锁的粒度,提高了并发性能。为避免出现死锁情况,fullyUnlock对先上锁的对象后解锁。

总结

LinkedBlockingQueue可以用来实现简单的”生产者-消费者”模型。

作者: wuzhaoyang(John)
出处: http://wuzhaoyang.me/
因为作者水平有限,无法保证每句话都是对的,但能保证不复制粘贴,每句话经过推敲。希望能表达自己对于技术的态度,做一名优秀的软件工程师。