0%

java | 多线程优化执行模型

使用多线程对 NIO 连接进行优化。

引入自写的方法类。

下载 JAVA 文件

netty

1
2
3
4
5
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.86.Final</version>
</dependency>
  • 主线程
    • boss
  • 执行线程
    • worker

当任务来临会通过 boss 分配给 worker

客户端代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
package com.redisc;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;

public class Client {
public static void main(String[] args) throws IOException {
SocketChannel sc = SocketChannel.open();
sc.connect(new InetSocketAddress("localhost", 8888));
sc.write(Charset.defaultCharset().encode("hello"));
System.in.read();
}
}

13 行打断点,并 debug 执行。

简单的多线程

这次只实现一个 worker

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
package com.redisc;

import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;

import static com.redisc.ByteBufferUtil.debugAll;

@Slf4j(topic = "c.MultiThreadServer")
public class MultiThreadServer {

public static void main(String[] args) throws IOException {
Thread.currentThread().setName("boss");
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
Selector boss = Selector.open();
SelectionKey bossKey = ssc.register(boss, 0, null);
bossKey.interestOps(SelectionKey.OP_ACCEPT);
ssc.bind(new InetSocketAddress(8888));

// 创建 worker 并初始化
Worker worker = new Worker("worker-0");
worker.reginster();
while (true) {
boss.select();
Iterator<SelectionKey> iter = boss.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();
if (key.isAcceptable()) {
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
log.debug("connected ... {}", sc.getRemoteAddress());
log.debug("before reginster ... {},", sc.getRemoteAddress());
// 关联 selector
sc.register(worker.worker, SelectionKey.OP_READ, null);
log.debug("after reginster ... {},", sc.getRemoteAddress());
}
}
}
}

static class Worker implements Runnable {
private Thread thread;
private Selector worker;
private String name;
private volatile boolean start = false;

public Worker(String name) {
this.name = name;
}

// 初始化线程
public void reginster() throws IOException {
if (!start) {
thread = new Thread(this);
thread.start();
worker = Selector.open();
}
}


@Override
public void run() {
while (true) {
try {
worker.select();
Iterator<SelectionKey> iter = worker.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();
if (key.isReadable()) {
ByteBuffer buffer = ByteBuffer.allocate(16);
SocketChannel channel = (SocketChannel) key.channel();
log.debug("read ... {}", channel.getRemoteAddress());
channel.read(buffer);
buffer.flip();
debugAll(buffer);
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}

开启 server,然后开启 client,输出为

1
2
13:04:59.067 [boss] DEBUG c.MultiThreadServer - connected ... /127.0.0.1:51397
13:04:59.071 [boss] DEBUG c.MultiThreadServer - before reginster ... /127.0.0.1:51397,

发现只输出了 before ,没有输出 after

这是因为,当执行

worker.reginster();

后,就立马进入了 worker 的循环,其中一句话

worker.select();

造成了阻塞,而住线程中 sc.register(worker.worker, SelectionKey.OP_READ, null);worker.worker 和 worker 都关联同一个 Selector ,所以,进行不下去了。

稍微的改进

如果将 worker.reginster(); 放在主线程的循环中,比如

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
package com.redisc;

import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;

import static com.redisc.ByteBufferUtil.debugAll;

@Slf4j(topic = "c.MultiThreadServer")
public class MultiThreadServer {

public static void main(String[] args) throws IOException {
Thread.currentThread().setName("boss");
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
Selector boss = Selector.open();
SelectionKey bossKey = ssc.register(boss, 0, null);
bossKey.interestOps(SelectionKey.OP_ACCEPT);
ssc.bind(new InetSocketAddress(8888));

// 创建 worker 并初始化
Worker worker = new Worker("worker-0");
while (true) {
boss.select();
Iterator<SelectionKey> iter = boss.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();
if (key.isAcceptable()) {
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
log.debug("connected ... {}", sc.getRemoteAddress());
log.debug("before reginster ... {},", sc.getRemoteAddress());
// 关联 selector
worker.reginster();
sc.register(worker.worker, SelectionKey.OP_READ, null);
log.debug("after reginster ... {},", sc.getRemoteAddress());
}
}
}
}

static class Worker implements Runnable {
private Thread thread;
private Selector worker;
private String name;
private volatile boolean start = false;

public Worker(String name) {
this.name = name;
}

// 初始化线程
public void reginster() throws IOException {
if (!start) {
thread = new Thread(this);
thread.start();
worker = Selector.open();
}
}


@Override
public void run() {
while (true) {
try {
worker.select();
Iterator<SelectionKey> iter = worker.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();
if (key.isReadable()) {
ByteBuffer buffer = ByteBuffer.allocate(16);
SocketChannel channel = (SocketChannel) key.channel();
log.debug("read ... {}", channel.getRemoteAddress());
channel.read(buffer);
buffer.flip();
debugAll(buffer);
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}

执行顺序

1
2
worker.reginster();
sc.register(worker.worker, SelectionKey.OP_READ, null);

因为指令重排和 CPU 调度问题,依然有可能造成上面的阻塞情况。但是,也有可能运行成功,输出

1
2
3
4
5
6
7
8
9
10
11
12
13:20:56.721 [boss] DEBUG c.MultiThreadServer - connected ... /127.0.0.1:51798
13:20:56.725 [boss] DEBUG c.MultiThreadServer - before reginster ... /127.0.0.1:51798,
13:20:56.726 [boss] DEBUG c.MultiThreadServer - after reginster ... /127.0.0.1:51798,
13:20:56.726 [Thread-0] DEBUG c.MultiThreadServer - read ... /127.0.0.1:51798
13:20:56.743 [Thread-0] DEBUG io.netty.util.internal.logging.InternalLoggerFactory - Using SLF4J as the default logging framework
+--------+-------------------- all ------------------------+----------------+
position: [0], limit: [5]
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 68 65 6c 6c 6f 00 00 00 00 00 00 00 00 00 00 00 |hello...........|
+--------+-------------------------------------------------+----------------+

但是,这时重新再来一个连接,依然会出现阻塞情况。

解决方法

模仿 netty

上面两个出现阻塞的原因,是分属于不同的线程,但是,我们可以将其放在一个线程中。

sc.register(worker.worker, SelectionKey.OP_READ, null);

放在 worker 中,代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
package com.redisc;

import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;

import static com.redisc.ByteBufferUtil.debugAll;

@Slf4j(topic = "c.MultiThreadServer")
public class MultiThreadServer {

public static void main(String[] args) throws IOException {
Thread.currentThread().setName("boss");
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
Selector boss = Selector.open();
SelectionKey bossKey = ssc.register(boss, 0, null);
bossKey.interestOps(SelectionKey.OP_ACCEPT);
ssc.bind(new InetSocketAddress(8888));

// 创建 worker 并初始化
Worker worker = new Worker("worker-0");
while (true) {
boss.select();
Iterator<SelectionKey> iter = boss.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();
if (key.isAcceptable()) {
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
log.debug("connected ... {}", sc.getRemoteAddress());
log.debug("before reginster ... {},", sc.getRemoteAddress());
// 关联 selector
worker.reginster(sc);
log.debug("after reginster ... {},", sc.getRemoteAddress());
}
}
}
}

static class Worker implements Runnable {
private Thread thread;
private Selector worker;
private String name;
private volatile boolean start = false;

public Worker(String name) {
this.name = name;
}

// 初始化线程
public void reginster(SocketChannel sc) throws IOException {
if (!start) {
thread = new Thread(this);
thread.start();
worker = Selector.open();
}
sc.register(worker, SelectionKey.OP_READ, null);
}


@Override
public void run() {
while (true) {
try {
worker.select();
Iterator<SelectionKey> iter = worker.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();
if (key.isReadable()) {
ByteBuffer buffer = ByteBuffer.allocate(16);
SocketChannel channel = (SocketChannel) key.channel();
log.debug("read ... {}", channel.getRemoteAddress());
channel.read(buffer);
buffer.flip();
debugAll(buffer);
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}

但是,这样其实是不行的。因为

1
2
3
4
5
6
7
8
9
// 初始化线程
public void reginster(SocketChannel sc) throws IOException {
if (!start) {
thread = new Thread(this);
thread.start();
worker = Selector.open();
}
sc.register(worker, SelectionKey.OP_READ, null);
}

这段代码其实还是主线程执行,并没有放在同一个线程。

所以,这里使用队列将其放在同一个线程中。

代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
package com.redisc;

import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.concurrent.ConcurrentLinkedQueue;

import static com.redisc.ByteBufferUtil.debugAll;

@Slf4j(topic = "c.MultiThreadServer")
public class MultiThreadServer {

public static void main(String[] args) throws IOException {
Thread.currentThread().setName("boss");
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
Selector boss = Selector.open();
SelectionKey bossKey = ssc.register(boss, 0, null);
bossKey.interestOps(SelectionKey.OP_ACCEPT);
ssc.bind(new InetSocketAddress(8888));

// 创建 worker 并初始化
Worker worker = new Worker("worker-0");
while (true) {
boss.select();
Iterator<SelectionKey> iter = boss.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();
if (key.isAcceptable()) {
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
log.debug("connected ... {}", sc.getRemoteAddress());
log.debug("before reginster ... {},", sc.getRemoteAddress());
// 关联 selector
worker.reginster(sc);
log.debug("after reginster ... {},", sc.getRemoteAddress());
}
}
}
}

static class Worker implements Runnable {
private Thread thread;
private Selector worker;
private String name;
private volatile boolean start = false;
private ConcurrentLinkedQueue<Runnable> queue = new ConcurrentLinkedQueue<>();

public Worker(String name) {
this.name = name;
}

// 初始化线程
public void reginster(SocketChannel sc) throws IOException {
if (!start) {
thread = new Thread(this);
thread.start();
worker = Selector.open();
}
// 向队列添加任务,任务并没有立刻执行
queue.add(() -> {
try {
sc.register(worker, SelectionKey.OP_READ, null);//boss
} catch (ClosedChannelException e) {
e.printStackTrace();
}
});
worker.wakeup(); // 唤醒 select 方法
}


@Override
public void run() {
while (true) {
try {
worker.select(); // 只有事件出现时,才能继续执行
Runnable task = queue.poll();
if (task != null) {
task.run(); // 执行了 sc.register(worker, SelectionKey.OP_READ, null);//boss
}
Iterator<SelectionKey> iter = worker.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();
if (key.isReadable()) {
ByteBuffer buffer = ByteBuffer.allocate(16);
SocketChannel channel = (SocketChannel) key.channel();
log.debug("read ... {}", channel.getRemoteAddress());
channel.read(buffer);
buffer.flip();
debugAll(buffer);
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}

执行顺序

  • 连接过来
  • 执行
    • worker.reginster(sc);
  • 开启新线程
  • 向队列注册事件,但不执行
  • workerworker.select(); // 只有事件出现时,才能继续执行 阻塞
  • worker.wakeup(); // 唤醒 select 方法 唤醒 worker ,程序继续向下执行
  • 执行注册读事件
  • 等待读事件出现

这里唯一的点就是,看上面的顺序

worker.wakeup(); 的执行一定是在 worker.select(); 之后吗?

答案是不一定,可能是在前面。但是 wakeup 的机制如下

如果一个线程在调用select()或select(long)方法时被阻塞,调用wakeup()会使线程立即从阻塞中唤醒;如果调用wakeup()期间没有select操作,下次调用select相关操作会立即返回,不会执行poll(),包括调用selectNow()。
在Select期间,多次调用wakeup()与调用一次效果是一样的。

这个和 java | park && unpark 有点像。

具体原理请参考 Java NIO wakeup实现原理

多个 worker

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
package com.redisc;

import lombok.extern.slf4j.Slf4j;
import org.checkerframework.checker.units.qual.A;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;

import static com.redisc.ByteBufferUtil.debugAll;

@Slf4j(topic = "c.MultiThreadServer")
public class MultiThreadServer {

public static void main(String[] args) throws IOException {
Thread.currentThread().setName("boss");
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
Selector boss = Selector.open();
SelectionKey bossKey = ssc.register(boss, 0, null);
bossKey.interestOps(SelectionKey.OP_ACCEPT);
ssc.bind(new InetSocketAddress(8888));

// 创建 worker 并初始化
Worker[] workers = new Worker[2];
for (int i = 0; i < workers.length; i++) {
workers[i] = new Worker("worker-" + i);
}

AtomicInteger index = new AtomicInteger();
while (true) {
boss.select();
Iterator<SelectionKey> iter = boss.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();
if (key.isAcceptable()) {
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
log.debug("connected ... {}", sc.getRemoteAddress());
log.debug("before reginster ... {},", sc.getRemoteAddress());
// 关联 selector
workers[index.getAndIncrement() % workers.length].reginster(sc);
log.debug("after reginster ... {},", sc.getRemoteAddress());
}
}
}
}

static class Worker implements Runnable {
private Thread thread;
private Selector worker;
private String name;
private volatile boolean start = false;
private ConcurrentLinkedQueue<Runnable> queue = new ConcurrentLinkedQueue<>();

public Worker(String name) {
this.name = name;
}

// 初始化线程
public void reginster(SocketChannel sc) throws IOException {
if (!start) {
thread = new Thread(this);
thread.start();
worker = Selector.open();
}
// 向队列添加任务,任务并没有立刻执行
queue.add(() -> {
try {
sc.register(worker, SelectionKey.OP_READ, null);//boss
} catch (ClosedChannelException e) {
e.printStackTrace();
}
});
worker.wakeup(); // 唤醒 select 方法
}


@Override
public void run() {
while (true) {
try {
worker.select(); // 只有事件出现时,才能继续执行
Runnable task = queue.poll();
if (task != null) {
task.run(); // 执行了 sc.register(worker, SelectionKey.OP_READ, null);//boss
}
Iterator<SelectionKey> iter = worker.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();
if (key.isReadable()) {
ByteBuffer buffer = ByteBuffer.allocate(16);
SocketChannel channel = (SocketChannel) key.channel();
log.debug("read ... {}", channel.getRemoteAddress());
channel.read(buffer);
buffer.flip();
debugAll(buffer);
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
请我喝杯咖啡吧~