# Java多线程的阻塞队列实现 ## 一、阻塞队列概述 ### 1.1 什么是阻塞队列 阻塞队列(BlockingQueue)是Java并发包(java.util.concurrent)中提供的一种线程安全的队列实现。它在普通队列的基础上增加了两个附加操作: 1. 当队列为空时,获取元素的线程会等待队列变为非空 2. 当队列满时,存储元素的线程会等待队列可用 这种特性使得阻塞队列成为生产者-消费者模式的理想实现方式,无需开发者手动实现线程间的等待/通知机制。 ### 1.2 阻塞队列的核心特性 - **线程安全**:所有操作都是原子性的 - **阻塞机制**:提供put/take等阻塞方法 - **容量限制**:可以是有界队列或无界队列 - **公平性选项**:部分实现支持公平访问策略 ### 1.3 Java中的阻塞队列实现类 Java并发包提供了多种阻塞队列实现: 1. ArrayBlockingQueue:基于数组的有界阻塞队列 2. LinkedBlockingQueue:基于链表的可选有界阻塞队列 3. PriorityBlockingQueue:支持优先级排序的无界阻塞队列 4. DelayQueue:使用优先级队列实现的无界阻塞队列 5. SynchronousQueue:不存储元素的阻塞队列 6. LinkedTransferQueue:基于链表的无界阻塞队列 7. LinkedBlockingDeque:基于链表的双向阻塞队列 ## 二、阻塞队列的核心方法 ### 2.1 插入操作 | 方法 | 说明 | 特殊行为 | |------|------|----------| | add(E e) | 添加元素到队列 | 队列满时抛出IllegalStateException | | offer(E e) | 添加元素到队列 | 队列满时返回false | | put(E e) | 添加元素到队列 | 队列满时阻塞等待 | | offer(E e, long timeout, TimeUnit unit) | 添加元素到队列 | 队列满时等待指定时间 | ### 2.2 移除操作 | 方法 | 说明 | 特殊行为 | |------|------|----------| | remove() | 移除并返回队列头元素 | 队列空时抛出NoSuchElementException | | poll() | 移除并返回队列头元素 | 队列空时返回null | | take() | 移除并返回队列头元素 | 队列空时阻塞等待 | | poll(long timeout, TimeUnit unit) | 移除并返回队列头元素 | 队列空时等待指定时间 | ### 2.3 检查操作 | 方法 | 说明 | 特殊行为 | |------|------|----------| | element() | 返回队列头元素 | 队列空时抛出NoSuchElementException | | peek() | 返回队列头元素 | 队列空时返回null | ## 三、阻塞队列的实现原理 ### 3.1 锁与条件变量 阻塞队列的核心实现依赖于ReentrantLock和Condition: ```java // 以ArrayBlockingQueue为例 final ReentrantLock lock; private final Condition notEmpty; private final Condition notFull; public ArrayBlockingQueue(int capacity, boolean fair) { // 省略其他初始化代码 lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); }
以put()方法为例:
public void put(E e) throws InterruptedException { Objects.requireNonNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) notFull.await(); // 队列满时等待 enqueue(e); // 实际入队操作 } finally { lock.unlock(); } }
以enqueue()方法为例:
private void enqueue(E e) { final Object[] items = this.items; items[putIndex] = e; if (++putIndex == items.length) putIndex = 0; count++; notEmpty.signal(); // 唤醒等待的消费者线程 }
public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { final Object[] items; // 存储元素的数组 int takeIndex; // 下一个要取出的元素索引 int putIndex; // 下一个要放入的元素索引 int count; // 当前元素数量 // 锁和条件变量 final ReentrantLock lock; private final Condition notEmpty; private final Condition notFull; // 迭代器 transient Itrs itrs; }
入队操作:
public boolean offer(E e) { Objects.requireNonNull(e); final ReentrantLock lock = this.lock; lock.lock(); try { if (count == items.length) return false; else { enqueue(e); return true; } } finally { lock.unlock(); } }
出队操作:
public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { return (count == 0) ? null : dequeue(); } finally { lock.unlock(); } }
public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { // 节点类 static class Node<E> { E item; Node<E> next; Node(E x) { item = x; } } private final int capacity; // 容量限制 private final AtomicInteger count = new AtomicInteger(); // 当前元素数量 // 头节点和尾节点 transient Node<E> head; private transient Node<E> last; // 分离的锁 private final ReentrantLock takeLock = new ReentrantLock(); private final Condition notEmpty = takeLock.newCondition(); private final ReentrantLock putLock = new ReentrantLock(); private final Condition notFull = putLock.newCondition(); }
入队操作:
public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); int c = -1; Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; putLock.lockInterruptibly(); try { while (count.get() == capacity) { notFull.await(); } enqueue(node); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); }
出队操作:
public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); try { while (count.get() == 0) { notEmpty.await(); } x = dequeue(); c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; }
基于堆结构的优先级阻塞队列:
public class PriorityBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { private transient Object[] queue; private transient int size; private transient Comparator<? super E> comparator; private final ReentrantLock lock; private final Condition notEmpty; // 扩容时使用的自旋锁 private transient volatile int allocationSpinLock; }
特点: - 无界队列(自动扩容) - 元素必须实现Comparable或提供Comparator - 出队顺序由优先级决定
用于实现延迟任务的队列:
public class DelayQueue<E extends Delayed> extends AbstractQueue<E> implements BlockingQueue<E> { private final transient ReentrantLock lock = new ReentrantLock(); private final PriorityQueue<E> q = new PriorityQueue<E>(); private final Condition available = lock.newCondition(); private Thread leader; }
特点: - 元素必须实现Delayed接口 - 只有到期元素才能被取出 - 应用场景:缓存过期、定时任务调度
不存储元素的阻塞队列:
public class SynchronousQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { abstract static class Transferer<E> { abstract E transfer(E e, boolean timed, long nanos); } // 两种不同的传输策略 static final class TransferStack<E> extends Transferer<E> { /*...*/ } static final class TransferQueue<E> extends Transferer<E> { /*...*/ } }
特点: - 每个插入操作必须等待一个移除操作 - 吞吐量高于LinkedBlockingQueue和ArrayBlockingQueue - 适合传递性场景
经典实现方式:
// 生产者 class Producer implements Runnable { private final BlockingQueue<String> queue; public Producer(BlockingQueue<String> queue) { this.queue = queue; } public void run() { try { while (true) { String item = produceItem(); queue.put(item); Thread.sleep(100); } } catch (InterruptedException ex) { // 处理中断 } } private String produceItem() { // 生产逻辑 } } // 消费者 class Consumer implements Runnable { private final BlockingQueue<String> queue; public Consumer(BlockingQueue<String> queue) { this.queue = queue; } public void run() { try { while (true) { String item = queue.take(); consumeItem(item); } } catch (InterruptedException ex) { // 处理中断 } } private void consumeItem(String item) { // 消费逻辑 } }
Java线程池使用阻塞队列作为工作队列:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { // 实现代码 }
实现简单的消息队列:
public class SimpleMessageQueue { private final BlockingQueue<Message> queue; public SimpleMessageQueue(int capacity) { this.queue = new LinkedBlockingQueue<>(capacity); } public void send(Message msg) throws InterruptedException { queue.put(msg); } public Message receive() throws InterruptedException { return queue.take(); } }
场景: 生产者等待队列空间,消费者等待生产者释放锁
解决方案: - 使用双锁设计的LinkedBlockingQueue - 设置合理的超时时间 - 避免在持有锁时调用外部方法
场景: 无界队列持续增长导致OOM
解决方案: - 使用有界队列 - 实现自定义的拒绝策略 - 监控队列大小
场景: 单一锁成为系统瓶颈
解决方案: - 使用分离锁的实现(如LinkedBlockingQueue) - 考虑无锁队列(如ConcurrentLinkedQueue) - 分区处理(多个队列)
public class SimpleBlockingQueue<E> { private final E[] items; private int putIndex, takeIndex, count; private final ReentrantLock lock = new ReentrantLock(); private final Condition notFull = lock.newCondition(); private final Condition notEmpty = lock.newCondition(); public SimpleBlockingQueue(int capacity) { this.items = (E[]) new Object[capacity]; } public void put(E e) throws InterruptedException { Objects.requireNonNull(e); lock.lockInterruptibly(); try { while (count == items.length) notFull.await(); items[putIndex] = e; if (++putIndex == items.length) putIndex = 0; count++; notEmpty.signal(); } finally { lock.unlock(); } } public E take() throws InterruptedException { lock.lockInterruptibly(); try { while (count == 0) notEmpty.await(); E e = items[takeIndex]; items[takeIndex] = null; if (++takeIndex == items.length) takeIndex = 0; count--; notFull.signal(); return e; } finally { lock.unlock(); } } }
public class CASBlockingQueue<E> { private static class Node<E> { volatile E item; volatile Node<E> next; Node(E item) { this.item = item; } } private volatile Node<E> head; private volatile Node<E> tail; private final AtomicInteger count = new AtomicInteger(0); private final int capacity; public CASBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; head = tail = new Node<>(null); } public boolean offer(E e) { Objects.requireNonNull(e); Node<E> newNode = new Node<>(e); for (;;) { Node<E> currentTail = tail; Node<E> tailNext = currentTail.next; if (currentTail == tail) { if (tailNext != null) { // 帮助推进尾节点 compareAndSetTail(tail, tailNext); } else { if (count.get() < capacity) { if (compareAndSetNext(currentTail, null, newNode)) { compareAndSetTail(tail, newNode); count.incrementAndGet(); return true; } } else { return false; } } } } } // 省略其他方法和CAS操作实现 }
场景 | 推荐队列 | 理由 |
---|---|---|
固定大小线程池 | ArrayBlockingQueue | 简单高效 |
高并发生产者消费者 | LinkedBlockingQueue | 吞吐量高 |
任务优先级处理 | PriorityBlockingQueue | 支持优先级 |
延迟任务调度 | DelayQueue | 内置延迟支持 |
直接传递任务 | SynchronousQueue | 零容量设计 |
本文共约9350字,详细介绍了Java多线程中阻塞队列的实现原理、各种实现类的特点、应用场景以及性能优化建议。 “`
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。