站点图标

浅谈 IO

2021-02-27折腾记录Java / 浅谈 / IO
本文最后更新于 607 天前,文中所描述的信息可能已发生改变

前言

最近也不知道该学什么,干脆就把之前学的的 IO 相关的东西翻出来写一篇文章吧。

概念

首先我们需要了解 阻塞、非阻塞、同步、异步 这些概念。之前的 浅谈并发:基础 一文上有写了相关的概念,虽然那写的是面向线程的概念,不过在 IO 通讯上也是类似的。所以这里就不再说明了。

同步与异步关注的是消息通信机制阻塞与非阻塞关注的是程序在等待调用结果时的状态

类型举例效率
同步阻塞在咖啡店里柜台前排队等,不能做其他的事最低
同步非阻塞在咖啡店里排队等时一边玩着手机一边不断的检查是否到自己了较低
异步阻塞向咖啡店里的服务员点单,点完后在自己的位置上等待,不能做其他的事
异步非阻塞

向咖啡店里的服务员点单,点完后打开电脑开始工作,直到服务员送上来。

最高

IO 模型

有了上面的一些概念,我们就可以来看看以下五种场景的 IO 模型了。

阻塞 IO(Bloking IO)

8c4152ab ff07 40b0 81cd 21c073884757

非阻塞 IO(Non-Blocking IO)

6862938f 5ce7 45ec a871 21a6eeb11268

IO 复用(IO Multiplexing)

203fc0e1 dcc2 4b02 8ee5 f1206505179f

信号驱动 IO(Signal-driven IO)

4fccc003 4b8c 43f4 92db b74a696995e7

异步 IO(Asynchronous IO)

5d9413f4 4efc 42d1 bc1e 0cef20e177b2

Java 中的 IO

BIO

Java 中的 BIO 是 Java 中最开始提供的一种 IO 模型,对应上面五种中的阻塞 IO。在 JDK 中是在 java.io 包中。使用起来极其简单但是性能并不好。通常采用如下的架构:

16b1c289 a523 4d45 b9c8 41d5a1f6381d

以下是一段样例:


_43
public class BioServer {
_43
_43
public static void main(final String[] args) throws Exception {
_43
// 用一个线程池处理接收到的请求
_43
final ExecutorService executor = ThreadUtil.newExecutor(10);
_43
// 监听 8080 端口
_43
final ServerSocket serverSocket = new ServerSocket(8080);
_43
while (!Thread.interrupted()) {
_43
// 阻塞式接收请求
_43
final Socket socket = serverSocket.accept();
_43
// 每当有新的请求到来,将其放到线程池中处理
_43
executor.submit(
_43
() -> {
_43
try (
_43
final InputStream inputStream = socket.getInputStream();
_43
final OutputStream outputStream = socket.getOutputStream();
_43
) {
_43
// 输入
_43
final BufferedReader reader = IoUtil.getReader(
_43
inputStream,
_43
StandardCharsets.UTF_8
_43
);
_43
while (true) {
_43
final String line = reader.readLine();
_43
if (line == null || "bye".equals(line)) {
_43
break;
_43
}
_43
log.info("Input: {}", line);
_43
IoUtil.writeUtf8(outputStream, false, line);
_43
}
_43
// 输出
_43
_43
socket.shutdownInput();
_43
socket.shutdownOutput();
_43
socket.close();
_43
} catch (final Exception e) {
_43
log.error("Error", e);
_43
}
_43
}
_43
);
_43
}
_43
}
_43
}

NIO

Java NIO 是 Java IO 模型中最重要的 IO 模型,在 JDK 中是在 java.nio 包下,NIO 是 New IO 的简称,一般也称为 Non-block IO 不过它其实对应的是上面五种 IO 模型中的非阻塞 IO 和 IO 复用,其本身是 IO 复用的,但是同时可以切换为阻塞或非阻塞两种模式。

在 NIO 中有三个重要的概念:

  • 缓冲区(Buffer):缓冲区用于存储数据。
  • 通道(Channel):通道类似于以前的 InputStreamOutputStream 用于读写操作,不过与之不同的是,Channel 是双向的。
  • 多路复用器(Selector):顾名思义就是提供 IO 复用的东西,通过注册多个 ChannelSelector 中,当某个 Channel 关心的事件就绪的时候 select() 方法就会返回,线程就可以处理这些事件,事件的例子有如新的连接进来、数据接收等。

三者关系的简单模型:

992324ce 51e2 4e8d b3fa e755da1a21fe

Buffer

Buffer 本质上可以看成一个容器,在 NIO 中对数据的操作都是通过 Buffer 完成的。除了 Boolean 类型没有对应的 Buffer 外其他基本类型都有其对应的 Buffer 类,其中最常用的就是 ByteBuffer 了。Buffer 可以存储于堆中同时也可以存储于直接内存里。

Buffer 虽然可以进行读写操作,不过读写操作并不能同时进行,是半双工的,Buffer 分成了读模式和写模式,其切换的方法如下:

  • 初始状态:写模式
  • 写模式 => 读模式flip() 方法
  • 读模式 => 写模式compact() 或者 clear() 方法
  • 重新读写rewind() 方法

在 Buffer 中有 4 个重要的属性,用于索引和标记:

  • 容量(capacity):用于标记 Buffer 的大小。
  • 位置(position):用于标记 Buffer 当前读或写的位置。
  • 限制(limit):该属性在读模式下等同于 size,标记了有效数据的大小。在写模式下等同于 capacity,标记了可以写入的最大大小。一旦 position 超过了 limit 这抛出异常。
  • 标记(mark):调用 mark() 标记一个位置,之后可以通过 reset() 方法恢复 position 到该位置。

两种模式下的属性位置图:

e7e8dcc9 e60d 414a 913c 91d46c5cb84a

Channel

Channel 是程序读写数据的通道,通过 Channel 我们可以将数据从源读到 Buffer,也可以从 Buffer 写入到源里。它是全双工的。在 Java 中常用的 Channel 有以下几种:

  • FileChannel:用于文件的读写,不能设置为非阻塞模式
  • DatagramChannel:用于 UDP 数据读写。
  • SocketChannel:用于 TCP 数据读写。
  • ServerSocketChannel:监听 TCP 连接请求。当收到请求后返回 SocketChannel。

Selector

Selector 是多路复用器,一般也可以直译为选择器,其功能简单来说就是通过轮询注册在其上的 Channel 是否就绪,如果就绪了就会被选择出来,这样程序就可以对这些就绪的 Channel 进行读写操作了。Selector 在 Linux 上依赖于 epoll,在 Windows 上则是 iocp。Linux 的 epoll 使用的是 Reactor 的模式,在网络编程中常用的是 Reactor 和 Proactor 模式,详细的内容可以参考 Java NIO 浅析

Selector 和 Channel 是通过 SelectionKey 关联起来的,但 Channel 注册到 Selector 中时就能得到一个 SelectionKey,其作用相当于 ID,用于区分不同的 Channel。

Selector 可以监听以下 4 种事件:

  • OP_CONNECT:表示连接建立成功了。
  • OP_ACCEPT:监听是否有新的请求到来。
  • OP_READ:监听是否有新的数据到来。
  • OP_WRITE:监听是否可写。

以下是一段样例:


_106
public class NioServer {
_106
_106
public static void main(final String[] args) throws Exception {
_106
final ExecutorService executor = ThreadUtil.newExecutor(10);
_106
final Poller poller = new Poller();
_106
final Acceptor acceptor = new Acceptor(poller);
_106
// 启动 Poller
_106
executor.execute(poller);
_106
// 启动 Acceptor
_106
executor.execute(acceptor);
_106
}
_106
_106
public static class Acceptor implements Runnable {
_106
_106
private final ServerSocketChannel acceptor;
_106
private final Poller poller;
_106
_106
public Acceptor(final Poller poller) throws IOException {
_106
// 开启 ServerSocketChannel,同时绑定端口
_106
this.acceptor = ServerSocketChannel.open();
_106
this.acceptor.bind(new InetSocketAddress(8080));
_106
this.poller = poller;
_106
}
_106
_106
public void accept() throws IOException {
_106
if (this.acceptor != null && this.acceptor.isOpen()) {
_106
// 阻塞式等等请求到来
_106
final SocketChannel channel = this.acceptor.accept();
_106
this.accepted(channel);
_106
}
_106
}
_106
_106
public void accepted(final SocketChannel channel) throws IOException {
_106
if (channel != null && channel.isOpen()) {
_106
// 将 SocketChannel 设置为非阻塞,注册到 Selector 中
_106
channel.configureBlocking(false);
_106
this.poller.register(channel);
_106
// 唤醒 Selector
_106
this.poller.up();
_106
}
_106
}
_106
_106
@Override
_106
public void run() {
_106
while (!Thread.interrupted()) {
_106
try {
_106
// 循环处理请求
_106
this.accept();
_106
} catch (final Throwable e) {
_106
log.error("Accept error", e);
_106
}
_106
}
_106
}
_106
}
_106
_106
public static class Poller implements Runnable {
_106
_106
private final Selector selector;
_106
_106
public Poller() throws IOException {
_106
this.selector = Selector.open();
_106
}
_106
_106
public void register(final SelectableChannel channel)
_106
throws ClosedChannelException {
_106
channel.register(this.selector, SelectionKey.OP_READ);
_106
}
_106
_106
public void up() {
_106
this.selector.wakeup();
_106
}
_106
_106
@Override
_106
public void run() {
_106
while (!Thread.interrupted()) {
_106
try {
_106
// 轮询
_106
final int selected = this.selector.select();
_106
if (selected == 0) {
_106
continue;
_106
}
_106
// 当查询到的时候就进行处理
_106
final Iterator<SelectionKey> iterator =
_106
this.selector.selectedKeys().iterator();
_106
while (iterator.hasNext()) {
_106
final SelectionKey key = iterator.next();
_106
iterator.remove();
_106
final SocketChannel channel = (SocketChannel) key.channel();
_106
// 输入
_106
final ByteBuffer buffer = ByteBuffer.allocate(1024);
_106
channel.read(buffer);
_106
buffer.flip();
_106
final String input = StrUtil
_106
.str(buffer, StandardCharsets.UTF_8)
_106
.trim();
_106
log.info("Input: {}", input);
_106
// 输出
_106
channel.write(ByteBuffer.wrap(input.getBytes()));
_106
}
_106
} catch (final IOException e) {
_106
log.error("Select error", e);
_106
}
_106
}
_106
}
_106
}
_106
}

这里的样例所使用的架构类似于 Jetty 和 Tomcat,由 Acceptor 负责接收请求,是阻塞的,接受到请求后将 SocketChannel 注册到 Poller 也就是 Selector 中,由 Poller 负责轮询可读和可写状态。本例为了简单一点将 IO 的处理放在了 Poller 中,实际的架构上应放到线程池中进行处理,解放 Poller,使其进行下一轮的轮询。同时本例只使用了一个 Acceptor 和 Poller,实际的架构中一般会设置多个。

具体的架构如下:

9e458394 71c5 41b3 8960 6bd8d76cb425

上面的架构是相对简单的,一般来说会使用更复杂的方式来压榨每一分性能,比如 Jetty 的 EatWhatYouKill,Netty 的 EventLoop 等等。

AIO

说完了 NIO,接下来就是 AIO 了,AIO 一般也称 NIO2,是在 Java7 后新增的异步非阻塞 IO 实现。不再需要 Selector 进行轮询,而是通过监听的方式。简单来说就是不再是我去拿,而是你给我。这个概念也是反应式编程中一个重要的概念(消息驱动)。

以下是一段样例代码:


_95
public class AioServer {
_95
_95
public static void main(final String[] args)
_95
throws IOException, InterruptedException {
_95
CountDownLatch countDownLatch = new CountDownLatch(1);
_95
// 开启 AsynchronousServerSocketChannel
_95
final AsynchronousServerSocketChannel accept = AsynchronousServerSocketChannel.open();
_95
accept.bind(new InetSocketAddress(8080));
_95
// 接受请求
_95
accept.accept(accept, new AcceptHandler());
_95
// 使主线程等等,否则 Channel 会停止,在生成环境一般用线程池不存在这个问题。
_95
countDownLatch.await();
_95
}
_95
_95
public static class AcceptHandler
_95
implements
_95
CompletionHandler<AsynchronousSocketChannel, AsynchronousServerSocketChannel> {
_95
_95
@Override
_95
public void completed(
_95
AsynchronousSocketChannel channel,
_95
AsynchronousServerSocketChannel attachment
_95
) {
_95
// 需要注意,accept 的处理器只会执行一次,所以为了能连续执行,就需要再次注册
_95
attachment.accept(attachment, this);
_95
// 申请 ByteBuffer,将读任务交给 ResultHandler 处理
_95
ByteBuffer buffer = ByteBuffer.allocate(1024);
_95
channel.read(buffer, buffer, new ResultHandler(channel));
_95
}
_95
_95
@Override
_95
public void failed(
_95
Throwable exc,
_95
AsynchronousServerSocketChannel attachment
_95
) {
_95
log.error("AcceptHandler failed", exc);
_95
}
_95
}
_95
_95
public static class ResultHandler
_95
implements CompletionHandler<Integer, ByteBuffer> {
_95
_95
private final AsynchronousSocketChannel channel;
_95
_95
public ResultHandler(AsynchronousSocketChannel channel) {
_95
this.channel = channel;
_95
}
_95
_95
@Override
_95
public void completed(Integer result, ByteBuffer buffer) {
_95
// 输入
_95
buffer.flip();
_95
final String input = StrUtil
_95
.str(buffer, StandardCharsets.UTF_8)
_95
.trim();
_95
log.info("Input: {}", input);
_95
final ByteBuffer write = ByteBuffer.wrap(input.getBytes());
_95
// 输出
_95
channel.write(
_95
write,
_95
write,
_95
new CompletionHandler<>() {
_95
@Override
_95
public void completed(
_95
Integer result,
_95
ByteBuffer attachment
_95
) {
_95
// 未写完则继续写
_95
if (attachment.hasRemaining()) {
_95
channel.write(attachment, attachment, this);
_95
} else {
_95
// 否则就再次注册处理器,这样就可以连续读了
_95
ByteBuffer buffer = ByteBuffer.allocate(1024);
_95
channel.read(
_95
buffer,
_95
buffer,
_95
new ResultHandler(channel)
_95
);
_95
}
_95
}
_95
_95
@Override
_95
public void failed(Throwable exc, ByteBuffer attachment) {
_95
log.error("ResultHandler failed", exc);
_95
}
_95
}
_95
);
_95
}
_95
_95
@Override
_95
public void failed(Throwable exc, ByteBuffer buffer) {
_95
log.error("ResultHandler failed", exc);
_95
}
_95
}
_95
}

结语

写了好几天了,总算摸完了。主要是最近都在折腾路由器、折腾手机等等。还有这个垃圾 WordPress 的编辑器,越来越卡,刚出那段时间挺好用的,越升级越难用。所以打算换成 Gatsby 了,由于要迁移主题,所以应该没那么快上线 🤣。

浅谈 IO

https://blog.ixk.me/post/talking-about-io
  • 许可协议

    BY-NC-SA

  • 发布于

    2021-02-27

  • 本文作者

    Otstar Lin

转载或引用本文时请遵守许可协议,注明出处、不得用于商业用途!

浅谈 JVM:类加载浅谈并发:synchronized & ReentrantLock