线程池相关参数¶
线程池解决的问题:
- 减少线程创建销毁的开销
- 提供了一种限制和管理资源的方式
JDK原生线程池¶
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
七大参数
corePoolSize 和 maximumPoolSize¶
- corePoolSize:核心线程数量
- maximumPoolSize:最大线程数量
- 提交新任务时,如果正在运行的线程少于 corePoolSize,则会创建一个新线程来处理请求,即使其他工作线程处于空闲状态
- 如果运行的线程数超过 corePoolSize 但少于 maximumPoolSize,新任务会放到队列里面等待调度
- 仅当队列已满,并且 maximumPoolSize 大于corePoolSize 时才会创建新线程
- 队列已满,并且提交任务数超过 maximumPoolSize,任务执行拒绝策略
- 当线程池中线程数超过 corePoolSize,且超过这部分线程的空闲时间达到 keepAliveTime 时,回收这些线程
- 如果设置 allowCoreThreadTimeOut(true) 时,线程池中 corePoolSize 范围内的线程空闲时间达到 keepAliveTime 也将回收
- 通过将 corePoolSize 和 maximumPoolSize 设置为相同,可以创建一个固定大小的线程池。
- 通过将 maximumPoolSize 设置为诸如
Integer.MAX_VALUE
之类的无限值,可以使线程池容纳任意数量的并发任务。 - 通常情况下,核心和最大线程池大小仅在构造时设置,但也可以使用 setCorePoolSize 和 setMaximumPoolSize 在运行时动态更改。
- ThreadPoolExecutor会根据corePoolSize和maximumPoolSize所设定的界限自动调整线程池大小
- 按需构建。默认情况下,即使核心线程最初也只有在新任务到达时才会被创建和启动,但可以使用prestartCoreThread或prestartAllCoreThreads 方法在动态情况下覆盖这一行为
keepAliveTime 和 unit¶
- keepAliveTime:非核心的空闲线程存活时间
- 提供了一种在线程池不被积极使用时减少资源消耗的方法
- 默认情况下,只有在线程数超过corePoolSize时才会应用保持活动策略。但是,只要keepAliveTime值不为零,方法
allowCoreThreadTimeOut(boolean)
可以用来将此超时策略应用于核心线程
- unit:keepAliveTime 的时间单位
workQueue¶
- workQueue:阻塞队列
- 任何 BlockingQueue 都可以,但是主要分为三种:
- 有界队列
- 有界队列(例如 ArrayBlockingQueue)在与有限的 maximumPoolSizes 一起使用时有助于防止资源耗尽
- 队列大小和最大池大小可以相互权衡:
- 使用大队列和小池最大程度地减少了CPU使用率、操作系统资源和上下文切换开销,但可能导致人为低的吞吐量。如果任务经常阻塞(例如,如果它们是I/O绑定的),系统可能能够为更多线程安排时间,超过您允许的线程数。
- 使用小队列通常需要更大的池大小,这使CPU保持繁忙,但可能会遇到无法接受的调度开销,这也会降低吞吐量。
- 无界队列
- 使用无界队列(例如没有预定义容量的 LinkedBlockingQueue)会导致在所有corePoolSize线程都忙碌时,新任务在队列中等待。
- maximumPoolSize的值不会产生任何效果,永远不会创建多于corePoolSize的线程。
- 当每个任务彼此完全独立,这可能是合适的
- 当命令的平均到达速度比它们能够被处理的速度更快时,队列长度无限增长
- 直接交付(同步队列)
- 将任务直接交给线程处理,默认 SynchronousQueue
- 需要线程数量是无限的(maximumPoolSizes 无限制),以避免拒绝新提交的任务
- 避免了在处理可能具有内部依赖关系的请求集时出现死锁
- 可能导致在命令的平均到达速度比它们能够被处理的速度更快时,线程无限增长
threadFactory¶
- threadFactory:线程工厂
- 新线程是使用ThreadFactory创建的。如果没有指定,将使用
Executors.defaultThreadFactory
,它创建的线程都位于同一个ThreadGroup中,具有相同的优先级和非守护线程状态。 - 通过提供自定义的 ThreadFactory,可以更改线程的名称、线程组、优先级、守护状态等。
- 如果ThreadFactory在被要求通过newThread创建线程时失败,通过从newThread返回null,执行器将继续运行,但可能无法执行任何任务。线程应具有"modifyThread" RuntimePermission。如果工作线程或其他使用线程池的线程没有此权限,服务可能会降级:配置更改可能无法及时生效,并且关闭的线程池可能会保持一种终止可能但未完成的状态
- 新线程是使用ThreadFactory创建的。如果没有指定,将使用
handler¶
- handler:拒绝策略
- AbortPolicy:默认的拒绝策略,该策略直接抛异常处理。
- DiscardPolicy:直接抛弃不处理。
- DiscardOldestPolicy:丢弃队列中最老的任务(在队列头)。
- CallerRunsPolicy:让调用者处理任务。
【补充】其他框架实现的拒绝策略
- Dubbo 实现:抛异常之前记录日志,并 dump 线程栈信息,方便定义问题
- Netty 实现:创建一个新线程来执行任务
- ActiveMQ 实现:带超时等待尝试(60s)放入队列
JDK 实现的线程池¶
Executors
类中实现的各种静态工厂方法。但是阿里开发手册其实不建议使用这些。
队列无限长或者线程池无穷大,都会导致 OOM
newFixedThreadPool¶
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,// 线程池大小固定
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()// 无界队列
);
}
// 重载方法,可指定线程工厂
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
- 适用于任务量未知,相对耗时的任务
后续可以更改线程池中线程数量大小
newCachedThreadPool¶
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,// 线程池大小无限
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>()// 同步队列(直接交付)
);
}
// 也有指定线程工厂的重载方法
- 创建的全部都是非核心线程(救急线程)
- 存活时间60s
-
SynchronousQueue 的特点是没有容量,如果没有线程来取任务,任务是放不进去的。所以需要线程池大小是无限的
-
适用于任务数量密集,但是任务执行时间较短的情况
newSingleThreadExecutor¶
public static ExecutorService newSingleThreadExecutor() {
// 装饰器模式
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,// 固定只有一个线程
0L, TimeUnit.MILLISECONDS,
// 无界队列
new LinkedBlockingQueue<Runnable>()));
}
使用场景:希望多个任务排队执行
固定的一个线程不会被释放。如果这个线程在执行任务的过程中因为某些原因挂了,线程池会再起一个线程,保证任意时刻都有且仅有一个线程
Spring的线程池类¶
ThreadPoolTaskExecutor
类
源码¶
public class ThreadPoolTaskExecutor extends ExecutorConfigurationSupport
implements AsyncListenableTaskExecutor, SchedulingTaskExecutor {
private final Object poolSizeMonitor = new Object();// 锁要锁的对象
private int corePoolSize = 1;
private int maxPoolSize = Integer.MAX_VALUE;// 线程数量无限
private int keepAliveSeconds = 60;
private int queueCapacity = Integer.MAX_VALUE;// 无界队列
private boolean allowCoreThreadTimeOut = false;
private boolean prestartAllCoreThreads = false;
@Nullable
private TaskDecorator taskDecorator;
@Nullable
private ThreadPoolExecutor threadPoolExecutor;
// Runnable decorator to user-level FutureTask, if different
private final Map<Runnable, Object> decoratedTaskMap =
new ConcurrentReferenceHashMap<>(16, ConcurrentReferenceHashMap.ReferenceType.WEAK);
// 被 ExecutorConfigurationSupport 中的 initialize() 方法调用
@Override
protected ExecutorService initializeExecutor(
ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {
BlockingQueue<Runnable> queue = createQueue(this.queueCapacity);
ThreadPoolExecutor executor;
if (this.taskDecorator != null) {
// 调用JDK的ThreadPoolExecutor
executor = new ThreadPoolExecutor(
this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS,
queue, threadFactory, rejectedExecutionHandler) {
@Override
public void execute(Runnable command) {
Runnable decorated = taskDecorator.decorate(command);
if (decorated != command) {
decoratedTaskMap.put(decorated, command);
}
super.execute(decorated);
}
};
}
else {
executor = new ThreadPoolExecutor(
this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS,
queue, threadFactory, rejectedExecutionHandler);
}
if (this.allowCoreThreadTimeOut) {
executor.allowCoreThreadTimeOut(true);
}
if (this.prestartAllCoreThreads) {
executor.prestartAllCoreThreads();
}
this.threadPoolExecutor = executor;
return executor;
}
protected BlockingQueue<Runnable> createQueue(int queueCapacity) {
if (queueCapacity > 0) {
return new LinkedBlockingQueue<>(queueCapacity);// 有界队列使用的链表实现
}
else {
return new SynchronousQueue<>();
}
}
}
public abstract class ExecutorConfigurationSupport extends CustomizableThreadFactory
implements BeanNameAware, InitializingBean, DisposableBean {
protected final Log logger = LogFactory.getLog(getClass());
private ThreadFactory threadFactory = this;// CustomizableThreadFactory
private boolean threadNamePrefixSet = false;
private RejectedExecutionHandler rejectedExecutionHandler = new ThreadPoolExecutor.AbortPolicy();
private boolean waitForTasksToCompleteOnShutdown = false;
private long awaitTerminationMillis = 0;
@Nullable
private String beanName;
@Nullable
private ExecutorService executor;
}
- 核心线程数量是 1
- 最大线程数量无限制
- 阻塞队列默认是 LinkedBlockingQueue 和 SynchronousQueue
- 默认的拒绝策略是 AbortPolicy
- 线程工厂是 CustomizableThreadFactory
- 非核心线程空闲时间是60秒
使用示例¶
@Configuration
@EnableAsync // 开启多线程
public class ThreadPoolConfig {
@Bean("asyncTaskExecutor")
public Executor asyncServiceExecutor(){
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 返回可用处理器的Java虚拟机的数量
int processNum = Runtime.getRuntime().availableProcessors();
int corePoolSize = (int) (processNum / (1 - 0.2));
int maxPoolSize = (int) (processNum / (1 - 0.5));
// 核心线程数
executor.setCorePoolSize(corePoolSize);
// 最大线程数量
executor.setMaxPoolSize(maxPoolSize);
// 队列大小
executor.setQueueCapacity(maxPoolSize * 1000);
// 线程优先级
executor.setThreadPriority(Thread.MAX_PRIORITY);
// 非守护
executor.setDaemon(false);
// 线程活跃时间
executor.setKeepAliveSeconds(300);
// 默认线程名称
executor.setThreadNamePrefix("kaichi-");
return executor;
}
}
关闭线程池¶
shutdown¶
调用 shutdown 并不会立刻结束线程,只是不再接受新任务,已经提交的任务会按部就班执行完毕;方法不会阻塞调用线程的执行
// ExecutorService 接口
void shutdown();// 抽象方法
// ThreadPoolExecutor 实现
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
// 修改线程池状态
advanceRunState(SHUTDOWN);
// 仅打断空闲线程
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
// 尝试终止线程(不在运行状态的可以立刻终结,还在运行的不管)
tryTerminate();
}
shutdownNow¶
- 线程池状态改为 STOP
- 不再接受新任务
- 队列中的任务会作为返回结果返回
- 使用中断(interrupt)的方式打断所有线程
// ThreadPoolExecutor 实现
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP);
// 打断所有线程
interruptWorkers();
// 获取队列中的剩余任务
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}