为什么要有线程池

线程池能够对线程进行统一分配,调优和监控:

  • 降低资源消耗(线程无限制地创建,然后使用完毕后销毁)
  • 提高响应速度(无须创建线程)
  • 提高线程的可管理性

ThreadPoolExecutor 使用详解

其实 java 线程池的实现原理很简单,说白了就是一个线程集合 workerSet 和一个阻塞队列 workQueue。当用户向线程池提交一个任务(也就是线程)时,线程池会先将任务放入 workQueue 中。workerSet 中的线程会不断的从 workQueue 中获取线程然后执行。当 workQueue 中没有任务的时候,worker 就会阻塞,直到队列中有任务了就取出来继续执行。

Execute 原理

当一个任务提交至线程池之后:

  1. 线程池首先当前运行的线程数量是否少于 corePoolSize。如果是,则创建一个新的工作线程来执行任务。如果都在执行任务,则进入 2。
  2. 判断 BlockingQueue 是否已经满了,倘若还没有满,则将线程放入 BlockingQueue。否则进入 3。
  3. 如果创建一个新的工作线程将使当前运行的线程数量超过 maximumPoolSize ,则交给 RejectedExecutionHandler 来处理任务,否则创建一个新的线程来执行任务。

当 ThreadPoolExecutor 创建新线程时,通过 CAS 来更新线程池的状态 ctl.

参数

1
2
3
4
5
6
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler)
  • corePoolSize 线程池中的核心线程数,当提交一个任务时,线程池创建一个新线程执行任务,直到当前线程数等于 corePoolSize。如果当前线程数为 corePoolSize,继续提交的任务被保存到阻塞队列中,等待被执行;如果执行了线程池的 prestartAllCoreThreads() 方法,线程池会提前创建并启动所有核心线程。
  • workQueue 用来保存等待被执行的任务的阻塞队列. 在 JDK 中提供了如下阻塞队列:
    • ArrayBlockingQueue: 基于数组结构的有界阻塞队列,按 FIFO 排序任务;
    • LinkedBlockingQuene: 基于链表结构的阻塞队列,按 FIFO 排序任务,吞吐量通常要高于 ArrayBlockingQuene;
    • SynchronousQuene: 一个不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于 LinkedBlockingQuene;
    • PriorityBlockingQuene: 具有优先级的无界阻塞队列;

LinkedBlockingQueueArrayBlockingQueue 在插入删除节点性能方面更优,但是二者在 put(), take() 任务的时均需要加锁,SynchronousQueue 使用无锁算法,根据节点的状态判断执行,而不需要用到锁,其核心是 Transfer.transfer().

  • maximumPoolSize 线程池中允许的最大线程数。如果当前阻塞队列满了,且继续提交任务,则创建新的线程执行任务,前提是当前线程数小于 maximumPoolSize;当阻塞队列是无界队列,maximumPoolSize 则不起作用, 因为无法提交至核心线程池的线程会一直持续地放入 workQueue.
  • keepAliveTime 线程空闲时的存活时间,即当线程没有任务执行时,该线程继续存活的时间;默认情况下,该参数只在线程数大于 corePoolSize 时才有用,超过这个时间的空闲线程将被终止;
  • unit keepAliveTime 的单位。
  • threadFactory 创建线程的工厂,通过自定义的线程工厂可以给每个新建的线程设置一个具有识别度的线程名。默认为 DefaultThreadFactory。
  • handler 线程池的饱和策略,当阻塞队列满了,且没有空闲的工作线程,如果继续提交任务,必须采取一种策略处理该任务,线程池提供了4种策略:
    • AbortPolicy:直接抛出异常,默认策略;
    • CallerRunsPolicy:用调用者所在的线程来执行任务;
    • DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;
    • DiscardPolicy:直接丢弃任务;

当然也可以根据应用场景实现 RejectedExecutionHandler 接口,自定义饱和策略,如记录日志或持久化存储不能处理的任务。

三种类型

newFixedThreadPool

1
2
3
4
5
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}

线程池的线程数量达 corePoolSize 后,即使线程池没有可执行任务时,也不会释放线程。

FixedThreadPool 的工作队列为无界队列 LinkedBlockingQueue (队列容量为 Integer.MAX_VALUE), 这会导致以下问题:

  • 线程池里的线程数量不超过 corePoolSize,这导致了 maximumPoolSize 和 keepAliveTime 将会是个无用参数。
  • 由于使用了无界队列, 所以 FixedThreadPool 永远不会拒绝, 即饱和策略失效。

newSingleThreadExecutor

1
2
3
4
5
6
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}

初始化的线程池中只有一个线程,如果该线程异常结束,会重新创建一个新的线程继续执行任务,唯一的线程可以保证所提交任务的顺序执行。

由于使用了无界队列,所以 SingleThreadPool 永远不会拒绝,即饱和策略失效。

newCachedThreadPool

1
2
3
4
5
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}

线程池的线程数可达到 Integer.MAX_VALUE,即 2147483647,内部使用 SynchronousQueue 作为阻塞队列;和 newFixedThreadPool 创建的线程池不同, newCachedThreadPool 在没有任务执行时,当线程的空闲时间超过 keepAliveTime,会自动释放线程资源,当提交新任务时,如果没有空闲线程,则创建新线程执行任务,会导致一定的系统开销;执行过程与前两种稍微不同:

  1. 主线程调用 SynchronousQueue 的 offer() 方法放入 task,倘若此时线程池中有空闲的线程尝试读取 SynchronousQueue 的 task,即调用了 SynchronousQueue 的 poll(),那么主线程将该 task 交给空闲线程. 否则执行 2。

  2. 当线程池为空或者没有空闲的线程,则创建新的线程执行任务。

  3. 执行完任务的线程倘若在 60s 内仍空闲,则会被终止。因此长时间空闲的 CachedThreadPool 不会持有任何线程资源。

关闭线程池

遍历线程池中的所有线程,然后逐个调用线程的 interrupt 方法来中断线程.

关闭方式 - shutdown

将线程池里的线程状态设置成 SHUTDOWN 状态,然后中断所有没有正在执行任务的线程。

关闭方式 - shutdownNow

将线程池里的线程状态设置成STOP状态,然后停止所有正在执行或暂停任务的线程。只要调用这两个关闭方法中的任意一个,isShutDown() 返回 true。当所有任务都成功关闭了, isTerminated() 返回true。

ThreadPoolExecutor 源码详解

几个关键属性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
//这个属性是用来存放 当前运行的 worker 数量以及线程池状态的
//int是 32 位的,这里把 int 的高 3 位拿来充当线程池状态的标志位,后 29 位拿来充当当前运行 worker 的数量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//存放任务的阻塞队列
private final BlockingQueue<Runnable> workQueue;
//worker的集合,用set来存放
private final HashSet<Worker> workers = new HashSet<Worker>();
//历史达到的worker数最大值
private int largestPoolSize;
//当队列满了并且worker的数量达到maxSize的时候,执行具体的拒绝策略
private volatile RejectedExecutionHandler handler;
//超出coreSize的worker的生存时间
private volatile long keepAliveTime;
//常驻worker的数量
private volatile int corePoolSize;
//最大worker的数量,一般当workQueue满了才会用到这个参数
private volatile int maximumPoolSize;

内部状态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;

// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;

// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }

其中 AtomicInteger 变量 ctl 的功能非常强大: 利用低29位表示线程池中线程数,通过高3位表示线程池的运行状态:

  • RUNNING: -1 << COUNT_BITS,即高3位为111,该状态的线程池会接收新任务,并处理阻塞队列中的任务;

  • SHUTDOWN: 0 << COUNT_BITS,即高3位为000,该状态的线程池不会接收新任务,但会处理阻塞队列中的任务;

  • STOP : 1 << COUNT_BITS,即高3位为001,该状态的线程不会接收新任务,也不会处理阻塞队列中的任务,而且会中断正在运行的任务;

  • TIDYING : 2 << COUNT_BITS,即高3位为010, 所有的任务都已经终止;

  • TERMINATED: 3 << COUNT_BITS,即高3位为011, terminated() 方法已经执行完成

任务的执行

execute –> addWorker –>runworker (getTask)

线程池的工作线程通过 Woker 类实现,在 ReentrantLock 锁的保证下,把 Woker 实例插入到 HashSet 后,并启动 Woker 中的线程。 从 Woker 类的构造方法实现可以发现: 线程工厂在创建线程 thread 时,将 Woker 实例本身 this 作为参数传入,当执行 start 方法启动线程 thread 时,本质是执行了 Worker 的 runWorker 方法。 firstTask 执行完成之后,通过 getTask 方法从阻塞队列中获取等待的任务,如果队列中没有任务,getTask 方法会被阻塞并挂起,不会占用 cpu 资源;

execute 方法

ThreadPoolExecutor.execute(task) 实现了 Executor.execute(task)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
//workerCountOf获取线程池的当前线程数;小于corePoolSize,执行addWorker创建新线程执行command任务
if (addWorker(command, true))
return;
c = ctl.get();
}
// double check: c, recheck
// 线程池处于RUNNING状态,把提交的任务成功放入阻塞队列中
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// recheck and if necessary 回滚到入队操作前,即倘若线程池shutdown状态,就remove(command)
//如果线程池没有RUNNING,成功从阻塞队列中删除任务,执行reject方法处理任务
if (! isRunning(recheck) && remove(command))
reject(command);
//线程池处于running状态,但是没有线程,则创建线程
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 往线程池中创建新的线程失败,则reject任务
else if (!addWorker(command, false))
reject(command);
}
  • 为什么需要 double check 线程池的状态?

在多线程环境下,线程池的状态时刻在变化,而 ctl.get() 是非原子操作,很有可能刚获取了线程池状态后线程池状态就改变了。判断是否将 command 加入 workque 是线程池之前的状态。倘若没有 double check,万一线程池处于非 running 状态(在多线程环境下很有可能发生),那么 command 永远不会执行。

addWorker 方法

从方法 execute 的实现可以看出: addWorker 主要负责创建新的线程并执行任务,线程池创建新线程执行任务时,需要获取全局锁:

1
private final ReentrantLock mainLock = new ReentrantLock();
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
private boolean addWorker(Runnable firstTask, boolean core) {
// CAS更新线程池数量
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);

// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;

for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}

boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
// 线程池重入锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());

if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start(); // 线程启动,执行任务(Worker.thread(firstTask).start());
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}

Worker 类的 runworker 方法

1
2
3
4
5
6
7
8
9
10
11
12
private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this); // 创建线程
}
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
// ...
}
  • 继承了 AQS 类,可以方便的实现工作线程的中止操作;
  • 实现了 Runnable 接口,可以将自身作为一个任务在工作线程中执行;
  • 当前提交的任务 firstTask 作为参数传入 Worker 的构造方法;

一些属性还有构造方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
//运行的线程,前面addWorker方法中就是直接通过启动这个线程来启动这个worker
final Thread thread;
//当一个worker刚创建的时候,就先尝试执行这个任务
Runnable firstTask;
//记录完成任务的数量
volatile long completedTasks;

Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
//创建一个Thread,将自己设置给他,后面这个thread启动的时候,也就是执行worker的run方法
this.thread = getThreadFactory().newThread(this);
}

runWorker 方法是线程池的核心:

  • 线程启动之后,通过 unlock 方法释放锁,设置 AQS 的 state 为 0,表示运行可中断;
  • Worker 执行 firstTask 或从 workQueue 中获取任务:
    • 进行加锁操作,保证 thread 不被其他线程中断(除非线程池被中断)
    • 检查线程池状态,倘若线程池处于中断状态,当前线程将中断。
    • 执行 beforeExecute
    • 执行任务的 run 方法
    • 执行 afterExecute 方法
    • 解锁操作

通过 getTask 方法从阻塞队列中获取等待的任务,如果队列中没有任务,getTask 方法会被阻塞并挂起,不会占用 cpu 资源;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// 先执行firstTask,再从workerQueue中取task(getTask())

while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}

getTask 方法

下面来看一下 getTask() 方法,这里面涉及到 keepAliveTime 的使用,从这个方法我们可以看出先吃池是怎么让超过 corePoolSize 的那部分 worker 销毁的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?

for (;;) {
int c = ctl.get();
int rs = runStateOf(c);

// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}

int wc = workerCountOf(c);

// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}

try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}

注意这里一段代码是 keepAliveTime 起作用的关键:

1
2
3
4
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();

allowCoreThreadTimeOut 为 false,线程即使空闲也不会被销毁;倘若为 ture,在 keepAliveTime 内仍空闲则会被销毁。

如果线程允许空闲等待而不被销毁 timed == false,workQueue.take 任务: 如果阻塞队列为空,当前线程会被挂起等待;当队列中有任务加入时,线程被唤醒,take 方法返回任务,并执行;

如果线程不允许无休止空闲 timed == true, workQueue.poll 任务: 如果在 keepAliveTime 时间内,阻塞队列还是没有任务,则返回 null;

任务的提交

  1. submit 任务,等待线程池 execute。
  2. 执行 FutureTask 类的 get 方法时,会把主线程封装成 WaitNode 节点并保存在 waiters 链表中,并阻塞等待运行结果;
  3. FutureTask 任务执行完成后,通过 UNSAFE 设置 waiters 相应的 waitNode 为 null,并通过 LockSupport 类unpark 方法唤醒主线程;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class Test{
public static void main(String[] args) {

ExecutorService es = Executors.newCachedThreadPool();
Future<String> future = es.submit(new Callable<String>() {
@Override
public String call() throws Exception {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "future result";
}
});
try {
String result = future.get();
System.out.println(result);
} catch (Exception e) {
e.printStackTrace();
}
}
}

在实际业务场景中,Future 和 Callable 基本是成对出现的,Callable 负责产生结果,Future 负责获取结果。

  1. Callable 接口类似于 Runnable,只是 Runnable 没有返回值。
  2. Callable 任务除了返回正常结果之外,如果发生异常,该异常也会被返回,即 Future 可以拿到异步执行任务各种结果;
  3. Future.get 方法会导致主线程阻塞,直到 Callable 任务执行完成;

submit方法

AbstractExecutorService.submit() 实现了 ExecutorService.submit() 可以获取执行完的返回值, 而 ThreadPoolExecutorAbstractExecutorService.submit() 的子类,所以 submit 方法也是 ThreadPoolExecutor 的方法。

1
2
3
4
5
6
// submit()在ExecutorService中的定义
<T> Future<T> submit(Callable<T> task);

<T> Future<T> submit(Runnable task, T result);

Future<?> submit(Runnable task);
1
2
3
4
5
6
7
8
// submit方法在AbstractExecutorService中的实现
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
// 通过submit方法提交的Callable任务会被封装成了一个FutureTask对象。
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}

通过 submit 方法提交的 Callable 任务会被封装成了一个 FutureTask 对象。通过 Executor.execute() 方法提交 FutureTask 到线程池中等待被执行,最终执行的是 FutureTask 的run方法;

FutureTask对象

public class FutureTask<V> implements RunnableFuture<V> 可以将 FutureTask 提交至线程池中等待被执行(通过 FutureTask 的 run 方法来执行)

  • 内部状态
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/* The run state of this task, initially NEW. 
* ...
* Possible state transitions:
* NEW -> COMPLETING -> NORMAL
* NEW -> COMPLETING -> EXCEPTIONAL
* NEW -> CANCELLED
* NEW -> INTERRUPTING -> INTERRUPTED
*/
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;

内部状态的修改通过 sun.misc.Unsafe 修改。

  • get 方法
1
2
3
4
5
6
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}

内部通过 awaitDone 方法对主线程进行阻塞,具体实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}

int s = state;
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
else if (q == null)
q = new WaitNode();
else if (!queued)
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,q.next = waiters, q);
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos);
}
else
LockSupport.park(this);
}
}
  1. 如果主线程被中断,则抛出中断异常;
  2. 判断 FutureTask 当前的 state,如果大于 COMPLETING,说明任务已经执行完成,则直接返回;
  3. 如果当前 state 等于 COMPLETING,说明任务已经执行完,这时主线程只需通过 yield 方法让出 cpu 资源,等待 state 变成 NORMAL;
  4. 通过 WaitNode 类封装当前线程,并通过 UNSAFE 添加到 waiters 链表;
  5. 最终通过 LockSupport 的 park 或 parkNanos 挂起线程;

run 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public void run() {
if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}

FutureTask.run 方法是在线程池中被执行的,而非主线程

  1. 通过执行 Callable 任务的 call 方法;
  2. 如果 call 执行成功,则通过 set 方法保存结果;
  3. 如果 call 执行有异常,则通过 setException 保存异常;

任务的关闭

shutdown 方法会将线程池的状态设置为 SHUTDOWN,线程池进入这个状态后,就拒绝再接受任务,然后会将剩余的任务全部执行完。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//检查是否可以关闭线程
checkShutdownAccess();
//设置线程池状态
advanceRunState(SHUTDOWN);
//尝试中断worker
interruptIdleWorkers();
//预留方法,留给子类实现
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}

private void interruptIdleWorkers() {
interruptIdleWorkers(false);
}

private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//遍历所有的worker
for (Worker w : workers) {
Thread t = w.thread;
//先尝试调用w.tryLock(),如果获取到锁,就说明worker是空闲的,就可以直接中断它
//注意的是,worker自己本身实现了AQS同步框架,然后实现的类似锁的功能
//它实现的锁是不可重入的,所以如果worker在执行任务的时候,会先进行加锁,这里tryLock()就会返回false
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}

shutdownNow 做的比较绝,它先将线程池状态设置为 STOP,然后拒绝所有提交的任务。最后中断左右正在运行中的 worker ,然后清空任务队列。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
//检测权限
advanceRunState(STOP);
//中断所有的worker
interruptWorkers();
//清空任务队列
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}

private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//遍历所有worker,然后调用中断方法
for (Worker w : workers)
w.interruptIfStarted();
} finally {
mainLock.unlock();
}
}

评论




博客内容遵循 署名-非商业性使用-相同方式共享 4.0 国际 (CC BY-NC-SA 4.0) 协议

载入天数...载入时分秒... 本站使用 Volantis 作为主题 鲁ICP备-20012065号