利用多线程,程序可以更加合理地使用CPU多核心资源,在同一时间完成更多的工作。但是,如果程序频繁地创建线程,由于线程的创建和销毁也需要占用系统资源,因此这样会降低整个程序的性能,利用线程池可以将已创建的线程复用,利用池化技术,就像数据库连接池一样,创建很多个线程,然后反复地使用这些线程,而不对它们进行销毁。
由于线程池可以反复利用已有线程执行多线程操作,所以它一般是有容量限制的,当所有的线程都处于工作状态时,那么新的多线程请求会被阻塞,直到有一个线程空闲出来为止,这里会用到阻塞队列

使用方式
线程池的构造方法:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
|
其中的参数:
- corePoolSize:核心线程池大小,每向线程池提交一个多线程任务都会创建一个新的
核心线程
,无论是否存在其他空闲线程,直到到达核心线程池大小为止,之后会尝试复用线程资源。也可以在一开始就全部初始化好,调用 prestartAllCoreThreads()
即可
- maximumPoolSize:最大线程池大小,当目前线程池中所有的线程都处于运行状态,并且等待队列已满,那么就会直接尝试继续创建新的
非核心线程
运行,但是不能超过最大线程池大小
- keepAliveTime:线程最大空闲时间,当一个
非核心线程
空闲超过一定时间,会自动销毁。
- unit:线程最大空闲时间的时间单位
- workQueue:线程等待队列,当线程池中核心线程数已满时,就会将任务暂时存到等待队列中,直到有线程资源可用为止,这里可以使用我们上一章学到的阻塞队列
- threadFactory:线程创建工厂,我们可以干涉线程池中线程的创建过程,进行自定义。
- handler:拒绝策略,当等待队列和线程池都没有空间了,真的不能再来新的任务时,来了个新的多线程任务,那么只能拒绝了,这时就会根据当前设定的拒绝策略进行处理
整体流程:核心线程 -> 核心线程满 -> 剩余任务进入阻塞队列 -> 阻塞队列满 -> 开启非核心线程执行阻塞队列满后面到的任务 -> 线程数量超过maximumPoolSize
-> 拒绝后续任务
线程池的拒绝策略默认有四个:
AbortPolicy(默认)
:直接抛异常
CallerRunsPolicy
:直接让提交任务的线程运行这个任务,比如在主线程向线程池提交了任务,那么就直接由主线程执行
DiscardOldestPolicy
:丢弃队列中最近的一个任务,替换为当前任务
DiscardPolicy
:直接丢弃新提交的任务
注意:如果任务在运行过程中出现异常了,会导致线程池中的线程被销毁,线程池会自动创建新的线程来替代被销毁的线程
使用示例
public static void main(String[] args) throws InterruptedException { ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 4, 3, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2));
for (int i = 0; i < 6; i++) { int finalI = i; executor.execute(() -> { try { System.out.println(Thread.currentThread().getName()+" 开始执行!("+ finalI); TimeUnit.SECONDS.sleep(1); System.out.println(Thread.currentThread().getName()+" 已结束!("+finalI); } catch (InterruptedException e) { e.printStackTrace(); } }); }
TimeUnit.SECONDS.sleep(1); System.out.println("线程池中线程数量:"+executor.getPoolSize()); TimeUnit.SECONDS.sleep(5); System.out.println("线程池中线程数量:"+executor.getPoolSize());
executor.shutdownNow(); }
|
线程池类
newFixedThreadPool(int nThreads)
- 作用:创建一个固定大小的线程池
- 特点:
- 线程池中的线程数量固定,不会动态变化
- 任务队列是无界的(
LinkedBlockingQueue
),如果任务数量超过线程数量,任务会排队等待
- 适用场景:适用于负载比较稳定的服务器,或者需要限制线程数量的场景
newCachedThreadPool()
- 作用:创建一个可缓存的线程池
- 特点:
- 线程池中的线程数量会根据任务数量动态调整,所有的线程都是
非核心线程
- 空闲线程会在60秒后被回收
- 任务队列是同步队列(
SynchronousQueue
),不会存储任务,新任务会立即执行或创建新线程
- 适用场景:适用于执行大量短期异步任务,或者任务执行时间较短的场景
newSingleThreadExecutor()
- 作用:创建一个单线程的线程池
- 特点:
- 线程池中只有一个线程,所有任务按顺序执行
- 任务队列是无界的(
LinkedBlockingQueue
)
- 适用场景:适用于需要保证任务顺序执行的场景
newScheduledThreadPool(int corePoolSize)
- 作用:创建一个支持定时任务的线程池
- 特点:
- 线程池可以执行定时任务或周期性任务
- 任务队列是延迟队列(
DelayedWorkQueue
)
- 适用场景:适用于需要执行定时任务或周期性任务的场景
执行带返回值的任务
一个多线程任务不仅仅可以是void任务,也可以使用Future得到一个任务的返回值,可以通过它来获取任务的结果以及任务当前是否完成:
import java.util.concurrent.*;
public class FutureExample { public static void main(String[] args) throws ExecutionException, InterruptedException { ExecutorService executor = Executors.newSingleThreadExecutor();
Future<Integer> future = executor.submit(() -> { Thread.sleep(1000); return 42; });
System.out.println("任务结果: " + future.get());
executor.shutdown(); } }
|
Future还可以取消任务:
import java.util.concurrent.*;
public class FutureCancelExample { public static void main(String[] args) { ExecutorService executor = Executors.newSingleThreadExecutor();
Future<Integer> future = executor.submit(() -> { Thread.sleep(2000); return 42; });
boolean cancelled = future.cancel(true); System.out.println("任务是否被取消: " + cancelled);
System.out.println("任务是否完成: " + future.isDone()); System.out.println("任务是否被取消: " + future.isCancelled());
executor.shutdown(); } }
|
- 调用
future.cancel(true)
取消任务,并中断任务执行
isDone()
和isCancelled()
方法返回 true
,表示任务已被取消
执行定时任务
JDK5之后,可以使用ScheduledThreadPoolExecutor
来提交定时任务,它继承自ThreadPoolExecutor
,并且所有的构造方法都必须要求最大线程池容量为Integer.MAX_VALUE
,并且都是采用的DelayedWorkQueue
作为等待队列
核心方法
schedule(Runnable command, long delay, TimeUnit unit)
- 作用:提交一个延迟任务,在指定的延迟时间
delay
后执行
- 参数:
command
:要执行的任务
delay
:延迟时间
unit
:时间单位
schedule(Callable<V> callable, long delay, TimeUnit unit)
- 作用:提交一个延迟任务,在指定的延迟时间
delay
后执行,并返回Future
对象
- 参数:
scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)
- 作用:提交一个周期性任务,在初始延迟时间后开始执行,之后按固定的时间间隔重复执行。
- 参数:
command
:要执行的任务
initialDelay
:初始延迟时间
period
:任务执行的时间间隔
scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)
- 作用:提交一个周期性任务,在初始延迟时间后开始执行,之后在上一次任务执行完成后,再延迟指定的时间执行下一次任务
- 参数:
command
:要执行的任务
initialDelay
:初始延迟时间
delay
:任务执行完成后的延迟时间
示例
- 延迟任务
import java.util.concurrent.*;
public class ScheduledThreadPoolExecutorExample { public static void main(String[] args) { ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);
executor.schedule(() -> { System.out.println("延迟任务执行,线程: " + Thread.currentThread().getName()); }, 3, TimeUnit.SECONDS);
executor.shutdown(); } }
|
- 固定速率周期性任务
import java.util.concurrent.*;
public class ScheduledAtFixedRateExample { public static void main(String[] args) { ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);
executor.scheduleAtFixedRate(() -> { System.out.println("周期性任务执行,线程: " + Thread.currentThread().getName()); }, 1, 2, TimeUnit.SECONDS);
try { Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } executor.shutdown(); } }
|
- 固定延迟周期性任务
import java.util.concurrent.*;
public class ScheduledWithFixedDelayExample { public static void main(String[] args) { ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);
executor.scheduleWithFixedDelay(() -> { System.out.println("周期性任务执行,线程: " + Thread.currentThread().getName()); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } }, 1, 2, TimeUnit.SECONDS);
try { Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } executor.shutdown(); } }
|
scheduleAtFixedRate
是如果任务执行时间不到period
,就休息到period
,否则立刻执行下一次;scheduleWithFixedDelay
是无论任务执行用了多长时间,两次任务的时间间隔一定是delay
线程池实现原理
在ThreadPoolExecutor
中,ctl
变量是一个AtomicInteger
类型的字段,用于同时表示线程池的状态和线程数量。它是线程池实现中的核心变量,通过位运算来高效地管理线程池的状态和线程数量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static final int COUNT_BITS = Integer.SIZE - 3; private static final int CAPACITY = (1 << COUNT_BITS) - 1;
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
private static int runStateOf(int c) { return c & ~CAPACITY; } private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
|
execute方法
execute
方法是线程池的入口,用于提交任务到线程池,具体流程:
- 任务提交
- 线程池状态检查
- 线程数量检查
- 如果线程数量小于
corePoolSize
,则创建新的核心线程执行任务
- 任务队列检查
- 如果线程数量已达到
corePoolSize
,则将任务放入任务队列中
- 创建新线程
- 如果任务队列已满且线程数量小于
maximumPoolSize
,则创建新的非核心线程执行任务
- 拒绝策略
- 如果任务队列已满且线程数量已达到
maximumPoolSize
,则执行拒绝策略
private final BlockingQueue<Runnable> workQueue;
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); }
|
addWorker方法
addWorker
方法用于创建新线程并执行任务。主要逻辑:
- 线程数量检查
- 创建线程
- 启动线程
- 线程池状态检查
- 如果线程池状态不是
RUNNING
,则回滚线程的创建
private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c);
if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false;
for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); if (runStateOf(c) != rs) continue retry; } }
boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
|
Worker类
继承自AbstractQueuedSynchronizer
,也就是说,它本身就是一把锁:
private final class Worker extends AbstractQueuedSynchronizer implements Runnable { final Thread thread; Runnable firstTask; volatile long completedTasks;
Worker(Runnable firstTask) { setState(-1); this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } public void run() { runWorker(this); }
protected boolean isHeldExclusively() { return getState() != 0; } }
|
runWorker方法
unWorker
方法是线程执行任务的核心逻辑:
- 任务执行
- 线程中断处理
- 任务完成处理
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { w.lock(); if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }
|
getTask方法
getTask
方法用于从任务队列中获取任务:
- 线程池状态检查
- 任务队列检查
- 线程超时处理
- 如果线程空闲时间超过
keepAliveTime
,则返回null
private Runnable getTask() { boolean timedOut = false;
for (;;) { int c = ctl.get(); int rs = runStateOf(c);
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; }
int wc = workerCountOf(c);
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; }
try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
|
shutdown和shutdownNow方法
shutdown会继续将等待队列中的线程执行完成后再关闭线程池:
public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(SHUTDOWN); interruptIdleWorkers(); onShutdown(); } finally { mainLock.unlock(); } tryTerminate(); }
|
shutdownNow开始后,不仅不允许新的任务到来,也不会再执行等待队列的线程,而且会终止正在执行的线程:
public List<Runnable> shutdownNow() { List<Runnable> tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(STOP); interruptWorkers(); tasks = drainQueue(); } finally { mainLock.unlock(); } tryTerminate(); return tasks; }
|
tryTerminate方法
tryTerminate
方法用于尝试终止线程池:
- 线程池状态检查
- 如果线程池状态不是
SHUTDOWN
或STOP
,则返回
- 线程数量检查
- 状态转换
- 如果线程池状态为
TIDYING
,则调用 terminated()
方法,将状态转换为 TERMINATED
final void tryTerminate() { for (;;) { int c = ctl.get(); if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) return; if (workerCountOf(c) != 0) { interruptIdleWorkers(ONLY_ONE); return; }
final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { try { terminated(); } finally { ctl.set(ctlOf(TERMINATED, 0)); termination.signalAll(); } return; } } finally { mainLock.unlock(); } } }
|