FutureTask 的submit源码
Future代码示例:
package com.liuyuan.disruptor.v1; import java.util.concurrent.*; public class UseFuture implements Callable<String> { private String param; public UseFuture(String param) { this.param = param; } @Override public String call() throws Exception { //模拟执行业务逻辑的耗时 TimeUnit.SECONDS.sleep(3); String result = this.param + " 处理完成!"; return result; } public static void main(String[] args) throws Exception{ String queryStr = "query1"; String queryStr2 = "query2"; FutureTask<String> future1 = new FutureTask<String>(new UseFuture(queryStr)); FutureTask<String> future2 = new FutureTask<String>(new UseFuture(queryStr2)); ExecutorService executorService = Executors.newFixedThreadPool(2); executorService.submit(future1);//异步操作 executorService.submit(future2);//异步操作 System.out.println("执行中..."); TimeUnit.SECONDS.sleep(2);//处理其他相关的任务。 String result1 = future1.get(); String result2 = future2.get(); System.out.println("数据处理完成。。" + result1); System.out.println("数据处理完成。。" + result2); } }
Future实现原理
看到上面示例代码,我们是通过executorService.submit(future1) 来提交线程的,进一步看看里面具体的逻辑。
1、 AbstractExecutorService
中submit()源码:
2、FutureTask
中run()源码:
public void run() { if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } if (ran) set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }
这个是核心代码,首先我们需要知道FutureTask中有一个volatile state全局变量,通过这个值来界定任务是否已经执行完毕。
将上面run方法一点点拆解如下:
先判断state状态,如果不是NEW说明执行完毕,直接return掉。
后面使用CAS操作,判断这个任务是否已经执行,这里FutureTask有个全局的volatile runner字段,这里通过cas将当前线程指定给runner。
这里可以防止callable被执行多次。
接着往下看:
查看set方法具体实现:
继续往下跟,查看finishCompletion方法:
FutureTask中有一个WaiteNode单链表,当执行futureTask.get()方法时,多个线程会将等待的线程的next指向下一个想要get获取结果的线程。
finishCompletion主要就是使用Unsafe.unpark()进行唤醒操作。
3,FutureTask.get() 源码
get() 方法会进行自旋操作等待,直到FutureTask中的state状态大于NORMAL(表示自行完成),然后才会通过FutureTask的outcome获取返回值。
接着往下跟awaitDone方法:
private int awaitDone(boolean timed, long nanos) throws InterruptedException { final long deadline = timed ? System.nanoTime() + nanos : 0L; WaitNode q = null; boolean queued = false; for (;;) { if (Thread.interrupted()) { removeWaiter(q); throw new InterruptedException(); } int s = state; if (s > COMPLETING) { if (q != null) q.thread = null; return s; } else if (s == COMPLETING) // cannot time out yet Thread.yield(); else if (q == null) q = new WaitNode(); else if (!queued) queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); else if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { removeWaiter(q); return state; } LockSupport.parkNanos(this, nanos); } else LockSupport.park(this); } }
还是老样子,一点点分析:
总结
结合上述分析可得 FutureTask 执行活动图如下:
同时也可以看出,在 FutureTask 中内部维护了一个单向链表 waiters , 在执行 get 的时候会向其中添加节点: