前言
最近在写类似于 Tomcat 的 Java Web 容器和 HTTP 服务器,碰到了一些有趣的东西,便打算水水文章,顺便还能加深理解 😎。
线程池
线程池(Thread Pool)是一种基于池化思想管理线程的工具,通过线程池,我们可以做到线程复用,避免频繁创建和销毁线程带来的不必要的开销,同时也避免了线程过多导致操作系统调度困难的问题。
在 JDK 中有两种典型的线程池:
- FixedPool:固定线程数量,当线程池处理不来的时候将待处理的任务放入无限长任务队列中。
- CachedPool:不限线程数量,当线程池处理不来的时候新建临时线程,闲时销毁不活动的线程,任务队列为空。
在《阿里巴巴 Java 开发手册》中有提到我们应该禁止使用这两种线程池,而应该手动 new ThreadPoolExecutor
创建线程池。这是因为当任务很多并且处理不来的时候 FixedPool 会因为任务被积压到任务队列中,撑爆内存,引起 OOM。而 CachedPool 会不断的创建线程来执行任务,这同样会导致撑爆内存,引起 OOM,同时过多的线程切换也会引起严重的性能损失。
大多数情况下,我们需要的是闲时保留一定的线程(核心线程),忙时创建线程。直到达到设定的最大线程数时停止创建。来不及处理的任务放到定长的任务队列中,当任务队列满的时候触发拒绝策略。在线程池闲下来的时候销毁线程,将线程池中的线程数量回收到核心线程数。
然而,JDK 中线程池的工作模式并不是这样的,JDK 中线程池闲时的时候保留一定线程,当核心线程处理不来的时候将任务放到任务队列中,任务队列满的时候才会创建临时线程,此时如果还是处理不来,则触发拒绝策略。这种工作模式也导致了在队列较长的情况下,线程池没有机会创建新的线程,限制了线程池的吞吐性能。
可扩展线程池
为了解决以上的问题,Tomcat 中对 JDK 中的线程池进行了扩展,通过自定义任务队列和增加任务计数器来达到在忙时优先创建临时线程处理任务的作用。
思路
在线程池中增加一个 submittedTaskCount
的任务计数器,记录实际提交到线程池中任务的个数,同时自定义 TaskQueue
任务队列,重写 offer
方法。
- 当
submittedTaskCount
的值小于当前线程池中启动的线程数量时,则将任务直接插入到任务队列中(相当于直接执行该任务)。 - 若大于或等于,则检查当前线程池是否已经到达了最大线程数,如果还未到最大线程数,则返回
false
,制造任务队列已满的假象。 - 此时将任务重新插入线程池,线程池就会创建新的线程来执行任务。
- 若已经达到最大线程数,则将任务放入任务队列,等待执行。
- 若任务队列已经满了,重新插入任务队列的时候依旧会失败,此时就触发拒绝策略。
实现
具体代码请到 Github 查看
public class TaskQueue<R extends Runnable>
extends LinkedBlockingQueue<Runnable> {
// ...
@Override
public boolean offer(final Runnable runnable) {
// 未设置线程池的时候无法获取已提交的数量,抛出异常
if (executor == null) {
throw new RejectedExecutionException(
"The task queue does not have executor!"
);
}
final int currentPoolThreadSize = executor.getPoolSize();
// 已提交的任务数量少于线程池当前启动的线程数量,则直接添加到工作队列中
if (executor.getSubmittedTaskCount() < currentPoolThreadSize) {
return super.offer(runnable);
}
// 判断当前线程数量是否达到最大线程数量,如果未达到,则返回 false,让线程池优先新建线程
if (currentPoolThreadSize < executor.getMaximumPoolSize()) {
return false;
}
// 当当前线程数量达到最大线程数量的时候,此时将任务添加到任务队列中
return super.offer(runnable);
}
public boolean retryOffer(
final Runnable o,
final long timeout,
final TimeUnit unit
)
throws InterruptedException {
if (executor.isShutdown()) {
throw new RejectedExecutionException("Executor is shutdown!");
}
// 重试插入
return super.offer(o, timeout, unit);
}
}
public class TaskQueue<R extends Runnable>
extends LinkedBlockingQueue<Runnable> {
// ...
@Override
public boolean offer(final Runnable runnable) {
// 未设置线程池的时候无法获取已提交的数量,抛出异常
if (executor == null) {
throw new RejectedExecutionException(
"The task queue does not have executor!"
);
}
final int currentPoolThreadSize = executor.getPoolSize();
// 已提交的任务数量少于线程池当前启动的线程数量,则直接添加到工作队列中
if (executor.getSubmittedTaskCount() < currentPoolThreadSize) {
return super.offer(runnable);
}
// 判断当前线程数量是否达到最大线程数量,如果未达到,则返回 false,让线程池优先新建线程
if (currentPoolThreadSize < executor.getMaximumPoolSize()) {
return false;
}
// 当当前线程数量达到最大线程数量的时候,此时将任务添加到任务队列中
return super.offer(runnable);
}
public boolean retryOffer(
final Runnable o,
final long timeout,
final TimeUnit unit
)
throws InterruptedException {
if (executor.isShutdown()) {
throw new RejectedExecutionException("Executor is shutdown!");
}
// 重试插入
return super.offer(o, timeout, unit);
}
}
public class ThreadPoolExecutor
extends java.util.concurrent.ThreadPoolExecutor {
private final AtomicInteger submittedTaskCount = new AtomicInteger(0);
// ...
public int getSubmittedTaskCount() {
return this.submittedTaskCount.get();
}
@Override
protected void afterExecute(final Runnable r, final Throwable t) {
// 完成任务后将提交的数量递减一,代表已经完成一个任务
this.submittedTaskCount.decrementAndGet();
}
@Override
@SuppressWarnings("rawtypes")
public void execute(final Runnable command) {
if (command == null) {
throw new NullPointerException();
}
// 提交任务的时候递增一,代表有新的任务加入队列
submittedTaskCount.incrementAndGet();
try {
// 实际执行任务
super.execute(command);
} catch (final RejectedExecutionException rx) {
// 如果触发拒绝策略,说明有可能是未达到最大线程数,或者工作队列满
final TaskQueue queue = (TaskQueue) super.getQueue();
try {
// 尝试重新插入到工作队列
if (!queue.retryOffer(command, 0, TimeUnit.MILLISECONDS)) {
// 插入失败,说明工作队列实际上满了,触发实际的拒绝策略
submittedTaskCount.decrementAndGet();
throw new RejectedExecutionException(
"Queue capacity is full.",
rx
);
}
// else 插入成功,说明工作队列未满,只是未达到最大线程数,线程创建达到要求的时候就会执行
} catch (final InterruptedException x) {
submittedTaskCount.decrementAndGet();
throw new RejectedExecutionException(x);
}
} catch (final Throwable t) {
// 出现其他异常,则抛出异常
submittedTaskCount.decrementAndGet();
throw t;
}
}
}
public class ThreadPoolExecutor
extends java.util.concurrent.ThreadPoolExecutor {
private final AtomicInteger submittedTaskCount = new AtomicInteger(0);
// ...
public int getSubmittedTaskCount() {
return this.submittedTaskCount.get();
}
@Override
protected void afterExecute(final Runnable r, final Throwable t) {
// 完成任务后将提交的数量递减一,代表已经完成一个任务
this.submittedTaskCount.decrementAndGet();
}
@Override
@SuppressWarnings("rawtypes")
public void execute(final Runnable command) {
if (command == null) {
throw new NullPointerException();
}
// 提交任务的时候递增一,代表有新的任务加入队列
submittedTaskCount.incrementAndGet();
try {
// 实际执行任务
super.execute(command);
} catch (final RejectedExecutionException rx) {
// 如果触发拒绝策略,说明有可能是未达到最大线程数,或者工作队列满
final TaskQueue queue = (TaskQueue) super.getQueue();
try {
// 尝试重新插入到工作队列
if (!queue.retryOffer(command, 0, TimeUnit.MILLISECONDS)) {
// 插入失败,说明工作队列实际上满了,触发实际的拒绝策略
submittedTaskCount.decrementAndGet();
throw new RejectedExecutionException(
"Queue capacity is full.",
rx
);
}
// else 插入成功,说明工作队列未满,只是未达到最大线程数,线程创建达到要求的时候就会执行
} catch (final InterruptedException x) {
submittedTaskCount.decrementAndGet();
throw new RejectedExecutionException(x);
}
} catch (final Throwable t) {
// 出现其他异常,则抛出异常
submittedTaskCount.decrementAndGet();
throw t;
}
}
}
过程注释里都写了,这里就不多介绍了。另外,文中的代码并不是完整的,主要是不想文章又臭又长,一堆代码,所以不要直接复制粘贴就运行哦。
结语
最近总算闲下来了,所以最近偶尔会更新下文章,因为现在在写类似 Tomcat 的服务器,所以最近的文章应该都会是偏向这方面的,不废话了,溜了溜了 😂。
浅谈可扩展线程池
https://blog.ixk.me/post/talk-about-scalable-thread-pool许可协议
BY-NC-SA
本文作者
Otstar Lin
发布于
2020/11/17
转载或引用本文时请遵守许可协议,注明出处、不得用于商业用途!