SynchronousQueue 详解
- Java
- 11天前
- 17热度
- 0评论
为了更好地理解 SynchronousQueue 的工作原理及其内部的数据结构和操作流程,我将详细提供每个 Mermaid 图的文字描述,并确保图中的每个步骤都清晰明了。以下是 4.2 到 4.7 部分的补充内容:
4. 必要流程的 Mermaid 图
4.1 类图
classDiagram
class SynchronousQueue {
-Transferer transferer
+void put(E e)
+E take()
+boolean offer(E e)
+E poll()
+SynchronousQueue()
+SynchronousQueue(boolean fair)
}
abstract class Transferer {
+E transfer(E e, boolean timed, long nanos): E
}
class TransferStack {
-SNode head
+TransferStack(): void
+E transfer(E e, boolean timed, long nanos): E
}
class SNode {
-SNode next
-SNode match
-Thread waiter
-Object item
-int mode
}
class TransferQueue {
-QNode head
-QNode tail
+TransferQueue(): void
+E transfer(E e, boolean timed, long nanos): E
}
class QNode {
-QNode next
-Object item
-Thread waiter
-boolean isData
}
SynchronousQueue <|-- TransferStack
SynchronousQueue <|-- TransferQueue详细文字描述 :
这是一个展示了 SynchronousQueue 类及其内部类之间的关系的 UML 图。SynchronousQueue 内部持有一个抽象引用 Transferer,具体实现由构造函数中的参数决定:若为公平模式,则使用 TransferQueue;否则使用 TransferStack。TransferStack 和 TransferQueue 都实现了 transfer(E, boolean, long) 方法处理生产者和消费者的逻辑。
4.2 非公平模式 TransferStack 结构图
graph LR
A[head(SNode)] --> B(mode=DATA)
B(mode=DATA) --> C(item=E1)
B(mode=DATA) --> D(match=null)
B(mode=DATA) --> E(waiter=Thread2)
F[null] --> G(SNode)
G(SNode) --> H(mode=REQUEST)
G(SNode) --> I(item=null)
G(SNode) --> J(match=null)
G(SNode) --> K(waiter=Thread1)详细文字描述 :
该图展示了非公平模式下的栈结构(LIFO)。head 指向栈顶节点,为 DATA 类型的生产者节点。这个节点携带数据 E1 并等待消费者匹配。下一个链表节点是另一个 SNode 节点,它是一个 REQUEST 类型的消费者节点,等待从生产者处获取数据,并且该节点由 Thread1 线程阻塞。LIFO 特性使得后到达线程优先被匹配,提高了吞吐量。
4.3 公平模式 TransferQueue 结构图
graph LR
A[head(QNode)] --> B(isData=true)
B(isData=true) --> C(item=E1)
B(isData=true) --> D(waiter=Thread1)
E(QNode) --> F(isData=false)
F(isData=false) --> G(item=null)
F(isData=false) --> H(waiter=Thread2)
I(QNode) --> J(isData=true)
J(isData=true) --> K(item=E2)
J(isData=true) --> L(waiter=Thread3)
M[null] --> N(tail)详细文字描述 :
该图展示了公平模式下的队列结构(FIFO)。head 指向第一个有效节点,为生产者类型节点,携带数据 E1 并被 Thread1 线程阻塞。第二个链表节点是消费者类型节点,等待获取数据,并由 Thread2 线程阻塞。第三个节点也是生产者类型节点,携带数据 E2 并被 Thread3 线程阻塞。
4.4 非公平模式 transfer 流程图(栈)
graph LR
A[开始 transfer] --> B(读取栈顶 h)
subgraph "判断条件"
C{栈空 或 h.mode == mode?}
D{h 是否处于 FULFILLING?}
E{匹配成功?}
F{清理失败节点?}
end
A --> B
B -->|是| C
B -->|否| G(返回 null)
C -->|是| H(创建节点 s 并压栈)
C -->|否| I(CAS 压栈成功?)
H --> J(CAS 压栈成功?)
J -->|是| K(自旋/阻塞等待匹配)
J -->|否| G
K --> L(匹配成功?)
L -->|是| M(返回元素)
L -->|否| N(清理失败节点)
C -->|否| D
D -->|是| O(设为 FULFILLING 模式)
D -->|否| G
O --> P(尝试匹配栈顶 h)
P --> Q(匹配成功?)
Q -->|是| R(弹出两个节点)
Q -->|否| S(清理失败节点)
R --> T(返回元素)详细文字描述 :
该流程图展示了非公平模式 TransferStack.transfer() 的执行路径。
- 模式相同或栈空 :读取栈顶 h,如果栈为空或栈顶模式与当前线程模式一致,创建节点并压栈(CAS)。压栈成功后自旋/阻塞等待匹配。
- 模式互补且 h 不是 FULFILLING :将自己设为 FULFILLING 模式,并尝试匹配栈顶。匹配成功则弹出两个节点;失败则清理失败节点并重试。
- h 处于 FULFILLING :帮助完成匹配(例如推进栈顶指针),加快整体进度。
整个流程通过 CAS 和自旋避免了阻塞开销,提高了性能。
4.5 公平模式 transfer 流程图(队列)
graph LR
A[开始 transfer] --> B(读取 head)
subgraph "判断条件"
C{head 是否有效?}
D{队尾节点与当前线程模式相同吗?}
E{匹配成功?}
F{清理失败节点?}
end
A --> B
B -->|是| C
B -->|否| G(返回 null)
C -->|是| D
D -->|是| H(创建节点并入队)
D -->|否| I(CAS 入队成功?)
H --> J(CAS 入队成功?)
J -->|是| K(自旋/阻塞等待匹配)
J -->|否| G
K --> L(匹配成功?)
L -->|是| M(返回元素)
L -->|否| N(清理失败节点)
C -->|否| E
E -->|是| O(进行匹配操作)
E -->|否| G
O --> P(CAS 弹出队首节点)
P --> Q(CAS 成功?)
Q -->|是| R(返回元素)
Q -->|否| S(清理失败节点)详细文字描述 :
该流程图展示了公平模式 TransferQueue.transfer() 的执行路径。
- 队列有效且尾部与当前线程模式一致 :创建新节点并入队(CAS)。入队成功后自旋/阻塞等待匹配。
- 头节点有效且队首和当前线程互补 :进行匹配操作,弹出队首节点;匹配失败则清理失败节点并重试。
整个流程通过 CAS 和自旋避免了阻塞开销,确保公平性。
4.6 调用栈图
graph LR
A[SynchronousQueue.put(E e)]
B[Transferer.transfer(E, boolean, long)]
subgraph "非公平模式 TransferStack"
C[读取 head]
D[创建节点并压栈]
E[CAS 压栈成功?]
F[自旋/阻塞等待匹配]
G[匹配成功?]
H[清理失败节点]
A -->|调用| B
B -->|模式相同或栈空| C
C --> D
D --> E
E -->|是| F
E -->|否| G
F --> G
G -->|是| H
end
subgraph "公平模式 TransferQueue"
I[读取 head]
J[创建节点并入队]
K[CAS 入队成功?]
L[自旋/阻塞等待匹配]
M[匹配成功?]
A --> B
B -->|头节点有效且互补| I
I --> J
J --> K
K -->|是| L
L --> M
end详细文字描述 :
该图展示了 SynchronousQueue.put(E e) 方法的调用栈,包括向非公平模式和公平模式的 Transferer 调用。每个步骤都通过 CAS 和自旋确保效率。
4.7 锁机制图
graph LR
A[put()方法]
B[take()方法]
subgraph "无锁机制"
C[CAS 操作]
D[自旋/阻塞等待匹配]
A -->|非公平模式 TransferStack| C
B -->|非公平模式 TransferStack| C
A -->|公平模式 TransferQueue| D
B -->|公平模式 TransferQueue| D
end
subgraph "可能的锁机制"
E[同步代码块]
A -->|锁定 head 节点| E
B -->|锁定 tail 节点| E
end详细文字描述 :
该图展示了 put() 和 take() 方法在非公平模式和公平模式下的无锁机制,以及可能的同步代码块实现。CAS 操作确保了性能而自旋/阻塞等待匹配则保证了线程间的协调。
这些详细的说明应该有助于全面理解 SynchronousQueue 的内部工作原理和设计思路。如果还需要更深入的信息或有其他问题,请随时告知。
总结与学习指引
9.1 SynchronousQueue 核心特点总结
- 零容量:SynchronousQueue 不存储任何元素,它是一个纯粹的手递手同步工具。
- 手递手机制:每个 put 操作必须等待一个 take 操作完成,反之亦然。没有缓冲机制。
- 无锁设计:基于 CAS 和自旋实现高性能的同步操作。
- 双模式:支持公平和非公平两种模式,默认为非公平模式(使用 TransferStack),提供更高的吞吐量;而公平模式则使用 TransferQueue。
- 特殊方法行为:
- size()、peek() 和 drainTo() 方法在 SynchronousQueue 中没有实际意义,永远返回特定值或无效结果。
9.2 使用建议
线程池任务直接传递
- 在创建线程池时,使用 SynchronousQueue 可以实现任务的快速传递。例如,在 ThreadPoolExecutor 中设置为工作队列:
ThreadPoolExecutor executor = new ThreadPoolExecutor( corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.MILLISECONDS, new SynchronousQueue<>(false), threadFactory);
- 在创建线程池时,使用 SynchronousQueue 可以实现任务的快速传递。例如,在 ThreadPoolExecutor 中设置为工作队列:
严格的生产者-消费者同步
- 如果每个生产的任务必须立即被消费,不允许积压,则使用 SynchronousQueue 是合适的。
根据公平性需求选择模式
- 默认的非公平模式提供更高的吞吐量和性能;如果需要严格的 FIFO 顺序,可以选择公平模式:
SynchronousQueue <E> queue = new SynchronousQueue<>(true);
- 默认的非公平模式提供更高的吞吐量和性能;如果需要严格的 FIFO 顺序,可以选择公平模式:
避免依赖特殊方法的行为
- size()、peek() 和 drainTo() 方法在 SynchronousQueue 中没有实际意义,不要依赖它们来获取队列的状态或数据。
注意资源风险
- 在使用 newCachedThreadPool 时,如果生产者提交任务的速度远高于消费者处理速度,线程池可能会不断创建新线程。建议设置合理的最大线程数上限:
ThreadPoolExecutor executor = new ThreadPoolExecutor( corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.MILLISECONDS, new SynchronousQueue<>(false), threadFactory);
- 在使用 newCachedThreadPool 时,如果生产者提交任务的速度远高于消费者处理速度,线程池可能会不断创建新线程。建议设置合理的最大线程数上限:
9.3 与其他队列的协同使用
在实际系统中,可以结合 SynchronousQueue 和其他有界队列一起使用:
- 直接 handoff 队列:使用 SynchronousQueue 作为任务传递机制。
- 降级缓冲队列:当任务无法立即被消费时,将任务降级到一个有界的阻塞队列(如 LinkedBlockingQueue)中进行临时存储。
例如:
ThreadPoolExecutor executor = new ThreadPoolExecutor(
corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(bufferSize), threadFactory);9.4 学习指引与预告
通过本文的学习,你已经掌握了 SynchronousQueue 的核心原理、源码实现、性能特征及应用场景。其无锁算法的设计和高效同步机制是值得深入研究的。
下一讲预告:我们将介绍 LinkedTransferQueue——它结合了 SynchronousQueue 的手递手传递特性与 LinkedBlockingQueue 的缓冲能力,提供高效的 transfer 方法(阻塞直到元素被消费)以及异步的 put 操作和批量操作。敬请期待!
下一讲:LinkedTransferQueue
在下一篇文章中,我们将深入探讨 LinkedTransferQueue 及其独特的特性,并了解如何在实际应用中使用它来优化任务传递和处理。
以上总结了 SynchronousQueue 的核心特性和应用场景,并提供了一些实用的建议和注意事项。希望这些内容能够帮助你更好地理解和使用这一高性能队列工具。
> 🔗 相关阅读:SynchronousQueue