Skip to content

线程池相关参数

线程池解决的问题:

  • 减少线程创建销毁的开销
  • 提供了一种限制和管理资源的方式

JDK原生线程池

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.acc = System.getSecurityManager() == null ?
        null :
    AccessController.getContext();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

七大参数

corePoolSize 和 maximumPoolSize

  • corePoolSize:核心线程数量
  • maximumPoolSize:最大线程数量
    • 提交新任务时,如果正在运行的线程少于 corePoolSize,则会创建一个新线程来处理请求,即使其他工作线程处于空闲状态
    • 如果运行的线程数超过 corePoolSize 但少于 maximumPoolSize,新任务会放到队列里面等待调度
    • 仅当队列已满,并且 maximumPoolSize 大于corePoolSize 时才会创建新线程
    • 队列已满,并且提交任务数超过 maximumPoolSize,任务执行拒绝策略
    • 当线程池中线程数超过 corePoolSize,且超过这部分线程的空闲时间达到 keepAliveTime 时,回收这些线程
    • 如果设置 allowCoreThreadTimeOut(true) 时,线程池中 corePoolSize 范围内的线程空闲时间达到 keepAliveTime 也将回收
    • 通过将 corePoolSize 和 maximumPoolSize 设置为相同,可以创建一个固定大小的线程池
    • 通过将 maximumPoolSize 设置为诸如 Integer.MAX_VALUE 之类的无限值,可以使线程池容纳任意数量的并发任务
    • 通常情况下,核心和最大线程池大小仅在构造时设置,但也可以使用 setCorePoolSize 和 setMaximumPoolSize 在运行时动态更改。
    • ThreadPoolExecutor会根据corePoolSize和maximumPoolSize所设定的界限自动调整线程池大小
    • 按需构建。默认情况下,即使核心线程最初也只有在新任务到达时才会被创建和启动,但可以使用prestartCoreThread或prestartAllCoreThreads 方法在动态情况下覆盖这一行为

keepAliveTime 和 unit

  • keepAliveTime:非核心的空闲线程存活时间
    • 提供了一种在线程池不被积极使用时减少资源消耗的方法
    • 默认情况下,只有在线程数超过corePoolSize时才会应用保持活动策略。但是,只要keepAliveTime值不为零,方法 allowCoreThreadTimeOut(boolean) 可以用来将此超时策略应用于核心线程
  • unit:keepAliveTime 的时间单位

workQueue

  • workQueue:阻塞队列
    • 任何 BlockingQueue 都可以,但是主要分为三种:
    • 有界队列
      • 有界队列(例如 ArrayBlockingQueue)在与有限的 maximumPoolSizes 一起使用时有助于防止资源耗尽
      • 队列大小和最大池大小可以相互权衡:
        • 使用大队列和小池最大程度地减少了CPU使用率、操作系统资源和上下文切换开销,但可能导致人为低的吞吐量。如果任务经常阻塞(例如,如果它们是I/O绑定的),系统可能能够为更多线程安排时间,超过您允许的线程数。
        • 使用小队列通常需要更大的池大小,这使CPU保持繁忙,但可能会遇到无法接受的调度开销,这也会降低吞吐量。
    • 无界队列
      • 使用无界队列(例如没有预定义容量的 LinkedBlockingQueue)会导致在所有corePoolSize线程都忙碌时,新任务在队列中等待。
      • maximumPoolSize的值不会产生任何效果,永远不会创建多于corePoolSize的线程。
      • 当每个任务彼此完全独立,这可能是合适的
      • 当命令的平均到达速度比它们能够被处理的速度更快时,队列长度无限增长
    • 直接交付(同步队列)
      • 将任务直接交给线程处理,默认 SynchronousQueue
      • 需要线程数量是无限的(maximumPoolSizes 无限制),以避免拒绝新提交的任务
      • 避免了在处理可能具有内部依赖关系的请求集时出现死锁
      • 可能导致在命令的平均到达速度比它们能够被处理的速度更快时,线程无限增长

threadFactory

  • threadFactory:线程工厂
    • 新线程是使用ThreadFactory创建的。如果没有指定,将使用 Executors.defaultThreadFactory,它创建的线程都位于同一个ThreadGroup中,具有相同的优先级和非守护线程状态。
    • 通过提供自定义的 ThreadFactory,可以更改线程的名称、线程组、优先级、守护状态等。
    • 如果ThreadFactory在被要求通过newThread创建线程时失败,通过从newThread返回null,执行器将继续运行,但可能无法执行任何任务。线程应具有"modifyThread" RuntimePermission。如果工作线程或其他使用线程池的线程没有此权限,服务可能会降级:配置更改可能无法及时生效,并且关闭的线程池可能会保持一种终止可能但未完成的状态

handler

  • handler:拒绝策略
  • AbortPolicy:默认的拒绝策略,该策略直接抛异常处理。
  • DiscardPolicy:直接抛弃不处理。
  • DiscardOldestPolicy:丢弃队列中最老的任务(在队列头)。
  • CallerRunsPolicy:让调用者处理任务。

【补充】其他框架实现的拒绝策略

  • Dubbo 实现:抛异常之前记录日志,并 dump 线程栈信息,方便定义问题
  • Netty 实现:创建一个新线程来执行任务
  • ActiveMQ 实现:带超时等待尝试(60s)放入队列

JDK 实现的线程池

Executors 类中实现的各种静态工厂方法。但是阿里开发手册其实不建议使用这些。

队列无限长或者线程池无穷大,都会导致 OOM

线程池规约

newFixedThreadPool

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,// 线程池大小固定
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>()// 无界队列
                                 );
}

// 重载方法,可指定线程工厂
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>(),
                                  threadFactory);
}
  • 适用于任务量未知,相对耗时的任务

后续可以更改线程池中线程数量大小

newCachedThreadPool

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,// 线程池大小无限
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>()// 同步队列(直接交付)
                                 );
}
// 也有指定线程工厂的重载方法
  • 创建的全部都是非核心线程(救急线程)
  • 存活时间60s
  • SynchronousQueue 的特点是没有容量,如果没有线程来取任务,任务是放不进去的。所以需要线程池大小是无限的

  • 适用于任务数量密集,但是任务执行时间较短的情况

newSingleThreadExecutor

public static ExecutorService newSingleThreadExecutor() {
    // 装饰器模式
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,// 固定只有一个线程
                                0L, TimeUnit.MILLISECONDS,
                                // 无界队列
                                new LinkedBlockingQueue<Runnable>()));
}

使用场景:希望多个任务排队执行

固定的一个线程不会被释放。如果这个线程在执行任务的过程中因为某些原因挂了,线程池会再起一个线程,保证任意时刻都有且仅有一个线程

Spring的线程池类

ThreadPoolTaskExecutor

源码

public class ThreadPoolTaskExecutor extends ExecutorConfigurationSupport
        implements AsyncListenableTaskExecutor, SchedulingTaskExecutor {

    private final Object poolSizeMonitor = new Object();// 锁要锁的对象

    private int corePoolSize = 1;

    private int maxPoolSize = Integer.MAX_VALUE;// 线程数量无限

    private int keepAliveSeconds = 60;

    private int queueCapacity = Integer.MAX_VALUE;// 无界队列

    private boolean allowCoreThreadTimeOut = false;

    private boolean prestartAllCoreThreads = false;

    @Nullable
    private TaskDecorator taskDecorator;

    @Nullable
    private ThreadPoolExecutor threadPoolExecutor;

    // Runnable decorator to user-level FutureTask, if different
    private final Map<Runnable, Object> decoratedTaskMap =
            new ConcurrentReferenceHashMap<>(16, ConcurrentReferenceHashMap.ReferenceType.WEAK);


    // 被 ExecutorConfigurationSupport 中的 initialize() 方法调用
    @Override
    protected ExecutorService initializeExecutor(
        ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {

        BlockingQueue<Runnable> queue = createQueue(this.queueCapacity);

        ThreadPoolExecutor executor;
        if (this.taskDecorator != null) {
            // 调用JDK的ThreadPoolExecutor
            executor = new ThreadPoolExecutor(
                this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS,
                queue, threadFactory, rejectedExecutionHandler) {
                @Override
                public void execute(Runnable command) {
                    Runnable decorated = taskDecorator.decorate(command);
                    if (decorated != command) {
                        decoratedTaskMap.put(decorated, command);
                    }
                    super.execute(decorated);
                }
            };
        }
        else {
            executor = new ThreadPoolExecutor(
                this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS,
                queue, threadFactory, rejectedExecutionHandler);

        }

        if (this.allowCoreThreadTimeOut) {
            executor.allowCoreThreadTimeOut(true);
        }
        if (this.prestartAllCoreThreads) {
            executor.prestartAllCoreThreads();
        }

        this.threadPoolExecutor = executor;
        return executor;
    }

    protected BlockingQueue<Runnable> createQueue(int queueCapacity) {
        if (queueCapacity > 0) {
            return new LinkedBlockingQueue<>(queueCapacity);// 有界队列使用的链表实现
        }
        else {
            return new SynchronousQueue<>();
        }
    }
}
public abstract class ExecutorConfigurationSupport extends CustomizableThreadFactory
        implements BeanNameAware, InitializingBean, DisposableBean {

    protected final Log logger = LogFactory.getLog(getClass());

    private ThreadFactory threadFactory = this;// CustomizableThreadFactory

    private boolean threadNamePrefixSet = false;

    private RejectedExecutionHandler rejectedExecutionHandler = new ThreadPoolExecutor.AbortPolicy();

    private boolean waitForTasksToCompleteOnShutdown = false;

    private long awaitTerminationMillis = 0;

    @Nullable
    private String beanName;

    @Nullable
    private ExecutorService executor;
}
  • 核心线程数量是 1
  • 最大线程数量无限制
  • 阻塞队列默认是 LinkedBlockingQueue 和 SynchronousQueue
  • 默认的拒绝策略是 AbortPolicy
  • 线程工厂是 CustomizableThreadFactory
  • 非核心线程空闲时间是60秒

使用示例

@Configuration
@EnableAsync // 开启多线程
public class ThreadPoolConfig {

    @Bean("asyncTaskExecutor")
    public Executor asyncServiceExecutor(){
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        // 返回可用处理器的Java虚拟机的数量
        int processNum = Runtime.getRuntime().availableProcessors();
        int corePoolSize = (int) (processNum / (1 - 0.2));
        int maxPoolSize = (int) (processNum / (1 - 0.5));
        // 核心线程数
        executor.setCorePoolSize(corePoolSize);
        // 最大线程数量
        executor.setMaxPoolSize(maxPoolSize);
        // 队列大小
        executor.setQueueCapacity(maxPoolSize * 1000);
        // 线程优先级
        executor.setThreadPriority(Thread.MAX_PRIORITY);
        // 非守护
        executor.setDaemon(false);
        // 线程活跃时间
        executor.setKeepAliveSeconds(300);
        // 默认线程名称
        executor.setThreadNamePrefix("kaichi-");

        return executor;
    }
}

关闭线程池

shutdown

调用 shutdown 并不会立刻结束线程,只是不再接受新任务,已经提交的任务会按部就班执行完毕;方法不会阻塞调用线程的执行

// ExecutorService 接口
void shutdown();// 抽象方法
// ThreadPoolExecutor 实现

public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        // 修改线程池状态
        advanceRunState(SHUTDOWN);
        // 仅打断空闲线程
        interruptIdleWorkers();
        onShutdown(); // hook for ScheduledThreadPoolExecutor
    } finally {
        mainLock.unlock();
    }
    // 尝试终止线程(不在运行状态的可以立刻终结,还在运行的不管)
    tryTerminate();
}

shutdownNow

  • 线程池状态改为 STOP
  • 不再接受新任务
  • 队列中的任务会作为返回结果返回
  • 使用中断(interrupt)的方式打断所有线程
// ThreadPoolExecutor 实现
public List<Runnable> shutdownNow() {
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        advanceRunState(STOP);
        // 打断所有线程
        interruptWorkers();
        // 获取队列中的剩余任务
        tasks = drainQueue();
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
    return tasks;
}