0%

java | nio-selector 写入事件

可写事件。向客户端发送数据。

引入自写的方法类。

下载 JAVA 文件

netty 包

1
2
3
4
5
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.86.Final</version>
</dependency>

客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package com.redisc;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;

public class Client {
public static void main(String[] args) throws IOException {
SocketChannel sc = SocketChannel.open();
sc.connect(new InetSocketAddress("localhost", 8090));

//接收数据
int count = 0;
while (true) {
ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
count += sc.read(buffer);
System.out.println(count);
buffer.clear();
}
}
}

服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
package com.redisc;

import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Iterator;

@Slf4j(topic = "c.Test")
public class Run {

public static void main(String[] args) throws IOException {
// 创建 selector,管理多个 channel
Selector selector = Selector.open();
// 创建服务器
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);//将阻塞模式切换为非阻塞模式

// 建立 selector 和 channel 的联系「注册」
// SelectionKey 事件发生后,通过它知道事件和哪个 channel 事件
SelectionKey sscKey = ssc.register(selector, 0, null);
// key 只关注 accept 事件
sscKey.interestOps(SelectionKey.OP_ACCEPT);
log.debug("register key:{}", sscKey);

//绑定端口
ssc.bind(new InetSocketAddress(8090));
while (true) {
// selector 方法,没有事件发生,线程阻塞,有事件发生,线程才会恢复运行
// selector 在事件未处理的时候,不会阻塞
selector.select();
// 处理事件,selectorKeys 内部包含了所有发生事件
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
// 处理 key 时,要从 selectKeys 集合中删除,否则下次处理就会出问题
iter.remove();
log.debug("key:{}", key);
// 区分事件类型
if (key.isAcceptable()) {
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);

// 想客户端发送数据
StringBuilder sb = new StringBuilder();
for (int i = 0; i < 3000000; i++) {
sb.append("a");
}
ByteBuffer buffer = Charset.defaultCharset().encode(sb.toString());
// 返回值代表实际写入的字节数
// 因为有时候数据太大,不能一次性写入
while (buffer.hasRemaining()) {
// 返回值代表实际写入的字节数
int wirte = sc.write(buffer);
System.out.println(wirte);
}
}

}

}
}

}

使用

  • 开启服务端
  • 开启客户端

服务端输出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
14:17:05.106 [main] DEBUG c.Test - register key:sun.nio.ch.SelectionKeyImpl@497470ed
14:17:09.349 [main] DEBUG c.Test - key:sun.nio.ch.SelectionKeyImpl@497470ed
261676
1136400
0
0
1080096
0
0
0
0
0
0
0
0
0
0
521828

之所以有这么多 0,是因为缓冲区已经写满了,所以需要等待。

所以,这里有一个改进的地方,就是缓冲区满了,应该去干别的,而不是一直等待。

改进

如果数据量太大,增加一个可写事件,进行后续触发。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
package com.redisc;

import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Iterator;

@Slf4j(topic = "c.Test")
public class Run {

public static void main(String[] args) throws IOException {
// 创建 selector,管理多个 channel
Selector selector = Selector.open();
// 创建服务器
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);//将阻塞模式切换为非阻塞模式

// 建立 selector 和 channel 的联系「注册」
// SelectionKey 事件发生后,通过它知道事件和哪个 channel 事件
SelectionKey sscKey = ssc.register(selector, 0, null);
// key 只关注 accept 事件
sscKey.interestOps(SelectionKey.OP_ACCEPT);
log.debug("register key:{}", sscKey);

//绑定端口
ssc.bind(new InetSocketAddress(8090));
while (true) {
// selector 方法,没有事件发生,线程阻塞,有事件发生,线程才会恢复运行
// selector 在事件未处理的时候,不会阻塞
selector.select();
// 处理事件,selectorKeys 内部包含了所有发生事件
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
// 处理 key 时,要从 selectKeys 集合中删除,否则下次处理就会出问题
iter.remove();
log.debug("key:{}", key);
// 区分事件类型
if (key.isAcceptable()) {
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
SelectionKey sckey = sc.register(selector, 0, null);

// 想客户端发送数据
StringBuilder sb = new StringBuilder();
for (int i = 0; i < 3000000; i++) {
sb.append("a");
}
ByteBuffer buffer = Charset.defaultCharset().encode(sb.toString());
// 返回值代表实际写入的字节数
int wirte = sc.write(buffer);
System.out.println(wirte);

// 判断是否有剩余内容
if (buffer.hasRemaining()) {
// 关注可写事件
// sckey.interestOps() 之前的事件,防止 OP_WRITE 覆盖了之前的事件
// 类似于文件的权限,多少代表读、写、读写
sckey.interestOps(sckey.interestOps() + SelectionKey.OP_WRITE);
//将未写完的数据挂到 sckey
sckey.attach(buffer);
}
} else if (key.isWritable()) {
ByteBuffer buffer = (ByteBuffer) key.attachment();
SocketChannel sc = (SocketChannel) key.channel();
int write = sc.write(buffer);
System.out.println(write);

// buffer 清理,如果不清理,数据太大的话,一直占据内存
if (!buffer.hasRemaining()) {
key.attach(null);
// 不需要关注可写事件
// 因为只有数据量非常大的时候才会关注这个
key.interestOps(key.interestOps() - SelectionKey.OP_WRITE);
}
}

}

}
}

}

输出

1
2
3
4
5
6
7
8
9
15:23:52.898 [main] DEBUG c.Test - register key:sun.nio.ch.SelectionKeyImpl@497470ed
15:23:57.351 [main] DEBUG c.Test - key:sun.nio.ch.SelectionKeyImpl@497470ed
261676
15:23:57.425 [main] DEBUG c.Test - key:sun.nio.ch.SelectionKeyImpl@61e4705b
1120068
15:23:57.426 [main] DEBUG c.Test - key:sun.nio.ch.SelectionKeyImpl@61e4705b
1071020
15:23:57.427 [main] DEBUG c.Test - key:sun.nio.ch.SelectionKeyImpl@61e4705b
547236
请我喝杯咖啡吧~