DelayQueue 详解

java V getValue() { return value; }

@Override public long getDelay(TimeUnit unit) { long remainingNanos = expireTime - System.nanoTime(); return unit.convert(remainingNanos, TimeUnit.NANOSECONDS); }

// 此实现为线性比较,可以根据实际情况进行优化以提高效率 @Override public int compareTo(Delayed other) { CacheEntry<?, ?> that = (CacheEntry<?, ?>) other; if (expireTime < that.expireTime) return -1; if (expireTime > that.expireTime) return 1; return 0; }

// 2. 缓存类实现 public class ExpiringMap<K, V> extends HashMap<K, CacheEntry<K, V>> {

private final DelayQueue&amp;lt;CacheEntry&amp;lt;K, V&amp;gt;&amp;gt; queue;

public ExpiringMap() {
    // 初始化 DelayQueue,使用缓存条目作为元素类型
    this.queue = new DelayQueue&amp;lt;&amp;gt;();
}

@Override
public synchronized V put(K key, V value) {
    long ttl = 60; // 示例中设置为默认过期时间1分钟
    TimeUnit unit = TimeUnit.SECONDS;

    CacheEntry&amp;lt;K, V&amp;gt; entry = new CacheEntry&amp;lt;&amp;gt;(key, value, ttl, unit);
    super.put(key, entry);
    queue.add(entry); // 将缓存条目添加到延迟队列中

    return value;
}

@Override
public synchronized V get(Object key) {
    CacheEntry&amp;lt;K, V&amp;gt; entry = (CacheEntry&amp;lt;K, V&amp;gt;) super.get(key);
    if (entry == null || System.nanoTime() &amp;gt;= entry.expireTime) {
        // 如果缓存条目已过期或不存在,则返回null
        return null;
    }
    return entry.getValue();
}

@Override
public synchronized void clear() {
    queue.clear();  // 清空 DelayQueue,从而移除所有未处理的延迟任务
    super.clear();  // 清空HashMap缓存条目
}

// 定期检查过期元素并清理
private class ReapingThread extends Thread {
    @Override
    public void run() {
        while (!isInterrupted()) { 
            try {
                CacheEntry&amp;lt;K, V&amp;gt; entry = queue.take();  // 获取一个已经到达的延迟任务(即已过期缓存条目)

                if (entry != null) {
                    super.remove(entry.getKey()); // 移除HashMap中的对应键值对
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }
}

public void startReaping() {
    ReapingThread reaper = new ReapingThread(); 
    reaper.setDaemon(true);  // 设置为守护线程,程序结束时自动退出
    reaper.start();
}

}

// 使用示例 public class Main { public static void main(String[] args) throws InterruptedException {

    ExpiringMap&amp;lt;String, String&amp;gt; cache = new ExpiringMap&amp;lt;&amp;gt;();

    cache.put("key1", "value1"); // 设置缓存条目,自动添加到延迟队列并设置为过期时间

    System.out.println(cache.get("key1"));  // 输出:value1
    Thread.sleep(65000);  // 模拟等待一段时间让缓存条目过期

    System.out.println(cache.get("key1"));  // 输出:null,已过期被自动清理
}

}


该代码示例展示了如何使用 `DelayQueue` 和自定义的缓存条目类来实现一个简单的内存缓存系统。这个缓存在插入项时就设定其过期时间,并在后台线程中定期检查和清除已经过期的数据,以保持高效和整洁的状态。

### 5.2 定时任务调度
```java
import java.util.concurrent.*;
import java.text.SimpleDateFormat;
import java.util.*;

public class ScheduledTaskExample {
    public static void main(String[] args) throws InterruptedException {
        // 创建一个 DelayQueue 用于存储定时任务
        DelayQueue&amp;lt;ScheduledTask&amp;gt; delayQueue = new DelayQueue&amp;lt;&amp;gt;();

        // 模拟添加多个定时任务
        addScheduledTasks(delayQueue);

        // 使用后台线程处理延迟队列中的任务,以避免阻塞主线程
        Executors.newSingleThreadExecutor().execute(() -&amp;gt; {
            while (true) {
                try {
                    ScheduledTask task = delayQueue.take();  // 取出下一个到期的任务
                    System.out.println("Executing scheduled task: " + task);

                    // 执行任务,这里使用打印日志作为示例
                    if ("log".equals(task.getCommand())) {
                        SimpleDateFormat sdf = new SimpleDateFormat("[hh:mm:ss]");
                        String timestamp = sdf.format(new Date());

                        System.out.print(timestamp + " - ");
                        System.out.println("Logging message: Task executed at " 
                                + task.getExecutionTime() + ".");
                    }

                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    break;
                }
            }
        });
    }

    private static void addScheduledTasks(DelayQueue&amp;lt;ScheduledTask&amp;gt; queue) throws InterruptedException {
        // 添加几个延迟时间不同的任务
        queue.put(new ScheduledTask("log", System.currentTimeMillis() + 5000));
        queue.put(new ScheduledTask("log", System.currentTimeMillis() + 10000));
    }
}

// 定义用于存储定时任务的类,并实现 Delayed 接口
class ScheduledTask implements Delayed {
    private final String command; // 任务需要执行的操作(这里假设为命令)
    private final long executionTime; // 执行时间点

    public ScheduledTask(String cmd, long exec) {
        this.command = cmd;
        this.executionTime = exec;
    }

    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert(executionTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
    }

    @Override
    public int compareTo(Delayed other) {
        ScheduledTask task = (ScheduledTask)other;
        if (executionTime &amp;lt; task.executionTime) return -1;
        else if (executionTime &amp;gt; task.executionTime) return 1;
        else return 0; // 相等则返回0,保持顺序不变
    }

    public String getCommand() { return command; }
    public long getExecutionTime() { return executionTime; }

    @Override
    public String toString() {
        SimpleDateFormat sdf = new SimpleDateFormat("[hh:mm:ss]");
        String timestamp = sdf.format(new Date(executionTime));

        return "Scheduled Task: Executed at " + timestamp;
    }
}

5.3 消息队列的过期消息处理

import java.util.concurrent.*;
import java.io.Serializable;

public class MessageQueueExample implements Serializable {

    public static void main(String[] args) throws InterruptedException, IOException {
        // 使用延迟队列存储带有过期时间的消息条目
        DelayQueue&amp;lt;ExpiredMessage&amp;gt; delayQueue = new DelayQueue&amp;lt;&amp;gt;();

        // 为消息队列创建一个后台线程,用于定期检查和处理过期的消息
        Executors.newSingleThreadExecutor().execute(() -&amp;gt; {
            while(true) {
                try {
                    ExpiredMessage expiredMsg = delayQueue.take(); // 获取下一个到期的消息

                    System.out.println("Processing expired message: " + expiredMsg);

                    // 在此处执行消息的清理或处理逻辑,如删除、归档等
                    delayQueue.remove(expiredMsg); // 从队列中移除已处理的消息
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    break;
                }
            }
        });

        // 添加一些过期时间不同的消息到队列
        addMessagesToQueue(delayQueue);
    }

    private static void addMessagesToQueue(DelayQueue&amp;lt;ExpiredMessage&amp;gt; queue) throws InterruptedException, IOException {
        queue.put(new ExpiredMessage("user1", "Welcome to our service!"));
        Thread.sleep(2000); // 模拟时间差,确保消息有不同的过期时间
        queue.put(new ExpiredMessage("admin", "System notice: maintenance starting..."));
    }
}

// 定义带有过期属性的消息类,并实现 Delayed 接口
class ExpiredMessage implements Delayed, Serializable {

    private final String userId;
    private final String messageContent;
    private final long expireTime;

    public ExpiredMessage(String user, String msg) throws IOException {
        this.userId = user;
        this.messageContent = msg;

        // 设置过期时间为当前时间点之后的3秒
        this.expireTime = System.currentTimeMillis() + 3000;
    }

    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert(expireTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
    }

    @Override
    public int compareTo(Delayed other) {
        ExpiredMessage msg = (ExpiredMessage)other;
        if (expireTime &amp;lt; msg.expireTime) return -1;
        else if (expireTime &amp;gt; msg.expireTime) return 1;
        else return 0; // 相等则返回0,保持顺序不变
    }

    public String getUserId() {
        return userId;
    }

    public String getMessageContent() {
        return messageContent;
    }

    @Override
    public String toString() {
        SimpleDateFormat sdf = new SimpleDateFormat("[hh:mm:ss]");

        // 返回带有过期时间的消息字符串表示形式
        return "ExpiredMessage [User: " + getUserId() 
                + ", Message: '" + getMessageContent()
                + "', Expired at: " + sdf.format(new Date(expireTime)) + "]";
    }
}

这些示例展示了使用 DelayQueue 处理带有延迟属性的任务和消息的几种常见场景,包括缓存过期、定时任务执行以及消息队列中处理已过期的消息。通过这种方式,可以实现高效且灵活的后台任务调度与管理机制。

以上代码实现了基于 java.util.concurrent.DelayQueue 的自定义任务管理系统,适用于需要对数据或任务设置特定时效限制的应用场景,如缓存系统、定时作业执行器以及消息队列等。这些示例展示了如何通过设计自定义类来实现延迟功能,并结合后台线程进行定期检查和处理以保持系统的高效运行。

请注意,在实际应用中还需要根据具体需求考虑更多细节(例如异常处理、资源释放、并发安全等),确保代码的健壮性和可维护性。对于生产环境中的任务调度,还可以使用更加成熟的解决方案如 java.util.concurrent.ScheduledExecutorService 或专门的任务调度框架,以应对更复杂的场景和更高的性能要求。此外,在设计系统时还需考虑数据持久化(如数据库存储)、分布式执行及容错机制等因素,确保系统的可靠性和扩展性。这些示例提供了一个基本的起点,可以根据具体需求进行进一步开发和完善。

总结与学习指引

核心要点回顾:

  1. 无界、延时、优先级DelayQueue 的三大特性。
    • DelayQueue 允许无限数量的任务存放在队列中,同时每个任务都有一个过期时间。
  2. 基于 PriorityQueue 实现维护到期顺序
    • 使用二叉堆结构来确保按剩余延时排序,自动获取最早过期的元素。
  3. 使用单锁保证线程安全
    • 只有单一的 ReentrantLock 来保护队列操作,简化了实现并减少了竞争。
  4. Leader-Follower 模式优化性能
    • 确保只有一个消费者线程等待过期元素,避免多线程同时等待导致的竞争和资源浪费。
  5. 生产者永不阻塞
    • DelayQueueput()offer() 方法永远不会被阻塞,但需要监控队列大小防止内存溢出。

使用建议:

  1. 实现 Delayed 接口时务必注意方法的一致性
    • 确保 getDelay(TimeUnit unit) 返回正确的延迟时间,并且与 compareTo(Delayed other) 方法逻辑保持一致。
  2. 使用 drainTo 批量处理过期元素
    • 减少锁的获取和释放次数,提高批量操作效率。
  3. 监控队列大小防止内存溢出
    • 在生产环境中定期检查队列的 size() 方法,并设置合理的上限或报警机制。
  4. 合理选择队列类型
    • 如果不需要时间延迟功能,则可以考虑使用其他类型的阻塞队列,如 LinkedBlockingQueueArrayBlockingQueue

示例代码

以下是一个简单的示例,展示了如何使用 DelayQueue 来实现缓存过期功能:


import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

public class CacheEntry implements Delayed {
    private final long expireTime; // 过期时间点

    public CacheEntry(long delay, TimeUnit unit) {
        this.expireTime = System.currentTimeMillis() + unit.toMillis(delay);
    }

    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert(expireTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
    }

    @Override
    public int compareTo(Delayed o) {
        return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
    }

    // 缓存操作示例
    public static void main(String[] args) throws InterruptedException {
        DelayQueue&amp;lt;CacheEntry&amp;gt; cache = new DelayQueue&amp;lt;&amp;gt;();

        // 添加缓存条目, 过期时间为 5 秒后
        cache.put(new CacheEntry(5, TimeUnit.SECONDS));

        System.out.println("Adding entries to delay queue...");

        // 检查是否已经过期并移除
        while (!cache.isEmpty()) {
            Delayed entry = cache.take();
            if (entry.getDelay(TimeUnit.MILLISECONDS) &amp;lt;= 0) {
                System.out.println("Entry has expired at " + entry.expireTime);
                cache.remove(entry); // 移除已过期的条目
            }
            Thread.sleep(1000); // 模拟处理逻辑延迟
        }

        System.out.println("All entries processed.");
    }
}
``

### 参考资料

- [Java Concurrency in Practice](https://www.amazon.com/Java-Concurrency-Practice-Brian-Goetz/dp/0321349601)
- [JUC 文档](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/package-summary.html)

通过以上学习和实践,可以更好地理解和应用 DelayQueue` 来解决实际问题,并且在必要时选择合适的替换方案。

---

&amp;gt; 🔗 **相关阅读**:[缓存实现原理](https://manbohub.com/archives/1287)