高并发接口总被打崩?我用 ArrayBlockingQueue + 底层源码深度剖析搞定流控

高并发接口限流策略:ArrayBlockingQueue 应用与源码剖析

高并发场景下,系统性能瓶颈往往出现在接口处理能力上。为了防止接口被海量请求压垮,需要采取有效的流量控制措施。本文将介绍如何使用有界阻塞队列 ArrayBlockingQueue 来实现简单的限流机制,并深入分析其内部原理。

一、ArrayBlockingQueue 基本概念

构造与特性

public class ArrayBlockingQueue
<E> extends AbstractQueue<E>
        implements BlockingQueue
<E>, java.io.Serializable {
    // 数据元素数组
    final Object[] items;
    // 下一个待取出元素索引
    int takeIndex;
    // 下一个待添加元素索引
    int putIndex;
    // 数组元素个数
    int count;
    // ReentrantLock 内部锁
    final ReentrantLock lock;
    // 消费者条件
    private final Condition notEmpty;
    // 生产者条件
    private final Condition notFull;

    public ArrayBlockingQueue(int capacity) {
        this(capacity, false);
    }

    public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull = lock.newCondition();
    }

ArrayBlockingQueue 是一个有界阻塞队列,其核心特性包括固定容量、先进先出(FIFO)以及存取互斥。初始化时需要指定容量大小,没有扩容机制;所有读写操作共享同一把 ReentrantLock 锁,确保同一时间只能执行入队或出队操作。

二、高并发限流应用场景

流控实例

package com.demo;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

/**
 * 基于阻塞队列的高并发流量控制,限制同时进入系统的请求数,保护系统不被压垮。
 */
public class FlowControl {
    private static final int MAX_QUEUE_SIZE = 2; // 最大允许排队请求的数量

    private static final BlockingQueue
<String> QUEUE = new ArrayBlockingQueue<>(MAX_QUEUE_SIZE);

    public static void main(String[] args) {
        System.out.println("系统启动,最大并发排队请求数:" + MAX_QUEUE_SIZE);
        new Thread(FlowControl::handleRequest, "Consumer-Thread").start();

        for (int i = 1; i <= 5; i++) {
            final int requestNo = i;
            new Thread(() -> {
                try {
                    QUEUE.put("Request-" + requestNo); // 入队操作,阻塞等待
                    System.out.println(Thread.currentThread().getName() + " 请求:" + requestNo);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }, "Producer-" + i).start();
        }
    }

    /**
     * 消费处理请求(核心业务)
     */
    private static void handleRequest() {
        while (true) {
            try {
                String request = QUEUE.take(); // 出队操作,阻塞等待
                System.out.println("【处理】" + Thread.currentThread().getName() + " 处理:" + request);
                Thread.sleep(500); // 模拟业务逻辑耗时
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                break;
            }
        }
    }
}

输出结果分析

系统启动,最大并发排队请求数:2
Producer-1 请求:1
Producer-4 请求:4
【处理】Consumer-Thread 处理:Request-1
Producer-2 请求:2
【处理】Consumer-Thread 处理:Request-4
Producer-3 请求:3
【处理】Consumer-Thread 处理:Request-2
Producer-5 请求:5
【处理】Consumer-Thread 处理:Request-3
【处理】Consumer-Thread 处理:Request-5

通过以上实例,可以看到 ArrayBlockingQueue 在高并发场景下能够有效地限制同时进入系统的请求数量。当队列满时,入队操作会被阻塞等待,直到有空位出现才继续执行。这对于保护系统资源和稳定运行至关重要。

三、内部源码实现分析

构造函数详解

public ArrayBlockingQueue(int capacity) {
    this(capacity, false);
}

public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0)
        throw new IllegalArgumentException();
    this.items = new Object[capacity];
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition();
    notFull = lock.newCondition();
}

构造函数首先检查容量参数是否合法,然后创建数据元素数组、ReentrantLock 锁以及条件变量。fair 参数控制锁的公平性策略。

入队与出队操作

public E put(E e) throws InterruptedException {
    if (e == null)
        throw new NullPointerException();
    final int c = capacity;
    int i = tail;
    for (;;) {
        final Object[] items = this.items;
        // 检查是否已满,满了就阻塞等待 notFull 条件
        if (i == head && ++count >= c) { // count is now -1 mod cap
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                for (int w; ; ) {
                    w = tail;
                    int q = (w + 1 < c ? w + 1 : 0);
                    if (q != head) // queue not full
                        break;
                    else if (Thread.interrupted())
                        throw new InterruptedException();
                    else
                        notFull.await();
                }
            } finally {
                lock.unlock();
            }
        }
        items[i] = e; // 插入元素,可能超过实际容量 c
        tail = i = q;
    }
}

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (;;) { // 阻塞等待直到队列不为空
            if (head >= size) // ignore spurious CAS failures
                return null;

            int h = head;  // 获取当前头部位置
            int i = exclusiveIndexFor(h); // 索引计算,用于多生产者场景
            E x = (E) items[i];
            if (x != null) { // 检查元素有效性
                items[i] = null;
                head = h + 1; // 更新头指针
                count--; // 减少队列长度
                return x;
            }
            notEmpty.await(); // 阻塞等待直到队列不为空
        }
    } finally {
        lock.unlock();
    }
}

入队操作使用 put 方法,当队列已满时会阻塞等待 notFull 条件释放;出队操作使用 take 方法,在队列为空时同样需要阻塞等待直到有元素可用。

总结

ArrayBlockingQueue 是一种非常有效的高并发限流工具。通过控制队列大小和锁机制,可以确保系统在大量请求冲来时不被压垮,并能够稳定运行。本文介绍了其基本构造、应用场景以及内部实现原理,希望对读者有所帮助。

2. ReentrantLock:并发控制

为了保证线程间的互斥访问,ArrayBlockingQueue 使用了 ReentrantLock 来实现同步机制。具体来说,在进行入队和出队操作时会调用锁的 lockInterruptibly() 方法以获取锁。

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == 0)
            notEmpty.await();
        return dequeue();
    } finally {
        lock.unlock();
    }
}

public void put(E e) throws InterruptedException {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == items.length)
            notFull.await();
        enqueue(e);
    } finally {
        lock.unlock();
    }
}
  • 锁机制:ArrayBlockingQueue 使用 ReentrantLock 保证线程安全。入队和出队操作共用同一把锁,确保了互斥访问。
  • 阻塞等待
    • 当队列为空(count=0)时,调用 notEmpty.await() 方法使当前线程进入等待状态,直到有新元素入队后唤醒。
    • 队列为满(count=length)时,则调用 notFull.await() 让当前的入队操作线程挂起,等待资源释放。

3. 数据结构

3.1 数组 Object[]

final Object[] items = this.items;
  • ArrayBlockingQueue 的存储核心是静态数组,用于存放队列中的元素。

3.2 入队和出队操作

private E dequeue() {
    final Object[] items = this.items;
    @SuppressWarnings("unchecked")
    E x = (E) items[takeIndex];
    items[takeIndex] = null;
    if (++takeIndex == items.length)
        takeIndex = 0;
    count--;
    notFull.signal();
    return x;
}

private void enqueue(E x) {
    final Object[] items = this.items;
    items[putIndex] = x;
    if (++putIndex == items.length)
        putIndex = 0;
    count++;
    notEmpty.signal();
}
  • 入队:在 enqueue 方法中,元素被添加到由 putIndex 指向的位置上。当到达数组末尾时,指针从头重新开始。
  • 出队:dequeue 负责从当前的 takeIndex 位置移除一个元素,并将指针向前推进。

4. 双指针设计

4.1 入队(put/offer)

  • 新元素被添加到由 putIndex 指向的位置。
  • 当数组末尾时,索引回绕至数组的起始位置。
  • 元素成功插入后,会唤醒在 notEmpty 上等待的线程。

4.2 出队(take/poll)

  • 从由 takeIndex 指向的位置移除元素。
  • 当到达末尾时重新回到数组开头。
  • 成功出队后唤醒在 notFull 条件变量上等待的线程,准备接收新的入队请求。

5. 固定长度静态数组

  • 队列依赖固定大小的静态数组存储数据。
  • 数组中的空位由 null 占用,这保证了空间利用率并简化了实现。
  • 使用 putIndex 和 takeIndex 确保 FIFO 顺序,并通过循环设计避免浪费存储。

四、总结

优势

  • ArrayBlockingQueue 设计为有界队列,有效防止内存溢出风险。它的锁机制和条件变量组合实现了高效的生产者-消费者模式。

局限性

  • 由于使用固定大小的数组,它不能动态扩展容量以适应不同的需求场景。此外,在高并发情况下独占锁可能会成为性能瓶颈。

欢迎在评论区交流关于阻塞队列的实际应用经验!


> 🔗 相关阅读高并发接口流控