
问题
- DelayQueue是阻塞队列吗?
 
- DelayQueue的实现方式?
 
- DelayQueue主要用于什么场景?
 
简介
DelayQueue是java并发包下的延时阻塞队列,常用于实现定时任务。
实战另见 DelayQueue
继承体系

从继承体系可以看到,DelayQueue实现了BlockingQueue,所以它是一个阻塞队列。
另外,DelayQueue还组合了一个叫做Delayed的接口,DelayQueue中存储的所有元素必须实现Delayed接口。
那么,Delayed是什么呢?
1 2 3
   | public interface Delayed extends Comparable<Delayed> {     long getDelay(TimeUnit unit); }
  | 
 
Delayed是一个继承自Comparable的接口,并且定义了一个getDelay()方法,用于表示还有多少时间到期,到期了应返回小于等于0的数值。
源码分析
主要属性
1 2 3 4 5 6 7 8
   |  private final transient ReentrantLock lock = new ReentrantLock();
  private final PriorityQueue<E> q = new PriorityQueue<E>();
  private Thread leader = null;
  private final Condition available = lock.newCondition();
 
  | 
 
从属性我们可以知道,延时队列主要使用优先级队列来实现,并辅以重入锁和条件来控制并发安全。
因为优先级队列是无界的,所以这里只需要一个条件就可以了。
还记得优先级队列吗?点击链接直达 PriorityQueue内部原理
主要构造方法
1 2 3 4 5
   | public DelayQueue() {}
  public DelayQueue(Collection<? extends E> c) {     this.addAll(c); }
  | 
 
构造方法比较简单,一个默认构造方法,一个初始化添加集合c中所有元素的构造方法。
入队
因为DelayQueue是阻塞队列,且优先级队列是无界的,所以入队不会阻塞不会超时,因此它的四个入队方法是一样的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
   | public boolean add(E e) {     return offer(e); }
  public void put(E e) {     offer(e); }
  public boolean offer(E e, long timeout, TimeUnit unit) {     return offer(e); }
  public boolean offer(E e) {     final ReentrantLock lock = this.lock;     lock.lock();     try {         q.offer(e);         if (q.peek() == e) {             leader = null;             available.signal();         }         return true;     } finally {         lock.unlock();     } }
  | 
 
入队方法比较简单:
(1)加锁;
(2)添加元素到优先级队列中;
(3)如果添加的元素是堆顶元素,就把leader置为空,并唤醒等待在条件available上的线程;
(4)解锁;
出队
因为DelayQueue是阻塞队列,所以它的出队有四个不同的方法,有抛出异常的,有阻塞的,有不阻塞的,有超时的。
我们这里主要分析两个,poll()和take()方法。
1 2 3 4 5 6 7 8 9 10 11 12 13
   | public E poll() {     final ReentrantLock lock = this.lock;     lock.lock();     try {         E first = q.peek();         if (first == null || first.getDelay(NANOSECONDS) > 0)             return null;         else             return q.poll();     } finally {         lock.unlock();     } }
  | 
 
poll()方法比较简单:
- 加锁;
 
- 检查第一个元素,如果为空或者还没到期,就返回null;
 
- 如果第一个元素到期了就调用poll()弹出第一个元素;
 
- 解锁。
 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
   | public E take() throws InterruptedException {     final ReentrantLock lock = this.lock;     lock.lockInterruptibly();     try {         for (;;) {                          E first = q.peek();                          if (first == null)                 available.await();             else {                                  long delay = first.getDelay(NANOSECONDS);                                  if (delay <= 0)                     return q.poll();                                                                                                      first = null;                                   if (leader != null)                     available.await();                 else {                                          Thread thisThread = Thread.currentThread();                     leader = thisThread;                     try {                                                                                                                                                                                                        available.awaitNanos(delay);                     } finally {                                                  if (leader == thisThread)                             leader = null;                     }                 }             }         }     } finally {                  if (leader == null && q.peek() != null)                          available.signal();                  lock.unlock();     } }
  | 
 
take()方法稍微要复杂一些:
加锁;
 
判断堆顶元素是否为空,为空的话直接阻塞等待;
 
判断堆顶元素是否到期,到期了直接poll()出元素;
 
没到期,再判断前面是否有其它线程在等待,有则直接等待;
 
前面没有其它线程在等待,则把自己当作第一个线程等待delay时间后唤醒,再尝试获取元素;
 
获取到元素之后再唤醒下一个等待的线程;
 
解锁;
 
使用方法
说了那么多,是不是还是不知道怎么用呢?那怎么能行,请看下面的案例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
   | public class DelayQueueTest {     public static void main(String[] args) {         DelayQueue<Message> queue = new DelayQueue<>();
          long now = System.currentTimeMillis();
                   new Thread(()->{             while (true) {                 try {                                          System.out.println(queue.take().deadline - now);                 } catch (InterruptedException e) {                     e.printStackTrace();                 }             }         }).start();
                   queue.add(new Message(now + 5000));         queue.add(new Message(now + 8000));         queue.add(new Message(now + 2000));         queue.add(new Message(now + 1000));         queue.add(new Message(now + 7000));     } }
  class Message implements Delayed {     long deadline;
      public Message(long deadline) {         this.deadline = deadline;     }
      @Override     public long getDelay(TimeUnit unit) {         return deadline - System.currentTimeMillis();     }
      @Override     public int compareTo(Delayed o) {         return (int) (getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));     }
      @Override     public String toString() {         return String.valueOf(deadline);     } }
  | 
 
是不是很简单,越早到期的元素越先出队。
总结
- DelayQueue是阻塞队列;
 
- DelayQueue内部存储结构使用优先级队列;
 
- DelayQueue使用重入锁和条件来控制并发安全;
 
- DelayQueue常用于定时任务;
 
彩蛋
java中的线程池实现定时任务是直接用的DelayQueue吗?
当然不是,ScheduledThreadPoolExecutor中使用的是它自己定义的内部类DelayedWorkQueue,其实里面的实现逻辑基本都是一样的,只不过DelayedWorkQueue里面没有使用现在的PriorityQueue,而是使用数组又实现了一遍优先级队列,本质上没有什么区别。