DelayQueue 详解
- Java
- 11天前
- 15热度
- 0评论
java V getValue() { return value; }
@Override public long getDelay(TimeUnit unit) { long remainingNanos = expireTime - System.nanoTime(); return unit.convert(remainingNanos, TimeUnit.NANOSECONDS); }
// 此实现为线性比较,可以根据实际情况进行优化以提高效率 @Override public int compareTo(Delayed other) { CacheEntry<?, ?> that = (CacheEntry<?, ?>) other; if (expireTime < that.expireTime) return -1; if (expireTime > that.expireTime) return 1; return 0; }
// 2. 缓存类实现 public class ExpiringMap<K, V> extends HashMap<K, CacheEntry<K, V>> {
private final DelayQueue&lt;CacheEntry&lt;K, V&gt;&gt; queue;
public ExpiringMap() {
// 初始化 DelayQueue,使用缓存条目作为元素类型
this.queue = new DelayQueue&lt;&gt;();
}
@Override
public synchronized V put(K key, V value) {
long ttl = 60; // 示例中设置为默认过期时间1分钟
TimeUnit unit = TimeUnit.SECONDS;
CacheEntry&lt;K, V&gt; entry = new CacheEntry&lt;&gt;(key, value, ttl, unit);
super.put(key, entry);
queue.add(entry); // 将缓存条目添加到延迟队列中
return value;
}
@Override
public synchronized V get(Object key) {
CacheEntry&lt;K, V&gt; entry = (CacheEntry&lt;K, V&gt;) super.get(key);
if (entry == null || System.nanoTime() &gt;= entry.expireTime) {
// 如果缓存条目已过期或不存在,则返回null
return null;
}
return entry.getValue();
}
@Override
public synchronized void clear() {
queue.clear(); // 清空 DelayQueue,从而移除所有未处理的延迟任务
super.clear(); // 清空HashMap缓存条目
}
// 定期检查过期元素并清理
private class ReapingThread extends Thread {
@Override
public void run() {
while (!isInterrupted()) {
try {
CacheEntry&lt;K, V&gt; entry = queue.take(); // 获取一个已经到达的延迟任务(即已过期缓存条目)
if (entry != null) {
super.remove(entry.getKey()); // 移除HashMap中的对应键值对
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}
public void startReaping() {
ReapingThread reaper = new ReapingThread();
reaper.setDaemon(true); // 设置为守护线程,程序结束时自动退出
reaper.start();
}}
// 使用示例 public class Main { public static void main(String[] args) throws InterruptedException {
ExpiringMap&lt;String, String&gt; cache = new ExpiringMap&lt;&gt;();
cache.put("key1", "value1"); // 设置缓存条目,自动添加到延迟队列并设置为过期时间
System.out.println(cache.get("key1")); // 输出:value1
Thread.sleep(65000); // 模拟等待一段时间让缓存条目过期
System.out.println(cache.get("key1")); // 输出:null,已过期被自动清理
}}
该代码示例展示了如何使用 `DelayQueue` 和自定义的缓存条目类来实现一个简单的内存缓存系统。这个缓存在插入项时就设定其过期时间,并在后台线程中定期检查和清除已经过期的数据,以保持高效和整洁的状态。
### 5.2 定时任务调度
```java
import java.util.concurrent.*;
import java.text.SimpleDateFormat;
import java.util.*;
public class ScheduledTaskExample {
public static void main(String[] args) throws InterruptedException {
// 创建一个 DelayQueue 用于存储定时任务
DelayQueue&lt;ScheduledTask&gt; delayQueue = new DelayQueue&lt;&gt;();
// 模拟添加多个定时任务
addScheduledTasks(delayQueue);
// 使用后台线程处理延迟队列中的任务,以避免阻塞主线程
Executors.newSingleThreadExecutor().execute(() -&gt; {
while (true) {
try {
ScheduledTask task = delayQueue.take(); // 取出下一个到期的任务
System.out.println("Executing scheduled task: " + task);
// 执行任务,这里使用打印日志作为示例
if ("log".equals(task.getCommand())) {
SimpleDateFormat sdf = new SimpleDateFormat("[hh:mm:ss]");
String timestamp = sdf.format(new Date());
System.out.print(timestamp + " - ");
System.out.println("Logging message: Task executed at "
+ task.getExecutionTime() + ".");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
});
}
private static void addScheduledTasks(DelayQueue&lt;ScheduledTask&gt; queue) throws InterruptedException {
// 添加几个延迟时间不同的任务
queue.put(new ScheduledTask("log", System.currentTimeMillis() + 5000));
queue.put(new ScheduledTask("log", System.currentTimeMillis() + 10000));
}
}
// 定义用于存储定时任务的类,并实现 Delayed 接口
class ScheduledTask implements Delayed {
private final String command; // 任务需要执行的操作(这里假设为命令)
private final long executionTime; // 执行时间点
public ScheduledTask(String cmd, long exec) {
this.command = cmd;
this.executionTime = exec;
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(executionTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed other) {
ScheduledTask task = (ScheduledTask)other;
if (executionTime &lt; task.executionTime) return -1;
else if (executionTime &gt; task.executionTime) return 1;
else return 0; // 相等则返回0,保持顺序不变
}
public String getCommand() { return command; }
public long getExecutionTime() { return executionTime; }
@Override
public String toString() {
SimpleDateFormat sdf = new SimpleDateFormat("[hh:mm:ss]");
String timestamp = sdf.format(new Date(executionTime));
return "Scheduled Task: Executed at " + timestamp;
}
}5.3 消息队列的过期消息处理
import java.util.concurrent.*;
import java.io.Serializable;
public class MessageQueueExample implements Serializable {
public static void main(String[] args) throws InterruptedException, IOException {
// 使用延迟队列存储带有过期时间的消息条目
DelayQueue&lt;ExpiredMessage&gt; delayQueue = new DelayQueue&lt;&gt;();
// 为消息队列创建一个后台线程,用于定期检查和处理过期的消息
Executors.newSingleThreadExecutor().execute(() -&gt; {
while(true) {
try {
ExpiredMessage expiredMsg = delayQueue.take(); // 获取下一个到期的消息
System.out.println("Processing expired message: " + expiredMsg);
// 在此处执行消息的清理或处理逻辑,如删除、归档等
delayQueue.remove(expiredMsg); // 从队列中移除已处理的消息
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
});
// 添加一些过期时间不同的消息到队列
addMessagesToQueue(delayQueue);
}
private static void addMessagesToQueue(DelayQueue&lt;ExpiredMessage&gt; queue) throws InterruptedException, IOException {
queue.put(new ExpiredMessage("user1", "Welcome to our service!"));
Thread.sleep(2000); // 模拟时间差,确保消息有不同的过期时间
queue.put(new ExpiredMessage("admin", "System notice: maintenance starting..."));
}
}
// 定义带有过期属性的消息类,并实现 Delayed 接口
class ExpiredMessage implements Delayed, Serializable {
private final String userId;
private final String messageContent;
private final long expireTime;
public ExpiredMessage(String user, String msg) throws IOException {
this.userId = user;
this.messageContent = msg;
// 设置过期时间为当前时间点之后的3秒
this.expireTime = System.currentTimeMillis() + 3000;
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(expireTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed other) {
ExpiredMessage msg = (ExpiredMessage)other;
if (expireTime &lt; msg.expireTime) return -1;
else if (expireTime &gt; msg.expireTime) return 1;
else return 0; // 相等则返回0,保持顺序不变
}
public String getUserId() {
return userId;
}
public String getMessageContent() {
return messageContent;
}
@Override
public String toString() {
SimpleDateFormat sdf = new SimpleDateFormat("[hh:mm:ss]");
// 返回带有过期时间的消息字符串表示形式
return "ExpiredMessage [User: " + getUserId()
+ ", Message: '" + getMessageContent()
+ "', Expired at: " + sdf.format(new Date(expireTime)) + "]";
}
}这些示例展示了使用 DelayQueue 处理带有延迟属性的任务和消息的几种常见场景,包括缓存过期、定时任务执行以及消息队列中处理已过期的消息。通过这种方式,可以实现高效且灵活的后台任务调度与管理机制。
以上代码实现了基于 java.util.concurrent.DelayQueue 的自定义任务管理系统,适用于需要对数据或任务设置特定时效限制的应用场景,如缓存系统、定时作业执行器以及消息队列等。这些示例展示了如何通过设计自定义类来实现延迟功能,并结合后台线程进行定期检查和处理以保持系统的高效运行。
请注意,在实际应用中还需要根据具体需求考虑更多细节(例如异常处理、资源释放、并发安全等),确保代码的健壮性和可维护性。对于生产环境中的任务调度,还可以使用更加成熟的解决方案如 java.util.concurrent.ScheduledExecutorService 或专门的任务调度框架,以应对更复杂的场景和更高的性能要求。此外,在设计系统时还需考虑数据持久化(如数据库存储)、分布式执行及容错机制等因素,确保系统的可靠性和扩展性。这些示例提供了一个基本的起点,可以根据具体需求进行进一步开发和完善。
总结与学习指引
核心要点回顾:
- 无界、延时、优先级 是
DelayQueue的三大特性。DelayQueue允许无限数量的任务存放在队列中,同时每个任务都有一个过期时间。
- 基于
PriorityQueue实现维护到期顺序 :- 使用二叉堆结构来确保按剩余延时排序,自动获取最早过期的元素。
- 使用单锁保证线程安全 :
- 只有单一的
ReentrantLock来保护队列操作,简化了实现并减少了竞争。
- 只有单一的
- Leader-Follower 模式优化性能 :
- 确保只有一个消费者线程等待过期元素,避免多线程同时等待导致的竞争和资源浪费。
- 生产者永不阻塞 :
DelayQueue的put()和offer()方法永远不会被阻塞,但需要监控队列大小防止内存溢出。
使用建议:
- 实现
Delayed接口时务必注意方法的一致性 :- 确保
getDelay(TimeUnit unit)返回正确的延迟时间,并且与compareTo(Delayed other)方法逻辑保持一致。
- 确保
- 使用
drainTo批量处理过期元素 :- 减少锁的获取和释放次数,提高批量操作效率。
- 监控队列大小防止内存溢出 :
- 在生产环境中定期检查队列的
size()方法,并设置合理的上限或报警机制。
- 在生产环境中定期检查队列的
- 合理选择队列类型 :
- 如果不需要时间延迟功能,则可以考虑使用其他类型的阻塞队列,如
LinkedBlockingQueue或ArrayBlockingQueue。
- 如果不需要时间延迟功能,则可以考虑使用其他类型的阻塞队列,如
示例代码
以下是一个简单的示例,展示了如何使用 DelayQueue 来实现缓存过期功能:
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
public class CacheEntry implements Delayed {
private final long expireTime; // 过期时间点
public CacheEntry(long delay, TimeUnit unit) {
this.expireTime = System.currentTimeMillis() + unit.toMillis(delay);
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(expireTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed o) {
return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
}
// 缓存操作示例
public static void main(String[] args) throws InterruptedException {
DelayQueue&lt;CacheEntry&gt; cache = new DelayQueue&lt;&gt;();
// 添加缓存条目, 过期时间为 5 秒后
cache.put(new CacheEntry(5, TimeUnit.SECONDS));
System.out.println("Adding entries to delay queue...");
// 检查是否已经过期并移除
while (!cache.isEmpty()) {
Delayed entry = cache.take();
if (entry.getDelay(TimeUnit.MILLISECONDS) &lt;= 0) {
System.out.println("Entry has expired at " + entry.expireTime);
cache.remove(entry); // 移除已过期的条目
}
Thread.sleep(1000); // 模拟处理逻辑延迟
}
System.out.println("All entries processed.");
}
}
``
### 参考资料
- [Java Concurrency in Practice](https://www.amazon.com/Java-Concurrency-Practice-Brian-Goetz/dp/0321349601)
- [JUC 文档](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/package-summary.html)
通过以上学习和实践,可以更好地理解和应用 DelayQueue` 来解决实际问题,并且在必要时选择合适的替换方案。
---
&gt; 🔗 **相关阅读**:[缓存实现原理](https://manbohub.com/archives/1287)