线程池


这周看完线程池,然后并发相关的学习就先告一段落。


本周最后要学习的类是 ThreadPoolExecutor,它从 Executor 接口一层层继承下来,因此总共要学习四个类。主要的学习来源是《深度解读 java 线程池设计思想及源码实现》,这篇文章写得很好,但是需要结合源码慢慢阅读。

ThreadPoolExecutor1

要学习的四个线程池类,前三个类的方法截图如下,结构还是比较清晰的:

ThreadPoolExecutor继承关系

下面分别记述吧。


Executor

Executor 是一个接口,只有一个方法,就是执行一个 Runnable。

1
2
3
public interface Executor {
void execute(Runnable command);
}

这是线程池最核心的方法,我们平常使用线程池,在拿到线程池对象之后,也只需要使用这个方法了:

1
2
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(10);
executor.execute(() -> System.out.println("runnable正在执行"));

这个方法要看到最后才能看到实现,具体的实现还是很复杂的,也很精妙。


ExecutorService

ExecutorService 也是一个接口,继承自 Executor 接口,新定义了许多方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public interface ExecutorService extends Executor {

// 关闭线程池,已提交的继续,不接受新的
void shutdown();

// 关闭线程池,已提交的尝试停止,不接受新的
List<Runnable> shutdownNow();

// 线程池已关闭
boolean isShutdown();

// 调用 shutdown / shutdownNow 后,且所有任务结束,返回true
boolean isTerminated();

// 在 shutdown / shutdownNow 后,设置一个超时时间,判断线程池是否在超时时间内关闭(该超时时间不影响关闭,只是设置等待时间)
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;

// 提交一个 callable 任务
<T> Future<T> submit(Callable<T> task);

// 提交一个 runnable 任务,并把 result 设置为future的返回值
<T> Future<T> submit(Runnable task, T result);

// 提交一个 runnable 任务,future的返回值将是null
Future<?> submit(Runnable task);

// 执行一堆任务
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException;

// 执行一堆任务,并设置超时时间
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException;

// 执行一堆任务,只要有一个完成就返回,剩下的继续执行
<T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException;

// 执行一堆任务,只要有一个完成就返回,剩下的继续执行,并设置超时时间
<T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}

通常来讲,我们使用线程池时,拿到的对象都是这个接口的实现,因为这个接口中的大部分方法已经足够我们使用了。就比如最常见的 Executors,使用 Executors 工厂方法获取线程池,拿到的对象很多都是 ExecutorService 接口实现。

1
2
3
ExecutorService executorService1 = Executors.newFixedThreadPool(10);
ExecutorService executorService2 = Executors.newSingleThreadExecutor();
ExecutorService executorService3 = Executors.newCachedThreadPool();

简单回顾一下:

  • Executor 类只有一个方法,execute(),执行任务,没有返回。
  • ExecutorService 类新定义了很多方法,其中有 submit(),执行任务,有返回(通过 Future 类获取返回值)。

AbstractExecutorService

AbstractExecutorService 是一个抽象类,继承自 ExecutorService 接口,实现了父类的 submit()invokeAny()invokeAll() 方法。

实现的方法中,其中比较重要的是 submit() 方法,我们刚刚提过,这个方法执行任务,有返回值。意思是说,在执行 submit() 方法方法之后,可以获得一个 Future 对象,这就是线程执行任务的返回值,可以通过 Future 的 get() 方法一直等到任务结束拿回任务结果。Future 的具体使用不在这里详细叙述,在此只关注实现。

AbstractExecutorService 的 submit() 方法有三种,主要的思想都是将【任务】和【返回值】封装成一个 Future 对象,然后返回这个 Future 对象。这里引入了 RunnableFuture 接口,这是一个同时继承 Runnable 和 Future 的接口,即让 Runnable 有了返回值。FutureTask 类实现了这个接口,submit() 方法也主要是通过这个类实现提交任务、返回值的。

下面是 RunnableFuture 接口和它的实现类 FutureTask 的源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// runnable 是没有 future 的,这里是组合,让 runnable 也有返回值
public interface RunnableFuture<V> extends Runnable, Future<V> {
void run();
}

public class FutureTask<V> implements RunnableFuture<V> {

private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;

private Callable<V> callable;

public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW;
}

public FutureTask(Callable<V> callable) {
if (callable == null) throw new NullPointerException();
this.callable = callable;
this.state = NEW;
}

...
}

下面直接附上 AbstractExecutorService 的源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
public abstract class AbstractExecutorService implements ExecutorService {

/**
* 把 runnable 包装成 futureTask
*/
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}

/**
* 把 callable 包装成 futureTask
*/
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}

/**
* runnable -> runnableFuture -> 提交任务
*/
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}

/**
* runnable -> runnableFuture -> 提交任务
*/
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}

/**
* callable -> runnableFuture -> 提交任务
*/
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}

/**
* (后面两个 invokeAny 方法的具体实现)
* 提交一堆任务,一个结束就返回,如果 没有返回/超时 就抛异常
*/
private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks, boolean timed, long nanos)
throws InterruptedException, ExecutionException, TimeoutException {

// 任务不能为空
if (tasks == null) throw new NullPointerException();
int ntasks = tasks.size();
if (ntasks == 0) throw new IllegalArgumentException();

// future 列表(用于返回前,取消其他任务)
ArrayList<Future<T>> futures = new ArrayList<Future<T>>(ntasks);

// 将 this 包装成 ecs,内部有一个阻塞队列,用于记录所有的 future
ExecutorCompletionService<T> ecs = new ExecutorCompletionService<T>(this);

try {
// 定义一个异常,如果本方法没有得到任何结果,可以抛出最后遇到的异常
ExecutionException ee = null;
// 超时时间
final long deadline = timed ? System.nanoTime() + nanos : 0L;
// 任务迭代器
Iterator<? extends Callable<T>> it = tasks.iterator();

// 先提交一个任务,其他任务在后面的for循环里提交(任务数-1,正在执行的任务数+1)
futures.add(ecs.submit(it.next()));
--ntasks;
int active = 1;

for (;;) {

// 获取刚刚提交任务的 future(从上面 ecs 内部的阻塞队列中获取)
Future<T> f = ecs.poll();

// 如果最初提交的任务没有执行完,这里拿不到 future
if (f == null) {
// 未执行任务√ -> 再提交一个任务
if (ntasks > 0) {
--ntasks;
futures.add(ecs.submit(it.next()));
++active;
}
// 未执行任务× 正在执行任务× -> 循环终止
else if (active == 0)
break;
// 未执行任务× 正在执行任务√ 设置超时√ -> 重新获取 future,超时抛异常
else if (timed) {
f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
if (f == null) throw new TimeoutException();
nanos = deadline - System.nanoTime();
}
// 未执行任务× 正在执行任务√ 设置超时× -> 阻塞等待 future
else
f = ecs.take();
}

// 如果在多次循环之后,拿到了 future
if (f != null) {
--active;
try {
// 返回 future,并把异常包装成 ee(ExecutionException)
return f.get();
} catch (ExecutionException eex) {
ee = eex;
} catch (RuntimeException rex) {
ee = new ExecutionException(rex);
}
}
}

// 不是很确定为什么 ee 会是 null
if (ee == null)
ee = new ExecutionException();

// 没有返回任何 future,那么抛出去异常
throw ee;

} finally {
// 方法退出之前,取消其他的任务
for (int i = 0, size = futures.size(); i < size; i++)
futures.get(i).cancel(true);
}
}

/**
* 不设置超时的 doInvokeAny()
*/
public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
try {
return doInvokeAny(tasks, false, 0);
} catch (TimeoutException cannotHappen) {
assert false;
return null;
}
}

/**
* 设置超时时间的 doInvokeAny()
*/
public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
return doInvokeAny(tasks, true, unit.toNanos(timeout));
}

/**
* 提交一堆任务,全部结束一起返回
*/
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
// 任务集合不能为空
if (tasks == null) throw new NullPointerException();

// future 列表,用于最后全部结束后返回
ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());

boolean done = false;
try {
// 将所有的任务包装成 runnableFuture,全部加入 future 列表,全部开始执行
for (Callable<T> t : tasks) {
RunnableFuture<T> f = newTaskFor(t);
futures.add(f);
execute(f);
}

// 逐一等待所有的 future 完成
for (int i = 0, size = futures.size(); i < size; i++) {
Future<T> f = futures.get(i);
if (!f.isDone()) {
try {
// 主动阻塞等到所有的 future 完成
// 忽略两类异常,但还会抛出 InterruptedException 异常
f.get();
} catch (CancellationException | ExecutionException ignore) {}
}
}
done = true;
return futures;
} finally {
// 如果没完成,撤销还没执行完的任务
if (!done)
for (int i = 0, size = futures.size(); i < size; i++)
futures.get(i).cancel(true);
}
}

/**
* 提交一堆任务,设置超时时间,
* 最终返回所有任务列表,但是不一定执行完,有可能被中断或取消等
*/
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
// 任务不能为空
if (tasks == null) throw new NullPointerException();
// 超时时长
long nanos = unit.toNanos(timeout);
// future 列表,用于最后返回
ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());

boolean done = false;
try {
// 把所有任务包装成 runnableFuture
for (Callable<T> t : tasks)
futures.add(newTaskFor(t));

// 截止时间和任务数
final long deadline = System.nanoTime() + nanos;
final int size = futures.size();

// 逐个执行任务,每执行一个,检查一下时间,超时直接返回任务列表(不管有没有执行完)
for (int i = 0; i < size; i++) {
execute((Runnable)futures.get(i));
nanos = deadline - System.nanoTime();
if (nanos <= 0L)
return futures;
}

// 逐个阻塞获取 future,每查一个,检查一下时间,超时直接返回任务列表(不管有没有执行完)
for (int i = 0; i < size; i++) {
Future<T> f = futures.get(i);
if (!f.isDone()) {
if (nanos <= 0L)
return futures;
try {
f.get(nanos, TimeUnit.NANOSECONDS);
} catch (CancellationException ignore) {
} catch (ExecutionException ignore) {
} catch (TimeoutException toe) {
return futures;
}
nanos = deadline - System.nanoTime();
}
}
done = true;
return futures;
} finally {
// 逐一取消所有任务,如果任务没完成就会被取消
if (!done)
for (int i = 0, size = futures.size(); i < size; i++)
futures.get(i).cancel(true);
}
}
}

ThreadPoolExecutor

ThreadPoolExecutor 就是我们今天要学习的线程池,它有很多代码,我们挑着学。

从整体上讲,ThreadPoolExecutor 有两个最重要的设计,一是设定了一个线程池的内部状态数,二是通过内部类 Worker 来执行池中的任务。

内部状态数

ThreadPoolExecutor 内部维护了一个 AtomicInteger(并发安全的 Integer),这个数字是线程池的内部状态数,它是一个 int 数字,长 32 位,其中高 3 位是状态位,低 29 位是线程的数量,具体可以看这部分代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class ThreadPoolExecutor extends AbstractExecutorService {

/* ———————————————————— 线程池的内部状态 ———————————————————— */

// 一个并发int,是线程池的内部状态数,其中高3位代表线程池的状态,后29位代表线程数量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 这个数字是29(32-3),是移位用的,可以看下面的五个状态
private static final int COUNT_BITS = Integer.SIZE - 3;
// 以下有五个状态,存储在线程池内部状态数 ctl 的高三位,例如 RUNNING:111 00000000000000000000000000000
// 分别是:【运行】、【接收×继续√】、【接收×继续×】、【所有任务销毁后,执行钩子方法terminated()】、【terminated()方法结束】
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// 线程池的线程数容量,最多有 2^29-1 个
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// 下面有三个方法,分别是用来:获取状态、获取线程数、获取内部状态数
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }

...
}

内部类Worker

ThreadPoolExecutor 有一个内部类 Worker,这个类继承自 AQS,实现了 Runnable 接口,这一继承一实现特别巧妙,这个后面再说。

Worker 类实际上就是一个封装的线程,线程池按顺序执行批量任务,实际上就是内部有一些 Worker 对象,每个 Worker 对象就是一个线程,这些 worker 各自执行任务,执行完一个去任务队列里再拿一个。从这个意义上去理解,线程池其实就是 Worker 池,池中的所有 Worker 按顺序执行丢进池子里的任务。

想要更深地理解 Worker,就要去读 ThreadPoolExecutor 的其他源码了,尤其是构造方法和 execute() 方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
/**
* 一个内部类 Worker,是一个封装过的线程
*/
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {

// 真正的执行线程
final Thread thread;

// 创建线程的时候可以同时指定第一个要执行的任务,如果为null,就去任务队列里拿
Runnable firstTask;

// 线程执行完的任务数量
volatile long completedTasks;

// Worker构造方法,传入首个任务(可以是null),创建线程,设置AQS状态
Worker(Runnable firstTask) {
setState(-1);
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}

// (重写 Runnable 的 run 方法)调用外部的 runWorker() 方法
public void run() {
runWorker(this);
}

// ...
}

我先把剩下的所有代码扔上来吧。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
/* ———————————————————— 线程池的构造方法 ———————————————————— */

/**
* 核心构造方法
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
// 参数非法
if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0)
throw new IllegalArgumentException();
// 必须有这几个参数
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();

// 不懂
this.acc = System.getSecurityManager() == null ? null : AccessController.getContext();
// 核心线程数
this.corePoolSize = corePoolSize;
// 最大线程数,线程池允许创建的最大线程数
this.maximumPoolSize = maximumPoolSize;
// 任务队列,里面装着所有 runnable
this.workQueue = workQueue;
// 空闲线程存活时间,默认指的是核心线程外的线程,一直没事干就会被回收
this.keepAliveTime = unit.toNanos(keepAliveTime);
// 线程池,用于生成线程
this.threadFactory = threadFactory;
// 拒绝策略,如果线程池满了,来了新任务应该采用什么策略
this.handler = handler;
}

下面是终于提到的 execute() 方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
/* ———————————————————— execute 方法 ———————————————————— */

/**
* 提交一个任务
* 1. 如果核心线程数不满,创建 worker 去执行任务
* 2. 如果核心线程数满了,放到 workQueue 等待 worker 空闲下来去执行任务
* 中途失败,采用 handle 的拒绝策略
*/
public void execute(Runnable command) {
// 任务不能为null
if (command == null) throw new NullPointerException();

// 如果核心线程数不满,调用 addWorker() 方法,直接返回
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
// 这个方法很长,具体实现见下文
if (addWorker(command, true))
return;
c = ctl.get();
}

// 当代码执行到这里,说明核心线程数超了,或者 addWorker 失败了

// 如果线程池 RUNNING,先把任务加到 workQueue 中
if (isRunning(c) && workQueue.offer(command)) {
// 如果线程池不是 RUNNING,workQueue 删除刚才的任务,采用拒绝策略
int recheck = ctl.get();
if (!isRunning(recheck) && remove(command))
reject(command);
// 如果线程池 RUNNING,为了避免 worker 全部关闭,检查一下 worker 的数量,全部关闭时再创建一个
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 如果任务加入 workQueue 失败(容量满了),再试着 addWorker
// 此时将创建以 maximumPoolSize 为边界的非核心 worker,再失败就拒绝策略
else if (!addWorker(command, false)) {
reject(command);
}
}

/**
* 添加一个worker
*
* 本方法使用到了线程池的如下属性:
* - ctl(AtomicInteger) 线程池的内部状态数,高三位是线程池状态
* - workQueue 任务队列
* - mainLock 全线程池的公共锁
* - workers(HashSet) 用于存储所有的worker
* - largestPoolSize 用于记录 workers 中的个数的最大值,可以知道线程池的大小曾经达到的最大值
*/
private boolean addWorker(Runnable firstTask, boolean core) {
// 两个入参:firstTask【准备提交给线程的任务,可null】,core【核心线程/最大线程】

retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);

// 翻译一下:
// if (rs == STOP || rs == TIDYING || rs == TERMINATED 如果线程池处于 STOP,TIDYING,或TERMINATED 这三种状态
// || rs == SHUTDOWN && firstTask != null 如果线程池处于 SHUTDOWN 状态,并且有新任务
// || rs == SHUTDOWN && workQueue.isEmpty()) 如果线程池处于 SHUTDOWN 状态,并且【任务队列】空了
// 那么失败,返回false
// 换种角度思考:
// 如果线程池处于 SHUTDOWN 状态,并且没有 firstTask,并且【任务队列】不为空,那么是可以创建 Worker 的
// 这种情况下是指,线程池关闭了,但是已提交的任务还没执行完毕,那么创建 Worker 继续执行
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;

for (;;) {
// 线程数超了,不能创建
int wc = workerCountOf(c);
if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))
return false;

// CAS线程数+1,如果成功,那么就可以退出循环出去创建 Worker 了
if (compareAndIncrementWorkerCount(c))
break retry;

// 如果线程池状态变更,到外层循环
// 如果线程池状态不变,在本层循环
c = ctl.get();
if (runStateOf(c) != rs)
continue retry;
}
}

// 到此,检验结束,可以创建 Worker 了

// 【是否已启动worker】、【是否已把worker添加到HashSet中】
boolean workerStarted = false;
boolean workerAdded = false;

Worker w = null;
try {
// 创建一个 worker,并拿到里面的线程
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
// 上锁,这是全线程池的锁,避免线程池在执行下面代码时被关闭(关闭也是用这个锁)
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 重新获取线程池的状态
int rs = runStateOf(ctl.get());

// 如果线程池的状态是 RUNNING,或者虽然是 SHUTDOWN,但是没有 firstTask,并且潜在地【任务队列】不为空
// 那么把 worker 装入 HashSet 中
if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {

// worker 里的线程不可能已经启动
if (t.isAlive())
throw new IllegalThreadStateException();

// 把 worker 存进 workers(HashSet) 中
workers.add(w);

// 记录 workers 的最大容量
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;

// 【是否已把worker添加到HashSet中】√
workerAdded = true;
}
} finally {
mainLock.unlock();
}
// 启动任务
if (workerAdded) {
// 线程启动之后,得到CPU的调度后,会再调用线程的run方法,去执行 runnable,而这里的 runnable 对象是 worker
// 也就是说,在线程启动之后,(未知时间)会调用 runWorker 方法,也就是真正执行任务的方法,往后看
t.start();
workerStarted = true;
}
}
} finally {
// 如果线程没有启动,需要做一些清理工作,如前面 workCount 加了 1,减掉
if (!workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}

/**
* worker 开始执行任务
*/
final void runWorker(Worker w) {
// worker 的 【wt:线程】和【task:任务】
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock();
boolean completedAbruptly = true;

try {
// 循环获取任务,直到 workQueue 为空
while (task != null || (task = getTask()) != null) {
w.lock();

// 线程池状态 >= STOP 时线程阻塞,否则线程开启(这么复杂是为了避免 shutdownNow 造成的线程阻塞)
if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP)))
&& !wt.isInterrupted())
wt.interrupt();

try {
// 钩子方法,留给需要的子类实现
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 当当当当,执行任务
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 钩子方法,留给需要的子类实现
afterExecute(task, thrown);
}
} finally {
// 重置任务为 null,进行下一次循环(获取任务执行...)
task = null;
w.completedTasks++;
w.unlock();
}
}

// 中途没有被中断
completedAbruptly = false;
} finally {
// 执行到这里,要不然是没任务了,要不然是异常了,让后续代码处理
processWorkerExit(w, completedAbruptly);
}
}

懒得写了,列一些点吧:

  • 线程池的构造方法中,会指定很多参数,corePoolSize 指的是核心线程数,实际上也就是核心 Worker 数。当线程池执行 execute(runnable) 方法时,扔进来一个 runnable 任务,会首先判断 Worker 的数量是否达到了核心线程数,如果不到就去创建一个 Worker 对象。核心线程数内的所有 Worker 对象都不会被销毁,执行完任务就去任务队列里面拿,任务队列空了就等。
  • 当核心线程数满了,即有 corePoolSize 个 Worker 对象了,那么加入任务队列 workQueue 中,这是一个 AQS 阻塞队列(线程安全),当有 Worker 执行完任务后,会自己来 workQueue 里拿一个任务执行。
  • 如果核心线程数满了,任务队列也满了,再加进来的任务,会按照最大线程数 maximumPoolSize 的大小继续创建 Worker,比如核心线程数是 10,最大线程数是 20,那么在已经有了 10 个 Worker 对象、并且任务队列也满了的情况下,可以继续创建 Worker 对象(每新加一个任务就新创建一个,直到再新创建 10 个达到最大的 20 个)。核心线程数之外的 Worker 对象,将在没有任务的可执行的情况下被销毁掉。
  • execute(runnable) 方法的主要作用,就是创建 Worker,启动 Worker 中的线程,这个线程在启动之后(调用 start() 方法),将由 CPU 去调度,如果分到时间片,那么将调用起线程的 run() 方法,去执行 runnable 任务。而我们上面提过了,Worker 对象实现了 Runnable 接口,线程执行的那个 Runnable 对象,实际上就是 Worker 对象(这个设计太绝了!),由此实现了让 Worker 执行任务的目的。

就写到这里吧。