ArrayBlockingQueue的用法和原理

ArrayBlockingQueue是基于数组实现的阻塞队列,下面我们看下它的主要用法。

.1 ArrayBlockingQueue的用法

下面是ArrayBlockingQueue的一个简单示例:

public void testArrayBlockingQueue() throws InterruptedException {
  // 创建ArrayBlockingQueue实例,设置队列大小为10
  BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10);
  boolean r1 = queue.add(1);          // 使用add方法入队元素,如果无空间则抛出异常
  boolean r2 = queue.offer(2);       // 使用offer方法入队元素
  queue.put(3);                      // 使用put方法入队元素;如果无空间则会一直阻塞
  boolean r3 = queue.offer(4, 30, TimeUnit.SECONDS);  // 使用offer方法入队元素;如果无空间则会等待30s

  Integer o1 = queue.remove();        // 使用remove方法出队元素,如果无元素则抛出异常
  Integer o2 = queue.poll();          // 使用poll方法出队元素 
  Integer o3 = queue.take();          // 使用take方法出队元素;如果无元素则一直阻塞
  Integer o4 = queue.poll(10, TimeUnit.SECONDS);       // 使用poll方法出队元素; 如果无空间则等待10s
}

.2 ArrayBlockingQueue的原理

OK,下面我们来看一下ArrayBlockingQueue的实现原理,首先看一下类的定义

(1)类定义

首先我们看到ArrayBlockingQueue的类定义如下,实现了BlockingQueue接口,并继承了抽象队列类AbstractQueue(封装了部分通用方法)。

public class ArrayBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {
  /** 使用数组存储队列中的元素 */
  final Object[] items;
  /** 下一个出队元素在items数组中的索引 */
  int takeIndex;
  /** 下一个出队元素需要存放在items数组中的索引 */
  int putIndex;
  /** 队列中的元素数量 */
  int count;
  /** 使用在许多教科书中能找到的经典的`双Condition算法`进行并发控制 */
  /** 使用独占锁ReetrantLock */
  final ReentrantLock lock;
  /** 等待出队的条件 */
  private final Condition notEmpty;
  /** 等待入队的条件 */
  private final Condition notFull;
}

在ArrayBlockingQueue中,还定义了队列元素存储以及入队、出队操作的属性。

  • final Object[] items:由于ArrayBlockingQueue是基于数组实现的阻塞队列,所以使用items数组,存储队列中的元素;
  • int takeIndexint putIndex:两个items数组的索引值,分别指向出队元素的索引值以及将要入队元素的索引值;通过这两个索引,可以控制元素从items数组中如何进行出队和入队;
  • int count:当前队列中的元素数量,通过该值实现了队列有界性;

除了上述几个属性,还需要部分属性进行并发控制,在BlockingQueue中使用了双Condition算法进行并发控制,主要通过如下几个变量实现:

  • ReentrantLock lock:这里使用了ReetrantLock作为独占锁,进行并发控制
  • Condition notEmptyCondition notFull:定义了两个阻塞唤醒条件,分别表示等待出队的条件等待入队的条件

(2)构造方法

在ArrayBlockingQueue构造方法中,主要功能时初始化元素数组以及锁和condition条件;可以通过capacity变量指定有界队列的元素数量,以及通过fair指定是否使用公平锁。

/** 指定队列元素数量capacity,并使用非公平锁进行并发控制 */
public ArrayBlockingQueue(int capacity) {
  this(capacity, false);
}

/** 指定队列元素数量capacity,并通过fair变量指定使用公平锁/非公平锁进行并发控制*/
public ArrayBlockingQueue(int capacity, boolean fair) {
  if (capacity <= 0)
    throw new IllegalArgumentException();
  this.items = new Object[capacity]; // 初始化元素数组
  lock = new ReentrantLock(fair); // 初始化锁
  notEmpty = lock.newCondition(); // 初始化出队条件
  notFull =  lock.newCondition(); // 初始化入队条件
}

(3)入队逻辑

上面我们已经了解了类定义、对象属性以及构造方法,下面我们重点看下元素的入队和出队操作。在阅读入队的源码之前,我们先考虑下,如何基于数组实现一个有界队列,并提供入队和出队操作呢?

首先,我们需要两个索引,分别指向出队和入队的元素所在数组中的位置,也就是类定义中的takeIndexputIndex;还需要一个变量记录当前队列中的元素数量count,在出队和入队时根据count判断是否有元素或者是否有空间。

如下图所示,则为一个容量为8的队列数组,在初始状态下,takeIndexputIndex均指向数组的索引0处,且该数组中元素的数量count为0。

数组初始状态

然后,我们尝试入队一个元素A。由于目前数组中元素数量count未超过容量8,所以将元素A放置在数组的putIndex索引处,也就是索引0处;然后,由于putIndex所指向的为下一个入队元素的索引,所以要将putIndex+1,即putIndex = 1。这样就完成了一个元素的入队操作。依次递推,可以继续入队元素B、C、D……

入队第一个元素A

入队第8个元素H时,此时数组中元素数量count=7,且putIndex=7,所以将元素H放置在数组的索引7处;然后对putIndex进行加1操作;但是此时由于putIndex超出了数组的最大索引,所以将putIndex置为0,也就是指向了数组的索引0处。所以在这里,该数组其实是作为一个循环数组使用。

入队第8个元素H

此时队列中的元素数量已经达到了容量限制,当入队第九个元素I时,由于容量限制,无法直接入队成功,则需要进行等待,直到队列中的元素数量小于容量限制时才可以再次入队。

入队第九个元素I

ArrayBlockingQueue中入队逻辑的方法为enqueue,下面是其具体代码:

/**
     * 在当前位置插入元素,并修改索引值,并唤醒非空队列的线程
     * 只有在获取锁的情况才会调用
     */
    private void enqueue(E x) {
        final Object[] items = this.items;
        // 将元素插入到putIndex处
        items[putIndex] = x;
        // 修改putIndex索引
        if (++putIndex == items.length)
            // 如果修改后putIndex超出items数组最大索引,则指向索引0处
            putIndex = 0;
        // 元素数量+1
        count++;
        // 唤醒一个非空队列中的线程
        notEmpty.signal();
    }

(4)出队逻辑

OK,上面我们了解了元素入队的逻辑,然后我们再看下如何实现出队?

首先,当队列处于初始状态时,count=0takeIndex=0,这次数组中没有任何元素,所以无法进行出队,需要进行阻塞等待,直到队列中有元素时才可以进行再次出队。

数组初始状态 数组初始状态

当数组中存在元素时,如下图所示,数组中有4个元素,其中count=4,且takeIndex=0putIndex=4。此时当执行出队时,则将takeIndex=0处的元素A出队,并将数组该索引处置为null;然后将takeIndex修改指向为下一个待出队的元素B,也就是takeIndex=1,并修改元素数量count=3。此时完成了出队操作。

出队第一个元素A

由于该数组为循环数组,当出队元素索引takeIndex超出数组的最大索引时,需要将takeIndex修改为0。

ArrayBlockingQueue中出队逻辑的方法为dequeue,下面是其具体代码:

/**
     * 在当前位置获取一个元素,并修改索引值,并唤醒非满队列的线程
     * 只有在获取锁的情况下才会调用
     */
    private E dequeue() {
        final Object[] items = this.items;
        // 获取当前索引处元素
        E x = (E) items[takeIndex];
        // 将当前索引处置为空
        items[takeIndex] = null;
        // 修改takeIndex索引
        if (++takeIndex == items.length)
            // 如果修改后takeIndex超出items数组最大索引,则指向索引0处
            takeIndex = 0;
        // 元素数量-1
        count--;
        if (itrs != null)
            itrs.elementDequeued();
       // 唤醒一个非满队列中的线程
        notFull.signal();
        return x;
    }

(5)阻塞实现

通过上面的描述,我们了解了基于数组的阻塞队列的入队和出队实现逻辑,但是我们还剩下最后一个疑问,当入队和出队时,如果无法直接进行入队和出队操作,需要进行阻塞等待,那么阻塞是如何实现的呢?在ArrayBlockingQueue中主要是使用独占锁ReentrantLock以及两个条件队列notFullnotEmpty实现的。

我们首先看一下阻塞入队的方法put(E e),下面是其代码:

public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        // 加锁
        lock.lockInterruptibly();
        try {
            while (count == items.length) {
              // 如果队列已满,线程阻塞,并添加到notFull条件队列中等待唤醒
              notFull.await();
            }
            // 如果队列未满,则调用enqueue方法进行入队操作
            enqueue(e);
        } finally {
            // 释放锁
            lock.unlock();
        }
    }

调用put方法进行阻塞式入队的基本流程为:

  • 首先,在进行入队操作前,使用ReentrantLock进行加锁操作,保证只有一个线程执行入队或出队操作;如果锁被其他线程占用,则等待;
  • 如果加锁成功,则首先判断队列是否满,也就是while(count == items.length);如果队列已满,则调用notFull.await(),将当前线程阻塞,并添加到notFull条件队列中等待唤醒;如果队列不满,则直接调用enqueue方法,进行元素插入;
  • 当前线程添加到notFull条件队列中后,只有当其他线程有出队操作时,会调用notFull.signal()方法唤醒等待的线程;当前线程被唤醒后,还需要再次进行一次队列是否满的判断,如果此时队列不满才可以进行enqueue操作,否则仍然需要再次阻塞等待,这也就是为什么在判断队列是否满时使用while的原因,即避免当前线程被意外唤醒,或者唤醒后被其他线程抢先完成入队操作。
  • 最后,当完成入队操作后,在finally代码块中进行锁释放lock.unlock,完成put入队操作

下面我们再来看下阻塞出队方法take(),代码如下:

public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        // 加锁
        lock.lockInterruptibly();
        try {
            while (count == 0)
                // 判断队列是否为空,如果为空则线程阻塞,添加到notEmpty条件队列等待
                notEmpty.await();
            // 队列不为空,进行出队操作
            return dequeue();
        } finally {
            // 释放锁
            lock.unlock();
        }
    }

其实take方法与put方法类似,主要流程也是先加锁,然后循环判断队列是否为空,如果为空则添加到notEmpty条件队列等待,如果不为空则进行出队操作;最后进行锁释放。

(6)指定等待时间的阻塞实现

OK,到这里我们了解了如何进行阻塞的入队和出队操作,在ArrayBlockingQueue中还支持指定等待时间的阻塞式入队和出队操作,分别是offer(e, time, unit)poll(time, unit)方法。这里我们就只要看下offer(e, time, unit)的实现逻辑,代码如下:

public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
        checkNotNull(e);
        // 获取剩余等待时间
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        // 加锁
        lock.lockInterruptibly();
        try {
            // 判断队列是否满
            while (count == items.length) {
                if (nanos <= 0)
                    // 入队队列满,等待时间为0,则入队失败,返回false
                    return false;
                // 如果队列满,等待时间大于0,且未到等待时间,则继续等待nanos
                nanos = notFull.awaitNanos(nanos);
            }
            // 队列不满,进行入队操作
            enqueue(e);
            return true;
        } finally {
            // 释放锁
            lock.unlock();
        }
    }

在上面代码中,我们重点看下while循环中判断队列是否满的条件:

  • 当队列满时,则首先判断剩余等待时间是否为0,如果为0表示已经到了等待时间,此时入队失败,直接返回false
  • 当剩余等待时间大于0时,则需要继续等待,即调用nanos = notFull.awaitNanos(nanos),当该线程被唤醒时,awaitNanos会返回剩余的等待时间nanos,根据nanos则可以判断是否已经到等待时间。

在出队方法poll(time, unit)方法中,实现逻辑类似,这里不再赘述,有兴趣的小伙伴可以自行查看源码研究哦。

.3 ArrayBlockingQueue原理总结

到这里我们终于搞明白了ArrayBlockingQueue的实现原理,以及入队和出队的具体逻辑,我们最后来个总结:

  • ArrayBlockingQueue是一个有界阻塞队列,初始化时需要指定容量大小。
  • 在生产者-消费者模型中使用时,如果生产速度和消费速度基本匹配的情况下,使用ArrayBlockingQueue是个不错选择;当如果生产速度远远大于消费速度,则会导致队列填满,大量生产线程被阻塞。
  • 使用独占锁ReentrantLock实现线程安全,入队和出队操作使用同一个锁对象,也就是只能有一个线程可以进行入队或者出队操作;这也就意味着生产者和消费者无法并行操作,在高并发场景下会成为性能瓶颈。