【问题一】:JDK 实现的这个线程池优先把任务放入队列暂存起来,而不是创建更多的线程,它比较适用于执行 CPU 密集型的任务,也就是需要执行大量 CPU 运算的任务。所以当前线程数超过核心线程数时,线程池不会增加线程,而是放在队列里等待核心线程空闲下来。
但是,我们平时开发的 Web 系统通常都有大量的 IO 操作,比方说查询数据库、查询缓存等等。任务在执行 IO 操作的时候 CPU就空闲了下来,这时如果增加执行任务的线程数而不是把任务暂存在队列中,就可以在单位时间内执行更多的任务,大大提高了任务执行的吞吐量。所以你看 Tomcat 使用的线程池就不是 JDK原生的线程池,而是做了一些改造,当线程数超过 coreThreadCount 之后会优先创建线程,直到线程数到达 maxThreadCount,这样就比较适合于 Web系统大量 IO操作的场景了,你在实际运用过程中也可以参考借鉴。
【问题二】:线程池中使用的队列的堆积量也是我们需要监控的重要指标,对于实时性要求比较高的任务来说,这个指标尤为关键。
我在实际项目中就曾经遇到过任务丢给线程池之后,长时间都没有被执行的诡异问题。最初,我认为这是代码的 Bug 导致的,后来经过排查发现,是因为线程池的 coreThreadCount 和 maxThreadCount 设置的比较小,导致任务在线程池里面大量的堆积,在调大了这两个参数之后问题就解决了。跳出这个 bug之后,我就把重要线程池的队列任务堆积量,作为一个重要的监控指标放到了系统监控大屏上。
【问题三】:如果你使用线程池请一定记住不要使用无界队列(即没有设置固定大小的队列)。也许你会觉得使用了无界队列后,任务就永远不会被丢弃,只要任务对实时性要求不高,反正早晚有消费完的一天。但是,大量的任务堆积会占用大量的内存空间,一旦内存空间被占满就会频繁地触发 Full GC,造成服务不可用,我之前排查过的一次 GC 引起的宕机,起因就是系统中的一个线程池使用了无界队列。
线程池的改造方案
我们这里直接学习 Tomcat是如何优化线程池的,在我们平时的使用中如果使用 LinkedBlockingQueue的话,默认是使用Integer.MAX_VALUE,即无界队列(这种情况下如果没有配置队列的 capacity的话,队列始终不会满,那么始终无法进入开启新线程到达 maxThreads个数的地步,则此时配置 maxThreads其实是没有意义的)。
而在 Tomcat中使用的是TaskQueue,TaskQueue的队列 capacity为 maxQueueSize,默认也是 Integer.MAX_VALUE。但是,其重写 offer方法,当其线程池大小小于maximumPoolSize的时候,返回false(换句话说:先创建核心线程数,再创建最大线程数,最后放入阻塞队列。对照问题一),即在一定程度改写了队列满的逻辑,修复了使用 LinkedBlockingQueue默认的 capacity为 Integer.MAX_VALUE的时候,maxThreads失效的"bug"。从而可以继续增长线程到 maxThreads,超过之后,继续放入队列。
所以综上,Tomcat的线程池使用了自己扩展的 taskQueue,修改了offer的逻辑,以做到最小的改动实现了线程池的改造。我们看下 ThreadPoolExecutor的 execute方法是怎么写的:ThreadPoolExecutor#execute
public void execute(Runnable command) {if (command == null)throw new NullPointerException();int c = ctl.get();if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))return;c = ctl.get();}//这里,如果使用workQueue的offer成功的话,那么就不会创建新的线程,如果失败的话,就会走到else if方法进行创建新的线程if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();if (! isRunning(recheck) && remove(command))reject(command);else if (workerCountOf(recheck) == 0)addWorker(null, false);}else if (!addWorker(command, false))reject(command);
}
TaskQueue
public class TaskQueue extends LinkedBlockingQueue<Runnable> {private ThreadPoolExecutor parent = null;@Overridepublic boolean offer(Runnable o) {//we can't do any checksif (parent==null) return super.offer(o);//we are maxed out on threads, simply queue the objectif (parent.getPoolSize() == parent.getMaximumPoolSize()) return super.offer(o);//we have idle threads, just add it to the queueif (parent.getSubmittedCount()<(parent.getPoolSize())) return super.offer(o);//当其线程池大小小于maximumPoolSize的时候,返回falseif (parent.getPoolSize()<parent.getMaximumPoolSize()) return false;//if we reached here, we need to add it to the queuereturn super.offer(o);}
}
我们从这里可以看到:
【1】如果当前线程数已达到 MaximumPoolSize,那么就放入到队列里去;
【2】如果当前线程池的数量大于正在运行的线程数,说明有空闲的线程,那么就将任务放入到队列中去;
【3】若当其线程池大小小于maximumPoolSize的时候,返回false;