ArrayBlockingQueue源码分析
# 阻塞队列简介
# 阻塞队列的历史
Java阻塞队列的历史可以追溯到JDK1.5版本,Java平台增加了java.util.concurrent(即我们常说的JUC包),这其中包含了各种并发流程控制工具、并发容器等。这其中自然也包含了我们这篇文章所讨论的阻塞队列。
为了实现在高并发场景下,多线程之间数据共享的问题,在JDK1.5版本,出现了我们所熟知的ArrayBlockingQueue和LinkedBlockingQueue,一种带有生产者-消费者模式所实现的并发容器。其中,ArrayBlockingQueue是有界队列,即添加的元素达到上限之后,再次添加就会被阻塞或者抛出异常。而LinkedBlockingQueue则由链表构成的队列,正是因为链表的特性,所以LinkedBlockingQueue在添加元素上并不会向ArrayBlockingQueue那样有着较多的约束,所以LinkedBlockingQueue设置队列是否有界是可选的(注意这里的无界并不是指可以添加任务数量的元素,而是说队列的大小默认为Integer.MAX_VALUE,近乎于无限大)。
随着Java的不断发展,JDK后续的几个版本又对阻塞队列进行了不少的更新和完善:
- JDK1.6版本:增加SynchronousQueue,一个不存储元素的阻塞队列。
- JDK1.7版本:增加TransferQueue,一个支持更多操作的阻塞队列。
- JDK1.8版本:增加DelayQueue,一个支持延迟获取元素的阻塞队列。
# 阻塞队列的思想
阻塞队列就是典型的生产者-消费者模型,它可以做到以下几点:
- 当阻塞队列数据为空时,所有的消费者线程都会被阻塞,等待队列非空。
- 当生产者往队列里填充数据后,队列就会通知消费者队列非空,消费者此时就可以进来消费。
- 当阻塞队列因为消费者消费过慢或者生产者存放元素过快导致队列填满时无法容纳新元素时,生产者就会被阻塞,等待队列非满时继续存放元素。
- 当消费者从队列中消费一个元素之后,队列就会通知生产者队列非满,生产者可以继续填充数据了。
总结一下:阻塞队列就说基于非空和非满两个条件实现生产者和消费者之间的交互,尽管这些交互流程和等待通知的机制实现非常复杂,好在Doug Lea的操刀之下已将阻塞队列的细节屏蔽,我们只需调用put、take、offfer、poll等API即可实现多线程之间的生产和消费。
这也使得阻塞队列在多线程开发中有着广泛的运用,最常见的例子无非是我们的线程池,从源码中我们就能看出当核心线程无法及时处理任务时,这些任务都会扔到workQueue中。
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
}
2
3
4
5
6
7
8
9
# ArrayBlockingQueue常见方法及测试
简单了解了阻塞队列的历史之后,我们就开始重点讨论本篇文章所要介绍的并发容器——ArrayBlockingQueue。为了后续更加深入的了解ArrayBlockingQueue,我们不妨基于下面几个实例了解以下ArrayBlockingQueue的使用。
先看看第一个例子,我们这里会用两个线程分别模拟生产者和消费者,生产者生产完会使用put方法生产10个元素给消费者进行消费,当队列元素达到我们设置的上限5时,put方法就会阻塞。 同理消费者也会通过take方法消费元素,当队列为空时,take方法就会阻塞消费者线程。这里笔者为了保证消费者能够在消费完10个元素后及时退出。便通过倒计时门闩,来控制消费者结束,生产者在这里只会生产10个元素。当消费者将10个元素消费完成之后,按下倒计时门闩,所有线程都会停止。
public class ProducerConsumerExample {
public static void main(String[] args) throws InterruptedException {
// 创建一个大小为 5 的 ArrayBlockingQueue
ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<>(5);
// 创建生产者线程
Thread producer = new Thread(() -> {
try {
for (int i = 1; i <= 10; i++) {
// 向队列中添加元素,如果队列已满则阻塞等待
queue.put(i);
System.out.println("生产者添加元素:" + i);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
CountDownLatch countDownLatch = new CountDownLatch(1);
// 创建消费者线程
Thread consumer = new Thread(() -> {
try {
int count = 0;
while (true) {
// 从队列中取出元素,如果队列为空则阻塞等待
int element = queue.take();
System.out.println("消费者取出元素:" + element);
++count;
if (count == 10) {
break;
}
}
countDownLatch.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
// 启动线程
producer.start();
consumer.start();
// 等待线程结束
producer.join();
consumer.join();
countDownLatch.await();
producer.interrupt();
consumer.interrupt();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
代码输出结果如下,可以看到只有生产者往队列中投放元素之后消费者才能消费,这也就意味着当队列中没有数据的时消费者就会阻塞,等待队列非空再继续消费。
生产者添加元素:1
生产者添加元素:2
消费者取出元素:1
消费者取出元素:2
消费者取出元素:3
生产者添加元素:3
生产者添加元素:4
生产者添加元素:5
消费者取出元素:4
生产者添加元素:6
消费者取出元素:5
生产者添加元素:7
生产者添加元素:8
生产者添加元素:9
生产者添加元素:10
消费者取出元素:6
消费者取出元素:7
消费者取出元素:8
消费者取出元素:9
消费者取出元素:10
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
了解了put、take这两个会阻塞的存和取方法之后,我我们再来看看阻塞队列中非阻塞的入队和出队方法offer和poll。
如下所示,我们设置了一个大小为3的阻塞队列,我们会尝试在队列用offer方法存放4个元素,然后再从队列中用poll尝试取4次。
public class OfferPollExample {
public static void main(String[] args) {
// 创建一个大小为 3 的 ArrayBlockingQueue
ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(3);
// 向队列中添加元素
System.out.println(queue.offer("A"));
System.out.println(queue.offer("B"));
System.out.println(queue.offer("C"));
// 尝试向队列中添加元素,但队列已满,返回 false
System.out.println(queue.offer("D"));
// 从队列中取出元素
System.out.println(queue.poll());
System.out.println(queue.poll());
System.out.println(queue.poll());
// 尝试从队列中取出元素,但队列已空,返回 null
System.out.println(queue.poll());
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
最终代码的输出结果如下,可以看到因为队列的大小为3的缘故,我们前3次存放到队列的结果为true,第4次存放时,由于队列已满,所以存放结果返回false。这也是为什么我们后续的poll方法只得到了3个元素的值。
true
true
true
false
A
B
C
null
2
3
4
5
6
7
8
了解了阻塞存取和非阻塞存取,我们再来看看阻塞队列的一个比较特殊的操作,某些场景下,我们希望能够一次性将阻塞队列的结果存到列表中再进行批量操作,我们就可以使用阻塞队列的drainTo方法,这个方法会一次性将队列中所有元素存放到列表,如果队列中有元素,且成功存到list中则drainTo会返回本次转移到list中的元素数,反之若队列为空,drainTo则直接返回0。
public class DrainToExample {
public static void main(String[] args) {
// 创建一个大小为 5 的 ArrayBlockingQueue
ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<>(5);
// 向队列中添加元素
queue.add(1);
queue.add(2);
queue.add(3);
queue.add(4);
queue.add(5);
// 创建一个 List,用于存储从队列中取出的元素
List<Integer> list = new ArrayList<>();
// 从队列中取出所有元素,并添加到 List 中
queue.drainTo(list);
// 输出 List 中的元素
System.out.println(list);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
代码输出结果如下
[1, 2, 3, 4, 5]
# ArrayBlockingQueue源码分析
自此我们对阻塞队列的使用有了基本的印象,接下来我们就可以进一步了解一下ArrayBlockingQueue的工作机制了。
# 从类图开始ArrayBlockingQueue的设计
在了解ArrayBlockingQueue的具体细节之前,我们先来看看ArrayBlockingQueue的类图,从图中我们可以看出,ArrayBlockingQueue继承了阻塞队列BlockingQueue这个接口,不难猜出通过继承BlockingQueue这个接口之后,ArrayBlockingQueue就拥有了阻塞队列那些常见的操作行为。
同时ArrayBlockingQueue还继承了AbstractQueue这个抽象类,这个继承了AbstractCollection和Queue的抽象类,从抽象类的特定和语义我们也可以猜出,这个继承关系使得ArrayBlockingQueue拥有了队列的常见操作。

所以我们是否可以得出这样一个结论,通过继承AbstractQueue获得队列所有的操作模板,其实现的入队和出队操作的整体框架。然后ArrayBlockingQueue通过继承BlockingQueue获取到阻塞队列的常见操作并将这些操作实现,填充到AbstractQueue模板方法的细节中,由此ArrayBlockingQueue成为一个完整的阻塞队列。

为了印证这一点,我们到源码中一探究竟。首先我们先来看看AbstractQueue,从类的继承关系我们可以大致得出,它通过AbstractCollection获得了集合的常见操作方法,然后通过Queue接口获得了队列的特性。
public abstract class AbstractQueue<E>
extends AbstractCollection<E>
implements Queue<E> {
//略
}
2
3
4
5
6
对于集合的操作无非是增删改查,所以我们不妨从添加方法入手,从源码中我们可以看到,它实现了AbstractCollection的add方法,其内部逻辑如下:
- 调用继承Queue接口的来的offer方法,如果offer成功则返回true。
- 如果offer失败,即代表当前元素入队失败直接抛异常。
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}
2
3
4
5
6
而AbstractQueue中并没有对Queue的offer的实现,很明显这样做的目的是定义好了add的核心逻辑,将offer的细节交由其子类即我们的ArrayBlockingQueue实现。
到此,我们对于抽象类AbstractQueue的分析就结束了,我们继续看看ArrayBlockingQueue中另一个重要的继承接口BlockingQueue。
点开BlockingQueue之后,我们可以看到这个接口同样继承了Queue接口,这就意味着它也具备了队列所拥有的所有行为。同时,它还定义了自己所需要实现的方法。
public interface BlockingQueue<E> extends Queue<E> {
//元素入队成功返回true,反之则会抛出异常IllegalStateException
boolean add(E e);
//元素入队成功返回true,反之返回false
boolean offer(E e);
//元素入队成功则直接返回,如果队列已满元素不可入队则将线程阻塞,因为阻塞期间可能会被打断,所以这里方法签名抛出了InterruptedException
void put(E e) throws InterruptedException;
//和上一个方法一样,只不过队列满时只会阻塞单位为unit,时间为timeout的时长,如果在等待时长内没有入队成功则直接返回false。
boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException;
//从队头取出一个元素,如果队列为空则阻塞等待,因为会阻塞线程的缘故,所以该方法可能会被打断,所以签名定义了InterruptedException
E take() throws InterruptedException;
//取出队头的元素并返回,如果当前队列为空则阻塞等待timeout且单位为unit的时长,如果这个时间段没有元素则直接返回null。
E poll(long timeout, TimeUnit unit)
throws InterruptedException;
//获取队列剩余元素个数
int remainingCapacity();
//删除我们指定的对象,如果成功返回true,反之返回false。
boolean remove(Object o);
//判断队列中是否包含指定元素
public boolean contains(Object o);
//将队列中的元素全部存到指定的集合中
int drainTo(Collection<? super E> c);
//转移maxElements个元素到集合中
int drainTo(Collection<? super E> c, int maxElements);
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
所以,通过对接口注释的阅读,我们大抵可以将阻塞队列的API归纳成以下几类,由此我们可以看出阻塞队列对于增删查的操作还是很灵活的。

自此我们了解了BlockingQueue的常见操作后,我们就知道了ArrayBlockingQueue通过继承BlockingQueue的方法并实现后,填充到AbstractQueue的方法上,由此我们便知道了上文中AbstractQueue的add方法的offer方法是哪里是实现的了。
public boolean add(E e) {
//AbstractQueue的offer来自下层的ArrayBlockingQueue从BlockingQueue继承并实现的offer方法
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}
2
3
4
5
6
7
# ArrayBlockingQueue构造方法
了解ArrayBlockingQueue的细节前,我们不妨先看看其构造函数,了解一下其初始化过程。从源码中我们可以看出ArrayBlockingQueue有3个构造方法,而最核心的构造方法就是下方这一个。
可以看到这个构造方法会用capacity初始化容量,用fair的值设置锁的公平性,这里面有一个比较核心的成员变量,即notEmpty 和notFull ,它们是实现生产者和消费者有序工作的关键所在,这一点笔者会在后续的源码解析中详细说明,这里我们只需初步了解一下阻塞队列的构造即可。
public ArrayBlockingQueue(int capacity, boolean fair) {
//如果设置的队列大小小于0,则直接抛出IllegalArgumentException
if (capacity <= 0)
throw new IllegalArgumentException();
//初始化一个数组用于存放队列的元素
this.items = new Object[capacity];
//创建阻塞队列流程控制的锁
lock = new ReentrantLock(fair);
//用lock锁创建两个条件控制队列生产和消费
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
2
3
4
5
6
7
8
9
10
11
12
另外两个构造方法都是基于上述的构造方法,默认情况下,我们会使用下面这个构造方法,该构造方法就意味着ArrayBlockingQueue用的是非公平锁,即各个生产者或者消费者线程收到通知后,对于锁的争抢是随机的。
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
2
3
还有一个不怎么常用的构造方法,在初始化容量和锁的非公平性之后,它还提供了一个Collection参数,从源码中不难看出这个构造方法是将外部传入的集合的元素在初始化时直接存放到阻塞队列中。
public ArrayBlockingQueue(int capacity, boolean fair,
Collection<? extends E> c) {
//初始化容量和锁的公平性
this(capacity, fair);
final ReentrantLock lock = this.lock;
//上锁并将c中的元素存放到ArrayBlockingQueue底层的数组中
lock.lock();
try {
int i = 0;
try {
//遍历并添加元素到数组中
for (E e : c) {
checkNotNull(e);
items[i++] = e;
}
} catch (ArrayIndexOutOfBoundsException ex) {
throw new IllegalArgumentException();
}
//记录当前队列容量
count = i;
//更新下一次put或者offer或用add方法添加到队列底层数组的位置
putIndex = (i == capacity) ? 0 : i;
} finally {
//完成遍历后释放锁
lock.unlock();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# put(E e)和take()方法
还记得我们上文中关于生产者和消费者的例子吗?它们就是通过put(E e)和take()方法实现多线程之间队列数据有序的生产和消费的。而这两个方法实现的关键就是在于两个条件对象——非空和非满。 也就是我们从上文的构造方法中就会看到notEmpty和notFull对象,接下来笔者就通过两张图让大家了解一下这两个条件是如何在阻塞队列中运用的。
假设我们的代码消费者先启动,当它发现队列中没有数据,那么非空条件就会将这个线程挂起,即等待条件非空时挂起。然后CPU执行权到达生产者,生产者发现队列中可以存放数据,于是将数据存放进去,通知此时条件非空,此时消费者就会被唤醒到队列中使用take等方法获取值了。

随后的执行中,生产者生产速度远远大于消费者消费速度,于是生产者将队列塞满后再次尝试将数据存入队列,发现队列已满,于是阻塞队列就将当前线程挂起,等待非满。然后消费者拿着CPU执行权进行消费,于是队列可以存放新数据了,发出一个非满的通知,此时挂起的生产者就会等待CPU执行权到来时再次尝试将数据存到队列中。

了解阻塞队列的基于两个条件的交互流程之后,我们不妨看看put和take的源码。从put的源码中,我们可以印证上述所说,可以看到put方法做了以下几件事:
- 上锁,避免获取数据时出现线程安全问题。
- 如果count和值等于数组长度,则说明队列已经满了,本地put操作无法将数据存到ArrayBlockingQueue底层的数组中,调用notFull.await()将当前线程打断存放到AQS队列中,等待条件非满时完成插入。
- 如果count和值不等于数组长度,则说明当前队列还可以存放元素,则调用enqueue方法将元素存放到数组中。
- 完成put操作后释放锁。
public void put(E e) throws InterruptedException {
checkNotNull(e);
//获取锁并上锁
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
//如果count等数组长度则说明队列已满,将当前线程打断等待条件非空
while (count == items.length)
notFull.await();
//如果队列可以存放元素,则调用enqueue将元素入队
enqueue(e);
} finally {
lock.unlock();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
了解了put的整体流程之后,我们继续深入查看一下enqueue方法的实现细节,从源码中可以看到入队操作的逻辑就是在数组中追加一个新元素,整体执行步骤为:
- 获取ArrayBlockingQueue底层的数组items。
- 将元素存到putIndex位置。
- 更新putIndex到下一个位置,如果putIndex等于队列长度,则说明putIndex已经到达数组末尾了,下一次插入则需要0开始。(ArrayBlockingQueue用到了循环队列的思想,即从头到尾循环复用一个数组)
- 更新count的值,表示当前队列长度+1。
- 调用notEmpty.signal()通知队列非空,消费者可以从队列中获取值了。
private void enqueue(E x) {
//获取队列底层的数组
final Object[] items = this.items;
//将putindex位置的值设置为我们传入的x
items[putIndex] = x;
//更新putindex,如果putindex等于数组长度,则更新为0
if (++putIndex == items.length)
putIndex = 0;
//队列长度+1
count++;
//通知队列非空,那些因为获取元素而阻塞的线程可以继续工作了
notEmpty.signal();
}
2
3
4
5
6
7
8
9
10
11
12
13
自此我们了解了put方法的流程,为了更加完整的了解ArrayBlockingQueue关于生产者-消费者模型的设计,我们继续看看阻塞获取队列元素的take方法。
take方法的逻辑和put方法是相反的,它的整体执行步骤为:
- 上锁,避免获取元素时出现线程安全问题。
- 检查队列中的元素的数量,如果为0则说明队列中没有元素,则调用notEmpty.await()打断当前线程,并将其存放到AQS等待队列中等待非空。
- 如果count不为0则说明队列中有元素,可以用take方法获取元素,则调用dequeue方法获取元素。
- 完成元素获取后释放锁。
public E take() throws InterruptedException {
//尝试先上锁再获取元素
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
//如果队列中元素个数为0,则将当前线程打断并存入AQS队列中,等待非空条件通知
while (count == 0)
notEmpty.await();
//如果队列不为空则调用dequeue获取元素
return dequeue();
} finally {
//完成操作后释放锁
lock.unlock();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
步入查看dequeue方法详情后,我们发现出队的操作和入队差不多:
- 将takeIndex元素复制给E引用。
- 将数组中takeIndex位置设置为null。
- 更新takeIndex的值,即下一次取元素中自增后的takeIndex获取,如果takeIndex为数组长度则将takeIndex更新为0。
- count减1,意味从队列中移除一个元素。
- 通知那些被阻塞并存放到AQS队列中的线程,当前队列非满,可以继续存放元素了。
private E dequeue() {
//获取阻塞队列底层的数组
final Object[] items = this.items;
@SuppressWarnings("unchecked")
//从队列中获取takeIndex位置的元素
E x = (E) items[takeIndex];
//将takeIndex置空
items[takeIndex] = null;
//takeIndex向后挪动,如果等于数组长度则更新为0
if (++takeIndex == items.length)
takeIndex = 0;
//队列长度减1
count--;
if (itrs != null)
itrs.elementDequeued();
//通知那些被打断的线程当前队列状态非满,可以继续存放元素
notFull.signal();
return x;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
自此我们将阻塞队列的阻塞操作的源码都详细的了解了一遍,我们不妨用一张图来总结一下,两个Condition条件是如何控制ArrayBlockingQueue的存和取的。 我们从消费者开始看起,当消费者从队列中take或者poll等操作取出一个元素之后,就会通知队列非满,此时那些等待非满的生产者就会被唤醒等待获取CPU时间片进行入队操作。 当生产者将元素存到队列中后,就会触发通知队列非空,此时消费者就会被唤醒等待CPU时间片尝试获取元素。如此往复,两个条件对象就构成一个环路,控制着多线程之间的存和取。

# offer(E e)和poll()方法
有了put和take方法的经验,对于add和remove方法阅读起来就会轻松许多。先来看看offer方法,逻辑和put差不多,唯一的区别就是入队失败时不会阻塞当前线程,而是直接返回false。
public boolean offer(E e) {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
//队列已满直接返回false
if (count == items.length)
return false;
else {
//反之将元素入队并直接返回true
enqueue(e);
return true;
}
} finally {
lock.unlock();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
poll方法同理,获取元素失败也是直接返回空,并不会阻塞获取元素的线程。
public E poll() {
final ReentrantLock lock = this.lock;
//上锁
lock.lock();
try {
//如果队列为空直接返回null,反之出队返回元素值
return (count == 0) ? null : dequeue();
} finally {
lock.unlock();
}
}
2
3
4
5
6
7
8
9
10
11
# offer(E e, long timeout, TimeUnit unit)和poll(long timeout, TimeUnit unit)
因为offer和poll非阻塞的属性,所以设计者同样为其提供了带有等待时间的offer和poll,可以看到带有时间的offer方法在队列已满的情况下,会等待用户所传的时间段,如果规定时间内还不能存放元素则直接返回false。
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
checkNotNull(e);
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
//队列已满,进入循环
while (count == items.length) {
//时间到了队列还是满的,则直接返回false
if (nanos <= 0)
return false;
//阻塞nanos时间,等待非满
nanos = notFull.awaitNanos(nanos);
}
enqueue(e);
return true;
} finally {
lock.unlock();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
同理poll也一样,队列为空则在规定时间内等待,若时间到了还是空的,则直接返回null。
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
//队列为空,循环等待,若时间到还是空的,则直接返回null
while (count == 0) {
if (nanos <= 0)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
return dequeue();
} finally {
lock.unlock();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# add(E e)和remove()
add方法其实就是对于offer做了一层封装,如下代码所示,可以看到add会调用没有规定时间的offer,如果入队失败则直接抛异常。
public boolean add(E e) {
//调用下方的add
return super.add(e);
}
public boolean add(E e) {
//调用offer如果失败直接抛出异常
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}
2
3
4
5
6
7
8
9
10
11
12
13
remove方法同理,调用poll,如果返回null则说明队列没有东西,直接抛出异常。
public E remove() {
E x = poll();
if (x != null)
return x;
else
throw new NoSuchElementException();
}
2
3
4
5
6
7
# ArrayBlockingQueue相关面试题
ArrayBlockingQueue是Java中的阻塞队列实现之一,常用于多线程之间的数据共享。以下是一些关于ArrayBlockingQueue常见的面试题:
ArrayBlockingQueue是什么?它的特点是什么?
答:ArrayBlockingQueue是Java中的阻塞队列实现之一,是一个有界阻塞队列。它的特点是具有固定的容量,当队列已满时,使用put方法继续往里面添加元素的线程会被阻塞,直到队列中有空闲的位置;当队列为空时,使用take方法从队列中取元素的线程会被阻塞,直到队列中有新的元素被添加。
ArrayBlockingQueue和ConcurrentLinkedQueue有什么区别?
答:ArrayBlockingQueue是一个有界阻塞队列,而ConcurrentLinkedQueue是一个无界非阻塞队列。ArrayBlockingQueue的容量是固定的,而ConcurrentLinkedQueue可以动态地增加容量。因为后者是无界链表的特性,在高并发的情况下,ConcurrentLinkedQueue的性能可能会更好,因为它在入队和出队操作时不需要进行线程间的等待和唤醒操作。
ArrayBlockingQueue如何处理生产者-消费者模式?
答:ArrayBlockingQueue可以很好地支持生产者-消费者模式。生产者可以往队列中添加元素,而消费者可以从队列中取出元素进行处理。当队列已满时,生产者线程会被阻塞,直到消费者从队列中取出元素并通知此时队列非满,生产者才能继续存放元素。当队列为空时,消费者线程会被阻塞,直到队列中有新的元素被添加,触发非空条件通知,将消费者唤醒,等待CPU时间片分配到消费者线程再继续消费。
ArrayBlockingQueue如何处理异常情况?
答:当队列已满时,如果使用add方法继续往队列中添加元素,会抛出IllegalStateException异常;当队列为空时,如果使用remove从队列中取元素,会抛出NoSuchElementException异常。同时,如果在等待过程中,线程被中断,也会抛出InterruptedException异常。
ArrayBlockingQueue的实现原理是什么?
答:ArrayBlockingQueue的底层实现是一个数组,通过ReentrantLock实现线程安全,通过Condition实现线程间的等待和唤醒操作。当队列已满时,生产者线程会调用notFull.await()方法让生产者进行等待(即put这种入队操作的线程会被阻塞);当队列为空时,消费者线程会调用notEmpty.await()方法等待队列非空时进行消费。直到队列中有新的元素被添加时,生产者线程会调用notEmpty.signal()方法唤醒消费者线程。同理当队列中有元素被取出时,消费者线程会调用notFull.signal()方法唤醒生产者线程此时队列非满可以继续进行入队操作。
# 参考文献
深入理解Java系列 | BlockingQueue用法详解 (opens new window)
深入浅出阻塞队列BlockingQueue及其典型实现ArrayBlockingQueue (opens new window)