快上网专注成都网站设计 成都网站制作 成都网站建设
成都网站建设公司服务热线:028-86922220

网站建设知识

十年网站开发经验 + 多家企业客户 + 靠谱的建站团队

量身定制 + 运营维护+专业推广+无忧售后,网站问题一站解决

futuretask源码分析(推荐)

FutureTask只实现RunnableFuture接口:

成都创新互联公司网站建设由有经验的网站设计师、开发人员和项目经理组成的专业建站团队,负责网站视觉设计、用户体验优化、交互设计和前端开发等方面的工作,以确保网站外观精美、网站建设、成都网站制作易于使用并且具有良好的响应性。

该接口继承了java.lang.Runnable和Future接口,也就是继承了这两个接口的特性。

1.可以不必直接继承Thread来生成子类,只要实现run方法,且把实例传入到Thread构造函数,Thread就可以执行该实例的run方法了( Thread(Runnable) )。

2.可以让任务独立执行,get获取任务执行结果时,可以阻塞直至执行结果完成。也可以中断执行,判断执行状态等。

FutureTask是一个支持取消行为的异步任务执行器。该类实现了Future接口的方法。

如: 1. 取消任务执行

2. 查询任务是否执行完成

3. 获取任务执行结果(”get“任务必须得执行完成才能获取结果,否则会阻塞直至任务完成)。

注意:一旦任务执行完成,则不能执行取消任务或者重新启动任务。(除非一开始就使用runAndReset模式运行任务)
FutureTask支持执行两种任务, Callable 或者 Runnable的实现类。且可把FutureTask实例交由Executor执行。

源码部分(很简单):

public class FutureTask implements RunnableFuture {
  /*
   * Revision notes: This differs from previous versions of this
   * class that relied on AbstractQueuedSynchronizer, mainly to
   * avoid surprising users about retaining interrupt status during
   * cancellation races. Sync control in the current design relies
   * on a "state" field updated via CAS to track completion, along
   * with a simple Treiber stack to hold waiting threads.
   *
   * Style note: As usual, we bypass overhead of using
   * AtomicXFieldUpdaters and instead directly use Unsafe intrinsics.
   */
  /**
   * The run state of this task, initially NEW. The run state
   * transitions to a terminal state only in methods set,
   * setException, and cancel. During completion, state may take on
   * transient values of COMPLETING (while outcome is being set) or
   * INTERRUPTING (only while interrupting the runner to satisfy a
   * cancel(true)). Transitions from these intermediate to final
   * states use cheaper ordered/lazy writes because values are unique
   * and cannot be further modified.
   *
   * Possible state transitions:
   * NEW -> COMPLETING -> NORMAL
   * NEW -> COMPLETING -> EXCEPTIONAL
   * NEW -> CANCELLED
   * NEW -> INTERRUPTING -> INTERRUPTED
   */
  private volatile int state;
  private static final int NEW     = 0;
  private static final int COMPLETING  = 1;
  private static final int NORMAL    = 2;
  private static final int EXCEPTIONAL = 3;
  private static final int CANCELLED  = 4;
  private static final int INTERRUPTING = 5;
  private static final int INTERRUPTED = 6;
  /** The underlying callable; nulled out after running */
  private Callable callable;
  /** 用来存储任务执行结果或者异常对象,根据任务state在get时候选择返回执行结果还是抛出异常 */
  private Object outcome; // non-volatile, protected by state reads/writes
  /** 当前运行Run方法的线程 */
  private volatile Thread runner;
  /** Treiber stack of waiting threads */
  private volatile WaitNode waiters;
  /**
   * Returns result or throws exception for completed task.
   *
   * @param s completed state value
   */
  @SuppressWarnings("unchecked")
  private V report(int s) throws ExecutionException {
    Object x = outcome;
    if (s == NORMAL)
      return (V)x;
    if (s >= CANCELLED)
      throw new CancellationException();
    throw new ExecutionException((Throwable)x);
  }
  /**
   * Creates a {@code FutureTask} that will, upon running, execute the
   * given {@code Callable}.
   *
   * @param callable the callable task
   * @throws NullPointerException if the callable is null
   */
  public FutureTask(Callable callable) {
    if (callable == null)
      throw new NullPointerException();
    this.callable = callable;
    this.state = NEW;    // ensure visibility of callable
  }
  /**
   * Creates a {@code FutureTask} that will, upon running, execute the
   * given {@code Runnable}, and arrange that {@code get} will return the
   * given result on successful completion.
   *
   * @param runnable the runnable task
   * @param result the result to return on successful completion. If
   * you don't need a particular result, consider using
   * constructions of the form:
   * {@code Future<?> f = new FutureTask(runnable, null)}
   * @throws NullPointerException if the runnable is null
   */
  public FutureTask(Runnable runnable, V result) {
    this.callable = Executors.callable(runnable, result);
    this.state = NEW;    // ensure visibility of callable
  }
  //判断任务是否已取消(异常中断、取消等)
  public boolean isCancelled() {
    return state >= CANCELLED;
  }
  /**
  判断任务是否已结束(取消、异常、完成、NORMAL都等于结束)
  **
  public boolean isDone() {
    return state != NEW;
  }
  /**
  mayInterruptIfRunning用来决定任务的状态。
          true : 任务状态= INTERRUPTING = 5。如果任务已经运行,则强行中断。如果任务未运行,那么则不会再运行
          false:CANCELLED  = 4。如果任务已经运行,则允许运行完成(但不能通过get获取结果)。如果任务未运行,那么则不会再运行
  **/
  public boolean cancel(boolean mayInterruptIfRunning) {
    if (state != NEW)
      return false;
    if (mayInterruptIfRunning) {
      if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, INTERRUPTING))
        return false;
      Thread t = runner;
      if (t != null)
        t.interrupt();
      UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); // final state
    }
    else if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, CANCELLED))
      return false;
    finishCompletion();
    return true;
  }
  /**
   * @throws CancellationException {@inheritDoc}
   */
  public V get() throws InterruptedException, ExecutionException {
    int s = state;
    //如果任务未彻底完成,那么则阻塞直至任务完成后唤醒该线程
    if (s <= COMPLETING)
      s = awaitDone(false, 0L);
    return report(s);
  }
  /**
   * @throws CancellationException {@inheritDoc}
   */
  public V get(long timeout, TimeUnit unit)
    throws InterruptedException, ExecutionException, TimeoutException {
    if (unit == null)
      throw new NullPointerException();
    int s = state;
    if (s <= COMPLETING &&
      (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
      throw new TimeoutException();
    return report(s);
  }
  /**
   * Protected method invoked when this task transitions to state
   * {@code isDone} (whether normally or via cancellation). The
   * default implementation does nothing. Subclasses may override
   * this method to invoke completion callbacks or perform
   * bookkeeping. Note that you can query status inside the
   * implementation of this method to determine whether this task
   * has been cancelled.
   */
  protected void done() { }
  /**
  该方法在FutureTask里只有run方法在任务完成后调用。
  主要保存任务执行结果到成员变量outcome 中,和切换任务执行状态。
  由该方法可以得知:
  COMPLETING : 任务已执行完成(也可能是异常完成),但还未设置结果到成员变量outcome中,也意味着还不能get
  NORMAL  : 任务彻底执行完成
  **/
  protected void set(V v) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
      outcome = v;
      UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
      finishCompletion();
    }
  }
  /**
   * Causes this future to report an {@link ExecutionException}
   * with the given throwable as its cause, unless this future has
   * already been set or has been cancelled.
   *
   * 

This method is invoked internally by the {@link #run} method * upon failure of the computation. * * @param t the cause of failure */ protected void setException(Throwable t) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = t; UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state finishCompletion(); } } /** 由于实现了Runnable接口的缘故,该方法可由执行线程所调用。 **/ public void run() { //只有当任务状态=new时才被运行继续执行 if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable c = callable; if (c != null && state == NEW) { V result; boolean ran; try { //调用Callable的Call方法 result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } if (ran) set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } } /** 如果该任务在执行过程中不被取消或者异常结束,那么该方法不记录任务的执行结果,且不修改任务执行状态。 所以该方法可以重复执行N次。不过不能直接调用,因为是protected权限。 **/ protected boolean runAndReset() { if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return false; boolean ran = false; int s = state; try { Callable c = callable; if (c != null && s == NEW) { try { c.call(); // don't set result ran = true; } catch (Throwable ex) { setException(ex); } } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } return ran && s == NEW; } /** * Ensures that any interrupt from a possible cancel(true) is only * delivered to a task while in run or runAndReset. */ private void handlePossibleCancellationInterrupt(int s) { // It is possible for our interrupter to stall before getting a // chance to interrupt us. Let's spin-wait patiently. if (s == INTERRUPTING) while (state == INTERRUPTING) Thread.yield(); // wait out pending interrupt // assert state == INTERRUPTED; // We want to clear any interrupt we may have received from // cancel(true). However, it is permissible to use interrupts // as an independent mechanism for a task to communicate with // its caller, and there is no way to clear only the // cancellation interrupt. // // Thread.interrupted(); } /** * Simple linked list nodes to record waiting threads in a Treiber * stack. See other classes such as Phaser and SynchronousQueue * for more detailed explanation. */ static final class WaitNode { volatile Thread thread; volatile WaitNode next; WaitNode() { thread = Thread.currentThread(); } } /** 该方法在任务完成(包括异常完成、取消)后调用。删除所有正在get获取等待的节点且唤醒节点的线程。和调用done方法和置空callable. **/ private void finishCompletion() { // assert state > COMPLETING; for (WaitNode q; (q = waiters) != null;) { if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { for (;;) { Thread t = q.thread; if (t != null) { q.thread = null; LockSupport.unpark(t); } WaitNode next = q.next; if (next == null) break; q.next = null; // unlink to help gc q = next; } break; } } done(); callable = null; // to reduce footprint } /** 阻塞等待任务执行完成(中断、正常完成、超时) **/ private int awaitDone(boolean timed, long nanos) throws InterruptedException { final long deadline = timed ? System.nanoTime() + nanos : 0L; WaitNode q = null; boolean queued = false; for (;;) { /** 这里的if else的顺序也是有讲究的。 1.先判断线程是否中断,中断则从队列中移除(也可能该线程不存在于队列中) 2.判断当前任务是否执行完成,执行完成则不再阻塞,直接返回。 3.如果任务状态=COMPLETING,证明该任务处于已执行完成,正在切换任务执行状态,CPU让出片刻即可 4.q==null,则证明还未创建节点,则创建节点 5.q节点入队 6和7.阻塞 **/ if (Thread.interrupted()) { removeWaiter(q); throw new InterruptedException(); } int s = state; if (s > COMPLETING) { if (q != null) q.thread = null; return s; } else if (s == COMPLETING) // cannot time out yet Thread.yield(); else if (q == null) q = new WaitNode(); else if (!queued) queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); else if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { removeWaiter(q); return state; } LockSupport.parkNanos(this, nanos); } else LockSupport.park(this); } } /** * Tries to unlink a timed-out or interrupted wait node to avoid * accumulating garbage. Internal nodes are simply unspliced * without CAS since it is harmless if they are traversed anyway * by releasers. To avoid effects of unsplicing from already * removed nodes, the list is retraversed in case of an apparent * race. This is slow when there are a lot of nodes, but we don't * expect lists to be long enough to outweigh higher-overhead * schemes. */ private void removeWaiter(WaitNode node) { if (node != null) { node.thread = null; retry: for (;;) { // restart on removeWaiter race for (WaitNode pred = null, q = waiters, s; q != null; q = s) { s = q.next; if (q.thread != null) pred = q; else if (pred != null) { pred.next = s; if (pred.thread == null) // check for race continue retry; } else if (!UNSAFE.compareAndSwapObject(this, waitersOffset, q, s)) continue retry; } break; } } } // Unsafe mechanics private static final sun.misc.Unsafe UNSAFE; private static final long stateOffset; private static final long runnerOffset; private static final long waitersOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<?> k = FutureTask.class; stateOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("state")); runnerOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("runner")); waitersOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("waiters")); } catch (Exception e) { throw new Error(e); } } }

总结

以上就是本文关于futuretask源码分析(推荐)的全部内容,希望对大家有所帮助。感兴趣的朋友可以参阅:Java利用future及时获取多线程运行结果、浅谈Java多线程处理中Future的妙用(附源码)、futuretask用法及使用场景介绍等,有什么问题可以随时留言,欢迎大家一起交流讨论。


名称栏目:futuretask源码分析(推荐)
标题URL:http://6mz.cn/article/gepdso.html

其他资讯