1. 什么是BlockingQueue?
阻塞队列 = 阻塞 + 队列。
- 队列:一种先进先出的数据结构,支持尾部添加、首部移除或查看等基础操作。
- 阻塞:除了队列提供的基本操作之外,还提供了支持阻塞式插入和移除的方式。
下面这些对BlockingQueue的介绍基本翻译自JavaDoc,非常详细。
- 阻塞队列的顶级接口是
java.util.concurrent.BlockingQueue
,它继承了Queue,Queue又继承自Collection接口。- BlockingQueue 对插入操作、移除操作、获取元素操作提供了四种不同的方法用于不同的场景中使用:1、抛出异常;2、返回特殊值(null 或 true/false,取决于具体的操作);3、阻塞等待此操作,直到这个操作成功;4、阻塞等待此操作,直到成功或者超时指定时间,第二节会有详细介绍。
- BlockingQueue不接受null的插入,否则将抛出空指针异常,因为poll失败了会返回null,如果允许插入null值,就无法判断poll是否成功了。
- BlockingQueue可能是有界的,如果在插入的时候发现队列满了,将会阻塞,而无界队列则有
Integer.MAX_VALUE
大的容量,并不是真的无界。- BlockingQueue通常用来作为生产者-消费者的队列的,但是它也支持Collection接口提供的方法,比如使用remove(x)来删除一个元素,但是这类操作并不是很高效,因此尽量在少数情况下使用,如:当一条入队的消息需要被取消的时候。
- BlockingQueue的实现都是线程安全的,所有队列的操作或使用内置锁或是其他形式的并发控制来保证原子。但是一些批量操作如:
addAll
,containsAll
,retainAll
和removeAll
不一定是原子的。如 addAll(c) 有可能在添加了一些元素后中途抛出异常,此时 BlockingQueue 中已经添加了部分元素。- BlockingQueue不支持类似close或shutdown等关闭操作。
BlockingQueue
其实就是阻塞队列,是基于阻塞机制实现的线程安全的队列。而阻塞机制的实现是通过在入队和出队时加锁的方式避免并发操作。
BlockingQueue
不同于普通的Queue
的区别主要是:
- 通过在入队和出队时进行加锁,保证了队列线程安全
- 支持阻塞的入队和出队方法:当队列满时,会阻塞入队的线程,直到队列不满;当队列为空时,会阻塞出队的线程,直到队列中有元素。
BlockingQueue
常用于生产者-消费者模型中,往队列里添加元素的是生产者,从队列中获取元素的是消费者;通常情况下生产者和消费者都是由多个线程组成;下图所示则为一个最常见的生产者-消费者模型,生产者和消费者之间通过队列平衡两者的的处理能力、进行解耦等。
2. BlockingQueue接口定义
BlockingQueue
继承了Queue
接口,在Queue接口基础上,又提供了若干其他方法,其定义源码如下:
public interface BlockingQueue<E> extends Queue<E> {
/**
* 入队一个元素,如果有空间则直接插入,并返回true;
* 如果没有空间则抛出IllegalStateException
*/
boolean add(E e);
/**
* 入队一个元素,如果有空间则直接插入,并返回true;
* 如果没有空间返回false
*/
boolean offer(E e);
/**
* 入队一个元素,如果有空间则直接插入,如果没有空间则一直阻塞等待
*/
void put(E e) throws InterruptedException;
/**
* 入队一个元素,如果有空间则直接插入,并返回true;
* 如果没有空间则等待timeout时间,插入失败则返回false
*/
boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;
/**
* 出队一个元素,如果存在则直接出队,如果没有空间则一直阻塞等待
*/
E take() throws InterruptedException;
/**
* 出队一个元素,如果存在则直接出队,如果没有空间则等待timeout时间,无元素则返回null
*/
E poll(long timeout, TimeUnit unit) throws InterruptedException;
/**
* 返回该队列剩余的容量(如果没有限制则返回Integer.MAX_VALUE)
*/
int remainingCapacity();
/**
* 如果元素o在队列中存在,则从队列中删除
*/
boolean remove(Object o);
/**
* 判断队列中是否存在元素o
*/
public boolean contains(Object o);
/**
* 将队列中的所有元素出队,并添加到给定的集合c中,返回出队的元素数量
*/
int drainTo(Collection<? super E> c);
/**
* 将队列中的元素出队,限制数量maxElements个,并添加到给定的集合c中,返回出队的元素数量
*/
int drainTo(Collection<? super E> c, int maxElements);
}
BlockingQueue
主要提供了四类方法,如下表所示:
方法 | 抛出异常 | 返回特定值 | 阻塞 | 阻塞特定时间 |
---|---|---|---|---|
入队 | add(e) | offer(e) | put(e) | offer(e, time, unit) |
出队 | remove() | poll() | take() | poll(time, unit) |
获取队首元素 | element() | peek() | 不支持 | 不支持 |
除了抛出异常和返回特定值方法与Queue接口定义相同外,BlockingQueue还提供了两类阻塞方法:一种是当队列没有空间/元素时一直阻塞,直到有空间/有元素;另一种是在特定的时间尝试入队/出队,等待时间可以自定义。
在本文开始我们了解到,BlockingQueue是线程安全的队列,所以提供的方法也都是线程安全的;那么下面我们就继续看下BlockingQueue的实现类,以及如何实现线程安全和阻塞。
3. BlockingQueue实现类及原理
3.1 主要实现类
BlockingQueue接口主要由5个实现类,分别如下表所示。
实现类 | 功能 |
---|---|
ArrayBlockingQueue | 基于数组的阻塞队列,使用数组存储数据,并需要指定其长度,所以是一个有界队列 |
LinkedBlockingQueue | 基于链表的阻塞队列,使用链表存储数据,默认是一个无界队列;也可以通过构造方法中的capacity 设置最大元素数量,所以也可以作为有界队列 |
SynchronousQueue | 一种没有缓冲的队列,生产者产生的数据直接会被消费者获取并且立刻消费 |
PriorityBlockingQueue | 基于优先级别的阻塞队列,底层基于数组实现,是一个无界队列 |
DelayQueue | 延迟队列,其中的元素只有到了其指定的延迟时间,才能够从队列中出队 |
其中在日常开发中用的比较多的是ArrayBlockingQueue
和LinkedBlockingQueue
,本文也将主要介绍这两个实现类的原理。