0%

java | nio-selector read 事件处理

使用 nio-selector

引入自写的方法类。

下载 JAVA 文件

netty 包

1
2
3
4
5
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.86.Final</version>
</dependency>

客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
package com.redisc;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SocketChannel;

public class Client {
public static void main(String[] args) throws IOException {
SocketChannel sc = SocketChannel.open();
sc.connect(new InetSocketAddress("localhost", 8090));
System.out.println("waiting...");
}
}

在第 11 行打断点。

服务端

特别注意,read 事件的注册方式。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
package com.redisc;

import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;

import static com.redisc.ByteBufferUtil.debugRead;

@Slf4j(topic = "c.Test")
public class Run {

public static void main(String[] args) throws IOException {
// 创建 selector,管理多个 channel
Selector selector = Selector.open();
// 创建服务器
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);//将阻塞模式切换为非阻塞模式

// 建立 selector 和 channel 的联系「注册」
// SelectionKey 事件发生后,通过它知道事件和哪个 channel 事件
SelectionKey sscKey = ssc.register(selector, 0, null);
// key 只关注 accept 事件
sscKey.interestOps(SelectionKey.OP_ACCEPT);
log.debug("register key:{}", sscKey);

//绑定端口
ssc.bind(new InetSocketAddress(8090));
while (true) {
// selector 方法,没有事件发生,线程阻塞,有事件发生,线程才会恢复运行
// selector 在事件未处理的时候,不会阻塞
selector.select();
// 处理事件,selectorKeys 内部包含了所有发生事件
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
// 处理 key 时,要从 selectKeys 集合中删除,否则下次处理就会出问题
iter.remove();
log.debug("key:{}", key);
// 区分事件类型
if (key.isAcceptable()) {
ServerSocketChannel channel = (ServerSocketChannel) key.channel();
SocketChannel sc = channel.accept();
sc.configureBlocking(false);
SelectionKey scKey = sc.register(selector, 0, null);
scKey.interestOps(SelectionKey.OP_READ);
log.debug("{}", sc);
} else if (key.isReadable()) {
// 读取事件
SocketChannel channel = (SocketChannel) key.channel(); // 拿到触发事件
ByteBuffer buffer = ByteBuffer.allocate(16);
channel.read(buffer);
buffer.flip();
debugRead(buffer);
}

}

}
}

}

各种情况处理

客户端断开

  • 开启服务端
  • 开启 debug 的客户端
  • 然后关闭客户端
    • MacBook 模式下会进入死循环
      • 关闭的时候会发送一个空数据到 server 端,如果进入死循环可以 cancel 一下
      • Macbook 不会报错
    • Windows 模式下会报错「IOException 远程主机强迫关闭了一个现有连接」
      • Windows 模式下会报错

所以,服务端代码改成

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
package com.redisc;

import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;

import static com.redisc.ByteBufferUtil.debugRead;

@Slf4j(topic = "c.Test")
public class Run {

public static void main(String[] args) throws IOException {
// 创建 selector,管理多个 channel
Selector selector = Selector.open();
// 创建服务器
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);//将阻塞模式切换为非阻塞模式

// 建立 selector 和 channel 的联系「注册」
// SelectionKey 事件发生后,通过它知道事件和哪个 channel 事件
SelectionKey sscKey = ssc.register(selector, 0, null);
// key 只关注 accept 事件
sscKey.interestOps(SelectionKey.OP_ACCEPT);
log.debug("register key:{}", sscKey);

//绑定端口
ssc.bind(new InetSocketAddress(8090));
while (true) {
// selector 方法,没有事件发生,线程阻塞,有事件发生,线程才会恢复运行
// selector 在事件未处理的时候,不会阻塞
selector.select();
// 处理事件,selectorKeys 内部包含了所有发生事件
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
// 处理 key 时,要从 selectKeys 集合中删除,否则下次处理就会出问题
iter.remove();
log.debug("key:{}", key);
// 区分事件类型
if (key.isAcceptable()) {
ServerSocketChannel channel = (ServerSocketChannel) key.channel();
SocketChannel sc = channel.accept();
sc.configureBlocking(false);
SelectionKey scKey = sc.register(selector, 0, null);
scKey.interestOps(SelectionKey.OP_READ);
log.debug("{}", sc);
} else if (key.isReadable()) {
try {
// 读取事件
SocketChannel channel = (SocketChannel) key.channel(); // 拿到触发事件
ByteBuffer buffer = ByteBuffer.allocate(16);
int read = channel.read(buffer);
if (read > 0) {
buffer.flip();
debugRead(buffer);
} else {
log.debug("客户端关闭");
key.cancel(); // 因为客户端断开了,因此需要把 key 取消
}

} catch (IOException e) {
e.printStackTrace();
key.cancel(); // 因为客户端断开了,因此需要把 key 取消
}

}

}

}
}

}

所以处理 read 事件改成了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
try {
// 读取事件
SocketChannel channel = (SocketChannel) key.channel(); // 拿到触发事件
ByteBuffer buffer = ByteBuffer.allocate(16);
int read = channel.read(buffer);
if (read > 0) { // 如果是正常断开事件,read 的方法的返回值是 -1
buffer.flip();
debugRead(buffer);
} else {
log.debug("客户端关闭");
key.cancel(); // 因为客户端断开了,因此需要把 key 取消
}

} catch (IOException e) {
e.printStackTrace();
key.cancel(); // 因为客户端断开了,因此需要把 key 取消
}

read 判断是为了处理 MacBook 上的错误,用异常是为了处理 windows 上的错误,ps: read > 0 应该也能处理 windows 上的客户端断开。

另外,还测试了一种情况,在客户端 11 位置,打断电,然后 sc 变量上执行

1
sc.write(Charset.defaultCharset().encode(""))

这样的话服务端什么都不输出,也不报错,客户端断开后,依然能正确执行。

正常断开

客户端 11 处代码打断店,然后,sc 变量执行。

如果是正常断开代码

1
int read = channel.read(buffer);

这里的 read = -1

1
sc.close()

第二版客户端代码依然完美运行。

消息边界问题

请我喝杯咖啡吧~