高并发接口总被打崩?我用 ArrayBlockingQueue + 底层源码深度剖析搞定流控
- Java
- 11天前
- 12热度
- 0评论
高并发接口限流策略:ArrayBlockingQueue 应用与源码剖析
高并发场景下,系统性能瓶颈往往出现在接口处理能力上。为了防止接口被海量请求压垮,需要采取有效的流量控制措施。本文将介绍如何使用有界阻塞队列 ArrayBlockingQueue 来实现简单的限流机制,并深入分析其内部原理。
一、ArrayBlockingQueue 基本概念
构造与特性
public class ArrayBlockingQueue
<E> extends AbstractQueue<E>
implements BlockingQueue
<E>, java.io.Serializable {
// 数据元素数组
final Object[] items;
// 下一个待取出元素索引
int takeIndex;
// 下一个待添加元素索引
int putIndex;
// 数组元素个数
int count;
// ReentrantLock 内部锁
final ReentrantLock lock;
// 消费者条件
private final Condition notEmpty;
// 生产者条件
private final Condition notFull;
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}ArrayBlockingQueue 是一个有界阻塞队列,其核心特性包括固定容量、先进先出(FIFO)以及存取互斥。初始化时需要指定容量大小,没有扩容机制;所有读写操作共享同一把 ReentrantLock 锁,确保同一时间只能执行入队或出队操作。
二、高并发限流应用场景
流控实例
package com.demo;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
/**
* 基于阻塞队列的高并发流量控制,限制同时进入系统的请求数,保护系统不被压垮。
*/
public class FlowControl {
private static final int MAX_QUEUE_SIZE = 2; // 最大允许排队请求的数量
private static final BlockingQueue
<String> QUEUE = new ArrayBlockingQueue<>(MAX_QUEUE_SIZE);
public static void main(String[] args) {
System.out.println("系统启动,最大并发排队请求数:" + MAX_QUEUE_SIZE);
new Thread(FlowControl::handleRequest, "Consumer-Thread").start();
for (int i = 1; i <= 5; i++) {
final int requestNo = i;
new Thread(() -> {
try {
QUEUE.put("Request-" + requestNo); // 入队操作,阻塞等待
System.out.println(Thread.currentThread().getName() + " 请求:" + requestNo);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}, "Producer-" + i).start();
}
}
/**
* 消费处理请求(核心业务)
*/
private static void handleRequest() {
while (true) {
try {
String request = QUEUE.take(); // 出队操作,阻塞等待
System.out.println("【处理】" + Thread.currentThread().getName() + " 处理:" + request);
Thread.sleep(500); // 模拟业务逻辑耗时
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
}
}输出结果分析
系统启动,最大并发排队请求数:2
Producer-1 请求:1
Producer-4 请求:4
【处理】Consumer-Thread 处理:Request-1
Producer-2 请求:2
【处理】Consumer-Thread 处理:Request-4
Producer-3 请求:3
【处理】Consumer-Thread 处理:Request-2
Producer-5 请求:5
【处理】Consumer-Thread 处理:Request-3
【处理】Consumer-Thread 处理:Request-5通过以上实例,可以看到 ArrayBlockingQueue 在高并发场景下能够有效地限制同时进入系统的请求数量。当队列满时,入队操作会被阻塞等待,直到有空位出现才继续执行。这对于保护系统资源和稳定运行至关重要。
三、内部源码实现分析
构造函数详解
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}构造函数首先检查容量参数是否合法,然后创建数据元素数组、ReentrantLock 锁以及条件变量。fair 参数控制锁的公平性策略。
入队与出队操作
public E put(E e) throws InterruptedException {
if (e == null)
throw new NullPointerException();
final int c = capacity;
int i = tail;
for (;;) {
final Object[] items = this.items;
// 检查是否已满,满了就阻塞等待 notFull 条件
if (i == head && ++count >= c) { // count is now -1 mod cap
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (int w; ; ) {
w = tail;
int q = (w + 1 < c ? w + 1 : 0);
if (q != head) // queue not full
break;
else if (Thread.interrupted())
throw new InterruptedException();
else
notFull.await();
}
} finally {
lock.unlock();
}
}
items[i] = e; // 插入元素,可能超过实际容量 c
tail = i = q;
}
}
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) { // 阻塞等待直到队列不为空
if (head >= size) // ignore spurious CAS failures
return null;
int h = head; // 获取当前头部位置
int i = exclusiveIndexFor(h); // 索引计算,用于多生产者场景
E x = (E) items[i];
if (x != null) { // 检查元素有效性
items[i] = null;
head = h + 1; // 更新头指针
count--; // 减少队列长度
return x;
}
notEmpty.await(); // 阻塞等待直到队列不为空
}
} finally {
lock.unlock();
}
}入队操作使用 put 方法,当队列已满时会阻塞等待 notFull 条件释放;出队操作使用 take 方法,在队列为空时同样需要阻塞等待直到有元素可用。
总结
ArrayBlockingQueue 是一种非常有效的高并发限流工具。通过控制队列大小和锁机制,可以确保系统在大量请求冲来时不被压垮,并能够稳定运行。本文介绍了其基本构造、应用场景以及内部实现原理,希望对读者有所帮助。
2. ReentrantLock:并发控制
为了保证线程间的互斥访问,ArrayBlockingQueue 使用了 ReentrantLock 来实现同步机制。具体来说,在进行入队和出队操作时会调用锁的 lockInterruptibly() 方法以获取锁。
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}- 锁机制:ArrayBlockingQueue 使用 ReentrantLock 保证线程安全。入队和出队操作共用同一把锁,确保了互斥访问。
- 阻塞等待:
- 当队列为空(count=0)时,调用 notEmpty.await() 方法使当前线程进入等待状态,直到有新元素入队后唤醒。
- 队列为满(count=length)时,则调用 notFull.await() 让当前的入队操作线程挂起,等待资源释放。
3. 数据结构
3.1 数组 Object[]
final Object[] items = this.items;- ArrayBlockingQueue 的存储核心是静态数组,用于存放队列中的元素。
3.2 入队和出队操作
private E dequeue() {
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
notFull.signal();
return x;
}
private void enqueue(E x) {
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
notEmpty.signal();
}- 入队:在 enqueue 方法中,元素被添加到由 putIndex 指向的位置上。当到达数组末尾时,指针从头重新开始。
- 出队:dequeue 负责从当前的 takeIndex 位置移除一个元素,并将指针向前推进。
4. 双指针设计
4.1 入队(put/offer)
- 新元素被添加到由 putIndex 指向的位置。
- 当数组末尾时,索引回绕至数组的起始位置。
- 元素成功插入后,会唤醒在 notEmpty 上等待的线程。
4.2 出队(take/poll)
- 从由 takeIndex 指向的位置移除元素。
- 当到达末尾时重新回到数组开头。
- 成功出队后唤醒在 notFull 条件变量上等待的线程,准备接收新的入队请求。
5. 固定长度静态数组
- 队列依赖固定大小的静态数组存储数据。
- 数组中的空位由 null 占用,这保证了空间利用率并简化了实现。
- 使用 putIndex 和 takeIndex 确保 FIFO 顺序,并通过循环设计避免浪费存储。
四、总结
优势:
- ArrayBlockingQueue 设计为有界队列,有效防止内存溢出风险。它的锁机制和条件变量组合实现了高效的生产者-消费者模式。
局限性:
- 由于使用固定大小的数组,它不能动态扩展容量以适应不同的需求场景。此外,在高并发情况下独占锁可能会成为性能瓶颈。
欢迎在评论区交流关于阻塞队列的实际应用经验!
> 🔗 相关阅读:高并发接口流控