FutureTask获取线程返回值原理、源码分析

先看一段FutureTask获取线程返回值简单应用的代码,以下基于jdk8进行源码分析。

package com.lanhuigu.demo.createthread;

import java.util.concurrent.Callable;

import java.util.concurrent.ExecutionException;

import java.util.concurrent.FutureTask;

/**

 * 实现Callable接口,获取线程执行返回值

 * @author yihonglei

 * @date 2018/9/12 16:43

 */

public class MyCallable implements Callable<String> {

    /**

     * 实现Callable中的call方法

     * @author yihonglei

     * @date 2018/9/12 17:01

     */

    public String call() throws Exception {

        return "Test Callable";

    }

    public static void main(String[] args) {

        /** 根据MyCallable创建FutureTask对象 */

        FutureTask<String> futureTask = new FutureTask<>(new MyCallable());

        try {

            /** 启动线程 */

            new Thread(futureTask).start();

            /** 获取线程执行返回值 */

            String s = futureTask.get();

            /** 打印返回值 */

            System.out.println(s);

        } catch (InterruptedException e) {

            e.printStackTrace();

        } catch (ExecutionException e) {

            e.printStackTrace();

        }

    }

}

程序运行结果:

FutureTask获取线程返回值原理、源码分析

成功拿到了线程执行的返回值。

以下从源码角度分析拿到返回值的全过程,首先需要简单了解下Callable和FutureTask的结构。

Callable是一个函数式接口,源码如下:

package java.util.concurrent;

@FunctionalInterface

public interface Callable<V> {

    V call() throws Exception;

}

该接口有一个call方法,返回任意类型值。

FutureTask实现RunnableFuture接口,而RunnableFuture继承了Runnable, Future<V>,源码如下:

package java.util.concurrent;

import java.util.concurrent.locks.LockSupport;

public class FutureTask<V> implements RunnableFuture<V> {

 ......

}

package java.util.concurrent;

public interface RunnableFuture<V> extends Runnable, Future<V> {

    /**

     * Sets this Future to the result of its computation

     * unless it has been cancelled.

     */

    void run();

}

所以FutureTask具有Runnable和Future功能,因此,在上面的Demo中,以下代码具有Runable特性:

/** 根据MyCallable创建FutureTask对象 */

FutureTask<String> futureTask = new FutureTask<>(new MyCallable());

创建线程对象,通过start()方法启动线程:

/** 启动线程 */

new Thread(futureTask).start();

start()方法源码如下:

public synchronized void start() {

    if (threadStatus != 0)

        throw new IllegalThreadStateException();

    group.add(this);

    boolean started = false;

    try {

        start0();

        started = true;

    } finally {

        try {

            if (!started) {

                group.threadStartFailed(this);

            }

        } catch (Throwable ignore) {

            /* do nothing. If start0 threw a Throwable then

            it will be passed up the call stack */

        }

    }

}

// 本地方法

private native void start0();

start()方法最后会调用本地方法,由JVM通知操作系统,创建线程,最后线程通过JVM访问到Runnable中的run()方法。

而FutureTask实现了Runnable的run()方法,看下FutureTask中的run()方法源码:

public void run() {

    if (state != NEW ||

        !UNSAFE.compareAndSwapObject(this, runnerOffset,

                                     null, Thread.currentThread()))

        return;

    try {

        /** 

          这里的callable就是我们创建FutureTask的时候传进来的MyCallable对象,

          该对象实现了Callable接口的call()方法。

        */

        Callable<V> c = callable;

        if (c != null && state == NEW) {

            V result;

            boolean ran;

            try {

                /** 

                  调用Callable的call方法,即调用实现类MyCallable的call()方法,

                  执行完会拿到MyCallable的call()方法的返回值“Test Callable”。

                */

                result = c.call();

                ran = true;

            } catch (Throwable ex) {

                result = null;

                ran = false;

                setException(ex);

            }

            if (ran)

                /** 将返回值传入到set方法中,这里是能获取线程执行返回值的关键 */

                set(result);

        }

    } finally {

        // runner must be non-null until state is settled to

        // prevent concurrent calls to run()

        runner = null;

        // state must be re-read after nulling runner to prevent

        // leaked interrupts

        int s = state;

        if (s >= INTERRUPTING)

            handlePossibleCancellationInterrupt(s);

    }

}

从run()方法源码可以知道,MyCallabel执行call()方法的返回值被传入到了一个set()方法中,能拿到线程返回值最关键的

就是这个FutureTask的set()方法源码:

protected void set(V v) {

    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {

        /** 

          将MyCallable执行call()方法的返回值传进来赋值给了outcome,

          这个outcome是FutureTask的一个成员变量。

          该变量用于存储线程执行返回值或异常堆栈,通过对应的get()方法获取值。

          private Object outcome;

        */

        outcome = v;

        UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state

        finishCompletion();

    }

}

到这里,得出一个结论就是,MyCallable执行的call()方法结果通过FutureTask的set()方法存到了成员变量outcome中,

通过我们熟悉的get方法就可以获取到outcome对应赋的值。

在Demo中获取返回值的代码:

/** 获取线程执行返回值 */

String s = futureTask.get();

FutureTask中get()方法的源码:

public V get() throws InterruptedException, ExecutionException {

    int s = state;

    if (s <= COMPLETING)

        s = awaitDone(false, 0L);

    /** 调用report方法 */

    return report(s);

}

private V report(int s) throws ExecutionException {

    /** outcome赋值给Object x */

    Object x = outcome;

    if (s == NORMAL)

       /** 返回outcome的值,也就是线程执行run()方法时通过set()方法放进去的MyCallable的call()执行的返回值 */

       return (V)x;

    if (s >= CANCELLED)

        throw new CancellationException();

    throw new ExecutionException((Throwable)x);

}

get()方法调用report()方法,report()方法会将outcome赋值并返回,set方法成功拿到返回的outcome,

也就是MyCallable()的call()方法执行结果。

到这里,我们大概理解了FutureTask.get()能拿到线程执行返回值的本质原理,也就基于FutureTask的成员变量

outcome进行的set赋值和get取值的过程。

下面简单总结一下过程:

1)FutureTask通过MyCallable创建;

2)new Thread()创建线程,通过start()方法启动线程;

3)执行FutureTask中的run()方法;

4)run()方法中调用了MyCallable中的call()方法,拿到返回值set到FutureTask的outcome成员变量中;

5)FutureTask通过get方法获取outcome对象值,从而成功拿到线程执行的返回值;

其实,本质上就是一个基于FutureTask成员变量outcome进行的set和get的过程,饶了一圈而已。

--------------------- 

原文:https://blog.csdn.net/yhl_jxy/article/details/82664829