硬核详解FutureTask设计与实现
@[toc]
# 写在文章开头
最近看到一篇比较不错的FutureTask实践,于是对FutureTask源码进行了研究,而本文将从实践和源码两个角度分析FutureTask的设计与实现思路,希望对你有帮助。
我是 SharkChili ,Java 开发者,Java Guide 开源项目维护者。欢迎关注我的公众号:写代码的SharkChili,也欢迎您了解我的开源项目 mini-redis:https://github.com/shark-ctrl/mini-redis (opens new window)。
为方便与读者交流,现已创建读者群。关注上方公众号获取我的联系方式,添加时备注加群即可加入。
# 详解FutureTask
# FutureTask使用示例
我们的批量线程会进行尝试创建一些任务执行,同时我们希望每个任务只有有一个线程去执行,其他线程如果拿到这个任务准备执行时发现这个任务已经在执行,则等待这个任务的返回结果。
如下图,假设我们的第一个线程提交task-1至清单成功后,这个线程就会执行该任务,而线程2同样想提交这个任务发现该任务已存在,则直接等待清单中记录的这个任务的结果返回。而线程3则也是因为第一次提交任务,所以提交清单成功并执行。

我们完全可以通过FutureTask做到这一点,先来说说任务清单,它用于存储正在执行的任务和任务名,为了保证多线程并发操作安全,笔者直接采用ConcurrentHashMap:
//保存执行的任务清单
private static ConcurrentHashMap<String, Future<String>> taskCache = new ConcurrentHashMap<>();
2
每一个线程执行任务则都是通过executionTask方法,该逻辑一旦检查任务不存在则创建,然后通过乐观锁方式保证任务不存在时才能提交,完成这些操作后通过get等待结果返回。
需要注意的是,这里笔者为了保证逻辑可用做了一点小小的计数处理,每当一个任务通过run执行时,笔者会用原子类自增一下,以此判断多线程执行该方法时有没有重复执行任务的情况出现。
private static AtomicInteger counter = new AtomicInteger(0);
public static String executionTask(String taskName) {
while (true){//执行到任务成功
if (!taskCache.containsKey(taskName)) {//如果任务不存在则创建任务
Callable<String> callable = () -> taskName;
FutureTask<String> futureTask = new FutureTask<>(callable);
//双重锁校验避免任务重复提交
Future<String> future = taskCache.putIfAbsent(taskName, futureTask);
if (future == null) {//如果返回空则说明这个任务之前没有提交过,当前线程直接执行
futureTask.run();
//累加执行的任务数
counter.incrementAndGet();
}
}
//任务非空或提交任务完成的线程在这里等待任务返回
try {
return taskCache.get(taskName).get();
} catch (Exception e) {
//如果保存则从清单中移除
taskCache.remove(taskName);
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
对应的测试代码如下,笔者提交10w个线程执行10个任务,从计数器的输出结果来看,执行了10个任务,没有问题:
public static void main(String[] args) throws Exception {
//创建线程池
ExecutorService executorService = Executors.newFixedThreadPool(10_000);
for (int i = 0; i < 10_000; i++) {
//通过取模运算保证10w个线程只会创建 名为10以内的任务
int finalI = i % 10;
//提交任务
executorService.submit(() -> FutureTaskExample.executionTask(String.valueOf(finalI)));
}
//等待结束
executorService.shutdown();
while (!executorService.isTerminated()) {
}
//查看计数
System.out.println(counter.get());//10
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# FutureTask在闭锁下的哲学
本质上FutureTask也可以是一种闭锁,即在FutureTask对应线程未完成运算前,FutureTask这个闭锁就像一个大门一样不允许所有线程通过,只有FutureTask完成运算进入完成状态后,其它线程才能通过。
例如我们希望通过FutureTask执行一些耗时的运算,此时就可以:
- 通过
FutureTask提交任务 - 异步任务运算期间,执行一些其它任务
- 通过get阻塞等待FutureTask结果返回
- FutureTask任务返回线程通过,打印输出结果
所以FutureTask也是一个高效的异步工具,我们可以将一些耗时的操作提前启动,着手其它耗时操作等待完成后拿结果:

对应的我们也给出上述实现的代码示例:
public class Task {
//休眠完成后,返回一个随机数
private final FutureTask<Integer> futureTask = new FutureTask<>(() -> {
ThreadUtil.sleep(1000);
return RandomUtil.randomInt();
});
private final Thread thread = new Thread(futureTask);
//启动任务执行
public void start() {
thread.start();
}
//阻塞获取结果
public int get() throws ExecutionException, InterruptedException {
return futureTask.get();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
使用实例如下,即通过FutureTask执行异步运算后,通过get执行闭锁控制异步流程:
Task task = new Task();
long begin = System.currentTimeMillis();
task.start();
//futureTask异步运行期间,执行一些业务逻辑
System.out.println("do something...");
//阻塞等待futureTask完成,即利用get方法实现一个闭锁的操作
int result = task.get();
long end = System.currentTimeMillis();
Console.log("result: {},cost:{}ms", result, end - begin);
2
3
4
5
6
7
8
9
10
11
12
输出结果如下,可以看到整体来说利用了异步运算期间执行一些其它操作,同时还使用get保证整体流程顺序正常:
do something...
result: -1158881871,cost:1009ms
2
# FutureTask状态机的扭转
在正式进行源码介绍之前,笔者先简单介绍一下FutureTask执行状态的扭转,FutureTask在创建时是全新的任务,此时它的状态就是NEW,一旦调用run之后就会开始运行,此时就会出现4个分支:
- 当任务正确执行完成之后,先将状态设置为完成中
COMPLETING,然后将执行结果存入outcome变量中,完成后再将状态设置为正常结束,即NORMAL。 - 一旦任务执行出现异常,
FutureTask则同样将任务设置为完成中,将结果设置为null之后,调整状态为执行出错EXCEPTIONAL。 - 当任务在执行过程中,需要进行打断操作时,
FutureTask会将状态设置为打断中INTERRUPTING,一旦线程正常被打断,任务就会被设置为终态INTERRUPTED。 FutureTask同样支持不打断当前执行线程,这一点笔者会在后文中说明,这种情况则直接将线程设置为CANCELLED。
一旦状态按照预期调整为终态后,FutureTask就会唤醒那些等待任务执行完成的线程:

这4种状态扭转,我们也可以通过阅读FutureTask上关于状态变量的注释了解:
/*
* Possible state transitions:
* NEW -> COMPLETING -> NORMAL
* NEW -> COMPLETING -> EXCEPTIONAL
* NEW -> CANCELLED
* NEW -> INTERRUPTING -> INTERRUPTED
*/
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# FutureTask如何执行
FutureTask在创建之初时的初始态为NEW,也就是整数变量0,一旦某个线程执行这个任务,JDK8版本会通过原子操作将记录运行线程的地址(这个位置的偏移量用runnerOffset记录)设置为当前线程,一旦操作成功则执行该task封装的Callable任务,如果运行成功则将result设置为运行后的结果,并将ran标志为true,并将任务状态设置为NORMAL:

反之如果运行失败,则将ran设置为false,result设置为空,并将错误信息设置到stateOffset标志位上了,将任务运行状态设置为终态EXCEPTIONAL,然后唤醒其他需要处理这个任务的线程:

对此我们给出FutureTask的底层实现,可以看到FutureTask只有状态为NEW且通过CAS操作将runner设置为自己的线程才能执行任务,而后续的线程如果看到state不为new则只能获取结果,而不能执行,这就FutureTask避免重复运行的核心设计所在。
进行乐观锁上锁拿到执行权之后,就会基于CAS上锁的线程进行任务调用,对应的结果扭转就如上文所说,这里读者可以参考笔者注释自行阅读:
public void run() {
//1. 为NEW 说明是第一次运行,则可以通过CAS操作获取执行权
//2. 不为NEW,直接返回
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
//如果任务不为空且状态为NEW则调用call运行
if (c != null && state == NEW) {
V result;
boolean ran;
try {
//运行成功则记录结果到result 并将ran 设置为true
result = c.call();
ran = true;
} catch (Throwable ex) {
//如果报错则result设置为空,并将ran设置为false
result = null;
ran = false;
//并将任务状态设置为错误
setException(ex);
}
//如果运行成功则将结果存到outcome中
if (ran)
set(result);
}
} finally {
//......
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
这里我们直接查看获取结果成功后的状态设置方法set方法,可以看到其内部先通过cas将status即状态字段设置为COMPLETING,完成结果设置之后,再通过putOrderedInt将状态设置为终态NORMAL,这么做的原因是为什么呢?
protected void set(V v) {
//先CAS设置为完成中
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
//设置结果完成后,才能设置状态为正常结束
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
}
}
2
3
4
5
6
7
8
9
多线程执行FutureTask情况下,大部分判断都需要依赖于COMPLETING这个中间态(利用get和COMPLETING方法),所以这个状态的可见性要求相对高一些,所以在进行结果设置之前,先通过CAS的方式进行更新status字段状态,这种操作是需要将storeLoad屏障,虽然性能表现差一些,但可以保证可见性和有序性,所以先通过这个操作保证其他线程对于这个中态状态可见保证并发操作一致性。

然后完成任务处理结果设置,此时在逻辑上我们可以认定这个任务是处理完成了,因为大部分的逻辑判断都是依赖于COMPLETING,对于终态(NORMAL、EXCEPTIONAL、CANCELLED、INTERRUPTED)的可见性要求不高,所以FutureTask的设计者直接采用putOrderedInt这种操作保证写入不会被重排序,但不会立即刷到一致性内存行上,所以在性能表现上会出色一些。
上述对于正确处理结果的设置,可以在set这个方法的源码上得以印证,读者可以结合上文笔者所说和注释自行了解这段逻辑:
protected void set(V v) {
//通过cas操作volatile变量status完成中态设置,又保证的多核心可见性
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
//设置处理结果到outcome上,后续其他线程get都是从这个outcome变量上获取
outcome = v;
//因为终态可见性要求不高,所以通过putOrderedInt设置终态,保证写入有序性
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
//唤醒其他线程处理当前任务
finishCompletion();
}
}
2
3
4
5
6
7
8
9
10
11
# 多线程如何获取FutureTask执行结果
其他线程需要从get方法获取结果时,其内部本质就是调用awaitDone等待完成,假设我们用putOrderedInt写入,且状态对于当前线程不可见,那么这个线程要做的也仅仅是yield让出处理器的使用权,相比之下volatile写这种需要增加内存屏障写入的开销,这种内存消耗无论从概率还是消耗上,都是划得来的。
最后完成上述操作,通过finishCompletion通知其他的等待线程可以开始处理FutureTask了。

对此我们也给出get的源码和注释,读者可结合上文感知一下逻辑:
public V get() throws InterruptedException, ExecutionException {
int s = state;
//状态在COMPLETING及其之前调用awaitDone等待完成
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {
//......
//如果状态处于中态完成中,则让出线程对于处理器的使用权
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
//......
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
我们给出状态设置为终态之后,finishCompletion的逻辑,比较简单,可以看到它仅是获取当前等待节点的后一个线程的WaitNode ,通过unpark将其唤醒,然后获取其后继节点继续进行唤醒操作:
private void finishCompletion() {
// assert state > COMPLETING;
//获取后继节点
for (WaitNode q; (q = waiters) != null;) {
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
//将该等待节点直接唤醒
Thread t = q.thread;
if (t != null) {
q.thread = null;
LockSupport.unpark(t);
}
//指向已唤醒节点的后继节点
WaitNode next = q.next;
//若为空直接退出
if (next == null)
break;
//将后继指针设置为空,辅助gc,并让q指向后续节点,继续进行唤醒操作
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
//......
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# FutureTask的异常处理
有了上文设置成功的逻辑解说,相信读者对于setException的逻辑处理也就比较熟悉了,这里笔者就不再展开,读者可自行查看代码和注释:
protected void setException(Throwable t) {
//原子操作设置为完成中,保证可见性
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
//结果设置为null
outcome = t;
//通过高性能有序写putOrderedInt设置终态为错误态
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
//唤醒其他线程
finishCompletion();
}
}
2
3
4
5
6
7
8
9
10
11
# FutureTask如何取消
任务取消操作有两种情况,如果我们传参为true,则会通过原子操作将状态设置为打断中,再尝试打断正在执行的线程,然后将状态设置为已打断这个终态INTERRUPTED,再唤醒其他线程。
若传参设置为false,则直接原子操作设置为已取消,然后直接唤醒其他线程。
public boolean cancel(boolean mayInterruptIfRunning) {
//如果状态不为new则进行状态原子设置操作
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try {
//如果入参为true则尝试打断线程
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
t.interrupt();
} finally { // final state
//完成打断设置状态为INTERRUPTED
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
//唤醒其他线程
finishCompletion();
}
return true;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# FutureTask如何避免任务重复执行
我们假设当前任务为初始化NEW,这意味所有任务都可以尝试进行上乐观锁获取执行劝,如下所示只有设置成功的线程才可以进入后续的执行,而其他线程则直接返回:
public void run() {
//只有状态为NEW的情况下,当前执行的线程才会通过CAS获取乐观锁,之后获取乐观锁成功的才能执行任务,其他的线程会直接返回
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
//......
//执行任务逻辑
}
2
3
4
5
6
7
8
9
# JDK8版本FutureTask的变化
这一点,笔者参阅了1.6版本的FutureTask实现,其内部是通过AQS来维护需要执行任务的线程及其状态,而JDK8版本则专门为FutureTask创建了一个state字段,多线程之间通过CAS操作进行维护。
我们先来说明一下早期版本的设计再说明原因,在较早的版本FutureTask内部线程竞争关系和任务状态都采用AQS进行维护,假设当前任务被取消,则执行这个操作线程会通过原子操作将AQS队列的state字段更新为CANCELLED。

同理执行任务时,也是先通过AQS的原子操作将状态设置为RUNNING,执行完成之后将操作结果原子修改为RAN,并将结果记录到FutureTask的reuslt中:

对此我们给出老版本的FutureTask的run方法,逻辑如上文所说,笔者这里就不多做赘述:
public void run() {
sync.innerRun();
}
void innerRun() {
//原子操作将状态设置为RUNNING
if (!compareAndSetState(0, RUNNING))
return;
try {
//获取当前线程,检查当前状态还是RUNNING则用innerSet当前执行执行callable,然后将结果设置到result中,并将状态修改为RAN
runner = Thread.currentThread();
if (getState() == RUNNING) // recheck after setting thread
innerSet(callable.call());
else
releaseShared(0); // cancel
} catch (Throwable ex) {
//......
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
我们来看看cancel方法,可以看到只要不是RAN这种终态,就可以尝试打断线程:
public boolean cancel(boolean mayInterruptIfRunning) {
return sync.innerCancel(mayInterruptIfRunning);
}
boolean innerCancel(boolean mayInterruptIfRunning) {
for (; ; ) {
//获取状
int s = getState();
//如果是RAN或者canceled则直接返回
if (ranOrCancelled(s))
return false;
//将状态设置为CANCELLED
if (compareAndSetState(s, CANCELLED))
break;
}
// 按照mayInterruptIfRunning的布尔值决定是否打断线程
if (mayInterruptIfRunning) {
Thread r = runner;
if (r != null)
r.interrupt();
}
releaseShared(0);
done();
return true;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
有意思的来了,我们试想这样一个场景:
- 假设我们执行上述
run方法的任务处于RUNNING状态,此时当前线程1就可以调用interrupt打断这个线程 - 此时线程2看到当前任务处于Running(源码说明不是ran或者canceled即可打断)直接将其打断
- 又假设我们这个线程跑去执行别的
task任务 - 所以线程1就可能在执行别的任务期间被打断,进而出现幻觉死:

于是就有了JDK7之后版本,FutureTask专门使用一个volatile的state维护任务的状态,并且在打断前设置一个中态INTERRUPTING 即打断中,执行任务的线程1在运行完成将结果存入outcome之后,看到打断中这个中态就会循环调用yield让出执行权,直到执行cancel操作的线程完成,由此保证了打断操作永远被限制在当前FutureTask生命周期以内。

对应我们给出cancel操作的源码,可以看到它是先设置为打断中INTERRUPTING 然后在进行打断再设置打断完成INTERRUPTED的终态:
public boolean cancel(boolean mayInterruptIfRunning) {
//原子操作设置状态为打断中
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try {
//尝试打断线程,完成后设置为INTERRUPTED
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
t.interrupt();
} finally { // final state
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
finishCompletion();
}
return true;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
假设上一步的线程的打断操作还未完成,这里可以直接理解为执行interrupt打断之前的代码段,而执行我们的run的线程已经运行完成并将结果设置到outcome时,如下所示会执行finally 语句块的handlePossibleCancellationInterrupt,它在看到打断中的操作后会循环调用Thread.yield()让当前线程让出CPU执行权,直到其他线程的cancel操作完成:
public void run() {
//.....
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
//执行逻辑运算
result = c.call();
ran = true;
} catch (Throwable ex) {
//......
}
//将结果设置到outcome中
if (ran)
set(result);
}
} finally {
//......
//读取到打断中的状态,调用handlePossibleCancellationInterrupt让线程yield出去,保证打断操作限定在当前线程内
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
private void handlePossibleCancellationInterrupt(int s) {
//循环yield直到cacnel操作完成
if (s == INTERRUPTING)
while (state == INTERRUPTING)
Thread.yield(); // wait out pending interrupt
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
关于这个问题我们也可以参考源码上的注释:
/*
* Revision notes: This differs from previous versions of this
* class that relied on AbstractQueuedSynchronizer, mainly to
* avoid surprising users about retaining interrupt status during
* cancellation races. Sync control in the current design relies
* on a "state" field updated via CAS to track completion, along
* with a simple Treiber stack to hold waiting threads.
*
* Style note: As usual, we bypass overhead of using
* AtomicXFieldUpdaters and instead directly use Unsafe intrinsics.
*/
2
3
4
5
6
7
8
9
10
11
# 小结
自此我们将FutureTask的设计与实现都分析完成了,希望对你有帮助。
我是 SharkChili ,Java 开发者,Java Guide 开源项目维护者。欢迎关注我的公众号:写代码的SharkChili,也欢迎您了解我的开源项目 mini-redis:https://github.com/shark-ctrl/mini-redis (opens new window)。
为方便与读者交流,现已创建读者群。关注上方公众号获取我的联系方式,添加时备注加群即可加入。
# 参考
FutureTask中的putOrderedInt怎么理解?:https://www.zhihu.com/question/394805625/answer/2381547268 (opens new window)
Java四种内存屏障详解,LoadLoad、LoadStore、StoreLoad、StoreStore :https://www.cnblogs.com/yfeil/p/18125208 (opens new window)
【细谈Java并发】谈谈FutureTask:https://www.jianshu.com/p/32dee2e483b8 (opens new window)
《Java并发编程的艺术》
《Java并发编程实战》