You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

142 lines
16 KiB

This file contains invisible Unicode characters!

This file contains invisible Unicode characters that may be processed differently from what appears below. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to reveal hidden characters.

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

[Java线程池实现原理及其在美团业务中的实践 - 美团技术团队 (meituan.com)](https://tech.meituan.com/2020/04/02/java-pooling-pratice-in-meituan.html)
[动态线程池)](https://dynamictp.cn/)
# 一、线程池
## 1.1 什么是线程池
线程池Thread Pool是一种基于池化思想管理线程的工具经常出现在多线程服务器中如MySQL。
线程过多会带来额外的开销,其中包括创建销毁线程的开销、调度线程的开销等等,同时也降低了计算机的整体性能。线程池维护多个线程,等待监督管理者分配可并发执行的任务。这种做法,一方面避免了处理任务时创建销毁线程开销的代价,另一方面避免了线程数量膨胀导致的过分调度问题,保证了对内核的充分利用。
而本文描述线程池是JDK中提供的ThreadPoolExecutor类。
当然,使用线程池可以带来一系列好处:
- **降低资源消耗**:通过池化技术重复利用已创建的线程,降低线程创建和销毁造成的损耗。
- **提高响应速度**:任务到达时,无需等待线程创建即可立即执行。
- **提高线程的可管理性**:线程是稀缺资源,如果无限制创建,不仅会消耗系统资源,还会因为线程的不合理分布导致资源调度失衡,降低系统的稳定性。使用线程池可以进行统一的分配、调优和监控。
- **提供更多更强大的功能**线程池具备可拓展性允许开发人员向其中增加更多的功能。比如延时定时线程池ScheduledThreadPoolExecutor就允许任务延期执行或定期执行。
## 1.2 线程池解决的问题是什么
线程池解决的核心问题就是资源管理问题。在并发环境下,系统不能够确定在任意时刻中,有多少任务需要执行,有多少资源需要投入。这种不确定性将带来以下若干问题:
1. 频繁申请/销毁资源和调度资源,将带来额外的消耗,可能会非常巨大。
2. 对资源无限申请缺少抑制手段,易引发系统资源耗尽的风险。
3. 系统无法合理管理内部的资源分布,会降低系统的稳定性。
为解决资源分配这个问题线程池采用了“池化”Pooling思想。池化顾名思义是为了最大化收益并最小化风险而将资源统一在一起管理的一种思想。
“池化”思想不仅仅能应用在计算机领域,在金融、设备、人员管理、工作管理等领域也有相关的应用。
在计算机领域中的表现为统一管理IT资源包括服务器、存储、和网络资源等等。通过共享资源使用户在低投入中获益。除去线程池还有其他比较典型的几种使用策略包括
1. 内存池(Memory Pooling):预先申请内存,提升申请内存速度,减少内存碎片。
2. 连接池(Connection Pooling):预先申请数据库连接,提升申请连接的速度,降低系统的开销。
3. 实例池(Object Pooling):循环使用对象,减少资源在初始化和释放时的昂贵损耗。
# 二、线程池核心设计与实现 ThreadPoolExecutor类
Java中的线程池核心实现类是ThreadPoolExecutor我们首先来看一下ThreadPoolExecutor的UML类图
解下ThreadPoolExecutor的继承关系。
![[912883e51327e0c7a9d753d11896326511272.png]]
ThreadPoolExecutor实现的顶层接口是Executor顶层接口Executor提供了一种思想将任务提交和任务执行进行解耦。用户无需关注如何创建线程如何调度线程来执行任务用户只需提供Runnable对象将任务的运行逻辑提交到执行器(Executor)中由Executor框架完成线程的调配和任务的执行部分。ExecutorService接口增加了一些能力1扩充执行任务的能力补充可以为一个或一批异步任务生成Future的方法2提供了管控线程池的方法比如停止线程池的运行。AbstractExecutorService则是上层的抽象类将执行任务的流程串联了起来保证下层的实现只需关注一个执行任务的方法即可。最下层的实现类ThreadPoolExecutor实现最复杂的运行部分ThreadPoolExecutor将会一方面维护自身的生命周期另一方面同时管理线程和任务使两者良好的结合从而执行并行任务。
## 2.1 运行机制
![[912883e51327e0c7a9d753d11896326511272 1.png]]
线程池在内部实际上构建了一个生产者消费者模型将线程和任务两者解耦并不直接关联从而良好的缓冲任务复用线程。线程池的运行主要分成两部分任务管理、线程管理。任务管理部分充当生产者的角色当任务提交后线程池会判断该任务后续的流转1直接申请线程执行该任务2缓冲到队列中等待线程执行3拒绝该任务。线程管理部分是消费者它们被统一维护在线程池内根据任务请求进行线程的分配当线程执行完任务后则会继续获取新的任务去执行最终当线程获取不到任务的时候线程就会被回收。
## 2.2 生命周期管理
线程池运行的状态,并不是用户显式设置的,而是伴随着线程池的运行,由内部来维护。线程池内部使用一个变量维护两个值:运行状态(runState)和线程数量 (workerCount)。在具体实现中,线程池将运行状态(runState)、线程数量 (workerCount)两个关键参数的维护放在了一起,如下代码所示:
```java
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
```
`ctl`这个AtomicInteger类型是对线程池的运行状态和线程池中有效线程的数量进行控制的一个字段 它同时包含两部分的信息:线程池的运行状态 (runState) 和线程池内有效线程的数量 (workerCount)高3位保存runState低29位保存workerCount两个变量之间互不干扰。用一个变量去存储两个值可避免在做相关决策时出现不一致的情况不必为了维护两者的一致而占用锁资源。通过阅读线程池源代码也可以发现经常出现要同时判断线程池运行状态和线程数量的情况。线程池也提供了若干方法去供用户获得线程池当前的运行状态、线程个数。这里都使用的是位运算的方式相比于基本运算速度也会快很多。
关于内部封装的获取生命周期状态、获取线程池线程数量的计算方法如以下代码所示:
```java
private static int runStateOf(int c) { return c & ~CAPACITY; } //计算当前运行状态 private static int workerCountOf(int c) { return c & CAPACITY; } //计算当前线程数量 private static int ctlOf(int rs, int wc) { return rs | wc; } //通过状态和线程数生成ctl
```
### 2.2.1 ThreadPoolExecutor的运行状态
| 运行状态 | 状态描述 |
| -------- | ---------------------------------------------------------------------- |
| RUNNING | 能接受新提交的任务,并且也能处理阻塞队列中的任务 |
| SHUTDOWN | 关闭状态,不再接受新提交的任务,但却可以继续处理阻塞队列中已保存的任务 |
| STOP | 不能接受新任务,也不处理队列中的任务,会中断正在处理任务的线程 |
| TIDYING | 所有的任务都已终止了workerCount有效线程数为0 |
| TERMINATED| 在terminated() 方法执行完后进入该状态 |
![[912883e51327e0c7a9d753d11896326511272 2.png]]
## 2.3 任务执行机制
### 2.3.1 任务调度
任务调度是线程池的主要入口,当用户提交了一个任务,接下来这个任务将如何执行都是由这个阶段决定的。了解这部分就相当于了解了线程池的核心运行机制。
首先所有任务的调度都是由execute方法完成的这部分完成的工作是检查现在线程池的运行状态、运行线程数、运行策略决定接下来执行的流程是直接申请线程执行或是缓冲到队列中执行亦或是直接拒绝该任务。其执行过程如下
1. 首先检测线程池运行状态如果不是RUNNING则直接拒绝线程池要保证在RUNNING的状态下执行任务。
2. 如果workerCount < corePoolSize,则创建并启动一个线程来执行新提交的任务。
3. 如果workerCount >= corePoolSize且线程池内的阻塞队列未满则将任务添加到该阻塞队列中。
4. 如果workerCount >= corePoolSize && workerCount < maximumPoolSize,且线程池内的阻塞队列已满,则创建并启动一个线程来执行新提交的任务。
5. 如果workerCount >= maximumPoolSize并且线程池内的阻塞队列已满, 则根据拒绝策略来处理该任务, 默认的处理方式是直接抛异常。
![[912883e51327e0c7a9d753d11896326511272 3.png]]
### 2.3.2 任务队列
任务缓冲模块是线程池能够管理任务的核心部分。线程池的本质是对任务和线程的管理,而做到这一点最关键的思想就是将任务和线程两者解耦,不让两者直接关联,才可以做后续的分配工作。线程池中是以生产者消费者模式,通过一个阻塞队列来实现的。阻塞队列缓存任务,工作线程从阻塞队列中获取任务。
阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作是:在队列为空时,获取元素的线程会等待队列变为非空。当队列满时,存储元素的线程会等待队列可用。阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。
![[912883e51327e0c7a9d753d11896326511272 4.png]]
| 名称 | 描述 | |
| ------------------- | ------------------------------------------------------------------------------------------------ | --- |
| ArrayBlockingQueue | 由数组实现的有界阻塞队列,按照先进先出规则进行排序。支持公平锁和非公平锁 | |
| LinkedBlockingQueue | 由链表组成的有界队列按照先进先出规则进行排序默认长度为Integer.MAX_VALUE,默认创建下有容量危险 | |
### 2.3.3 任务申请
任务的执行有两种可能:一种是任务直接由新创建的线程执行。另一种是线程从任务队列中获取任务然后执行,执行完任务的空闲线程会再次去从队列中申请任务再去执行。第一种情况仅出现在线程初始创建的时候,第二种是线程获取任务绝大多数的情况。
![[912883e51327e0c7a9d753d11896326511272 5.png]]
### 2.3.4 任务拒绝
任务拒绝模块是线程池的保护部分线程池有一个最大的容量当线程池的任务缓存队列已满并且线程池中的线程数目达到maximumPoolSize时就需要拒绝掉该任务采取任务拒绝策略保护线程池。
拒绝策略是一个接口,其设计如下:
```java
public interface RejectedExecutionHandler { void rejectedExecution(Runnable r, ThreadPoolExecutor executor); }
```
用户可以通过实现这个接口去定制拒绝策略也可以选择JDK提供的四种已有拒绝策略
| 名称 | 描述 |
| -------------------------------------- | ---------------------------------------------------------------------------------------------------------------------------------------------- |
| ThreadPoolExecutor.AbortPolicy | 丢弃并抛出RejectedExecutionException异常。这是线程池的默认策略。如果是比较关键的业务推荐使用在系统不能承载更大的并发量的时候能够及时发现 |
| ThreadPoolExecutor.DiscardPolicy | 丢弃任务,,但是不抛异常。建议是一些无关紧要的业务采用此策略 |
| ThreadPoolExecutor.DiscardOldestPolicy | 丢弃队列最前面的任务,然后重新提交被拒绝策略。需要根据实际业务中是否允许丢弃老任务来权衡 |
| ThreadPoolExecutor.CallerRunsPolicy | 由调用线程(提交任务的线程)处理该任务。这种情况是需要让所有任务都执行完毕,适合大量的计算任务,最终必须要让每个任务都执行完毕 |
## 2.4 Worker
线程池为了掌握线程的状态并维护线程的生命周期设计了线程池内的工作线程Worker。
```java
private final class Worker extends AbstractQueuedSynchronizer implements Runnable{ final Thread thread;//Worker持有的线程 Runnable firstTask;//初始化的任务可以为null }
```
Worker这个工作线程实现了Runnable接口并持有一个线程thread一个初始化的任务firstTask。thread是在调用构造方法时通过ThreadFactory来创建的线程可以用来执行任务firstTask用它来保存传入的第一个任务这个任务可以有也可以为null。如果这个值是非空的那么线程就会在启动初期立即执行这个任务也就对应核心线程创建时的情况如果这个值是null那么就需要创建一个线程去执行任务列表workQueue中的任务也就是非核心线程的创建。
Worker执行任务的模型如下图所示
![[912883e51327e0c7a9d753d11896326511272 6.png]]
线程池需要管理线程的生命周期需要在线程长时间不运行的时候进行回收。线程池使用一张Hash表去持有线程的引用这样可以通过添加引用、移除引用这样的操作来控制线程的生命周期。这个时候重要的就是如何判断线程是否在运行。
Worker是通过继承AQS使用AQS来实现独占锁这个功能。没有使用可重入锁ReentrantLock而是使用AQS为的就是实现不可重入的特性去反应线程现在的执行状态。
1.lock方法一旦获取了独占锁表示当前线程正在执行任务中。 2.如果正在执行任务,则不应该中断线程。 3.如果该线程现在不是独占锁的状态,也就是空闲的状态,说明它没有在处理任务,这时可以对该线程进行中断。 4.线程池在执行shutdown方法或tryTerminate方法时会调用interruptIdleWorkers方法来中断空闲的线程interruptIdleWorkers方法会使用tryLock方法来判断线程池中的线程是否是空闲状态如果线程是空闲状态则可以安全回收。
![[912883e51327e0c7a9d753d11896326511272 7.png]]
### 2.4.1  Worker线程增加
增加线程是通过线程池中的addWorker方法该方法的功能就是增加一个线程该方法不考虑线程池是在哪个阶段增加的该线程这个分配线程的策略是在上个步骤完成的该步骤仅仅完成增加线程并使它运行最后返回是否成功这个结果。addWorker方法有两个参数firstTask、core。firstTask参数用于指定新增的线程执行的第一个任务该参数可以为空core参数为true表示在新增线程时会判断当前活动线程数是否少于corePoolSizefalse表示新增线程前需要判断当前活动线程数是否少于maximumPoolSize其执行流程如下图所示
![[912883e51327e0c7a9d753d11896326511272 8.png]]
### 2.4.2 Worker线程执行任务
在Worker类中的run方法调用了runWorker方法来执行任务runWorker方法的执行过程如下
1.while循环不断地通过getTask()方法获取任务。 2.getTask()方法从阻塞队列中取任务。 3.如果线程池正在停止,那么要保证当前线程是中断状态,否则要保证当前线程不是中断状态。 4.执行任务。 5.如果getTask结果为null则跳出循环执行processWorkerExit()方法,销毁线程。
![[912883e51327e0c7a9d753d11896326511272 9.png]]