十年网站开发经验 + 多家企业客户 + 靠谱的建站团队
量身定制 + 运营维护+专业推广+无忧售后,网站问题一站解决
本文将主要结合源码对 JDK 中的阻塞队列进行分析,并比较其各自的特点;
成都创新互联主营白银网站建设的网络公司,主营网站建设方案,app开发定制,白银h5微信小程序开发搭建,白银网站营销推广欢迎白银等地区企业咨询说到阻塞队列想到的第一个应用场景可能就是生产者消费者模式了,如图所示;
根据上图所示,明显在入队和出队的时候,会发生竞争;所以一种很自然的想法就是使用锁,而在 JDK 中也的确是通过锁来实现的;所以 BlockingQueue
的源码其实可以当成锁的应用示例来查看;同时 JDK 也为我们提供了多种不同功能的队列:
ArrayBlockingQueue :基于数组的有界队列;
LinkedBlockingQueue :基于链表的×××队列(可以设置容量);
PriorityBlockingQueue :基于二叉堆的×××优先级队列;
DelayQueue :基于 PriorityBlockingQueue 的×××延迟队列;
SynchronousQueue :无容量的阻塞队列(Executors.newCachedThreadPool() 中使用的队列);
LinkedTransferQueue :基于链表的×××队列;
接下来我们就对最常用的 ArrayBlockingQueue
和 LinkedBlockingQueue
进行分析;
public class ArrayBlockingQueueextends AbstractQueue implements BlockingQueue , java.io.Serializable { final Object[] items; // 容器数组 int takeIndex; // 出队索引 int putIndex; // 入队索引 int count; // 排队个数 final ReentrantLock lock; // 全局锁 private final Condition notEmpty; // 出队条件队列 private final Condition notFull; // 入队条件队列 ... }
ArrayBlockingQueue
的结构如图所示:
如图所示,
ArrayBlockingQueue
的数组其实是一个逻辑上的环状结构,在添加、取出数据的时候,并没有像 ArrayList
一样发生数组元素的移动(当然除了 removeAt(final int removeIndex)
);
并且由 takeIndex
和 putIndex
指示读写位置;
在读写的时候还有两个读写条件队列;
下面我们就读写操作,对源码简单分析:
public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) // 当队列已满的时候放入 putCondition 条件队列 notFull.await(); enqueue(e); // 入队 } finally { lock.unlock(); } }
private void enqueue(E x) { // assert lock.getHoldCount() == 1; // assert items[putIndex] == null; final Object[] items = this.items; items[putIndex] = x; // 插入队列 if (++putIndex == items.length) putIndex = 0; // 指针走一圈的时候复位 count++; notEmpty.signal(); // 唤醒 takeCondition 条件队列中等待的线程}
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) // 当队列为空的时候,放入 takeCondition 条件 notEmpty.await(); return dequeue(); // 出队 } finally { lock.unlock(); } }
private E dequeue() { // assert lock.getHoldCount() == 1; // assert items[takeIndex] != null; final Object[] items = this.items; @SuppressWarnings("unchecked") E x = (E) items[takeIndex]; // 取出元素 items[takeIndex] = null; if (++takeIndex == items.length) takeIndex = 0; count--; if (itrs != null) itrs.elementDequeued(); notFull.signal(); // 取出元素后,队列空出一位,所以唤醒 putCondition 中的线程 return x; }
public class LinkedBlockingQueueextends AbstractQueue implements BlockingQueue , java.io.Serializable { private final int capacity; // 默认 Integer.MAX_VALUE private final AtomicInteger count = new AtomicInteger(); // 容量 transient Node head; // 头结点 head.item == null private transient Node last; // 尾节点 last.next == null private final ReentrantLock takeLock = new ReentrantLock(); // 出队锁 private final Condition notEmpty = takeLock.newCondition(); // 出队条件 private final ReentrantLock putLock = new ReentrantLock(); // 入队锁 private final Condition notFull = putLock.newCondition(); // 入队条件 static class Node { E item; Node next; Node(E x) { item = x; } } }
LinkedBlockingQueue
的结构如图所示:
如图所示,
LinkedBlockingQueue
其实就是一个简单的单向链表,其中头部元素的数据为空,尾部元素的 next 为空;
因为读写都有竞争,所以在头部和尾部分别有一把锁;同时还有对应的两个条件队列;
下面我们就读写操作,对源码简单分析:
public boolean offer(E e) { if (e == null) throw new NullPointerException(); final AtomicInteger count = this.count; if (count.get() == capacity) return false; // 如果队列已满,直接返回失败 int c = -1; Nodenode = new Node (e); // 将数据封装为节点 final ReentrantLock putLock = this.putLock; putLock.lock(); try { if (count.get() < capacity) { enqueue(node); // 入队 c = count.getAndIncrement(); if (c + 1 < capacity) // 如果队列未满,则继续唤醒 putCondition 条件队列 notFull.signal(); } } finally { putLock.unlock(); } if (c == 0) // 如果添加之前的容量为0,说明在出队的时候有竞争,则唤醒 takeCondition signalNotEmpty(); // 因为是两把锁,所以在唤醒 takeCondition的时候,还需要获取 takeLock return c >= 0; }
private void enqueue(Nodenode) { // assert putLock.isHeldByCurrentThread(); // assert last.next == null; last = last.next = node; // 连接节点,并设置尾节点}
public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); try { while (count.get() == 0) { // 如果队列为空,则加入 takeCondition 条件队列 notEmpty.await(); } x = dequeue(); // 出队 c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); // 如果队列还有剩余,则继续唤醒 takeCondition 条件队列 } finally { takeLock.unlock(); } if (c == capacity) // 如果取之前队列是满的,说明入队的时候有竞争,则唤醒 putCondition signalNotFull(); // 同样注意是两把锁 return x; }
private E dequeue() { // assert takeLock.isHeldByCurrentThread(); // assert head.item == null; Nodeh = head; Node first = h.next; h.next = h; // help GC // 将next引用指向自己,则该节点不可达,在下一次GC的时候回收 head = first; E x = first.item; first.item = null; return x; }
根据以上的讲解,我们可以逐步分析出一些不同,以及在不同场景队列的选择:
结构不同
ABQ:基于数组,有界,一把锁;
LBQ:基于链表,×××,两把锁;
内存分配
ABQ:队列空间预先初始化,受堆空间影响小,稳定性高;
LBQ:队列空间动态变化,受对空间影响大,稳定性差;
入队、出队效率
ABQ:数据直接赋值,移除;队列空间重复使用,效率高;
LBQ:数据需要包装为节点;需开辟新空间,效率低;
竞争方面
ABQ:出入队共用一把锁,相互影响;竞争严重时效率低;
LBQ:出入队分用两把锁,互不影响;竞争严重时效率影响小;
所以在这里并不能简单的给出详细的数据,证明哪个队列更适合什么场景,最好是结合实际使用场景分析。
另外有需要云服务器可以了解下创新互联scvps.cn,海内外云服务器15元起步,三天无理由+7*72小时售后在线,公司持有idc许可证,提供“云服务器、裸金属服务器、高防服务器、香港服务器、美国服务器、虚拟主机、免备案服务器”等云主机租用服务以及企业上云的综合解决方案,具有“安全稳定、简单易用、服务可用性高、性价比高”等特点与优势,专为企业上云打造定制,能够满足用户丰富、多元化的应用场景需求。