SynchronousQueue 详解

为了更好地理解 SynchronousQueue 的工作原理及其内部的数据结构和操作流程,我将详细提供每个 Mermaid 图的文字描述,并确保图中的每个步骤都清晰明了。以下是 4.2 到 4.7 部分的补充内容:

4. 必要流程的 Mermaid 图

4.1 类图

classDiagram
    class SynchronousQueue {
        -Transferer transferer
        +void put(E e)
        +E take()
        +boolean offer(E e)
        +E poll()
        +SynchronousQueue()
        +SynchronousQueue(boolean fair)
    }

    abstract class Transferer {
        +E transfer(E e, boolean timed, long nanos): E
    }

    class TransferStack {
        -SNode head
        +TransferStack(): void
        +E transfer(E e, boolean timed, long nanos): E
    }

    class SNode {
        -SNode next
        -SNode match
        -Thread waiter
        -Object item
        -int mode
    }

    class TransferQueue {
        -QNode head
        -QNode tail
        +TransferQueue(): void
        +E transfer(E e, boolean timed, long nanos): E
    }

    class QNode {
        -QNode next
        -Object item
        -Thread waiter
        -boolean isData
    }

    SynchronousQueue <|-- TransferStack
    SynchronousQueue <|-- TransferQueue

详细文字描述
这是一个展示了 SynchronousQueue 类及其内部类之间的关系的 UML 图。SynchronousQueue 内部持有一个抽象引用 Transferer,具体实现由构造函数中的参数决定:若为公平模式,则使用 TransferQueue;否则使用 TransferStack。TransferStack 和 TransferQueue 都实现了 transfer(E, boolean, long) 方法处理生产者和消费者的逻辑。

4.2 非公平模式 TransferStack 结构图

graph LR
    A[head(SNode)] --> B(mode=DATA)
    B(mode=DATA) --> C(item=E1)
    B(mode=DATA) --> D(match=null)
    B(mode=DATA) --> E(waiter=Thread2)

    F[null] --> G(SNode)
    G(SNode) --> H(mode=REQUEST)
    G(SNode) --> I(item=null)
    G(SNode) --> J(match=null)
    G(SNode) --> K(waiter=Thread1)

详细文字描述
该图展示了非公平模式下的栈结构(LIFO)。head 指向栈顶节点,为 DATA 类型的生产者节点。这个节点携带数据 E1 并等待消费者匹配。下一个链表节点是另一个 SNode 节点,它是一个 REQUEST 类型的消费者节点,等待从生产者处获取数据,并且该节点由 Thread1 线程阻塞。LIFO 特性使得后到达线程优先被匹配,提高了吞吐量。

4.3 公平模式 TransferQueue 结构图

graph LR
    A[head(QNode)] --> B(isData=true)
    B(isData=true) --> C(item=E1)
    B(isData=true) --> D(waiter=Thread1)

    E(QNode) --> F(isData=false)
    F(isData=false) --> G(item=null)
    F(isData=false) --> H(waiter=Thread2)

    I(QNode) --> J(isData=true)
    J(isData=true) --> K(item=E2)
    J(isData=true) --> L(waiter=Thread3)

    M[null] --> N(tail)

详细文字描述
该图展示了公平模式下的队列结构(FIFO)。head 指向第一个有效节点,为生产者类型节点,携带数据 E1 并被 Thread1 线程阻塞。第二个链表节点是消费者类型节点,等待获取数据,并由 Thread2 线程阻塞。第三个节点也是生产者类型节点,携带数据 E2 并被 Thread3 线程阻塞。

4.4 非公平模式 transfer 流程图(栈)

graph LR
    A[开始 transfer] --> B(读取栈顶 h)

    subgraph "判断条件"
        C{栈空 或 h.mode == mode?}

        D{h 是否处于 FULFILLING?}

        E{匹配成功?}

        F{清理失败节点?} 
    end

    A --> B
    B -->|是| C
    B -->|否| G(返回 null)

    C -->|是| H(创建节点 s 并压栈)
    C -->|否| I(CAS 压栈成功?)

    H --> J(CAS 压栈成功?)
    J -->|是| K(自旋/阻塞等待匹配)
    J -->|否| G

    K --> L(匹配成功?)
    L -->|是| M(返回元素)
    L -->|否| N(清理失败节点)

    C -->|否| D
    D -->|是| O(设为 FULFILLING 模式)
    D -->|否| G

    O --> P(尝试匹配栈顶 h)
    P --> Q(匹配成功?)
    Q -->|是| R(弹出两个节点)
    Q -->|否| S(清理失败节点)

    R --> T(返回元素)

详细文字描述
该流程图展示了非公平模式 TransferStack.transfer() 的执行路径。

  1. 模式相同或栈空 :读取栈顶 h,如果栈为空或栈顶模式与当前线程模式一致,创建节点并压栈(CAS)。压栈成功后自旋/阻塞等待匹配。
  2. 模式互补且 h 不是 FULFILLING :将自己设为 FULFILLING 模式,并尝试匹配栈顶。匹配成功则弹出两个节点;失败则清理失败节点并重试。
  3. h 处于 FULFILLING :帮助完成匹配(例如推进栈顶指针),加快整体进度。

整个流程通过 CAS 和自旋避免了阻塞开销,提高了性能。

4.5 公平模式 transfer 流程图(队列)

graph LR
    A[开始 transfer] --> B(读取 head)

    subgraph "判断条件"
        C{head 是否有效?}

        D{队尾节点与当前线程模式相同吗?}

        E{匹配成功?}

        F{清理失败节点?} 
    end

    A --> B
    B -->|是| C
    B -->|否| G(返回 null)

    C -->|是| D
    D -->|是| H(创建节点并入队)
    D -->|否| I(CAS 入队成功?)

    H --> J(CAS 入队成功?)
    J -->|是| K(自旋/阻塞等待匹配)
    J -->|否| G

    K --> L(匹配成功?)
    L -->|是| M(返回元素)
    L -->|否| N(清理失败节点)

    C -->|否| E
    E -->|是| O(进行匹配操作)
    E -->|否| G

    O --> P(CAS 弹出队首节点)
    P --> Q(CAS 成功?)
    Q -->|是| R(返回元素)
    Q -->|否| S(清理失败节点)

详细文字描述
该流程图展示了公平模式 TransferQueue.transfer() 的执行路径。

  1. 队列有效且尾部与当前线程模式一致 :创建新节点并入队(CAS)。入队成功后自旋/阻塞等待匹配。
  2. 头节点有效且队首和当前线程互补 :进行匹配操作,弹出队首节点;匹配失败则清理失败节点并重试。

整个流程通过 CAS 和自旋避免了阻塞开销,确保公平性。

4.6 调用栈图

graph LR
    A[SynchronousQueue.put(E e)]
    B[Transferer.transfer(E, boolean, long)]

    subgraph "非公平模式 TransferStack"
        C[读取 head]
        D[创建节点并压栈]
        E[CAS 压栈成功?]
        F[自旋/阻塞等待匹配]
        G[匹配成功?]
        H[清理失败节点]

        A -->|调用| B
        B -->|模式相同或栈空| C
        C --> D
        D --> E
        E -->|是| F
        E -->|否| G

        F --> G
        G -->|是| H
    end

    subgraph "公平模式 TransferQueue"
        I[读取 head]
        J[创建节点并入队]
        K[CAS 入队成功?]
        L[自旋/阻塞等待匹配]
        M[匹配成功?]

        A --> B
        B -->|头节点有效且互补| I
        I --> J
        J --> K
        K -->|是| L

        L --> M
    end

详细文字描述
该图展示了 SynchronousQueue.put(E e) 方法的调用栈,包括向非公平模式和公平模式的 Transferer 调用。每个步骤都通过 CAS 和自旋确保效率。

4.7 锁机制图

graph LR
    A[put()方法]
    B[take()方法]

    subgraph "无锁机制"
        C[CAS 操作]
        D[自旋/阻塞等待匹配]

        A -->|非公平模式 TransferStack| C
        B -->|非公平模式 TransferStack| C

        A -->|公平模式 TransferQueue| D
        B -->|公平模式 TransferQueue| D
    end

    subgraph "可能的锁机制"
        E[同步代码块]

        A -->|锁定 head 节点| E
        B -->|锁定 tail 节点| E
    end

详细文字描述
该图展示了 put() 和 take() 方法在非公平模式和公平模式下的无锁机制,以及可能的同步代码块实现。CAS 操作确保了性能而自旋/阻塞等待匹配则保证了线程间的协调。

这些详细的说明应该有助于全面理解 SynchronousQueue 的内部工作原理和设计思路。如果还需要更深入的信息或有其他问题,请随时告知。

总结与学习指引

9.1 SynchronousQueue 核心特点总结

  • 零容量:SynchronousQueue 不存储任何元素,它是一个纯粹的手递手同步工具。
  • 手递手机制:每个 put 操作必须等待一个 take 操作完成,反之亦然。没有缓冲机制。
  • 无锁设计:基于 CAS 和自旋实现高性能的同步操作。
  • 双模式:支持公平和非公平两种模式,默认为非公平模式(使用 TransferStack),提供更高的吞吐量;而公平模式则使用 TransferQueue。
  • 特殊方法行为
    • size()、peek() 和 drainTo() 方法在 SynchronousQueue 中没有实际意义,永远返回特定值或无效结果。

9.2 使用建议

  1. 线程池任务直接传递

    • 在创建线程池时,使用 SynchronousQueue 可以实现任务的快速传递。例如,在 ThreadPoolExecutor 中设置为工作队列:
      ThreadPoolExecutor executor = new ThreadPoolExecutor(
          corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.MILLISECONDS,
          new SynchronousQueue<>(false), threadFactory);
  2. 严格的生产者-消费者同步

    • 如果每个生产的任务必须立即被消费,不允许积压,则使用 SynchronousQueue 是合适的。
  3. 根据公平性需求选择模式

    • 默认的非公平模式提供更高的吞吐量和性能;如果需要严格的 FIFO 顺序,可以选择公平模式:
      SynchronousQueue
      <E> queue = new SynchronousQueue<>(true);
  4. 避免依赖特殊方法的行为

    • size()、peek() 和 drainTo() 方法在 SynchronousQueue 中没有实际意义,不要依赖它们来获取队列的状态或数据。
  5. 注意资源风险

    • 在使用 newCachedThreadPool 时,如果生产者提交任务的速度远高于消费者处理速度,线程池可能会不断创建新线程。建议设置合理的最大线程数上限:
      ThreadPoolExecutor executor = new ThreadPoolExecutor(
          corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.MILLISECONDS,
          new SynchronousQueue<>(false), threadFactory);

9.3 与其他队列的协同使用

在实际系统中,可以结合 SynchronousQueue 和其他有界队列一起使用:

  • 直接 handoff 队列:使用 SynchronousQueue 作为任务传递机制。
  • 降级缓冲队列:当任务无法立即被消费时,将任务降级到一个有界的阻塞队列(如 LinkedBlockingQueue)中进行临时存储。

例如:

ThreadPoolExecutor executor = new ThreadPoolExecutor(
    corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.MILLISECONDS,
    new LinkedBlockingQueue<>(bufferSize), threadFactory);

9.4 学习指引与预告

通过本文的学习,你已经掌握了 SynchronousQueue 的核心原理、源码实现、性能特征及应用场景。其无锁算法的设计和高效同步机制是值得深入研究的。

下一讲预告:我们将介绍 LinkedTransferQueue——它结合了 SynchronousQueue 的手递手传递特性与 LinkedBlockingQueue 的缓冲能力,提供高效的 transfer 方法(阻塞直到元素被消费)以及异步的 put 操作和批量操作。敬请期待!

下一讲:LinkedTransferQueue

在下一篇文章中,我们将深入探讨 LinkedTransferQueue 及其独特的特性,并了解如何在实际应用中使用它来优化任务传递和处理。


以上总结了 SynchronousQueue 的核心特性和应用场景,并提供了一些实用的建议和注意事项。希望这些内容能够帮助你更好地理解和使用这一高性能队列工具。


> 🔗 相关阅读SynchronousQueue