线程池
1. 线程池的使用
1.1 为什么使用线程池
创建/销毁线程伴随着系统开销,过于频繁的创建/销毁线程,会很大程度上影响处理效率。
我们使用网上经常可以看到的例子:
假设一个服务器完成一项任务所需时间为:T1
创建线程时间,T2
在线程中执行任务的时间,T3
销毁线程时间。
如果:T1 + T3
远大于T2
,则可以采用线程池,以提高服务器性能。
线程并发数量过多,抢占系统资源从而导致阻塞。
要知道线程能共享系统资源,如果同时执行的线程过多,就有可能导致系统资源不足而产生阻塞的情况。
运用线程池能有效的控制线程最大并发数,避免以上的问题。
对线程进行一些简单的管理。 比如:延时执行、定时循环执行的策略等。
其实这里运用的是线程池本身的高级特性,运用线程池都能进行很好的实现。虽然不是用线程池,本身也可以通过其他方式实现这些管理。
1.2 线程池的技术概括
一个线程池包括以下四个基本组成部分:
- 线程池管理器(ThreadPool): 用于创建并管理线程池,包括创建线程池,销毁线程池,添加新任务;
- 工作线程(PoolWorker): 线程池中线程,在没有任务时处于等待状态,可以循环的执行任务;
- 任务接口(Task): 每个任务必须实现的接口,以供工作线程调度任务的执行,它主要规定了任务的入口,任务执行完后的收尾工作,任务的执行状态等;
- 任务队列(taskQueue): 用于存放没有处理的任务。提供一种缓冲机制。
线程池技术正是关注如何缩短或调整T1,T3时间的技术,从而提高服务器程序性能的。它把T1,T3分别安排在服务器程序的启动和结束的时间段或者一些空闲的时间段,这样在服务器程序处理客户请求时,不会有T1,T3的开销了。
同时,线程池可以做到,使线程达到复用的目的, 执行完一个任务之后,并不被销毁,而是可以继续执行其他的任务。
1.3 JDK中提供的线程池技术
1.3.1 ThreadPoolExecutor
创建一个ThreadPoolExecutor
首先我们可以看一下ThreadPoolExecutor
的构造方法。
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,Executors.defaultThreadFactory(), defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,threadFactory, defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,Executors.defaultThreadFactory(), handler);
}
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||maximumPoolSize <= 0 ||maximumPoolSize < corePoolSize ||keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
初始化一个线程池,需要不同的参数去初始化,
int corePoolSize
-> 该线程池中核心线程数最大值
核心线程:
线程池新建线程的时候,如果当前线程总数小于corePoolSize
,则新建的是核心线程,如果超过corePoolSize
,则新建的是非核心线程。
核心线程默认情况下会一直存活在线程池中,即使这个核心线程啥也不干(闲置状态)。
如果指定ThreadPoolExecutor
的allowCoreThreadTimeOut
这个属性为true
,那么核心线程如果不干活(闲置状态)的话,超过一定时间(时长下面参数决定),就会被销毁掉。
int maximumPoolSize
-> 该线程池中线程总数最大值
线程总数 = 核心线程数 + 非核心线程数。核心线程在上面解释过了,这里说下非核心线程: 不是核心线程的线程。
long keepAliveTime
-> 该线程池中非核心线程闲置超时时长
一个非核心线程,如果不干活(闲置状态)的时长超过这个参数所设定的时长,就会被销毁掉。
如果设置allowCoreThreadTimeOut = true
,则会作用于核心线程。
TimeUnit unit
-> keepAliveTime的单位
keepAliveTime
的单位,TimeUnit
是一个枚举类型,其包括:
NANOSECONDS
,MICROSECONDS
,MILLISECONDS
,SECONDS
,MINUTES
,HOURS
,DAYS
。
BlockingQueue<Runnable> workQueue
-> 该线程池中的任务队列
该线程池中的任务队列:维护着等待执行的Runnable
对象
当所有的核心线程都在干活时,新添加的任务会被添加到这个队列中等待处理,如果队列满了,则新建非核心线程执行任务
排队有三种通用策略:
直接提交 工作队列的默认选项是 SynchronousQueue,它将任务直接提交给线程而不保持它们。在此,如果不存在可用于立即运行任务的线程,则试图把任务加入队列将失败,因此会构造一个新的线程。此策略可以避免在处理可能具有内部依赖性的请求集时出现锁。直接提交通常要求无界 maximumPoolSizes 以避免拒绝新提交的任务。当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线程具有增长的可能性。
无界队列 使用无界队列(例如,不具有预定义容量的 LinkedBlockingQueue)将导致在所有corePoolSize 线程都忙时新任务在队列中等待。这样,创建的线程就不会超过 corePoolSize。(因此,maximumPoolSize的值也就无效了。)当每个任务完全独立于其他任务,即任务执行互不影响时,适合于使用无界队列;例如,在 Web页服务器中。这种排队可用于处理瞬态突发请求,当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线程具有增长的可能性。
有界队列 当使用有限的 maximumPoolSizes时,有界队列(如 ArrayBlockingQueue)有助于防止资源耗尽,但是可能较难调整和控制。队列大小和最大池大小可能需要相互折衷:使用大型队列和小型池可以最大限度地降低 CPU 使用率、操作系统资源和上下文切换开销,但是可能导致人工降低吞吐量。如果任务频繁阻塞(例如,如果它们是 I/O边界),则系统可能为超过您许可的更多线程安排时间。使用小型队列通常要求较大的池大小,CPU使用率较高,但是可能遇到不可接受的调度开销,这样也会降低吞吐量。
常用的workQueue
类型:
SynchronousQueue
:这个队列接收到任务的时候,会直接提交给线程处理,而不保留它,如果所有线程都在工作怎么办?那就新建一个线程来处理这个任务!所以为了保证不出现<线程数达到了maximumPoolSize
而不能新建线程>的错误,使用这个类型队列的时候,maximumPoolSize
一般指定成Integer.MAX_VALUE
,即无限大。一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQueue,静态工厂方法Executors.newCachedThreadPool使用了这个队列。
LinkedBlockingQueue
:这个队列接收到任务的时候,如果当前线程数小于核心线程数,则新建线程(核心线程)处理任务;如果当前线程数等于核心线程数,则进入队列等待。由于这个队列没有最大值限制,即所有超过核心线程数的任务都将被添加到队列中,这也就导致了maximumPoolSize
的设定失效,因为总线程数永远不会超过corePoolSize
。一个基于链表结构的阻塞队列,此队列按FIFO (先进先出) 排序元素,吞吐量通常要高于ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool()使用了这个队列。
ArrayBlockingQueue
:可以限定队列的长度,接收到任务的时候,如果没有达到corePoolSize
的值,则新建线程(核心线程)执行任务,如果达到了,则入队等候,如果队列已满,则新建线程(非核心线程)执行任务,又如果总线程数到了maximumPoolSize
,并且队列也满了,则发生错误。是一个基于数组结构的有界阻塞队列,此队列按 FIFO(先进先出)原则对元素进行排序。
DelayQueue
:队列内元素必须实现Delayed
接口,这就意味着你传进去的任务必须先实现Delayed
接口。这个队列接收到任务时,首先先入队,只有达到了指定的延时时间,才会执行任务。一个具有优先级得无限阻塞队列。
注意点: 我们在使用线程池的时候,有一些需要注意:
- SynchronousQueue通常要求maximumPoolSize是无界的
- 使用无界队列,不会触发maximumPoolSize。所以可能会触发内存泄漏。
ThreadFactory threadFactory
-> 创建线程的方式
默认使用Executors内部类DefaultThreadFactory,可以通过实现ThreadFactory接口,写自己的Factory,通过线程工厂给每个创建出来的线程设置更有意义的名字,Debug和定位问题时非常又帮助;
RejectedExecutionHandler handler
-> 饱和策略
当队列和线程池都满了,说明线程池处于饱和状态,那么必须采取一种策略处理提交的新任务。
当拒绝处理任务时的策略,有以下四种取值:
ThreadPoolExecutor.AbortPolicy;//丢弃任务并抛出RejectedExecutionException异常。
ThreadPoolExecutor.DiscardPolicy;//也是丢弃任务,但是不抛出异常。
ThreadPoolExecutor.DiscardOldestPolicy;//丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
ThreadPoolExecutor.CallerRunsPolicy;//由调用线程处理该任务
//当然也可以根据应用场景需要来实现RejectedExecutionHandler接口自定义策略。如记录日志或持久化不能处理的任务。
向ThreadPoolExecutor添加任务
通过ThreadPoolExecutor.execute(Runnable command)
或者AbstractExecutorService.<T> Future<T> submit(Callable<T> task)
方法即可向线程池内添加一个任务。
execute()
方法实际上是Executor
中声明的方法,在ThreadPoolExecutor
进行了具体的实现,这个方法是ThreadPoolExecutor
的核心方法,通过这个方法可以向线程池提交一个任务,交由线程池去执行。
submit()
方法是在ExecutorService
中声明的方法,在AbstractExecutorService
就已经有了具体的实现,在ThreadPoolExecutor
中并没有对其进行重写,这个方法也是用来向线程池提交任务的,但是它和execute()
方法不同,它能够返回任务执行的结果,去看submit()
方法的实现,会发现它实际上还是调用的execute()
方法,只不过它利用了Future
来获取任务执行结果。
ThreadPoolExecutor的策略
上面介绍参数的时候其实已经说到了ThreadPoolExecutor
执行的策略,这里给总结一下,当一个任务被添加进线程池时:
- 线程数量未达到
corePoolSize
,则新建一个线程(核心线程)执行任务 - 线程数量达到了
corePools
,则将任务移入队列等待 - 队列已满,新建线程(非核心线程)执行任务
- 队列已满,总线程数又达到了
maximumPoolSize
,就会由RejectedExecutionHandler
抛出异常
1.3.2 常见四种线程池的创建方式。
public class WorkerThread implements Runnable {
@Override
public void run() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("thread:" + Thread.currentThread().getName() + "is finished.");
}
}
newSingleThreadExecutor() 创建一个单线程的线程池。这个线程池只有一个线程在工作,也就是相当于单线程串行执行所有任务。如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它。此线程池保证所有任务的执行顺序按照任务的提交顺序执行。
public class NewSingleThreadExecutorLearn {
public static void main(String[] args) {
//newFixedThreadPool
ExecutorService service = Executors.newSingleThreadExecutor();
for (int i = 0 ;i< 20;i++) {
Thread thread = new Thread(new WorkerThread(), "thread:"+i);
service.execute(thread);
}
service.shutdown();
}
}
output:
thread:pool-1-thread-1is finished.
thread:pool-1-thread-1is finished.
thread:pool-1-thread-1is finished.
thread:pool-1-thread-1is finished.
...
newFixedThreadPool() 创建固定大小的线程池。每次提交一个任务就创建一个线程,直到线程达到线程池的最大大小。线程池的大小一旦达到最大值就会保持不变,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。
public class FixedThreadExecutorLearn {
public static void main(String[] args) {
//newFixedThreadPool
ExecutorService service = Executors.newFixedThreadPool(3);
for (int i = 0 ;i< 2000;i++) {
Thread thread = new Thread(new WorkerThread(), "thread:"+i);
service.execute(thread);
}
service.shutdown();
}
}
output:
thread:pool-1-thread-3is finished.
thread:pool-1-thread-1is finished.
thread:pool-1-thread-2is finished.
thread:pool-1-thread-2is finished.
thread:pool-1-thread-1is finished.
thread:pool-1-thread-3is finished.
thread:pool-1-thread-1is finished.
thread:pool-1-thread-3is finished.
thread:pool-1-thread-2is finished.
...
newCachedThreadPool 创建一个可缓存的线程池。如果线程池的大小超过了处理任务所需要的线程,
那么就会回收部分空闲(60秒不执行任务)的线程,当任务数增加时,此线程池又可以智能的添加新线程来处理任务。此线程池不会对线程池大小做限制,线程池大小完全依赖于操作系统(或者说JVM
)能够创建的最大线程大小。
public class NewCachedThreadPoolLearn {
public static void main(String[] args) {
//newFixedThreadPool
ExecutorService service = Executors.newCachedThreadPool();
for (int i = 0 ;i< 20;i++) {
Thread thread = new Thread(new WorkerThread(), "thread:"+i);
service.execute(thread);
}
service.shutdown();
}
}
output:
thread:pool-1-thread-1is finished.
thread:pool-1-thread-3is finished.
thread:pool-1-thread-11is finished.
thread:pool-1-thread-10is finished.
thread:pool-1-thread-6is finished.
thread:pool-1-thread-7is finished.
thread:pool-1-thread-4is finished.
thread:pool-1-thread-8is finished.
thread:pool-1-thread-9is finished.
thread:pool-1-thread-5is finished.
thread:pool-1-thread-2is finished.
thread:pool-1-thread-12is finished.
thread:pool-1-thread-19is finished.
thread:pool-1-thread-20is finished.
...
2. 源码分析
2.1 变量讲述
2.1.1 ctl
它记录了当前线程池的运行状态和线程池内的线程数;一个变量是怎么记录两个值的呢?它是一个AtomicInteger
类型,有32个字节,这个32个字节中,高3位用来标识线程池的运行状态,低29位用来标识线程池内当前存在的线程数;
//利用低29位表示线程池中线程数,通过高3位表示线程池的运行状态:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
通过注释,本身就说明了很多。
线程池状态
线程池有5种状态,这五种状态由五个静态常量标识,每种状态的值的大小
//32-3 = 29 ,低位29位存储线程池中线程数
private static final int COUNT_BITS = Integer.SIZE - 3;
//线程池最多可以有536870911个线程,一般绝对创建不到这么大
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
//RUNNING线程池能接受新任务(只有running状态才会接收新任务),并且可以运行队列中的任务
//-1的二进制为32个1,移位后为:11100000000000000000000000000000
private static final int RUNNING = -1 << COUNT_BITS;
//SHUTDOWN不再接受新任务,但仍可以执行队列中的任务
//0的二进制为32个0,移位后还是全0
private static final int SHUTDOWN = 0 << COUNT_BITS;
//STOP不再接受新任务,不再执行队列中的任务,而且要中断正在处理的任务
//1的二进制为前面31个0,最后一个1,移位后为:00100000000000000000000000000000
private static final int STOP = 1 << COUNT_BITS;
//TIDYING所有任务均已终止,workerCount的值为0,转到TIDYING状态的线程即将要执行terminated()钩子方法.
//2的二进制为01000000000000000000000000000000
private static final int TIDYING = 2 << COUNT_BITS;
//TERMINATED terminated()方法执行结束.
//3移位后01100000000000000000000000000000
private static final int TERMINATED = 3 << COUNT_BITS;
需要知道的是:
- 只有RUNNING状态下才会接收新任务;
- 只有RUNNING状态和SHUTDOWN状态才会执行任务队列中的任务;
- 其它状态都不会接收新任务,不会执行任务队列中的任务;
线程池之间的状态的转换如下:
RUNNING -> SHUTDOWN
:调用了shutdown
方法,线程池实现了finalize
方法,在里面调用了shutdown
方法,因此shutdown
可能是在finalize
中被隐式调用的(RUNNING or SHUTDOWN) -> STOP
: 调用了shutdownNow
方法SHUTDOWN -> TIDYING
: 当队列和线程池均为空的时候STOP -> TIDYING
: 当线程池为空的时候TIDYING -> TERMINATED
: 处于TIDYING
状态后最终会进入TERMINATED
状态
与ctl相关的三个方法:
/获取线程池的状态,也就是将ctl低29位都置为0后的值
private static int runStateOf(int c) { return c & ~CAPACITY; }
//获取线程池中线程数,也就是ctl低29位的值
private static int workerCountOf(int c) { return c & CAPACITY; }
//设置ctl的值,rs为线程池状态,wc为线程数;
private static int ctlOf(int rs, int wc) { return rs | wc; }
2.1.2 workers
/**
* Set containing all worker threads in pool. Accessed only when
* holding mainLock.
* 用来存储线程池中的线程,线程都被封装成了Worker对象
*/
private final HashSet<Worker> workers = new HashSet<Worker>();
2.2 方法讲述
2.2.1 添加任务execute方法
//添加新任务
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
//获取当前线程池的ctl值. --> AtomicInteger ctl
int c = ctl.get();
//如果当前线程数小于核心线程数,这时候任务不会进入任务队列,会创建新的工作线程直接执行任务;
if (workerCountOf(c) < corePoolSize) {
//添加新的工作线程执行任务,addWorker方法后面分析
if (addWorker(command, true))
return;
//addWorker操作返回false,说明添加新的工作线程失败,则获取当前线程池状态;
//线程池数量小于corePoolSize情况下,创建新的工作线程失败,是因为线程池的状态发生了改变,已经处于非Running状态,或shutdown状态且任务队列为空
c = ctl.get();
}
//以下两种情况继续执行后面代码
//1.前面的判断中,线程池中线程数小于核心线程数,并且创建新的工作线程失败;
//2.前面的判断中,线程池中线程数大于等于核心线程数
//线程池处于RUNNING状态,说明线程池中线程已经>=corePoolSize,这时候要将任务放入队列中,等待执行;
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
//再次检查线程池的状态,如果线程池状态变了,非RUNNING状态下不会接收新的任务,需要将任务移除,成功从队列中删除任务,则执行reject方法处理任务;
if (! isRunning(recheck) && remove(command))
reject(command);
//如果线程池的状态没有改变,且池中无线程
else if (workerCountOf(recheck) == 0)
// 两种情况进入以该分支
//1.线程池处于RUNNING状态,线程池中没有线程了,因为有新任务进入队列所以要创建工作线程(这时候新任务已经在队列中,所以下面创建worker线程时第一个参数,要执行的任务为null,只是创建一个新的工作线程并启动它,让它自己去队列中取任务执行)
//2.线程池处于非RUNNING状态但是任务移除失败,导致任务队列中仍然有任务,但是线程池中的线程数为0,则创建新的工作线程,处理队列中的任务;
addWorker(null, false);
}
// 两种情况执行下面分支:
// 1.非RUNNING状态拒绝新的任务,并且无法创建新的线程,则拒绝任务
// 2.线程池处于RUNNING状态,线程池线程数量已经大于等于coresize,任务就需要放入队列,如果任务入队失败,说明队列满了,则创建新的线程,创建成功则新线程继续执行任务,如果创建失败说明线程池中线程数已经超过maximumPoolSize,则拒绝任务
else if (!addWorker(command, false))
reject(command);
}
2.2.2 添加任务addWorker方法
往线程池中添加工作线程,线程会被封装成Worker
对象,放入到works
线程池中,可以先看下一小节“内部类Worker
”的实现后再看这个方法,也可以先不用管Worker
类,先看addWorker
的实现过程。
它的执行过程如下:
- 增加线程时,先判断当前线程池的状态允不允许创建新的线程,如果允许再判断线程池有没有达到 限制,如果条件都满足,才继续执行。
- 先增加线程数计数
ctl
,增加计数成功后,才会去创建线程。 - 创建线程是通过
work
对象来创建的,创建成功后,将work
对象放入到works
线程池中(就是一个hashSet
)。 - 添加完成后,更新
largestPoolSize
值(线程池中创建过的线程最大数量),最后启动线程,如果参数firstTask
不为null
,则执行第一个要执行的任务,然后循环去任务队列中取任务来执行。
private boolean addWorker(Runnable firstTask, boolean core) {
//以下for循环,增加线程数计数,ctl,只增加计数,不增加线程,只有增加计数成功,才会增加线程
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
//这个代码块的判断,如果是STOP,TIDYING和TERMINATED这三种状态,都会返回false。(这几种状态不会接收新任务,也不再执行队列中的任务,中断当前执行的任务)
//如果是SHUTDOWN,firstTask不为空(SHUTDOWN状态下,不会接收新任务)或 者workQueue是空(队列里面都没有任务了,也就不需要线程了),返回false。
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
//只有满足以下两种条件才会继续创建worker线程对象
//1.RUNNING状态,
//2.shutdown状态,且firstTask为null(因为shutdown状态下不再接收新任务),队列不是空(shutdown状态下需要继续处理队列中的任务)
// 通过自旋的方式增加线程池线程数
for (;;) {
int wc = workerCountOf(c);
//1.如果线程数大于最大可创建的线程数CAPACITY,直接返回false;
//2.判断当前是要根据corePoolSize,还是maximumPoolSize进行创建线程(corePoolSize是基本线程池大小,未达到corePoolSize前按照corePollSize来限制线程池大小,达到corePoolSize后,并且任务队列也满了,才会按照maximumPoolSize限制线程池大小)
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))//将WorkerCount通过CAS操作增加1,成功的话直接跳出两层循环;
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)//否则则判断当前线程池的状态,如果现在获取到的状态与进入自旋的状态不一致的话,那么则通过continue retry重新进行状态的判断
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
//以下代码块是创建Worker线程对象,并启动
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask); //创建一个新的Worker对象
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock(); //获取线程池的重入锁后,
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
// RUNNING状态 || SHUTDONW状态下,没有新的任务,只是处理任务队列中剩余的任务;
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
//如果线程是活动状态,直接抛出异常,因为线程刚创建,还没有执行start方法,一定不会是活动状态;
if (t.isAlive())
throw new IllegalThreadStateException();
// 将新启动的线程添加到线程池中
workers.add(w);
// 更新largestPoolSize的值,largestPoolSize成员变量保存线程池中创建过的线程最大数量
int s = workers.size();
//将线程池中创建过的线程最大数量,设置给largestPoolSize,可以通过getLargestPoolSize()方法获取,注意这个方法只能在 ThreadPoolExecutor中调用,Executer,ExecuterService,AbstractExecutorService中都是没有这个方法的
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
// 启动新添加的线程,这个线程首先执行firstTask,然后不停的从队列中取任务执行
// 当等待keepAlieTime还没有任务执行则该线程结束。见runWoker和getTask方法的代码。
if (workerAdded) {
t.start();
workerStarted = true;
}
}
}finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
}
2.3 内部类Worker
它是ThreadPoolExecutor的一个内部类:
private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
...
}
由它的定义可以知它实现了Runnable
接口,是一个线程,还继承了AQS
类,实现了加锁机制。
它利用AQS框架实现了一个简单的非重入的互斥锁, 实现互斥锁主要目的是为了中断的时候判断线程是在空闲还是运行,它的state
只有三个值 ,初始状态为不可加锁状态-1,无锁状态为0,加锁状态为1,可以看shutdown
、shutdownNow
、runWorker
方法来分析它锁的作用。
2.3.1 Worker的构造方法
构造方法里面要重点关注一下getThreadFactory()
这个方法
//参数为Worker线程运行后第一个要执行的任务
Worker(Runnable firstTask) {
//设置ASQ的state为-1 设置worker处于不可加锁的状态,看后面的tryAcquire方法,只有state为0时才允许加锁,worker线程运行以后才会把state置为0
setState(-1);
//设置第一个运行的任务
this.firstTask = firstTask;
//创建线程,将this自己传入进去;getThreadFactory()见后面详解
this.thread = getThreadFactory().newThread(this);
}
2.3.2 Worker的成员变量
//被封装的线程,就是它自己;
final Thread thread;
//传入的它要执行的第一个任务,如果firstTask为空就从任务队列中取任务执行
Runnable firstTask;
//记录执行完成的任务数量,如果执行任务过程中出现异常,仍然会计数;
volatile long completedTasks
2.3.3 worker线程的加锁解锁
worker
的加锁解锁机制是基于AQS
框架的,要完全弄明白它的加锁解锁机制请看AQS
框架的实现,在这里只是简单介绍一下:
//尝试加锁方法,将状态从0设置为1;如果不是0则加锁失败,在worker线程没有启动前是-1状态,无法加锁
//该方法重写了父类AQS的同名方法
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
//尝试释放锁的方法,直接将state置为0
//该方法重写了父类AQS的同名方法
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
//注意:tryAcquire与tryRelease是重写了AQS父类的方法,且不可以直接调用,它们被以下方法调用实现加锁解锁操作
//加锁:acquire法是它父类AQS类的方法,会调用tryAcquire方法加锁
public void lock() { acquire(1); }
//尝试加锁
public boolean tryLock() { return tryAcquire(1); }
//解锁:release方法是它父类AQS类的方法,会调用tryRelease方法
public void unlock() { release(1); }
//返回锁状态
public boolean isLocked() { return isHeldExclusively(); }
2.3.4 worker线程的加锁解锁
看完了Worker
线程的创建,再来看看Worker
线程的运行,Worker
的run
方法中会调用runWorker
方法来获循环取任务并执行。
final void runWorker(Worker w) {
//当前线程
Thread wt = Thread.currentThread();
//获取当前Worker线程创建时,指定的第一个要执行的任务,也可以不指定任务,那么它自己就会去任务队列中取任务;
Runnable task = w.firstTask;
w.firstTask = null;
// 在构造方法里面将state设置为了-1,执行该方法就将state置为了0,这样就可以加锁了,-1状态下是无法加锁的,看Worker类的tryAcquire方法
w.unlock();
//该变量代表任务执行是否发生异常,默认值为true发生了异常,后面会用到这个变量
boolean completedAbruptly = true;
try {
//如果创建worker时传入了第一个任务,则执行第一个任务,否则 从任务队列中获取任务getTask(),getTask()后面分析;
while (task != null || (task = getTask()) != null) {
//线程加锁
w.lock();
/**
* 先判断线程池状态是否允许继续执行任务:
* 1.如果是stop<tidying<terminated(这种状态是不接受任务,且不执行任务的),并且线程是非中断状态
* 2.shutingdown,runing ,处于中断状态(并复位中断标志),如果这个时候其它线程执行了shutdownNow方法,shutdownNow方法会把状态设置为STOP
*
* 这个时候则中断线程
**/
if ((
runStateAtLeast(ctl.get(), STOP) ||
(
Thread.interrupted() && runStateAtLeast(ctl.get(), STOP)
)
)
&&
!wt.isInterrupted())
wt.interrupt();
/**
*开始执行任务
*/
try {
//任务执行前要做的处理:这个方法是空的,什么都不做,一般会通过继承ThreadPoolExecute类后重写该方法实现自己的功能;传入参数为当前线程与要执行的任务
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
//任务执行后要做的处理:这个方法也是空的,什么都不做,一般会通过继承ThreadPoolExecute类后重写该方法实现自己的功能;参数为当前任务和执行任务时抛出的异常
afterExecute(task, thrown);
}
} finally {
task = null;
//增加完成任务计数
w.completedTasks++;
w.unlock();
}
}
/**
*退出while循环,线程结束;
**/
//判断task.run()方法是否抛出了异常,如果没有则设置它为false,如果发生了异常,前面会直接抛出,中断方法继续执行,就不会执行下面这句;
completedAbruptly = false;
} finally {
/**
* 线程退出后的处理
*/
processWorkerExit(w, completedAbruptly);
}
}
加锁(w.lock()
)的核心目的
确保线程中断状态的正确性
- 场景:当线程池处于
STOP
状态(正在关闭)时,需要中断所有工作线程。 - 问题:如果不加锁,可能在检查线程池状态和实际中断线程之间发生竞态条件(如
shutdownNow
的并发调用)。 - 解决方案:通过加锁保证以下操作的原子性:
if ((线程池已STOP || 线程被中断且线程池已STOP) && 工作线程未被中断) { wt.interrupt(); // 安全中断 }
锁防止其他线程(如调用 shutdownNow
的线程)在此期间修改线程池状态或工作线程的中断状态。
保护任务执行的生命周期
- 钩子方法:
beforeExecute
和afterExecute
是线程池的可扩展方法,用户可能重写它们。 - 风险:如果这些方法被并发调用(如多个任务同时执行),可能导致用户自定义逻辑的线程安全问题。
- 锁的作用:确保一个
Worker
在同一时刻只执行一个任务,避免钩子方法的并发冲突。
解锁(w.unlock()
)的作用
允许其他线程操作Worker
- 任务窃取(Work Stealing):某些线程池实现(如
ForkJoinPool
)允许其他线程从Worker
的任务队列中窃取任务。 - 锁释放后:其他线程可以安全地访问该
Worker
的状态(如中断它或分配新任务)。 维护completedTasks
的准确性 - 统计计数:
w.completedTasks++
需要原子性,锁保证该计数器不会被并发修改。
为什么必须加锁?
- Worker 本身是一个
AQS
(AbstractQueuedSynchronizer)实现Worker
继承自AQS
,其锁机制用于控制线程的独占性(类似ReentrantLock
)。- 加锁后,当前工作线程独占
Worker
,防止其他线程干扰其任务执行或状态变更。
- 避免与线程池全局控制的冲突
- 例如,
shutdownNow
会遍历所有Worker
并尝试中断线程,锁能保证中断操作的同步性。
需要注意的是,线程如果执行任务过程中,业务代码抛出了异常,那么会将抛出的异常catch
以后抛出,如果是Throwable
类型的异常,则会封装成Error
抛出,最后此线程退出,但是退出之前会将任务完成数照样+1,然后会在控制台上打印Error
或者是RuntimeException
异常,这些异常不会被我们捕获,异常信息只会在控制台打出,不会再我们的log
日志中打出。
所以我们一定要自己去捕获并处理我们的异常,而不能抛出不管。
2.3.5 worker线程从任务队列里面获取任务getTask
这是个for
循环:
- 先判断线程池状态是否允许取任务,不允许直接将线程数量减1 ,直接返回
null
; - 若线程池状态允许取任务,则判断当前线程是否超时 ,若线程超时则将线程池数量减1,直接返回
null
; - 若没有超时,则去任务队列取任务,取到的话返回任务,若超时则设置超时状态,继续循环,在下次循环中处理超时状态
private Runnable getTask() {
// 如果判断当前线程池状态需要启用超时操作,那么任务队列取任务时使用的是带有超时的workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)方法,如果超时,则会将timeOut 变量设置为true,在下次执行for循环时根据timeOut来执行超时操作;
boolean timedOut = false;
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
/**
* 以下分支在stop、tidying、terminated状态,或者在SHUTDOWN状态且任务队列为空时 退出当前线程
*
* 判断线程池状态是否允许继续获取任务:
* RUNNING<shutdown<stop<tidying<terminated;
* rs >= SHUTDOWN,包含两部分判断操作
*1.如果是rs > SHUTDOWN,即状态为stop、tidying、terminated;这时不再处理队列中的任务,直接返回null
*2.如果是rs = SHUTDOWN ,rs>=STOP不成立,这时还需要处理队列中的任务除非队列为空,没有任务要处理,则返回null
*/
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
//自旋锁将ctl减1(也就是将线程池中的线程数减1)
decrementWorkerCount();
return null;
}
/**
* 在RUNNING状态 或 shutdown状态且任务队列不为空时继续往下执行执行
*/
/**
* 以下做线程超时控制:
* 启用超时控制需要满足至少一个条件
* 1.allowCoreThreadTimeOut为true代表核心线程数可以做超时控制;
* 2.如果当前线程数>corePoolSize核心线程数,也可以做超时控制;
* 在以上前提下,再判断当前线程是否需要销毁:
* 1.如果当前线程数大于maximumPoolSize,这肯定是不允许的,需要销毁当前线程;
* 2.如果当前线程上次执行循环时,取任务操作超时,任务队列是空,需要销毁当前线程;
*/
//获取线程池中线程数量
int wc = workerCountOf(c);
// timed变量用于判断是否需要进行超时控制。
// allowCoreThreadTimeOut默认是false,也就是核心线程不允许进行超时;
// wc > corePoolSize,表示当前线程池中的线程数量大于核心线程数量;
// 对于超过核心线程数量的这些线程,需要进行超时控制;
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
/*
* 超时销毁线程需要先满足以下两个条件之一
* 1. wc > maximumPoolSize的情况是因为可能在此方法执行阶段同时执行了setMaximumPoolSize方法;
* 2. timed && timedOut 如果为true,表示当前操作需要进行超时控制,并且上次循环当前线程从任务队列中获取任务发生了超时,没有取到任务;
* 满足上面两个条件之一的情况下,接下来判断,如果线程数量大于1,或者线程队列是空的,那么尝试将workerCount减1,减1成功则返回null,退出当前线程; 如果减1失败,则返回继续执行循环操作,重试。
*/
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
//尝试将线程池线程数量减一
if (compareAndDecrementWorkerCount(c))
return null;
//如果将线程池数量减一不成功则循环重试
continue;
}
/**
* 如果没有超时,则继续去任务队列取任务执行;
*取任务操作
*/
try {
//根据timed(是否启用超时控制)来判断执行poll操作还是执行take()操作还是执行有时间限制的poll操作,并返回获取到的任务;
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
//如果poll操作等待超时,没有取到任务;则将timeOut设置为true;
timedOut = true;
} catch (InterruptedException retry) {
//如果是因为线程中断导致没有取到任务;则设置timedOut=false继续执行循环,取任务
timedOut = false;
}
}
}
2.3.6 Worker线程的退出processWorkerExit
- 如果是处理任务发生异常导致的退出,则以自旋锁的方式将线程数减1;
- 将当前
worker
执行完成的任务数,累加到completedTaskCount
上; - 将当前线程移出线程池;
- 尝试终止线程池;
判断是否要新建worker
线程;
- 如果是
RUNNING
或SHUTDOWN
状态,且worker
是异常结束,会直接执行AddWorker
操作; - 如果是
RUNNING
或SHUTDOWN
状态,且worker
是没有任务可做结束的,且allowCoreThreadTimeOut=false
,且当前线程池中的线程数小于corePoolSize
,则会创建addWorker
线程; - 判断是否要添加一个新的线程:线程池是
RUNNING
或SHUTDOWN
状态,worker
线程如果是异常结束的,则直接添加一个新线程;如果当前线程池中的线程数小于最小线程数,也会创建一个新线程;
private void processWorkerExit(Worker w, boolean completedAbruptly) {
// 如果任务运行异常导致则completedAbruptly=true,则将线程池worker线程数减1,如果是没有获取到任务导致的completedAbruptly=false,则会在getTask()方法里面将线程数减1;
if (completedAbruptly)
//自旋锁将ctl减1(也就是将线程池中的线程数减1)
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//退出前,将本线程已完成的任务数量,添加到已经完成任务的总数中;
completedTaskCount += w.completedTasks;
//线程队列中移除当前线程
workers.remove(w);
} finally {
mainLock.unlock();
}
//尝试停止线程池
tryTerminate();
/**
*判断是否要增加新的线程
*如果满足以下条件则新增线程:
* 一、当线程池是RUNNING或SHUTDOWN状态,且worker是异常结束,那么会直接addWorker;
* 二、当线程池是RUNNING或SHUTDOWN状态,且worker是没有任务可做结束的;
* 1.如果allowCoreThreadTimeOut=true,则判断等待队列不为空 ,且当前线程数是否小于1;
* 2.如果allowCoreThreadTimeOut=false,则判断当前线程数是否小于小于corePoolSize;
* 如果小于,则会创建addWorker线程;
**/
int c = ctl.get();
//当线程池是RUNNING或SHUTDOWN状态,
if (runStateLessThan(c, STOP)) {
//如果非异常状况completedAbruptly=false,也就是没有获取到可执行的任务,则获取线程池允许的最小线程数,如果allowCoreThreadTimeOut为true说明允许核心线程超时,则最小线程数为0,否则最小线程数为corePoolSize;
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
//如果allowCoreThreadTimeOut=true,且任务队列有任务要执行,则将最最小线程数设置为1
if (min == 0 && ! workQueue.isEmpty())
min = 1;
//如果当前线程数大于等于最小线程数,则直接返回
if (workerCountOf(c) >= min)
return; // replacement not needed
}
//以下两种情况会添加一个新的线程
//1.worker是异常结束;
//2.如果是非异常结束,且任务队列里面还有任务,
addWorker(null, false);
}
}
2.4 线程池的关闭
线程池的关闭有两个方法shutdown()
与shutdownNow()
。
shutdown
会将线程池状态设置为SHUTDOWN
状态,然后中断所有空闲线程,然后执行tryTerminate()
方法tryTerminate
这个方法很重要,会在后面分析,来尝试终止线程池。
shutdownNow
会将线程池状态设置为STOP
状态,然后中断所有线程(不管有没有执行任务都设置为中断状态),然后执行tryTerminate()
方法,来尝试终止线程池;
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
// 线程池状态设为SHUTDOWN,如果已经是shutdown<stop<tidying<terminated,也就是非RUNING状态则直接返回
advanceRunState(SHUTDOWN);
// 中断空闲的没有执行任务的线程
interruptIdleWorkers();
onShutdown(); //空方法,子类覆盖实现
} finally {
mainLock.unlock();
}
tryTerminate();
}
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
// STOP状态:不再接受新任务且不再执行队列中的任务。
advanceRunState(STOP);
// 中断所有线程,无论空闲还是在执行任务
interruptWorkers();
// 将任务队列清空,并返回队列中还没有被执行的任务。
tasks = drainQueue();
}finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
这两个方法可以直接调用,来关闭线程池;shutdown
方法还会在线程池被垃圾回收时调用,因为ThreadPoolExecuter
重写了finalize
方法。
protected void finalize() {
shutdown();
}
关于finalize
方法说明:
垃圾回收时,如果判断对象不可达,且覆盖了finalize
方法,则会将对象放入到F-Queue
队列 ,有一个名为Finalizer
的守护线程执行finalize
方法,它的优先级为8,做最后的清理工作,执行finalize
方法完毕后,GC
会再次判断该对象是否可达,若不可达,则进行回收,否则,对象复活。
注意:网上很多人说 ,Finalizer
线程的优先级低,个人认为这是不对的,Finalizer
线程在jdk1.8
的优先级是8,比我们创建线程默认优先级5要高,之前其它版本的jdk我记得导出的线程栈信息里面优先级是5,忘记是哪个版本的jdk
了,即使是5优先级也不比自建的线程默认优先级低,总之我没见过优先级低于5的Finalizer线程;
这个线程会不停的循环等待java.lang.ref.Finalizer.ReferenceQueue
中的新增对象。一旦Finalizer
线程发现队列中出现了新的对象,它会弹出该对象,调用它的finalize()
方法,将该引用从Finalizer
类中移除,因此下次GC
再执行的时候,这个Finalizer
实例以及它引用的那个对象就可以回垃圾回收掉了。
大多数时候,Finalizer
线程能够赶在下次GC
带来更多的Finalizer
对象前清空这个队列,但是当它的处理速度没法赶上新对象创建的速度,对象创建的速度要比Finalizer
线程调用finalize()
结束它们的速度要快,这导致最后堆中所有可用的空间都被耗尽了;
当我们大量线程频繁创建重写了finalizer
方法的对象的情况下,高并发情况下,它可能会导致你内存的溢出;虽然Finalizer
线程优先级高,但是毕竟它只有一个线程;最典型的例子就是数据库连接池,proxool
,对要释放资源的操作加了锁,并在finalized
方法中调用该加锁方法,在高并发情况下,锁竞争严重,finalized
竞争到锁的几率减少,finalized
无法立即释放资源,越来越多的对象finalized()
方法无法被执行,资源无法被回收,最终导致导致oom
;所以覆盖finalized
方法,执行一定要快,不能有锁竞争的操作,否则在高并发下死的很惨;
(proxool
使用了cglib
,它用WrappedConnection
代理实际的Conneciton
。在运行WrappedConnection
的方法时,包括其finalize
方法,都会调用Conneciton.isClosed()
方法去判断是否真的需要执行某些操作。不幸的是JDBC
中的这个方法是同步的,锁是连接对象本身。于是, Finalizer
线程回收刚执行过的WrappedConnection
对象时就总会与还在使用Connection
的各个工作线程争用锁。)
2.5 线程池中线程的中断
线程池的中断也有两个方法
interruptIdleWorkers
中断没有执行任务的线程
interruptWorkers
中断所有线程,不管线程有没有执行任务
//中断空闲线程,没有执行任务的线程会被中断,onlyOne参数用来标识是否只中断一个线程;
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//遍历所有的Worker线程
for (Worker w : workers) {
Thread t = w.thread;
//如果线程没有被中断,w.tryLock()会调用tryAcquire()方法尝试加锁,加锁成功后会中断线程
//为什么要w.tryLock(),因为在runWorker()方法的while循环执行任务之前会加锁,如果已经被加锁说明线程正在执行任务,不能被中断;
if (!t.isInterrupted() && w.tryLock()) {
try {
//中断线程
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
//如果 onlyOne为true, for循环只执行一次就退出
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
/****
* 中断所有正在运行的线程,注意,这里与interruptIdelWorkers()方法不同的是,没有使用worker的AQS锁
*/
private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers)
w.interruptIfStarted();
} finally {
mainLock.unlock();
}
}
2.6 尝试终止线程池tryTerminate
该方法会在很多地方调用,如添加worker
线程失败的addWorkerFailed()
方法,worker
线程跳出执行任务的while
循环退出时的processWorkerExit()
方法,关闭线程池的shutdown()
和shutdownNow()
方法,从任务队列移除任务的remove()
方法。
该方法的作用是检测当前线程池的状态是否可以将线程池终止,如果可以终止则尝试着去终止线程,否则直接返回。
STOP->TIDYING
与SHUTDOWN->TIDYING
状态的转换,就是在该方法中实现的,最终执行terminated()
方法后会把线程状态设置为TERMINATED
的状态。
尝试终止线程池执行过程。
2.6.1 重点内容先判断线程池的状态是否允许被终止
以下状态不可被终止
- 如果线程池的状态是
RUNNING
(不可终止) 或者是TIDYING
(该状态一定执行过了tryTerminate
方法,正在执行或即将执行terminated()
方法,所以不需要重复执行), 或者是TERMINATED
(该状态已经执行完成terminated()
钩子方法,已经是被终止状态了), 以上三种状态直接返回。 - 如果线程池状态是
SHUTDOWN
,而且任务队列不是空的(该状态需要继续处理任务队列中的任务,不可被终止),也直接返回。
以下两种状态线程池可以被终止
- 如果线程池状态是
SHUTDOWN
,而且任务队列是空的(shutdown
状态下,任务队列为空,可以被终止),向下进行。 - 如果线程池状态是
STOP
(该状态下,不接收新任务,不执行任务队列中的任务,并中断正在执行中的线程,可以被终止),向下进行。
2.6.2 线程池状态可以被终止,如果线程池中仍然有线程,则尝试中断线程池中的线程
则尝试中断一个线程然后返回,被中断的这个线程执行完成退出后,又会调用tryTerminate()
方法,中断其它线程,直到线程池中的线程数为0,则继续往下执行。
2.6.3 如果线程池中的线程为0,则将状态设置为TIDYING
,设置成功后执行terminated()
方法,最后将线程状态设置为TERMINATED
final void tryTerminate() {
for (;;) {
int c = ctl.get();
//先判断是否满足终止线程池的条件
//1.如果线程池的状态是RUNNING(不可终止)或者是TIDYING(该状态的线程池即将要执行或正在执行terminated()钩子方法),TERMINATED(该状态已经执行完成terminated()钩子方法),直接返回。
//2.如果线程池状态是SHUTDOWN,而且任务队列不是空的(该状态需要继续处理任务队列中的任务,不可被终止),也直接返回。
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
//以下状态才会继续执行:
//1.如果线程池状态是SHUTDOWN,而且任务队列是空的(shutdown状态下,任务队列为空,可以被终止),向下进行。
//2.如果线程池状态是STOP(该状态下,不接收新任务,不执行任务队列中的任务,并中断正在执行中的线程,可以被终止),向下进行。
// workerCount不为0则还不能停止线程池,而且这时线程都处于空闲等待的状态
// 需要中断让线程“醒”过来,醒过来的线程才能继续处理shutdown的信号。
if (workerCountOf(c) != 0) { // Eligible to terminate
// runWoker方法中w.unlock就是为了可以被中断,getTask方法也处理了中断。
// ONLY_ONE:这里只需要中断1个线程去处理shutdown信号就可以了。
interruptIdleWorkers(ONLY_ONE);
return;
}
//满足以下两个条件才会继续执行
//1.线程池状态是STOP且 工作线程池中的线程wc是0
//2.线程池状态是SHUTDOWN而且工作线程池wc(pool)和任务队列(queue)都是空的
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//进入TIDYING状态,线程池的状态被原子操作ctl.compareAndSet(c, ctlOf(TIDYING, 0)将状态设置为TIDYING,(因为tryTerminate方法会在多处调用,存在竞争)
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
terminated();
} finally {
//进入TERMINATED状态
//进一步在terminated结束之后的finally块中通过ctl.set(ctlOf(TERMINATED, 0))设置为TERMINATED。
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll(); //最后执行termination.signalAll(),会唤醒awaitTermination方法中由于执行termination.awaitNanos(nanos)操作进入等待状态的线程
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
2.7 拒绝策略
以下两种情况会执行拒绝任务操作:
- 如果当前线程池状态为非
RUNNING
装状态 - 当队列满了,
workder
线程数到了最大值,而且没有空闲的worker
线程执行任务。
有内置的以下四种拒绝策略:
AbortPolicy
抛出异常RejectedExecutionException
(默认策略)
CallerRunsPolicy
当前生产者线程执行 (如果线程池被关闭了,以后任务就都要由生产者线程自己去执行了)
DiscardOldestPolicy
将队列中最后一个任务出队,将新的任务入队 (直接丢掉一个旧的,接收一个新的,场景少吧)
DiscardPolicy
什么都不做,相当于忽略当前任务(估计没人愿意这样做)
当然我们也可以通过实现RejectedExecutionHandler
类的rejectedExecution
方法来实现我们自己的拒绝策略。
public interface RejectedExecutionHandler {
/**
* Method that may be invoked by a {@link ThreadPoolExecutor} when
* {@link ThreadPoolExecutor#execute execute} cannot accept a
* task. This may occur when no more threads or queue slots are
* available because their bounds would be exceeded, or upon
* shutdown of the Executor.
*
* <p>In the absence of other alternatives, the method may throw
* an unchecked {@link RejectedExecutionException}, which will be
* propagated to the caller of {@code execute}.
*
* @param r the runnable task requested to be executed
* @param executor the executor attempting to execute this task
* @throws RejectedExecutionException if there is no remedy
*/
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}
2.8 线程状态
线程池提供了一些方法监视线程池的状态,如下所示:
ThreadPoolExecutor pool = new ThreadPoolExecutor(10, 10, 0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>(10));
// 当前线程池中的工作线程数;也就是返回成员变量private final HashSet<Worker> workers = new HashSet<Worker>()的大小
pool.getPoolSize();
// 队列中的任务; 也就是返回成员变量 private final BlockingQueue<Runnable> workQueue;的大小
pool.getQueue().size();
// 线程正在执行的任务; 遍历workers,返回加锁的worker数量(加锁,说明这个线程正在执行任务)
pool.getActiveCount();
// 已经执行完成的任务; ThreadPoolExecutor.completedTaskCount+每个Worker.completedTasks ,线程池记录的完成任务数量和每个worker线程记录的完成的任务的数量;
pool.getCompletedTaskCount();
// 全部的任务数,队列任务+正在执行+已经执行完成
pool.getTaskCount();
// 核心线程数;
pool.getCorePoolSize();
// 最大线程数;
pool.getMaximumPoolSize();
// 线程池中曾经最大的线程数量;
pool.getLargestPoolSize();
// 线程超时时间
pool.getKeepAliveTime(TimeUnit.SECONDS);
// 是否允许coreThread超时;
pool.allowsCoreThreadTimeOut();
注意任务执行失败也会计数,完成的任务数包含实行失败的任务。
2.9 一个线程池实例管理类
自己写了一个管理类,还不完善,先放这里:
/**
* 连接池管理类
*
* @author zqz
*
*/
public class ZQZThreadPool extends ThreadPoolExecutor {
public static final AtomicBoolean lock = new AtomicBoolean();
public static final Map<String, ThreadPoolExecutor> poolManager = new ConcurrentHashMap<String, ThreadPoolExecutor>();
public static final ThreadFactory defaultThreadFactor = new DefaultThreadFactory();
/**
* 获取连接池实例
*
* @param poolName
* 自定义连接池名称前缀 ,更好的区分不同的连接池;
* @param corePoolSize
* 核心线程数
* @param maximumPoolSize
* 最大线程数
* @param keepAliveTime
* 空闲线程存活时间,单位是秒
* @param workQueue
* 任务队列
* @return
*/
public static ThreadPoolExecutor getInstance(String poolName, int corePoolSize, int maximumPoolSize,
long keepAliveTime, BlockingQueue<Runnable> workQueue) {
while (!lock.compareAndSet(false, true))
;
ThreadPoolExecutor pool = poolManager.get(poolName);
try {
if (pool == null) {
pool = new ZQZThreadPool(poolName, corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.SECONDS,
workQueue, defaultThreadFactor);
poolManager.put(poolName, pool);
return pool;
} else {
return poolManager.get(poolName);
}
} finally {
lock.compareAndSet(true, false);
}
}
/**
* 私有构造方法
* @param poolName
* @param corePoolSize
* @param maximumPoolSize
* @param keepAliveTime
* @param unit
* @param workQueue
* @param threadFactory
*/
private ZQZThreadPool(String poolName, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
if (!poolManager.containsKey(poolName)) {
poolManager.put(poolName, this);
}
}
/**
* 返回指定线程池状态
* @param name
* @return
*/
public static PoolInfo monitor(String name) {
ThreadPoolExecutor pool = poolManager.get(name);
if (pool == null)
return null;
PoolInfo poolInfo = new PoolInfo();
// 当前线程池中的工作线程数;
poolInfo.setPoolSize(pool.getPoolSize());
// 队列中的任务;
poolInfo.setQueueSize(pool.getQueue().size());
// 线程正在执行的任务;
poolInfo.setActiveCount(pool.getActiveCount());
// 已经执行完成的任务;
poolInfo.setCompletedTaskCount(pool.getCompletedTaskCount());
// 是否允许coreThread超时;
poolInfo.setAllowsCoreThreadTimeOut(pool.allowsCoreThreadTimeOut());
// 核心线程数;
poolInfo.setCorePoolSize(pool.getCorePoolSize());
// 最大线程数;
poolInfo.setMaximumPoolSize(pool.getMaximumPoolSize());
// 线程池中曾经最大的线程数量;
poolInfo.setLargestPoolSize(pool.getLargestPoolSize());
// 线程超时时间
poolInfo.setKeepAliveTime(pool.getKeepAliveTime(TimeUnit.SECONDS));
// 全部的任务数,队列任务+正在执行+已经执行完成
pool.getTaskCount();
return poolInfo;
}
/**
* The default thread factory
*/
static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
private int stackSize = 0;
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
namePrefix = "zqz-pool-" + poolNumber.getAndIncrement() + "-thread-";
}
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), stackSize);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
/**
* 测试
* @param args
*/
public static void main(String[] args) {
String poolName = "test";
ThreadPoolExecutor pool = ZQZThreadPool.getInstance(poolName, 1, 10, 0, new ArrayBlockingQueue<Runnable>(10));
pool.execute(new Runnable() {
public void run() {
System.out.println("test");
}
});
PoolInfo poolInfo = ZQZThreadPool.monitor(poolName);
System.out.println(poolInfo.getPoolSize());
}
}
自带的各种坑
1. Executors.newFixedThreadPool(10)
new ThreadPoolExecutor(10, 10, 0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
初始化一个指定线程数的线程池,其中corePoolSize == maximumPoolSize
,使用LinkedBlockingQuene
作为阻塞队列,超时时间为0,当线程池没有可执行任务时,也不会释放线程。
因为队列LinkedBlockingQueue
大小为默认的Integer.MAX_VALUE
,可以无限的往里面添加任务,直到内存溢出。
2. Executors.newCachedThreadPool()
new ThreadPoolExecutor(0,Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());
初始化一个可以缓存线程的线程池,默认超时时间60s,线程池的最小线程数时0,但是最大线程数为Integer.MAX_VALUE
,即2147483647,内部使用SynchronousQueue
作为阻塞队列。
因为线程池的最大值了Integer.MAX_VALUE
,会导致无限创建线程;所以,使用该线程池时,一定要注意控制并发的任务数,如果短时有大量任务要执行,就会创建大量的线程,导致严重的性能问题(线程上下文切换带来的开销),线程创建占用堆外内存,如果任务对象也不小,它就会使堆外内存和堆内内存其中的一个先耗尽,导致OOM
。
3. Executors.newSingleThreadExecutor()
new FinalizableDelegatedExecutorService(
new ThreadPoolExecutor(1, 1,0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()
)
);
同newFixedThreadPool
线程池一样,队列用的是LinkedBlockingQueue
,队列大小为默认的Integer.MAX_VALUE
,可以无限的往里面添加任务,直到内存溢出。