Linux-IO阻塞模型

Linux-IO阻塞模型

通常,对于进程间的通信有两对常见的概念:同步和异步、阻塞和非阻塞,但实际上对于通信过程中的发送进程和接收进程而言,这两对概念是同义词,只有站在一个完整的进程间通信流程的角度,才会具有不同性质。

1. 同步异步及阻塞非阻塞

从《操作系统概念》中节选出有关进程通信的部分:

3.4.2.2 Synchronization

Communication between processes takes place through calls to send() and receive() primitives. There are different design options for implementing each primitive. Message passing may be either blocking or nonblocking – also known as synchronous and asynchronous. (Throughout this text, you will encounter the concepts of synchronous and asynchronous behavior in relation to various operating-system algorithms.)

  • Blocking send. The sending process is blocked until the message is received by the receiving process or by the mailbox.
  • Nonblocking send. The sending process sends the message and resumes operation.
  • Blocking receive. The receiver blocks until a message is available.
  • Nonblocking receive. The receiver retrieves either a valid message or a null.

Different combinations of send() and receive() are possible. When both send() and receive() are blocking, we have a rendezvous between the sender and the receiver. The solution to the producer-consumer problem becomes trivial when we use blocking send() and receive() statements. The producer merely invokes the blocking send() call and waits until the message is delivered to either the receiver or the mailbox. Likewise, when the consumer invokes receive(), it blocks until a message is available. This is illustrated in Figures 3.15 and 3.16.

大致含义为:

进程间通信通过 send() 发送和 receive() 接收两种操作完成,这两种操作可能存在多种实现方案。消息在进程间传递时有可能是 阻塞非阻塞 的,也被称为 同步异步

  • Blocking Send 阻塞发送:发送方发送消息后将一直阻塞,直到接收方收到消息。
  • Nonblocking Send 非阻塞发送:发送方将消息发出后就处理其他任务,不论消息是否被接收方收到。
  • Blocking Receive 阻塞接收:接收方调用 receive() 后将一直阻塞,直到收到一条可用消息。
  • Nonblocking Receive 非阻塞接收:接收方调用 receive() 后将立即获得一条可用消息,或是一个空值,而不会阻塞。

因此进程间通信时,站在进程的角度是对「发送方 / 接收方」与「阻塞 / 非阻塞」的自由组合,而「阻塞 / 非阻塞」与「同步 / 异步」是同义词。


2. 进程切换

当 Linux 从一个进程切换到另一个进程时通常包含一下几个步骤:

  • 对于运行中的进程,中断(Interrupt)系统调用(System Call) 都可以将 CPU 的控制转移到内核。
    • 中断(Interrupt):CPU 有一个中断信号位,CPU 在每个时钟周期的末尾都会检测中断信号位是否有中断信号到达,如果有则会根据中断的优先级决定是否要暂停当前执行的指令,转而去执行处理中断的指令。
    • 时钟中断(Clock Interrupt):一个硬件时钟每隔一段时间就会向 CPU 发送一个中断信号,CPU 在响应这个中断时就会去执行操作系统内核的指令,继而将 CPU 的控制转移给了内核,由内核决定下一个要被执行的指令。
    • 系统调用(System Call):操作系统提供给应用程序的接口。用户态通过 System Call 来完成那些需要内核才能执行的操作,例如硬盘、网络接口设备的 IO 等。
  • 内核会将原进程在 CPU 中的上下文(程序计数器、寄存器等)信息保存在分配给该进程的内存区域 PCB 中。
  • 内核从另一个进程的 PCB 中取出其上下文信息,将 CPU 的控制转移给新进程,开始执行新进程的指令。

当 CPU 在进程间切换时,由于需要在用户态和内核态之间切换,并且需要对进程的 PCB 做读写,所以会带来一定的开销,对于运行着 Unix 系统的现代 PC 而言,进程切换通常至少需要花费 300 us。

因此,当进程间需要通信时,CPU 需要一定的时间做数据的 IO 和进程切换,并且由于进程间内存默认不可共享,所以往往还伴随着数据的拷贝,这就导致发送进程和接收进程在这段等待时间内可以有不同的策略,也即是否阻塞。


3. 进程间通信的模型

在实际的进程间通信案例中,阻塞通信会使得进程的执行是顺序的,开发人员可以预知进程的执行情况。而非阻塞通信则可以提高 CPU 的利用率,换言之也就是提高性能。但在实际的进程间通信过程中,通常会考虑一个完整的进程间通信流程,此时「阻塞 / 非阻塞」更多是指通信过程中数据的流动,才会与「同步 / 异步」搭配定义。

但是需要明确,「同步」一定意味着「阻塞」,反之「阻塞」也一定意味着「同步」,「异步」一定意味着「非阻塞」,但「非阻塞」不一定「异步」,根据进程的执行策略而有所区别。

3.1 同步阻塞BIO

发送进程 A 向接收进程 B 发送消息,随后 A 挂起,等待内核将消息发送给 B,处理完后内核重新唤醒 A,A 再将返回的数据从内核拷贝到进程内。

  • A 在发送数据后挂起等待 B 接收数据,这个过程是阻塞的。
  • B 成功接收数据后,A 继续挂起等待 B 处理数据完成,这个过程也是阻塞的。
  • B 处理完成返回数据后,内核唤醒 A,A 仍然需要等待从内核将数据拷贝回用户进程,这个过程也是阻塞的。

由于整个过程中,A 都处于阻塞状态,直到被唤醒后执行吓一条指令,所以从整个通信过程来看也是同步的。

3.2 同步非阻塞NIO

发送进程 A 向接收进程 B 发送消息,随后 A 立即执行其他指令,但每个一段时间就会主动发起请求,查询 B 是否成功接收数据,以及是否处理完毕返回数据,如果有返回数据,则继续执行原来的指令,并将数据从内核拷贝到进程内。

  • A 发送数据后立即执行其他指令,不论 B 是否成功接收数据,这个过程是非阻塞的。
  • A 周期性请求内核,查询 B 是否成功接收数据,以及是否在执行完毕后返回了数据,这个过程是非阻塞的。
  • B 成功接收数据后,将状态同步给内核。B 处理完成返回后,将数据拷贝到内核空间,A 查询到后,再将返回的数据从内核拷贝到 A 进程中,这个过程是阻塞的。

由于 B 返回数据后,A 需要从内核拷贝数据,这个过程仍然是阻塞的,所以从整个通信过程来看仍然是同步的。

3.3 同步信号驱动

发送进程 A 向接收进程 B 发送消息,随后 A 立即执行其他指令,直到 B 处理完毕并返回数据后,由内核通知进程 A,然后 A 再从内核将数据拷贝到进程内。

  • A 发送数据后立即执行其他指令,不论 B 是否成功接收,这个过程是非阻塞的。
  • B 处理完并返回数据后,内核通知 A,这个过程是非阻塞的。
  • A 从内核中拷贝数据到进程内,这个过程是阻塞的。

虽然信号驱动模型不再需要 A 频繁主动查询 B 的数据返回状态,但由于最终仍然需要将数据从内核拷贝到 A 进程,这个过程依然是阻塞的,所以从整个通信过程来看仍然是同步的。

3.4 同步IO多路复用

多个发送进程向同一个管道注册,由管道完成和内核的交互。当一个进程通过 select 函数从管道获取数据时,如果所有被监听的 IO 的数据都没有准备好,则调用 select 的当前进程会进入阻塞状态,直到「任意一个」被监听的 IO 数据已准备好并返回,然后通知对应的监听进程,监听进程再从内核中将数据拷贝到进程内。

由于 IO 多路复用在调用 select 时会进入阻塞状态,并且监听进程在收到数据已准备好的通知后,仍需从内核拷贝数据到进程内,因此从整个通信过程来看仍然是同步的。

3.5 异步非阻塞AIO

进程 A 发起一个 aio_read 指令,将自己的「描述符」、「缓冲区指针」、「缓冲区大小」、「回调函数 / 回调信号」信息发送给内核,内核 aio_read 指令会立即返回,进程 A 即可执行其他指令。当 A 指令所需的数据准备好后,再由内核负责把数据拷贝到 A 的用户内存,然后再通过回调信号或基于线程的回调的方式,通知进程 A,此时 A 已经可以直接操作所需数据而不需要拷贝,是真正的异步 IO。


4. Java中的IO

4.1 BIO

BIO 面向「流」StreamStream 只能单项传输,Java 中对应的就是输入流 InputStream 和输出流 OutputStream。最普通的 Socket 和 ServerSocket 就基于 BIO,例如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
public class SocketBIO {

public static void start() {
// 启动 Server
new Thread(new Runnable() {
@Override
public void run() {
try {
startService();
} catch (IOException ignored) {}
}
}).start();

// 启动 Client
new Thread(new Runnable() {
@Override
public void run() {
try {
startClient();
} catch (IOException ignored) {}
}
}).start();
}

private static void startService() throws IOException {
// 绑定 8888 端口,最大同时连接数为 5,超出的请求会被拒绝。
ServerSocket serverSocket = new ServerSocket(8888, 5);
while (true) {
// 接受一个连接,accept 是阻塞的
try (Socket client = serverSocket.accept()) {
// 从客户端读取数据
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(client.getInputStream()));
int readLength = bufferedReader.read();
bufferedReader.close();
// 向客户端发送数据
BufferedWriter bufferedWriter = new BufferedWriter(new OutputStreamWriter(client.getOutputStream()));
bufferedWriter.write("Output Content");
bufferedWriter.close();
}
}
}

private static void startClient() throws IOException {
while (true) {
// 连接本地 8888 端口。
try (Socket client = new Socket("localhost", 8888)) {
// 从服务端读取数据
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(client.getInputStream()));
int readLength = bufferedReader.read();
bufferedReader.close();
// 向服务端写入数据
BufferedWriter bufferedWriter = new BufferedWriter(new OutputStreamWriter(client.getOutputStream()));
bufferedWriter.write("Output Content");
bufferedWriter.close();
}
}
}
}

虽然上例中 Socket 和 ServerSocket 都用了 BufferedWriterBufferedReader,但 Socket 本身提供的是输入流和输出流 client#getXXXStream,只是业务上再封装了一层 Buffer,所以 BIO 本质上是「流式传输」。

4.2 NIO

NIO 面向「通道」ChannelChannel 支持双向传输,Java 中常见的 Channel 有:

  • FileChannel
  • DatagramChannel
  • SocketChannel
  • ServerSocketChannel

Channel 只是一种双向通信模型,因此通常搭配 Buffer 一起使用,由 Buffer 充当实际数据的载体,Java 中常见的 Buffer 有:

  • ByteBuffer
  • CharBuffer
  • ShortBuffer
  • IntBuffer
  • FloatBuffer
  • LongBuffer
  • DoubleBuffer

SocketChannel 和 ServerSocketChannel 就基于 NIO:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
public class SocketNIO {
public static void start() {
// 开启 Server
new Thread(() -> {
try {
startService();
} catch (IOException ignored) {}
}).start();

// 开启 Client
try {
startClient();
} catch (IOException ignored) {}
}

private static void startService() throws IOException {
// 开启通道
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
// 绑定本地 8888 端口,最大同时连接数 5,超出后连接将被拒绝。
serverSocketChannel.socket().bind(new InetSocketAddress(8888), 5);
// 设置 accept 时为非阻塞
serverSocketChannel.configureBlocking(false);
while (true) {
// 接受一个客户端连接
SocketChannel clientChannel = serverSocketChannel.accept();
if (clientChannel == null) {
// 由于 serverSocketChannel.configureBlocking(false) 使得 accept 变为非阻塞,
// 因此需要对 accept 返回的 SocketChannel 判空,为空说明没有客户端连接,可以执行其他指令。
continue;
}
// 用 ByteBuffer 缓冲读取字节数据,缓冲区大小设置为 48 字节,
// Buffer 默认的起始指针 start 和当前指针 current 指向 0 下标,终止指针 limit 指向末位,
// 表示接下来的操作从 start 开始,当 current 指向 limit 时说明 Buffer 存满了。
ByteBuffer byteBuffer = ByteBuffer.allocate(48);
// 从通道中读取字节数据并写入缓冲区,此时对 Buffer 的操作是「写入」,
// 每次都向 Buffer 的 current 指向的位写入一个字节,并把 current 指向下一位。
int readLength = clientChannel.read(byteBuffer);
while (readLength > 0) {
// 如果原本 Buffer 用于「写入」,则需要「读取」之前需要先将 Buffer 翻转,
// 使得新 start 指向原 current,新 limit 指向原 start,
// 表示接下来的操作从 Buffer 原先的 current 位置开始反向进行。
byteBuffer.flip();
while (byteBuffer.hasRemaining()) {
System.out.println("Read: " + byteBuffer.get());
}
// 重置 Buffer 的 start 和 current 到 0 下标,limit 到末位,
// 相当于标记这个 Buffer 又可以从头重新操作。
byteBuffer.clear();
// 继续从通道中读取数据,直到通道中没有数据为止。
readLength = clientChannel.read(byteBuffer);
}
}
}

private static void startClient() throws IOException {
// 开启通道
SocketChannel clientChannel = SocketChannel.open();
// 连接到本地 8888 端口。
clientChannel.connect(new InetSocketAddress("localhost", 8888));
// 设置缓冲区大小 48 字节。
ByteBuffer byteBuffer = ByteBuffer.allocate(48);
// 向 Buffer 写入数据
byteBuffer.put("Output Content".getBytes());
// 为了读取 Buffer 中的数据并通过 Socket 发送,需要先翻转 Buffer
byteBuffer.flip();
while (byteBuffer.hasRemaining()) {
clientChannel.write(byteBuffer);
}
clientChannel.close();
}
}

参考文献