探秘线程池 ThreadPoolExecutor 的任务调度过程

线程池是 java 并发包的核心组件之一,为了减少线程创建和销毁所带来的性能开销,在实际项目中都会采用线程池来管理线程的创建、复用,以及消亡等过程。Executors 类提供了多种方法来简化线程池的创建,典型的应用场景如下:

1
2
int nCpu = Runtime.getRuntime().availableProcessors();
ExecutorService es = Executors.newFixedThreadPool(nCpu + 1);

上述示例中通过调用 newFixedThreadPool(int nThreads) 方法,我们创建了一个大小为 CPU 核心数加 1 的线程池。此外,Executors 还提供了newSingleThreadExecutor() 和 newCachedThreadPool() 方法分别创建固定大小为 1 和非固定大小的线程池。这些方法本质上都是对 ThreadPoolExecutor 类的封装,以简化线程池的使用,例如 newCachedThreadPool() 方法的内部实现如下:

1
2
3
4
5
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}

可以看到 newCachedThreadPool() 封装了一个大小在 [0, Integer.MAX_VALUE] 之间,线程最大存活为时间为 60 秒的 ThreadPoolExecutor,并以 SynchronousQueue 作为工作队列。

所以 ThreadPoolExecutor 可以视为 java 线程池的核心实现类,接下去的篇幅我们一起来探究 ThreadPoolExecutor 的内部实现细节。

一. 基础运行支撑

在 ThreadPoolExecutor 源码的一开始有几个特殊的常量定义和相应的位运算(如下),而这正是整个线程池运行的基础,理解这几个常量的意义是理解整个 ThreadPoolExecutor 的关键所在。

1
2
3
4
5
6
7
8
9
10
11
// control,高 3 位表示线程池的运行状态 runState,低 29 位表示线程池内工作线程数 workerCount
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3; // 29
private static final int CAPACITY = (1 << COUNT_BITS) - 1; // 00011111111111111111111111111111
// 线程池的五种运行状态
private static final int RUNNING = -1 << COUNT_BITS; // 11100000000000000000000000000000
private static final int SHUTDOWN = 0 << COUNT_BITS; // 00000000000000000000000000000000
private static final int STOP = 1 << COUNT_BITS; // 00100000000000000000000000000000
private static final int TIDYING = 2 << COUNT_BITS; // 01000000000000000000000000000000
private static final int TERMINATED = 3 << COUNT_BITS; // 01100000000000000000000000000000

首先看一下 ctl 常量,这是一个 AtomicInteger 类型,第一眼看到这个变量可能会一头雾水,但是随着深入源码的阅读,逐渐能够理解其命名的灵感来源。作者应该是希望通过该变量来表达控制(control)的意思,因为整个线程池的运行状态和工作线程数量都通过该变量来记录。这是一个 32 位的整型变量,其中 高 3 位用表示线程的运行状态(runState),而 低 29 位则用来记录当前工作线程的数量(workerCount),也就是说按照现有的能力,ThreadPoolExecutor 最多允许创建 229 - 1 个工作线程,约 5 亿多。

1
2
3
4
5
6
// 获取 ctl 中的 runState 值
private static int runStateOf(int c) { return c & ~CAPACITY; } // ~CAPACITY=11100000000000000000000000000000
// 获取 ctl 中的 workerCount 值
private static int workerCountOf(int c) { return c & CAPACITY; }
// 由 runState 和 workerCount 计算得到 ctl
private static int ctlOf(int rs, int wc) { return rs | wc; }

类中定义了上述三个方法分别用来从 ctl 中获取线程池的运行状态 runState、工作线程数 workerCount,以及由 runState 和 workerCount 计算得到 ctl。ThreadPoolExecutor 中针对这三个变量的变量命名有个规律,一般 c 表示 ctl,rs 表示 runState,而 wc 则表示 workerCount,所以在阅读源码时如果遇到相应变量名,不妨联想一下,或许能够茅塞顿开。

image

线程池定义了 5 种运行状态(状态转移关系如上图所示),即 RUNNING、SHUTDOWN、STOP、TIDYING,以及 TERMINATED,各运行态的释义如下:

  • RUNNING:该状态下线程池接收新的任务,并执行工作队列中的任务
  • SHUTDOWN:该状态下线程池不接受新的任务,但是会继续执行工作队列中的任务
  • STOP:该状态下线程池不接受新的任务,不执行工作队列中的任务,同时会中断正在运行中的任务
  • TIDYING:当所有的任务被终止,工作线程数目为 0,线程池进入该状态后会调用 terminated() 钩子方法
  • TERMINATED:当执行完 terminated() 方法后,线程池进入该状态
前置状态 后置状态 转换条件
RUNNING SHUTDOWN 显式调用了 shutdown() 方法,或在线程池的 finalize() 方法中调用了 shutdown() 方法
RUNNING or SHUTDOWN STOP 调用了线程池的 shutdownNow() 方法
SHUTDOWN TIDYING 当工作队列和线程池都为空的时候
STOP TIDYING 当线程池为空的时候
TIDYING TERMINATED 当钩子方法 terminated() 执行完毕的时候

ThreadPoolExecutor 利用 ctl 的高 3 位记录线程池的运行状态,并利用整型数值对每个状态进行标识,所以可以通过对整型数值的比较运算来判定当前的线程状态,ThreadPoolExecutor 提供了如下三个方法以对线程状态进行判定:

1
2
3
4
5
6
7
8
9
10
11
private static boolean runStateLessThan(int c, int s) {
return c < s;
}
private static boolean runStateAtLeast(int c, int s) {
return c >= s;
}
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}

各方法的作用可以通过方法名称直接理解,这里不再多做说明。

二. 基本属性与构造方法定义

ThreadPoolExecutor 中主要定义了如下基本属性,用来控制基本的工作队列和线程池大小,以及线程池的运行,下面逐个来解释说明。

1
2
3
4
5
6
7
8
private final BlockingQueue<Runnable> workQueue;
private final HashSet<Worker> workers = new HashSet<Worker>();
private volatile ThreadFactory threadFactory;
private volatile RejectedExecutionHandler handler;
private volatile long keepAliveTime;
private volatile boolean allowCoreThreadTimeOut;
private volatile int corePoolSize;
private volatile int maximumPoolSize;
  • workQueue

workQueue 直译为工作队列,用于存放已提交待执行的任务,这是线程池需要具备的一个基础组件。ThreadPoolExecutor 采用阻塞队列 BlockingQueue 作为工作队列的类型,当队列已满时后续提交任务的操作将会被阻塞,当队列为空时,从队列中取任务执行的操作也将被阻塞。

  • workers

工作线程集合,用于记录当前线程池中所有的工作线程,便于线程池对池中的线程数量、线程状态信息等进行管理。

  • threadFactory

线程工厂,用于创建新的线程,默认采用 Executors.defaultThreadFactory(),当然我们也可以在创建线程池时自定义线程工厂。

  • handler

handler 属性用于定义饱和策略,我们往线程池中提交的任务不一定会全部被线程池所接受,当如下两种情况之一出现时,线程池可以基于饱和策略对任务提交操作进行反馈:

  1. 当线程池处于 SHUTDOWN 状态时
  2. 当线程池中的线程都处于运行状态,但阻塞队列已满时,如果此时不能够再创建新的工作线程

Java 为饱和策略定义了 RejectedExecutionHandler 接口,并提供了多种不同的策略实现,包括:AbortPolicy、CallerRunsPolicy、DiscardPolicy,以及 DiscardOldestPolicy,其中 AbortPolicy 是默认的饱和策略。

  • keepAliveTime

线程池中的线程往往具备一定的生命周期,当一个线程长时间处于空闲状态时线程池可以将其消亡,以减少系统资源占用。keepAliveTime 属性定义了一个线程的最大生命周期,以微妙为单位(我们在创建线程池时可以指定最大生命周期的时间单位,但最终都将转换成微秒记录到 keepAliveTime 中)。一般来说线程池都会定义线程数量的下限,当线程数量减少到该下限值时,余下的线程将会一直存活,ThreadPoolExecutor 定义了 allowCoreThreadTimeOut 变量来控制这部分线程是否受 keepAliveTime 变量值所影响,该变量默认为 false,如果 allowCoreThreadTimeOut=true 则任何线程到达生命周期时间时都会死亡。

  • corePoolSize

核心线程数,可以理解为一个线程池所持有的最小线程数,默认情况当线程池空闲时这部分线程也会一直存活,不受 keepAliveTime 时间影响。在一开始提交任务且线程池中持有的线程数量还未达到该变量值时,线程池不会复用已有的空闲线程,而是会直接创建新的线程并执行任务。

  • maximumPoolSize

最大线程数,用于控制线程池中线程数量上限,防止系统运行过程中创建大量的线程,从而浪费系统资源,频繁切换线程上下文。ThreadPoolExecutor 能够持有的线程数量不是无上限的,因为通过 int 类型记录线程池的工作状态,并且利用其中的低 29 位来记录线程数,所以一个线程池最多持有 229 - 1 个线程,该值记录在 CAPACITY 静态常量中,我们可以理解一个线程池的线程数量上限是 min(CAPACITY, maximumPoolSize)。需要注意的一点是,当采用无界队列记录提交的任务时该变量将不起作用,具体原因在后面分析线程创建策略时再进行说明。

继续来看一下 ThreadPoolExecutor 的构造方法定义,ThreadPoolExecutor 提供了多种构造方法的重载版本,但都是对如下构造方法的各种定制:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}

方法中所有的参数已在上面进行了专门的说明,需要注意的一点就是参数中的 keepAliveTime 是有单位的,但最终还是将其转换成了微秒记录在 keepAliveTime 属性中。此外,构造方法还允许我们自定义工作队列的实现类型、线程工厂,以及饱和策略等等。

三. 任务调度过程

下方的代码块展示了 ThreadPoolExecutor 的基本使用方式,通常我们会调用线程池的 submit 或 execute 方法提交我们的任务,submit 本质上还是对 execute 方法的封装,该方法的实现位于 AbstractExecutorService 中,ThreadPoolExecutor 继承了该抽象类。对于我们提交的任务,线程池会基于当前线程池的负载来决定是否执行饱和策略,如果我们提交的任务被线程池接受,那么何时调度执行该任务则完全由线程池来控制,这其中的运行原理(调度策略)就是接下去我们主要探究的对象。

1
2
3
4
5
6
7
8
9
ThreadPoolExecutor tpe = new ThreadPoolExecutor(1, 5, 60L, TimeUnit.SECONDS, new SynchronousQueue<>());
tpe.submit(new Callable<Void>() {
@Override
public Void call() throws Exception {
// do something here
return null;
}
});
tpe.shutdown();

AbstractExecutorService 为 submit 方法提供了多种重载版本,这里我们以 submit(Callable<T> task) 方法为例进行说明:

1
2
3
4
5
6
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}

该方法首先通过 FutureTask 对 task 进行了封装,然后交给 execute 方法进行调度,ThreadPoolExecutor 中实现了 execute 方法,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get(); // 获取 ctl
// 1. 当前工作线程数小于 corePoolSize
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 2. 当前线程池处于运行态,且可以提交任务
if (isRunning(c) && workQueue.offer(command)) {
// 再次获取 ctl
int recheck = ctl.get();
// 如果当前线程池处于非运行态,移除之前提交的任务
if (! isRunning(recheck) && remove(command))
reject(command); // 执行饱和策略
// 如果当前工作线程数量为 0
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 3. 线程池处于非运行态,或者提交任务失败
else if (!addWorker(command, false))
reject(command);
}

上述方法所执行的逻辑可以分成三部分进行分析,每一部分都会调用 addWorker(Runnable firstTask, boolean core) 方法,几次调用的参数设置分别如下:

  1. addWorker(command, true)
  2. addWorker(null, false)
  3. addWorker(command, false)

因此我们先来分析 addWorker 方法,理解了 addWorker 的作用和实现细节,我们再回过头来看 execute 方法会更加清晰一些。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
private boolean addWorker(Runnable firstTask, boolean core) {
// 1. 判断是否允许创建新的线程
retry: // break label
for (;;) {
int c = ctl.get(); // 获取 ctl
int rs = runStateOf(c); // 获取线程池工作状态
/**
* 如果满足下列状态之一,则立即返回 false:
* 1. 当前为除 SHUTDOWN 以外的其它非运行态
* 2. 当前为 SHUTDOWN,但是 firstTask != null
* 3. 当前为 SHUTDOWN,但是工作队列为空
*/
if (rs >= SHUTDOWN
&& ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c); // 获取工作线程数
// 如果当前工作线程数量已经得到实际允许的最大工作线程上限,则直接返回 false
if (wc >= CAPACITY
|| wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 工作线程数加 1,退出多重循环
if (compareAndIncrementWorkerCount(c))
break retry;
// 如果过程中工作线程数已变更,则再次尝试
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
// 2. 执行创建新的线程
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock(); // 加锁
try {
// 再次检测线程池工作状态
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN // 线程池处于运行态
|| (rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w); // 将当前 Worker 对象记录到集合中
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock(); // 释放锁
}
// 启动线程
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}

整个 addWorker 方法可以分成两部分,第一部分判定当前线程池是否允许创建新的线程,如果不能创建则直接返回 false,第二部分则是执行创建并启动新的线程

首先来看 第一部分,作者通过一个忙循环来执行判定的过程,第一次返回 false 的条件如下:

1
2
3
if (rs >= SHUTDOWN
&& ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()))
return false;

满足这一行语句的所有条件可以概括如下:

  1. 当前为除 SHUTDOWN 以外的其它非运行态
  2. 当前为 SHUTDOWN,但是 firstTask != null
  3. 当前为 SHUTDOWN,但是工作队列为空

回忆一下我们在最开始总结线程池的运行态时,有哪些情况下线程池会继续执行我们提交的任务?实际上可以分为 RUNNING 和 SHUTDOWN 两类运行态,这两类运行态的含义分别如下:

  • RUNNING:该状态下线程池接收新的任务,并执行工作队列中的任务
  • SHUTDOWN:该状态下线程池不接受新的任务,但是会继续执行工作队列中的任务

如果当前线程池的状态是 RUNNING,那么这时候是可以接受新提交的任务,并且可以执行工作队列中堆积的任务,此时是不满足这里列举的 3 种情况的。如果当前是 SHUTDOWN 状态,那么线程池不会再接受新的任务,但是会继续执行工作队列中堆积的任务,所以说如果这个时候我们有提交新任务,那么就满足条件 2,当然是不允许创建新的线程执行的。此外,如果这个时候我们工作队列已没有待执行的任务,即满足条件 3,那么线程池也不会再允许创建新的线程。

第二次返回 false 的条件如下:

1
2
3
if (wc >= CAPACITY
|| wc >= (core ? corePoolSize : maximumPoolSize))
return false;

这里的条件比较容易理解,如果当前线程池持有的线程数量已经达到理论上限 CAPACITY,当然不允许再创建新的线程,因为继续创建就溢出了。除了理论上限以外,线程池一般还存在实际容量上限,当达到该上限时同样不允许创建新的线程。这里的 core 参数是一个 boolean 类型,只有在当前工作线程数量小于 corePoolSize 时才为 true,我们知道对于线程池来说,如果当前工作线程数量小于 corePoolSize 则会直接创建新的工作线程去执行提交的任务,而不会检查是否有空闲的工作线程可以复用,所以在工作线程数量小于 corePoolSize 时是否创建新的线程应该以 corePoolSize 作为上限进行判断,因为过程中工作线程的数量是在变化的,如果走到这里工作线程的数量已经大于 corePoolSize 则不能直接创建新的线程,而是要先检查一下是否有空闲的工作线程可以复用。

再来看一下 第二部分,这一部分涉及到 ThreadPoolExecutor 中定义的一个核心内部类 Worker:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
private static final long serialVersionUID = 6138294804551838833L;
/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
// 省略加锁相关方法
}

Worker 实现了 Runnable 接口,当我们创建一个 Worker 对象时,会以当前 Worker 对象作为一个任务绑定到一个新创建的线程对象上,当我们启动该线程时,实际上调用的是 Worker 中覆盖实现的 run 方法:

1
2
3
public void run() {
runWorker(this);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
final void runWorker(Worker w) {
Thread wt = Thread.currentThread(); // 获取当前线程对象
Runnable task = w.firstTask; // 获取绑定在 Worker 对象中的任务
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// 尝试先执行绑定到 Worker 对象上的任务,如果没有则尝试从工作队列中获取待执行的任务
while (task != null || (task = getTask()) != null) {
w.lock(); // 加锁
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP)
|| (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP)))
&& !wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task); // 模板方法
Throwable thrown = null;
try {
task.run(); // 执行任务
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown); // 模板方法
}
} finally {
task = null; // 防止任务被重复执行
w.completedTasks++;
w.unlock(); // 释放锁
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}

runWorker 方法描述了一个线程不断执行任务的过程,任务可以是我们创建该线程对象时绑定的,也可以是从工作队列中获取的,只要是存在待执行的任务,且当前线程池运行状态允许执行提交的任务,同时线程没有被中断,就可以循环的处理提已交的任务。当一个任务被执行完毕或因异常而退出,那么该任务会被标记为 null,从而防止被重复执行,同时可以让垃圾收集器回收任务对象,方法中在线程执行前后分别提供了模板方法方便扩展。

我们继续来看 addWorker 中的第二部分。当我们创建完 Worker 对象之后,线程并没有马上启动工作,而是会再次检测一下线程池的运行状态确保允许启动当前线程,如果允许执行则会记录当前 worker 对象到 workers 全局集合中,这个集合主要用来让线程池管理池中的线程对象,比如当前线程池的大小、检查各个线程的状态等等。如果一个 worker 对象被记录成功,那么接下去就会启动其绑定的线程对象,开始处理提交的任务。

介绍完了 addWorker 方法的作用和实现, 我们回过头来继续探究最开始的 execute 方法,有了对 Worker 的创建和调度过程的理解再来看 execute 的逻辑会清晰很多。前面我们说了 execute 方法主要分为 3 个步骤,4 种场景执行,先简单概括一下:

  1. 如果线程池工作线程数小于 corePoolSize,则直接创建新的线程并执行提交的任务
  2. 如果线程池工作线程数大于或等于 corePoolSize,且工作队列存在空位,则将提交的任务先记录到队列中
  3. 如果线程池工作线程数大于或等于 corePoolSize,且工作队列不存在空位,但是工作线程数还未达到线程池实际容量上限,则创建新的线程
  4. 如果线程池工作线程数大于或等于 corePoolSize,且工作队列不存在空位,同时工作线程数达到线程池实际容量上限,则触发饱和策略

先来看一下 步骤一,这一步描述第一种场景(源码实现如下)。线程池判断当前工作线程数小于 corePoolSize,则执行 addWorker(command, true),由于第二参数为 true,所以会依据 corePoolSize 判断是否直接创建新的线程并执行提交的任务,因为这中间工作线程数量可能会发生变化,一旦工作线程数达到 corePoolSize,则需要执行其它的策略。

1
2
3
4
5
6
// 1. 当前工作线程数小于 corePoolSize
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}

再来看 步骤二,这一步描述了第二种场景(源码实现如下)。能够执行到这里说明工作线程数量已经大于或等于 corePoolSize,同时线程池处于运行态,此时会尝试先将提交的任务记录到工作队列中,如果过程中线程池的状态发生变更,变为非运行态,则会从工作队列中移除刚刚提交的任务,并触发饱和策略,否则判断当前的工作线程数,如果为 0 则只是增加一个线程而不启动它。

1
2
3
4
5
6
7
8
9
10
11
// 2. 当前线程池处于运行态,且可以提交任务
if (isRunning(c) && workQueue.offer(command)) {
// 再次获取 ctl
int recheck = ctl.get();
// 如果当前线程池处于非运行态,移除之前提交的任务
if (! isRunning(recheck) && remove(command))
reject(command); // 执行饱和策略
// 如果当前工作线程数量为 0
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}

最后来看一下 步骤三,这一步描述了第三和第四两种场景(源码实现如下)。能够运行到这一步需要满足两种情况之一:1.当前线程池处于非运行状态;2.工作队列已满。如果线程池当前处于非运行状态,按照之前对于 addWorker 方法的分析可知,如果提交的任务不为 null,势必触发饱和策略。而如果是因为工作队列已满的原因,则需要依据当前线程池是否还可以创建新的线程来决定是否触发饱和策略。

1
2
3
// 3. 线程池处于非运行态,或者提交任务失败
else if (!addWorker(command, false))
reject(command);

到这里我们已经完成了对线程池之于任务调度过程的分析。总的说来线程池对于任务的接受或者拒绝、以及对于已接受的任务的调度还是比较容易理解的,整个 ThreadPoolExecutor 的实现上,最令人敬佩的是 Doug Lea 大师在线程池状态和工作线程数量记录上的设计,以一个 int 型,通过位运算来实现所有的基础逻辑,简洁、高效,值得借鉴。

上一篇