图标
创作项目友邻自述归档留言

浅谈 EatWhatYouKill

前言

既然说了 Tomcat 的线程池,那么这次就说说 Jetty 的任务执行策略吧。其实是 XK-Server(类似于 Tomcat 的 Java Web 容器和 HTTP 服务器)结合了二者的一些小特性,代码一两周前就写好了,这次就来水下文章。

什么是 EatWhatYouKill?

对于常规的 IO 操作,我们通常使用以下有几种处理方式:

ProduceConsume:将 IO 的生产和消费统一由一个线程来完成,不断循环生产和消费的过程。很明显,这种模式有个很严重的问题,后面的 IO 事件要等待前面的 IO 事件完成,这样的效率明显很差,越后面的 IO 事件需要等待的事件就越长。

ProduceExecuteConsume:采用一个线程作为生产者,负责收集 IO 数据流,然后将数据推送至队列中,让线程池中的另一线程来处理数据。这样消费的过程就不会影响生产,但是由于使用到了不同的线程,需要额外消耗线程切换的性能,同时当使用另外一个线程的时候有很大概率是调度到 CPU 的另外一个核心,这样就无法再利用到寄存器中的热缓存,需要再次缓存这些数据。

ExecuteProduceConsume:这种模式比较特殊,该模式和 ProduceConsume 类似,都是生产消费都在一个线程里完成,这样就可以充分的利用热缓存。但与其不同的是,该模式可能会新建一个新线程以继续生产和执行任务,线程不应该生产自己不想消费的任务,而应该把生产和消费职责都转移到另一个进程中执行。虽然这个可以很好的利用热缓存,但是同时也带来一个问题:一旦 IO 的消费时间过长,就会导致大量的阻塞。

看完上面的执行模式,你应该对 IO 的生产和消费调度有了一些理解,那么 EatWhatYouKill 又是什么呢?

EatWhatYouKill 是 Jetty 对 ExecuteProduceConsume 模式的改进,在线程池空闲的时候则采用 ExecuteProduceConsume 模式。如果线程池处于繁忙的情况下就切换成 ProduceExecuteConsume 模式。这么做的原因是当线程池处于繁忙的情况下,如果还是用 EPC 模式,那么就没有空闲的线程来接受连接(生产),这样会导致连接器(Connector)拒绝新的请求。这时候 Jetty 做了一个优化,一旦发现线程池处于繁忙的状态,Jetty 就不会在轮询线程里消费任务,而是采用 PEC 模式,将消费任务放到线程池中,由线程池进行调度。这样轮询线程就有办法继续接受新连接,等到线程池不繁忙了,Jetty 就会再次切回 EPC 模式,充分利用热缓存。

实现

完整的代码可以到 Github 查看。

了解了原理,那么就进入分析的阶段了,本文中的代码参考了 Jetty,对 Jetty 的 EWYK 进行简化,同时适配可扩展线程池。

首先我们需要知道,每个任务其实就是一个 Runnable,但直接调用 Runnable 的 run 方法的时候就相当于直接在本线程中执行,对应 EPC 模式。将 Runnable 放到线程池中执行则相当于在另外一个线程执行,对应 PEC 模式。

这样我们就可以定义以下两个方法,用于对应两种模式的执行方式:

private void executeTask(final Runnable task) {
    log.info("Run task in pool");
    this.executor.execute(task);
}

private void runTask(final Runnable task) {
    log.info("Run task in loop");
    task.run();
}
private void executeTask(final Runnable task) {
    log.info("Run task in pool");
    this.executor.execute(task);
}

private void runTask(final Runnable task) {
    log.info("Run task in loop");
    task.run();
}

既然有了执行的方式(消费),那么就需要有提供任务的方式(生产),此时我们可以定义一个接口来描述任务提供者:

interface Producer {
    /**
     * 生产任务
     *
     * @return 任务
     */
    Runnable produce();
}
interface Producer {
    /**
     * 生产任务
     *
     * @return 任务
     */
    Runnable produce();
}

这样我们就有了生产和消费了,接下来就是执行策略了。

首先先定义下状态和模式:

private enum State {
    /**
     * 空闲
     */
    IDLE,
    /**
     * 生产中
     */
    PRODUCING,
    /**
     * 再次生产中
     */
    REPRODUCING,
}

private enum Mode {
    /**
     * 生产-消费分离
     */
    PRODUCE_EXECUTE_CONSUME,
    /**
     * 生产-消费不分离
     */
    EXECUTE_PRODUCE_CONSUME,
}
private enum State {
    /**
     * 空闲
     */
    IDLE,
    /**
     * 生产中
     */
    PRODUCING,
    /**
     * 再次生产中
     */
    REPRODUCING,
}

private enum Mode {
    /**
     * 生产-消费分离
     */
    PRODUCE_EXECUTE_CONSUME,
    /**
     * 生产-消费不分离
     */
    EXECUTE_PRODUCE_CONSUME,
}

然后就是实际的执行方法:

@Override
public void run() {
    this.execute();
}

@Override
public void execute() {
    try (final AutoLock l = lock.lock()) {
        switch (this.state) {
            // 如果是空闲状态,则进入生产状态
            case IDLE:
                this.state = State.PRODUCING;
                break;
            // 如果是生产状态,则进入再生产状态
            case PRODUCING:
                this.state = State.REPRODUCING;
                return;
            default:
                return;
        }
    }

    // 处于运行中则轮询的执行任务
    while (this.isRunning()) {
        try {
            // 当任务被提交到其他线程执行的时候,或者当前的任务执行完毕了,则返回 true,继续执行下一个任务
            if (this.doProduce()) {
                continue;
            }
            return;
        } catch (final Throwable th) {
            log.warn("Unable to produce", th);
        }
    }
}

private boolean doProduce() {
    final Runnable task = this.produceTask();
    if (task == null) {
        // 没任务了,则进入下一个状态
        try (final AutoLock l = this.lock.lock()) {
            switch (this.state) {
                case PRODUCING:
                    this.state = State.IDLE;
                    return false;
                case REPRODUCING:
                    this.state = State.PRODUCING;
                    return true;
                default:
                    throw new IllegalStateException("State: " + this.state);
            }
        }
    }
    final Mode mode;

    try (final AutoLock l = this.lock.lock()) {
        // 尝试到线程池中执行,如果不繁忙则执行成功,进入 EPC 模式,否则进入 PEC 模式
        if (this.executor.tryExecute(this)) {
            this.state = State.IDLE;
            mode = Mode.EXECUTE_PRODUCE_CONSUME;
        } else {
            log.info("Use PEC mode");
            mode = Mode.PRODUCE_EXECUTE_CONSUME;
        }
    }

    // 按对应的模式使用不同的执行方式
    switch (mode) {
        case PRODUCE_EXECUTE_CONSUME:
            this.executeTask(task);
            return true;
        case EXECUTE_PRODUCE_CONSUME:
            this.runTask(task);
            try (final AutoLock l = this.lock.lock()) {
                if (this.state == State.IDLE) {
                    this.state = State.PRODUCING;
                    return true;
                }
            }
            return false;
        default:
            throw new IllegalStateException("State: " + this.state);
    }
}
@Override
public void run() {
    this.execute();
}

@Override
public void execute() {
    try (final AutoLock l = lock.lock()) {
        switch (this.state) {
            // 如果是空闲状态,则进入生产状态
            case IDLE:
                this.state = State.PRODUCING;
                break;
            // 如果是生产状态,则进入再生产状态
            case PRODUCING:
                this.state = State.REPRODUCING;
                return;
            default:
                return;
        }
    }

    // 处于运行中则轮询的执行任务
    while (this.isRunning()) {
        try {
            // 当任务被提交到其他线程执行的时候,或者当前的任务执行完毕了,则返回 true,继续执行下一个任务
            if (this.doProduce()) {
                continue;
            }
            return;
        } catch (final Throwable th) {
            log.warn("Unable to produce", th);
        }
    }
}

private boolean doProduce() {
    final Runnable task = this.produceTask();
    if (task == null) {
        // 没任务了,则进入下一个状态
        try (final AutoLock l = this.lock.lock()) {
            switch (this.state) {
                case PRODUCING:
                    this.state = State.IDLE;
                    return false;
                case REPRODUCING:
                    this.state = State.PRODUCING;
                    return true;
                default:
                    throw new IllegalStateException("State: " + this.state);
            }
        }
    }
    final Mode mode;

    try (final AutoLock l = this.lock.lock()) {
        // 尝试到线程池中执行,如果不繁忙则执行成功,进入 EPC 模式,否则进入 PEC 模式
        if (this.executor.tryExecute(this)) {
            this.state = State.IDLE;
            mode = Mode.EXECUTE_PRODUCE_CONSUME;
        } else {
            log.info("Use PEC mode");
            mode = Mode.PRODUCE_EXECUTE_CONSUME;
        }
    }

    // 按对应的模式使用不同的执行方式
    switch (mode) {
        case PRODUCE_EXECUTE_CONSUME:
            this.executeTask(task);
            return true;
        case EXECUTE_PRODUCE_CONSUME:
            this.runTask(task);
            try (final AutoLock l = this.lock.lock()) {
                if (this.state == State.IDLE) {
                    this.state = State.PRODUCING;
                    return true;
                }
            }
            return false;
        default:
            throw new IllegalStateException("State: " + this.state);
    }
}

结语

写的挺简单的,这些代码写了有一段时间了,流程忘得差不多了,所以代码就没说明的很详细。溜了溜了 😂。

浅谈 EatWhatYouKill

https://blog.ixk.me/post/talk-about-eatwhatyoukill
  • 许可协议

    BY-NC-SA

  • 本文作者

    Otstar Lin

  • 发布于

    2020/11/19

转载或引用本文时请遵守许可协议,注明出处、不得用于商业用途!

HTTP 系列文章浅谈可扩展线程池