0%

java | nio-selector read 事件 消息边界

如果,ByteBuffer 的长度小于客户端发送数据长度,就会出现消息边界问题。

引入自写的方法类。

下载 JAVA 文件

netty 包

1
2
3
4
5
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.86.Final</version>
</dependency>

客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
package com.redisc;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SocketChannel;

public class Client {
public static void main(String[] args) throws IOException {
SocketChannel sc = SocketChannel.open();
sc.connect(new InetSocketAddress("localhost", 8090));
System.out.println("waiting...");
}
}

服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
package com.redisc;

import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Iterator;

import static com.redisc.ByteBufferUtil.debugRead;

@Slf4j(topic = "c.Test")
public class Run {

public static void main(String[] args) throws IOException {
// 创建 selector,管理多个 channel
Selector selector = Selector.open();
// 创建服务器
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);//将阻塞模式切换为非阻塞模式

// 建立 selector 和 channel 的联系「注册」
// SelectionKey 事件发生后,通过它知道事件和哪个 channel 事件
SelectionKey sscKey = ssc.register(selector, 0, null);
// key 只关注 accept 事件
sscKey.interestOps(SelectionKey.OP_ACCEPT);
log.debug("register key:{}", sscKey);

//绑定端口
ssc.bind(new InetSocketAddress(8090));
while (true) {
// selector 方法,没有事件发生,线程阻塞,有事件发生,线程才会恢复运行
// selector 在事件未处理的时候,不会阻塞
selector.select();
// 处理事件,selectorKeys 内部包含了所有发生事件
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
// 处理 key 时,要从 selectKeys 集合中删除,否则下次处理就会出问题
iter.remove();
log.debug("key:{}", key);
// 区分事件类型
if (key.isAcceptable()) {
ServerSocketChannel channel = (ServerSocketChannel) key.channel();
SocketChannel sc = channel.accept();
sc.configureBlocking(false);
SelectionKey scKey = sc.register(selector, 0, null);
scKey.interestOps(SelectionKey.OP_READ);
log.debug("{}", sc);
} else if (key.isReadable()) {
try {
// 读取事件
SocketChannel channel = (SocketChannel) key.channel(); // 拿到触发事件
ByteBuffer buffer = ByteBuffer.allocate(4);
int read = channel.read(buffer);
if (read > 0) {
buffer.flip();
System.out.println(Charset.defaultCharset().decode(buffer));
debugRead(buffer);
} else {
log.debug("客户端关闭");
key.cancel(); // 因为客户端断开了,因此需要把 key 取消
}

} catch (IOException e) {
e.printStackTrace();
key.cancel(); // 因为客户端断开了,因此需要把 key 取消
}

}

}

}
}

}

使用

  • 服务端开启
  • 客户端,第 11 行断点,debug 开启
    • sc 变量执行
    • sc.write(Charset.defaultCharset().encode("中国"))
  • 服务端 乱码

这是因为 jvm 默认为 UTF-8 编码,中文字每个字是 3 字节,所以,4 字节下国字解析不出来。

处理消息边界

  • 消息长度和 ByteBuffer 一样大
    • 客户端和服务端约定同一大小,大小不够的,客户端补齐
      • 所以,可能造成空间的浪费
  • 按分隔符拆分
  • 协议方式
    • 数据由两部分组成,比如前 4 个字节存储数据长度,后面就是数据本身

容量超过

这里举例是根据 「处理消息边界的第二种方式:按分隔符拆分」。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
package com.redisc;

import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Iterator;

import static com.redisc.ByteBufferUtil.debugAll;
import static com.redisc.ByteBufferUtil.debugRead;

@Slf4j(topic = "c.Test")
public class Run {

public static void main(String[] args) throws IOException {
// 创建 selector,管理多个 channel
Selector selector = Selector.open();
// 创建服务器
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);//将阻塞模式切换为非阻塞模式

// 建立 selector 和 channel 的联系「注册」
// SelectionKey 事件发生后,通过它知道事件和哪个 channel 事件
SelectionKey sscKey = ssc.register(selector, 0, null);
// key 只关注 accept 事件
sscKey.interestOps(SelectionKey.OP_ACCEPT);
log.debug("register key:{}", sscKey);

//绑定端口
ssc.bind(new InetSocketAddress(8090));
while (true) {
// selector 方法,没有事件发生,线程阻塞,有事件发生,线程才会恢复运行
// selector 在事件未处理的时候,不会阻塞
selector.select();
// 处理事件,selectorKeys 内部包含了所有发生事件
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
// 处理 key 时,要从 selectKeys 集合中删除,否则下次处理就会出问题
iter.remove();
log.debug("key:{}", key);
// 区分事件类型
if (key.isAcceptable()) {
ServerSocketChannel channel = (ServerSocketChannel) key.channel();
SocketChannel sc = channel.accept();
sc.configureBlocking(false);
SelectionKey scKey = sc.register(selector, 0, null);
scKey.interestOps(SelectionKey.OP_READ);
log.debug("{}", sc);
} else if (key.isReadable()) {
try {
// 读取事件
SocketChannel channel = (SocketChannel) key.channel(); // 拿到触发事件
ByteBuffer buffer = ByteBuffer.allocate(16);
int read = channel.read(buffer);
if (read > 0) {
split(buffer);
} else {
log.debug("客户端关闭");
key.cancel(); // 因为客户端断开了,因此需要把 key 取消
}

} catch (IOException e) {
e.printStackTrace();
key.cancel(); // 因为客户端断开了,因此需要把 key 取消
}

}

}

}
}

private static void split(ByteBuffer source) {
// 读模式
source.flip();
for (int i = 0; i < source.limit(); i++) {
if (source.get(i) == '\n') {
int length = i + 1 - source.position();// 一条消息的长度
ByteBuffer target = ByteBuffer.allocate(length);
// 从 source 读,向 target 写
for (int j = 0; j < length; j++) {
target.put(source.get());
}
debugAll(target);
}
}
// 写模式
source.compact();
}

}
  • 开启服务端
  • 开启 debug11 行断点的客户端
    • sc 变量发送
    • sc.write(Charset.defaultCharset().encode("hello\nworld\n"))

成功。

如过 sc 变量发送

1
sc.write(Charset.defaultCharset().encode("123456789012345qwert\n"))

输出的是

1
2
3
4
5
6
7
+--------+-------------------- all ------------------------+----------------+
position: [5], limit: [5]
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 77 65 72 74 0a |wert. |
+--------+-------------------------------------------------+----------------+

前面的 16 字符没有了。

解决方案: 附件

之前的代码

1
SelectionKey scKey = sc.register(selector, 0, null);

代码中,那个 null 就是附件,这个附件可以是一个 bytebuff,每一个 SelectKey 对应自己的附件。

如果数据过大,那么,可以进行附件扩容,直到把所有的数据读取完毕。

修改服务器代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
package com.redisc;

import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Iterator;

import static com.redisc.ByteBufferUtil.debugAll;
import static com.redisc.ByteBufferUtil.debugRead;

@Slf4j(topic = "c.Test")
public class Run {

public static void main(String[] args) throws IOException {
// 创建 selector,管理多个 channel
Selector selector = Selector.open();
// 创建服务器
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);//将阻塞模式切换为非阻塞模式

// 建立 selector 和 channel 的联系「注册」
// SelectionKey 事件发生后,通过它知道事件和哪个 channel 事件
SelectionKey sscKey = ssc.register(selector, 0, null);
// key 只关注 accept 事件
sscKey.interestOps(SelectionKey.OP_ACCEPT);
log.debug("register key:{}", sscKey);

//绑定端口
ssc.bind(new InetSocketAddress(8090));
while (true) {
// selector 方法,没有事件发生,线程阻塞,有事件发生,线程才会恢复运行
// selector 在事件未处理的时候,不会阻塞
selector.select();
// 处理事件,selectorKeys 内部包含了所有发生事件
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
// 处理 key 时,要从 selectKeys 集合中删除,否则下次处理就会出问题
iter.remove();
log.debug("key:{}", key);
// 区分事件类型
if (key.isAcceptable()) {
ServerSocketChannel channel = (ServerSocketChannel) key.channel();
SocketChannel sc = channel.accept();
sc.configureBlocking(false);
ByteBuffer buffer = ByteBuffer.allocate(16);
// 附件
SelectionKey scKey = sc.register(selector, 0, buffer);
scKey.interestOps(SelectionKey.OP_READ);
log.debug("{}", sc);
} else if (key.isReadable()) {
try {
// 读取事件
SocketChannel channel = (SocketChannel) key.channel(); // 拿到触发事件
// 获取 selectionkey 关联附件
ByteBuffer buffer = (ByteBuffer) key.attachment();
int read = channel.read(buffer);
if (read > 0) {
split(buffer);
// 如果 position 和 limit 相同说明已经执行了 compact 了
// 说明没有遇到 '\n'
if (buffer.position() == buffer.limit()) {
// 进行扩容
ByteBuffer newBuffer = ByteBuffer.allocate(buffer.capacity() * 2);
buffer.flip();
newBuffer.put(buffer);
// 重新绑定 bytebuff
key.attach(newBuffer);
}
} else {
log.debug("客户端关闭");
key.cancel(); // 因为客户端断开了,因此需要把 key 取消
}

} catch (IOException e) {
e.printStackTrace();
key.cancel(); // 因为客户端断开了,因此需要把 key 取消
}

}

}

}
}

private static void split(ByteBuffer source) {
// 读模式
source.flip();
for (int i = 0; i < source.limit(); i++) {
if (source.get(i) == '\n') {
int length = i + 1 - source.position();// 一条消息的长度
ByteBuffer target = ByteBuffer.allocate(length);
// 从 source 读,向 target 写
for (int j = 0; j < length; j++) {
target.put(source.get());
}
debugAll(target);
}
}
// 写模式
source.compact();
}

}

客户端再次执行相同的动作。

输出

1
2
3
4
5
6
7
8
+--------+-------------------- all ------------------------+----------------+
position: [21], limit: [21]
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 31 32 33 34 35 36 37 38 39 30 31 32 33 34 35 71 |123456789012345q|
|00000010| 77 65 72 74 0a |wert. |
+--------+-------------------------------------------------+----------------+

bytebuffer 大小分配

  • 每个 channel 都需要记录可能被切分的消息,因为 Bytebuff 是线程不安全的,所以,不能被多个 channel 使用,因为需要为每个 channel 维护一个 ByteBuffer「附件」
  • Bytebuffer 不能太大
    • 一种思路是,Bytebuffer 默认为 4K,不够尽兴扩容,但是,拷贝数据耗费时间
    • 一个思路是,多个数组组成 buffer,一个数组不够,把多出的内容写到数组中,不需要拷贝,但是,数据不连续,解析复杂
请我喝杯咖啡吧~