利用多线程,程序可以更加合理地使用CPU多核心资源,在同一时间完成更多的工作。但是,如果程序频繁地创建线程,由于线程的创建和销毁也需要占用系统资源,因此这样会降低整个程序的性能,利用线程池可以将已创建的线程复用,利用池化技术,就像数据库连接池一样,创建很多个线程,然后反复地使用这些线程,而不对它们进行销毁。

由于线程池可以反复利用已有线程执行多线程操作,所以它一般是有容量限制的,当所有的线程都处于工作状态时,那么新的多线程请求会被阻塞,直到有一个线程空闲出来为止,这里会用到阻塞队列

使用方式

线程池的构造方法:

public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}

其中的参数:

  • corePoolSize:核心线程池大小,每向线程池提交一个多线程任务都会创建一个新的核心线程,无论是否存在其他空闲线程,直到到达核心线程池大小为止,之后会尝试复用线程资源。也可以在一开始就全部初始化好,调用 prestartAllCoreThreads()即可
  • maximumPoolSize:最大线程池大小,当目前线程池中所有的线程都处于运行状态,并且等待队列已满,那么就会直接尝试继续创建新的非核心线程运行,但是不能超过最大线程池大小
  • keepAliveTime:线程最大空闲时间,当一个非核心线程空闲超过一定时间,会自动销毁。
  • unit:线程最大空闲时间的时间单位
  • workQueue:线程等待队列,当线程池中核心线程数已满时,就会将任务暂时存到等待队列中,直到有线程资源可用为止,这里可以使用我们上一章学到的阻塞队列
  • threadFactory:线程创建工厂,我们可以干涉线程池中线程的创建过程,进行自定义。
  • handler:拒绝策略,当等待队列和线程池都没有空间了,真的不能再来新的任务时,来了个新的多线程任务,那么只能拒绝了,这时就会根据当前设定的拒绝策略进行处理

整体流程:核心线程 -> 核心线程满 -> 剩余任务进入阻塞队列 -> 阻塞队列满 -> 开启非核心线程执行阻塞队列满后面到的任务 -> 线程数量超过maximumPoolSize -> 拒绝后续任务

线程池的拒绝策略默认有四个:

  • AbortPolicy(默认):直接抛异常
  • CallerRunsPolicy:直接让提交任务的线程运行这个任务,比如在主线程向线程池提交了任务,那么就直接由主线程执行
  • DiscardOldestPolicy:丢弃队列中最近的一个任务,替换为当前任务
  • DiscardPolicy:直接丢弃新提交的任务

注意:如果任务在运行过程中出现异常了,会导致线程池中的线程被销毁,线程池会自动创建新的线程来替代被销毁的线程

使用示例

public static void main(String[] args) throws InterruptedException {
ThreadPoolExecutor executor =
new ThreadPoolExecutor(2, 4, //2个核心线程,最大线程数为4个
3, TimeUnit.SECONDS, //最大空闲时间为3秒钟
new ArrayBlockingQueue<>(2)); //这里使用容量为2的ArrayBlockingQueue队列

for (int i = 0; i < 6; i++) { //开始6个任务
int finalI = i;
executor.execute(() -> {
try {
System.out.println(Thread.currentThread().getName()+" 开始执行!("+ finalI);
TimeUnit.SECONDS.sleep(1);
System.out.println(Thread.currentThread().getName()+" 已结束!("+finalI);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}

TimeUnit.SECONDS.sleep(1); //看看当前线程池中的线程数量
System.out.println("线程池中线程数量:"+executor.getPoolSize());
TimeUnit.SECONDS.sleep(5); //等到超过空闲时间
System.out.println("线程池中线程数量:"+executor.getPoolSize());

executor.shutdownNow(); //使用完线程池记得关闭,不然程序不会结束,它会取消所有等待中的任务以及试图中断正在执行的任务,关闭后,无法再提交任务,一律拒绝
//executor.shutdown(); //同样可以关闭,但是会执行完等待队列中的任务再关闭
}

线程池类

  1. newFixedThreadPool(int nThreads)
  • 作用:创建一个固定大小的线程池
  • 特点:
    • 线程池中的线程数量固定,不会动态变化
    • 任务队列是无界的(LinkedBlockingQueue),如果任务数量超过线程数量,任务会排队等待
  • 适用场景:适用于负载比较稳定的服务器,或者需要限制线程数量的场景
  1. newCachedThreadPool()
  • 作用:创建一个可缓存的线程池
  • 特点:
    • 线程池中的线程数量会根据任务数量动态调整,所有的线程都是非核心线程
    • 空闲线程会在60秒后被回收
    • 任务队列是同步队列(SynchronousQueue),不会存储任务,新任务会立即执行或创建新线程
  • 适用场景:适用于执行大量短期异步任务,或者任务执行时间较短的场景
  1. newSingleThreadExecutor()
  • 作用:创建一个单线程的线程池
  • 特点:
    • 线程池中只有一个线程,所有任务按顺序执行
    • 任务队列是无界的(LinkedBlockingQueue
  • 适用场景:适用于需要保证任务顺序执行的场景
  1. newScheduledThreadPool(int corePoolSize)
  • 作用:创建一个支持定时任务的线程池
  • 特点:
    • 线程池可以执行定时任务或周期性任务
    • 任务队列是延迟队列(DelayedWorkQueue
  • 适用场景:适用于需要执行定时任务或周期性任务的场景

执行带返回值的任务

一个多线程任务不仅仅可以是void任务,也可以使用Future得到一个任务的返回值,可以通过它来获取任务的结果以及任务当前是否完成:

import java.util.concurrent.*;

public class FutureExample {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 创建线程池
ExecutorService executor = Executors.newSingleThreadExecutor();

// 提交任务,返回 Future 对象
Future<Integer> future = executor.submit(() -> {
Thread.sleep(1000); // 模拟任务执行
return 42;
});

// 获取任务结果
System.out.println("任务结果: " + future.get());

// 关闭线程池
executor.shutdown();
}
}

Future还可以取消任务:

import java.util.concurrent.*;

public class FutureCancelExample {
public static void main(String[] args) {
ExecutorService executor = Executors.newSingleThreadExecutor();

Future<Integer> future = executor.submit(() -> {
Thread.sleep(2000); // 模拟任务执行
return 42;
});

// 取消任务
boolean cancelled = future.cancel(true);
System.out.println("任务是否被取消: " + cancelled);

// 检查任务状态
System.out.println("任务是否完成: " + future.isDone());
System.out.println("任务是否被取消: " + future.isCancelled());

executor.shutdown();
}
}
  • 调用future.cancel(true)取消任务,并中断任务执行
  • isDone()isCancelled()方法返回 true,表示任务已被取消

执行定时任务

JDK5之后,可以使用ScheduledThreadPoolExecutor来提交定时任务,它继承自ThreadPoolExecutor,并且所有的构造方法都必须要求最大线程池容量为Integer.MAX_VALUE,并且都是采用的DelayedWorkQueue作为等待队列

核心方法

  1. schedule(Runnable command, long delay, TimeUnit unit)
  • 作用:提交一个延迟任务,在指定的延迟时间delay后执行
  • 参数:
    • command:要执行的任务
    • delay:延迟时间
    • unit:时间单位
  1. schedule(Callable<V> callable, long delay, TimeUnit unit)
  • 作用:提交一个延迟任务,在指定的延迟时间delay后执行,并返回Future对象
  • 参数:
    • callable:要执行的任务
  1. scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)
  • 作用:提交一个周期性任务,在初始延迟时间后开始执行,之后按固定的时间间隔重复执行。
  • 参数:
    • command:要执行的任务
    • initialDelay:初始延迟时间
    • period:任务执行的时间间隔
  1. scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)
  • 作用:提交一个周期性任务,在初始延迟时间后开始执行,之后在上一次任务执行完成后,再延迟指定的时间执行下一次任务
  • 参数:
    • command:要执行的任务
    • initialDelay:初始延迟时间
    • delay:任务执行完成后的延迟时间

示例

  1. 延迟任务
import java.util.concurrent.*;

public class ScheduledThreadPoolExecutorExample {
public static void main(String[] args) {
// 创建 ScheduledThreadPoolExecutor
ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);

// 提交延迟任务
executor.schedule(() -> {
System.out.println("延迟任务执行,线程: " + Thread.currentThread().getName());
}, 3, TimeUnit.SECONDS); // 3 秒后执行

// 关闭线程池
executor.shutdown();
}
}
  1. 固定速率周期性任务
import java.util.concurrent.*;

public class ScheduledAtFixedRateExample {
public static void main(String[] args) {
ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);

// 提交周期性任务
executor.scheduleAtFixedRate(() -> {
System.out.println("周期性任务执行,线程: " + Thread.currentThread().getName());
}, 1, 2, TimeUnit.SECONDS); // 1 秒后开始执行,每隔 2 秒执行一次

// 关闭线程池
try {
Thread.sleep(10000); // 主线程休眠 10 秒
} catch (InterruptedException e) {
e.printStackTrace();
}
executor.shutdown();
}
}
  1. 固定延迟周期性任务
import java.util.concurrent.*;

public class ScheduledWithFixedDelayExample {
public static void main(String[] args) {
ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);

// 提交周期性任务
executor.scheduleWithFixedDelay(() -> {
System.out.println("周期性任务执行,线程: " + Thread.currentThread().getName());
try {
Thread.sleep(1000); // 模拟任务执行
} catch (InterruptedException e) {
e.printStackTrace();
}
}, 1, 2, TimeUnit.SECONDS); // 1 秒后开始执行,任务执行完成后延迟 2 秒执行下一次任务

// 关闭线程池
try {
Thread.sleep(10000); // 主线程休眠 10 秒
} catch (InterruptedException e) {
e.printStackTrace();
}
executor.shutdown();
}
}

scheduleAtFixedRate是如果任务执行时间不到period,就休息到period,否则立刻执行下一次;scheduleWithFixedDelay是无论任务执行用了多长时间,两次任务的时间间隔一定是delay

线程池实现原理

ThreadPoolExecutor中,ctl变量是一个AtomicInteger类型的字段,用于同时表示线程池的状态和线程数量。它是线程池实现中的核心变量,通过位运算来高效地管理线程池的状态和线程数量

//这个变量比较关键,用到了原子AtomicInteger,用于同时保存线程池运行状态和线程数量(使用原子类是为了保证原子性)
//它是通过拆分32个bit位来保存数据的,前3位保存状态,后29位保存工作线程数量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3; //29位,线程数量位
private static final int CAPACITY = (1 << COUNT_BITS) - 1; //计算得出最大容量(1左移29位,最大容量为2的29次方-1)

// 所有的运行状态,都只占用前3位,不会占用后29位
// 接收新任务,并等待执行队列中的任务
private static final int RUNNING = -1 << COUNT_BITS; //111 | 0000... (后29数量位,下同)
// 不接收新任务,但是依然等待执行队列中的任务
private static final int SHUTDOWN = 0 << COUNT_BITS; //000 | 数量位
// 不接收新任务,也不执行队列中的任务,并且还要中断正在执行中的任务
private static final int STOP = 1 << COUNT_BITS; //001 | 数量位
// 所有的任务都已结束,线程数量为0,即将完全关闭
private static final int TIDYING = 2 << COUNT_BITS; //010 | 数量位
// 完全关闭
private static final int TERMINATED = 3 << COUNT_BITS; //011 | 数量位

// 封装和解析ctl变量的一些方法
private static int runStateOf(int c) { return c & ~CAPACITY; } //对CAPACITY取反就是后29位全部为0,前三位全部为1,接着与c进行与运算,这样就可以只得到前三位的结果了,所以这里是取运行状态
private static int workerCountOf(int c) { return c & CAPACITY; }
//同上,这里是为了得到后29位的结果,所以这里是取线程数量
private static int ctlOf(int rs, int wc) { return rs | wc; }
// 比如上面的RUNNING, 0,进行与运算之后:
// 111 | 0000000000000000000000000

execute方法

execute 方法是线程池的入口,用于提交任务到线程池,具体流程:

  1. 任务提交
    • 将任务提交到线程池中执行
  2. 线程池状态检查
    • 如果线程池状态不是 RUNNING,则拒绝任务
  3. 线程数量检查
    • 如果线程数量小于 corePoolSize,则创建新的核心线程执行任务
  4. 任务队列检查
    • 如果线程数量已达到 corePoolSize,则将任务放入任务队列中
  5. 创建新线程
    • 如果任务队列已满且线程数量小于 maximumPoolSize,则创建新的非核心线程执行任务
  6. 拒绝策略
    • 如果任务队列已满且线程数量已达到 maximumPoolSize,则执行拒绝策略
//指定的阻塞队列
private final BlockingQueue<Runnable> workQueue;

//这里没加锁,所以ctl才会使用原子类
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException(); //如果任务为null,返回空指针
int c = ctl.get(); //获取ctl的值,读取信息
if (workerCountOf(c) < corePoolSize) { //判断工作线程数量是否小于核心线程数
if (addWorker(command, true)) //直接加新的线程执行
return;
c = ctl.get(); //如果线程添加失败(有可能其他线程也在对线程池进行操作),就更新c的值
}
if (isRunning(c) && workQueue.offer(command)) { //继续判断,如果当前线程池是运行状态,那就尝试向阻塞队列中添加一个新的等待任务
int recheck = ctl.get(); //再次获取ctl的值
if (! isRunning(recheck) && remove(command)) //这里是再次确认当前线程池是否关闭,如果添加等待任务后线程池关闭了,那就把刚刚加进去任务的又拿出来
reject(command); //然后直接拒绝当前任务的提交(会根据拒绝策略决定如何进行拒绝操作)
else if (workerCountOf(recheck) == 0) //如果这个时候线程池依然在运行状态,那么就检查一下当前工作线程数是否为0,如果是那就直接添加新线程执行
addWorker(null, false); //添加一个新的非核心线程
}
else if (!addWorker(command, false)) //这种情况要么就是线程池没有运行,要么就是队列满了,按照我们之前的规则,核心线程数已满且队列已满,那么会直接添加新的非核心线程,但是如果已经添加到最大数量,这里肯定会失败
reject(command); //确实装不下了,只能拒绝
}

addWorker方法

addWorker方法用于创建新线程并执行任务。主要逻辑:

  1. 线程数量检查
    • 如果线程数量已达到上限,则返回false
  2. 创建线程
    • 创建新线程,并将其封装为Worker对象
  3. 启动线程
    • 启动线程,开始执行任务
  4. 线程池状态检查
    • 如果线程池状态不是RUNNING,则回滚线程的创建
private boolean addWorker(Runnable firstTask, boolean core) {
//给最外层循环打了个标签,方便跳转操作
retry:
for (;;) { //无限循环,全程没加锁
int c = ctl.get(); //获取ctl值
int rs = runStateOf(c); //解析当前的运行状态

if (rs >= SHUTDOWN && //判断线程池是否不是处于运行状态
! (rs == SHUTDOWN && //如果不是运行状态,判断线程是SHUTDOWN状态并、任务不为null、等待队列不为空,只要有其中一者不满足,直接返回false,添加失败
firstTask == null &&
! workQueue.isEmpty()))
return false;

for (;;) { //内层循环是为了将线程计数增加,然后才可以真正地添加一个新的线程
int wc = workerCountOf(c); //解析当前的工作线程数量
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize)) //判断一下还装得下不,如果装得下,看看是核心线程还是非核心线程,如果是核心线程,不能大于核心线程数的限制,如果是非核心线程,不能大于最大线程数限制
return false;
if (compareAndIncrementWorkerCount(c)) //CAS自增线程计数,如果增加成功,任务完成,直接跳出继续
break retry; //注意这里要直接跳出最外层循环,所以用到了标签(类似于goto语句)
c = ctl.get(); // 如果CAS失败,更新一下c的值
if (runStateOf(c) != rs) //如果CAS失败的原因是因为线程池状态和一开始的不一样了,那么就重新从外层循环再来一次
continue retry; //注意这里要直接从最外层循环继续,所以用到了标签(类似于goto语句)
// 如果是其他原因导致的CAS失败,那只可能是其他线程同时在自增,所以重新再来一次内层循环
}
}

//添加新的工作线程了
boolean workerStarted = false; //工作线程是否已启动
boolean workerAdded = false; //工作线程是否已添加
Worker w = null; //工作线程
try {
w = new Worker(firstTask); //创建新的工作线程,传入提交的任务
final Thread t = w.thread; //拿到工作线程中封装的Thread对象
if (t != null) { //如果线程不为null,安排任务
final ReentrantLock mainLock = this.mainLock; //ReentrantLock加锁,只有一个线程能进入
mainLock.lock();
try {
int rs = runStateOf(ctl.get()); //获取当前线程的运行状态

if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) { //只有当前线程池是正在运行状态,或是SHUTDOWN状态且firstTask为空,那么就继续
if (t.isAlive()) // 检查一下线程是否正在运行状态
throw new IllegalThreadStateException(); //如果是就抛出异常
workers.add(w); //直接将新创建的Work丢进workers集合中
int s = workers.size();
if (s > largestPoolSize) //记录线程池运行以来,历史上的最多线程数
largestPoolSize = s;
workerAdded = true; //工作线程已添加
}
} finally {
mainLock.unlock(); //解锁
}
if (workerAdded) {
t.start(); //启动线程
workerStarted = true; //工作线程已启动
}
}
} finally {
if (! workerStarted) //如果线程在上面的启动过程中失败了
addWorkerFailed(w); //将w移出workers并将计数器-1,最后如果线程池是终止状态,会尝试加速终止线程池
}
return workerStarted; //返回是否成功
}

Worker类

继承自AbstractQueuedSynchronizer,也就是说,它本身就是一把锁:

private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable {
//用来干活的线程
final Thread thread;
//要执行的第一个任务,构造时就确定好了
Runnable firstTask;
//干活数量计数器,也就是这个线程完成了多少个任务
volatile long completedTasks;

Worker(Runnable firstTask) {
setState(-1); // 执行Task之前不让中断,将AQS的state设定为-1
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this); //通过预定义或是我们自定义的线程工厂创建线程
}

public void run() {
runWorker(this); //真正开始干活,包括当前活干完了又要等新的活来
}

//0就是没加锁,1就是已加锁
protected boolean isHeldExclusively() {
return getState() != 0;
}
//...
}

runWorker方法

unWorker 方法是线程执行任务的核心逻辑:

  1. 任务执行
    • 从任务队列中获取任务并执行
  2. 线程中断处理
    • 如果线程被中断,则退出执行
  3. 任务完成处理
    • 任务执行完成后,更新线程池状态
final void runWorker(Worker w) {
Thread wt = Thread.currentThread(); //获取当前线程
Runnable task = w.firstTask; //取出要执行的任务
w.firstTask = null; //然后把Worker中的任务设定为null
w.unlock(); // 因为一开始为-1,这里是通过unlock操作将其修改回0,只有state大于等于0才能响应中断
boolean completedAbruptly = true;
try {
//只要任务不为null,或是任务为空但是可以从等待队列中取出任务不为空,那么就开始执行这个任务,注意这里是无限循环,也就是说如果当前没有任务了,那么会在getTask方法中卡住,因为要从阻塞队列中等着取任务
while (task != null || (task = getTask()) != null) {
w.lock(); //对当前Worker加锁,在shutdown时保护此任务的运行

//由于线程池在STOP状态及以上会禁止新线程加入并且中断正在进行的线程
if ((runStateAtLeast(ctl.get(), STOP) || //线程池是STOP及以上的状态,不能开始新任务
(Thread.interrupted() && //线程是否已经被打上中断标记并且线程一定是STOP及以上
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted()) //再次确保线程被没有打上中断标记
wt.interrupt(); //打中断标记
try {
beforeExecute(wt, task); //开始之前的准备工作
Throwable thrown = null;
try {
task.run(); //开始执行任务
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown); //执行之后的工作
}
} finally {
task = null; //任务已完成
w.completedTasks++; //任务完成数++
w.unlock(); //解锁
}
}
completedAbruptly = false;
} finally {
//如果能走到这一步,那说明上面的循环肯定是跳出了,也就是说这个Worker可以丢弃了
//所以这里会直接将Worker从workers里删除掉
processWorkerExit(w, completedAbruptly);
}
}

getTask方法

getTask方法用于从任务队列中获取任务:

  1. 线程池状态检查
    • 如果线程池状态不是RUNNING,则返回null
  2. 任务队列检查
    • 从任务队列中获取任务
  3. 线程超时处理
    • 如果线程空闲时间超过keepAliveTime,则返回null
private Runnable getTask() {
boolean timedOut = false;

for (;;) { //无限循环
int c = ctl.get(); //获取ctl
int rs = runStateOf(c); //解析线程池运行状态

if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { //判断是不是没有必要再执行等待队列中的任务了,也就是处于关闭线程池的状态了
decrementWorkerCount(); //直接减少一个工作线程数量
return null; //返回null,runWorker直接结束
}

int wc = workerCountOf(c); //如果线程池运行正常,那就获取当前的工作线程数量

boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; //如果线程数大于核心线程数或是允许核心线程等待超时,那么就标记为可超时的

//超时或maximumPoolSize在运行期间被修改了,并且线程数大于1或等待队列为空,那也是不能获取到任务的
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c)) //如果CAS减少工作线程成功
return null; //返回null
continue; //否则开下一轮循环
}

try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : //如果可超时,那么最多等到超时时间
workQueue.take(); //如果不可超时,那就一直等着拿任务
if (r != null) //成功拿到任务就返回
return r;
timedOut = true; //否则就是超时了,下一轮循环将直接返回null
} catch (InterruptedException retry) {
timedOut = false;
}
}
}

shutdown和shutdownNow方法

shutdown会继续将等待队列中的线程执行完成后再关闭线程池:

public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//判断是否有权限终止
checkShutdownAccess();
//CAS将线程池运行状态改为SHUTDOWN状态
advanceRunState(SHUTDOWN);
//让闲着的线程(比如正在等新的任务)中断,但是并不会影响正在运行的线程
interruptIdleWorkers();
onShutdown(); //给ScheduledThreadPoolExecutor提供的钩子方法,就是等ScheduledThreadPoolExecutor去实现的,当前类没有实现
} finally {
mainLock.unlock();
}
tryTerminate(); //最后尝试终止线程池
}

shutdownNow开始后,不仅不允许新的任务到来,也不会再执行等待队列的线程,而且会终止正在执行的线程:

public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
//直接设定为STOP状态了
advanceRunState(STOP);
//中断所有工作线程
interruptWorkers();
//取出仍处于阻塞队列中的线程
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks; //最后返回还没开始的任务
}

tryTerminate方法

tryTerminate方法用于尝试终止线程池:

  1. 线程池状态检查
    • 如果线程池状态不是SHUTDOWNSTOP,则返回
  2. 线程数量检查
    • 如果线程数量不为 0,则中断空闲线程
  3. 状态转换
    • 如果线程池状态为 TIDYING,则调用 terminated() 方法,将状态转换为 TERMINATED
final void tryTerminate() {
for (;;) { //无限循环
int c = ctl.get(); //上来先获取一下ctl值
//只要是正在运行 或 线程池基本关闭 或 处于SHUTDOWN状态且工作队列不为空,还不能关闭线程池,返回
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;

//处于SHUTDOWN状态且等待队列为空 或 STOP状态
if (workerCountOf(c) != 0) { // 如果工作线程数不是0,中断空闲状态下的线程
interruptIdleWorkers(ONLY_ONE); //最多只中断一个空闲线程,然后返回
return;
}

//终止线程池
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { //先CAS将状态设定为TIDYING表示基本终止
try {
terminated(); //终止
} finally {
ctl.set(ctlOf(TERMINATED, 0)); //将状态设定为TERMINATED
//如果有线程调用了awaitTermination方法,会等待当前线程池终止,这里就可以唤醒那些等待线程池终止的线程
termination.signalAll();
}
return;
}
//注意如果CAS失败会直接进下一轮循环重新判断
} finally {
mainLock.unlock();
}
}
}