PriorityBlockingQueue 详解

详细说明

插入元素(offer)流程图详细说明:

插入元素(offer)流程

详细步骤:

  1. 开始 offer

    • 方法 offer() 开始执行。
  2. 加锁 lock.lock

    • 调用 lock.lock() 获取全局锁,确保线程安全操作。
  3. e == null?

    • 检查插入的元素 e 是否为 null。
    • 若是,则抛出 NullPointerException。
  4. 判断 size >= queue.length

    • 判断当前队列大小 size 是否大于等于数组长度 length,如果需要扩容则进行下一步操作。
  5. 调用 tryGrow 扩容

    • 调用内部方法 tryGrow() 进行扩容。这里的关键设计在于:
      • 释放锁:在 tryGrow() 方法中先调用 lock.unlock() 释放全局锁。
      • CAS 竞争:通过 CAS 操作尝试将 allocationSpinLock 从 0 修改为 1,以获得扩容权限。如果成功,则执行扩容操作。
      • 重新加锁并复制数组:在扩容完成后,调用 lock.lock() 重新获取全局锁,并进行新旧数组的复制。
  6. 释放锁, CAS竞争

    • 在 tryGrow() 方法中释放全局锁后,进行 CAS 操作以获得扩容权限。
  7. 分配新数组并复制

    • 扩容操作包括计算新的容量(根据当前大小动态调整),创建新的数组,并将旧数据复制到新数组。
  8. 重新获取锁

    • 完成扩容后,再次调用 lock.lock() 重新获得全局锁。此时可能需要与等待的其他线程竞争锁。
  9. 执行 siftUp 上浮调整

    • 将新的元素插入数组末尾(索引为 size),然后通过 siftUpComparable 或 siftUpUsingComparator 方法将其上浮至正确位置,以保持堆性质。
  10. size++

    • 更新队列大小 size。
  11. 原 size == 0?

    • 如果插入之前队列为空(即 size == 0),则需要调用 notEmpty.signal() 唤醒等待的消费者线程以进行处理。
  12. notEmpty.signal

    • 调用 notEmpty.signal() 发出信号,唤醒一个在条件变量上等待的线程。
  13. lock.unlock

    • 释放全局锁,并返回结果 true。

取出元素(take)流程图详细说明:

取出元素(take)流程

详细步骤:

  1. 开始 take

    • 方法 take() 开始执行。
  2. lock.lockInterruptibly

    • 调用 lock.lockInterruptible() 获取全局锁,响应线程中断请求。
  3. dequeue 尝试取堆顶

    • 调用内部方法 dequeue() 尝试从数组的第一个元素(即堆顶)获取元素。
  4. result == null?

    • 检查返回的堆顶元素是否为 null。若为 null,则说明队列为空。
  5. notEmpty.await 阻塞等待

    • 如果队列为空,则调用 notEmpty.await() 进入阻塞状态,并释放锁。
  6. 执行 siftDown 下沉调整

    • 重新获取锁后,将数组末尾的元素移到堆顶,然后通过 siftDownComparable 或 siftDownUsingComparator 方法将其下沉至正确位置,以保持堆性质。
  7. size--

    • 更新队列大小 size。
  8. lock.unlock

    • 释放全局锁,并返回取出的元素。

扩容机制(tryGrow)时序图详细说明:

扩容机制(tryGrow)时序

详细步骤:

  1. 线程 A 开始进行 offer

    • 线程 A 尝试插入元素,发现需要扩容。
  2. 已持有锁,释放全局锁 lock.unlock

    • 为了提高并发性,在 tryGrow() 中先调用 lock.unlock(), 释放全局锁。
  3. CAS(allocationSpinLock, 0, 1) 成功分配新数组 newArray

    • 线程 A 尝试通过 CAS 操作将 allocationSpinLock 设置为 1,以获得扩容权限。如果成功,则进行新的数组分配。
  4. allocationSpinLock = 0

    • 如果线程 A 获得了扩容权限,则将其重置为 0,并开始复制旧数组。
  5. lock 重新获取锁

    • 完成新数组的创建和复制后,调用 lock.lock() 重新获得全局锁。
  6. 竞争锁(可能在 ThreadA 释放锁期间获得锁)

    • 在线程 A 调用 lock.unlock 的过程中,其他线程可能尝试获取全局锁。
  7. 发现仍需扩容,tryGrow CAS 失败(allocationSpinLock != 0)

    • 如果其他线程 B 发现需要进行扩容并且 allocationSpinLock 不为 0,则其尝试通过 Thread.yield() 让出 CPU,并重新尝试获取全局锁。
  8. 执行 tryGrow 和复制旧数组到新数组,更新 queue

    • 最终获得全局锁的线程完成分配和复制操作。

这些详细的流程图与时序图帮助我们更好地理解 PriorityBlockingQueue 在高并发环境下的工作原理,并展示了其如何通过精心设计来提高性能。

总结与学习指引

核心特点总结

PriorityBlockingQueue 是一个无界且支持优先级排序的阻塞队列,基于数组二叉堆实现。以下是它的关键设计点和特性:

  1. 单锁 + 条件变量

    • 使用一把 ReentrantLock 保证线程安全。
    • 提供 notEmpty 条件用于消费者在队列为空时进行阻塞等待。
  2. 动态扩容

    • 扩容时主动释放主锁,通过 CAS 自旋锁控制并发扩容,减少锁竞争和内存占用的浪费。
  3. 非对称阻塞

    • 生产者永不阻塞(put() 方法永不因队列满而等待)。
    • 消费者在队列为空时阻塞(take() 方法会阻塞直到有元素可取)。
  4. 基于数组的二叉堆实现

    • 使用数组来存储元素,通过索引进行堆调整操作(上浮和下沉),确保优先级排序。
    • 堆调整的时间复杂度为 O(log n),适合中等规模的数据集。

使用建议

  1. 适用场景

    • 需要按优先级处理任务的系统,例如作业调度、紧急消息优先处理。
    • 适合需要动态分配资源或根据业务权重进行操作的任务队列管理。
  2. 注意事项

    • 监控队列大小:虽然 PriorityBlockingQueue 是无界的,但其容量是有限制的(最大为 Integer.MAX_VALUE - 8)。如果不加监控可能因数据量过大而导致内存溢出。
    • 合理设置初始容量:为了减少扩容频率和提高性能,在实际应用中建议根据业务预估合理设置队列的初始大小。
  3. 性能权衡

    • 当队列中的元素数量非常大时,堆调整的时间复杂度 O(log n) 会成为瓶颈。如果需要高吞吐量处理大量数据,则考虑使用其他类型的数据结构或优化方案。
    • 对于大规模、无优先级需求的场景,可以考虑使用 ArrayBlockingQueue 或 LinkedBlockingQueue 来替代。
  4. 替代方案

    • 如果任务需要延迟执行,建议采用 DelayQueue。
    • 若需实现零容量直接传递特性,则研究 SynchronousQueue 是更好的选择。

总结

通过理解 PriorityBlockingQueue 的核心设计思想及其在不同应用场景下的优劣点,可更好地为具体业务场景挑选合适的数据结构。合理设置初始容量、监控队列大小和注意数据规模的局限性是保障系统稳定运行的关键措施。