使用多线程对 NIO
连接进行优化。
引入自写的方法类。
下载 JAVA 文件
netty
包
1 2 3 4 5
| <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.86.Final</version> </dependency>
|
当任务来临会通过 boss
分配给 worker
。
客户端代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| package com.redisc;
import java.io.IOException; import java.net.InetSocketAddress; import java.nio.channels.SocketChannel; import java.nio.charset.Charset;
public class Client { public static void main(String[] args) throws IOException { SocketChannel sc = SocketChannel.open(); sc.connect(new InetSocketAddress("localhost", 8888)); sc.write(Charset.defaultCharset().encode("hello")); System.in.read(); } }
|
第 13
行打断点,并 debug
执行。
简单的多线程
这次只实现一个 worker
。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94
| package com.redisc;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator;
import static com.redisc.ByteBufferUtil.debugAll;
@Slf4j(topic = "c.MultiThreadServer") public class MultiThreadServer {
public static void main(String[] args) throws IOException { Thread.currentThread().setName("boss"); ServerSocketChannel ssc = ServerSocketChannel.open(); ssc.configureBlocking(false); Selector boss = Selector.open(); SelectionKey bossKey = ssc.register(boss, 0, null); bossKey.interestOps(SelectionKey.OP_ACCEPT); ssc.bind(new InetSocketAddress(8888));
Worker worker = new Worker("worker-0"); worker.reginster(); while (true) { boss.select(); Iterator<SelectionKey> iter = boss.selectedKeys().iterator(); while (iter.hasNext()) { SelectionKey key = iter.next(); iter.remove(); if (key.isAcceptable()) { SocketChannel sc = ssc.accept(); sc.configureBlocking(false); log.debug("connected ... {}", sc.getRemoteAddress()); log.debug("before reginster ... {},", sc.getRemoteAddress()); sc.register(worker.worker, SelectionKey.OP_READ, null); log.debug("after reginster ... {},", sc.getRemoteAddress()); } } } }
static class Worker implements Runnable { private Thread thread; private Selector worker; private String name; private volatile boolean start = false;
public Worker(String name) { this.name = name; }
public void reginster() throws IOException { if (!start) { thread = new Thread(this); thread.start(); worker = Selector.open(); } }
@Override public void run() { while (true) { try { worker.select(); Iterator<SelectionKey> iter = worker.selectedKeys().iterator(); while (iter.hasNext()) { SelectionKey key = iter.next(); iter.remove(); if (key.isReadable()) { ByteBuffer buffer = ByteBuffer.allocate(16); SocketChannel channel = (SocketChannel) key.channel(); log.debug("read ... {}", channel.getRemoteAddress()); channel.read(buffer); buffer.flip(); debugAll(buffer); } } } catch (IOException e) { e.printStackTrace(); } } } } }
|
开启 server
,然后开启 client
,输出为
1 2
| 13:04:59.067 [boss] DEBUG c.MultiThreadServer - connected ... /127.0.0.1:51397 13:04:59.071 [boss] DEBUG c.MultiThreadServer - before reginster ... /127.0.0.1:51397,
|
发现只输出了 before
,没有输出 after
。
这是因为,当执行
worker.reginster();
后,就立马进入了 worker
的循环,其中一句话
worker.select();
造成了阻塞,而住线程中 sc.register(worker.worker, SelectionKey.OP_READ, null);
的 worker.worker
和 worker 都关联同一个 Selector
,所以,进行不下去了。
稍微的改进
如果将 worker.reginster();
放在主线程的循环中,比如
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94
| package com.redisc;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator;
import static com.redisc.ByteBufferUtil.debugAll;
@Slf4j(topic = "c.MultiThreadServer") public class MultiThreadServer {
public static void main(String[] args) throws IOException { Thread.currentThread().setName("boss"); ServerSocketChannel ssc = ServerSocketChannel.open(); ssc.configureBlocking(false); Selector boss = Selector.open(); SelectionKey bossKey = ssc.register(boss, 0, null); bossKey.interestOps(SelectionKey.OP_ACCEPT); ssc.bind(new InetSocketAddress(8888));
Worker worker = new Worker("worker-0"); while (true) { boss.select(); Iterator<SelectionKey> iter = boss.selectedKeys().iterator(); while (iter.hasNext()) { SelectionKey key = iter.next(); iter.remove(); if (key.isAcceptable()) { SocketChannel sc = ssc.accept(); sc.configureBlocking(false); log.debug("connected ... {}", sc.getRemoteAddress()); log.debug("before reginster ... {},", sc.getRemoteAddress()); worker.reginster(); sc.register(worker.worker, SelectionKey.OP_READ, null); log.debug("after reginster ... {},", sc.getRemoteAddress()); } } } }
static class Worker implements Runnable { private Thread thread; private Selector worker; private String name; private volatile boolean start = false;
public Worker(String name) { this.name = name; }
public void reginster() throws IOException { if (!start) { thread = new Thread(this); thread.start(); worker = Selector.open(); } }
@Override public void run() { while (true) { try { worker.select(); Iterator<SelectionKey> iter = worker.selectedKeys().iterator(); while (iter.hasNext()) { SelectionKey key = iter.next(); iter.remove(); if (key.isReadable()) { ByteBuffer buffer = ByteBuffer.allocate(16); SocketChannel channel = (SocketChannel) key.channel(); log.debug("read ... {}", channel.getRemoteAddress()); channel.read(buffer); buffer.flip(); debugAll(buffer); } } } catch (IOException e) { e.printStackTrace(); } } } } }
|
执行顺序
1 2
| worker.reginster(); sc.register(worker.worker, SelectionKey.OP_READ, null);
|
因为指令重排和 CPU
调度问题,依然有可能造成上面的阻塞情况。但是,也有可能运行成功,输出
1 2 3 4 5 6 7 8 9 10 11 12
| 13:20:56.721 [boss] DEBUG c.MultiThreadServer - connected ... /127.0.0.1:51798 13:20:56.725 [boss] DEBUG c.MultiThreadServer - before reginster ... /127.0.0.1:51798, 13:20:56.726 [boss] DEBUG c.MultiThreadServer - after reginster ... /127.0.0.1:51798, 13:20:56.726 [Thread-0] DEBUG c.MultiThreadServer - read ... /127.0.0.1:51798 13:20:56.743 [Thread-0] DEBUG io.netty.util.internal.logging.InternalLoggerFactory - Using SLF4J as the default logging framework +--------+-------------------- all ------------------------+----------------+ position: [0], limit: [5] +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 68 65 6c 6c 6f 00 00 00 00 00 00 00 00 00 00 00 |hello...........| +--------+-------------------------------------------------+----------------+
|
但是,这时重新再来一个连接,依然会出现阻塞情况。
解决方法
模仿 netty
。
上面两个出现阻塞的原因,是分属于不同的线程,但是,我们可以将其放在一个线程中。
sc.register(worker.worker, SelectionKey.OP_READ, null);
放在 worker
中,代码如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94
| package com.redisc;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator;
import static com.redisc.ByteBufferUtil.debugAll;
@Slf4j(topic = "c.MultiThreadServer") public class MultiThreadServer {
public static void main(String[] args) throws IOException { Thread.currentThread().setName("boss"); ServerSocketChannel ssc = ServerSocketChannel.open(); ssc.configureBlocking(false); Selector boss = Selector.open(); SelectionKey bossKey = ssc.register(boss, 0, null); bossKey.interestOps(SelectionKey.OP_ACCEPT); ssc.bind(new InetSocketAddress(8888));
Worker worker = new Worker("worker-0"); while (true) { boss.select(); Iterator<SelectionKey> iter = boss.selectedKeys().iterator(); while (iter.hasNext()) { SelectionKey key = iter.next(); iter.remove(); if (key.isAcceptable()) { SocketChannel sc = ssc.accept(); sc.configureBlocking(false); log.debug("connected ... {}", sc.getRemoteAddress()); log.debug("before reginster ... {},", sc.getRemoteAddress()); worker.reginster(sc); log.debug("after reginster ... {},", sc.getRemoteAddress()); } } } }
static class Worker implements Runnable { private Thread thread; private Selector worker; private String name; private volatile boolean start = false;
public Worker(String name) { this.name = name; }
public void reginster(SocketChannel sc) throws IOException { if (!start) { thread = new Thread(this); thread.start(); worker = Selector.open(); } sc.register(worker, SelectionKey.OP_READ, null); }
@Override public void run() { while (true) { try { worker.select(); Iterator<SelectionKey> iter = worker.selectedKeys().iterator(); while (iter.hasNext()) { SelectionKey key = iter.next(); iter.remove(); if (key.isReadable()) { ByteBuffer buffer = ByteBuffer.allocate(16); SocketChannel channel = (SocketChannel) key.channel(); log.debug("read ... {}", channel.getRemoteAddress()); channel.read(buffer); buffer.flip(); debugAll(buffer); } } } catch (IOException e) { e.printStackTrace(); } } } } }
|
但是,这样其实是不行的。因为
1 2 3 4 5 6 7 8 9
| public void reginster(SocketChannel sc) throws IOException { if (!start) { thread = new Thread(this); thread.start(); worker = Selector.open(); } sc.register(worker, SelectionKey.OP_READ, null); }
|
这段代码其实还是主线程执行,并没有放在同一个线程。
所以,这里使用队列将其放在同一个线程中。
代码如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105
| package com.redisc;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.*; import java.util.Iterator; import java.util.concurrent.ConcurrentLinkedQueue;
import static com.redisc.ByteBufferUtil.debugAll;
@Slf4j(topic = "c.MultiThreadServer") public class MultiThreadServer {
public static void main(String[] args) throws IOException { Thread.currentThread().setName("boss"); ServerSocketChannel ssc = ServerSocketChannel.open(); ssc.configureBlocking(false); Selector boss = Selector.open(); SelectionKey bossKey = ssc.register(boss, 0, null); bossKey.interestOps(SelectionKey.OP_ACCEPT); ssc.bind(new InetSocketAddress(8888));
Worker worker = new Worker("worker-0"); while (true) { boss.select(); Iterator<SelectionKey> iter = boss.selectedKeys().iterator(); while (iter.hasNext()) { SelectionKey key = iter.next(); iter.remove(); if (key.isAcceptable()) { SocketChannel sc = ssc.accept(); sc.configureBlocking(false); log.debug("connected ... {}", sc.getRemoteAddress()); log.debug("before reginster ... {},", sc.getRemoteAddress()); worker.reginster(sc); log.debug("after reginster ... {},", sc.getRemoteAddress()); } } } }
static class Worker implements Runnable { private Thread thread; private Selector worker; private String name; private volatile boolean start = false; private ConcurrentLinkedQueue<Runnable> queue = new ConcurrentLinkedQueue<>();
public Worker(String name) { this.name = name; }
public void reginster(SocketChannel sc) throws IOException { if (!start) { thread = new Thread(this); thread.start(); worker = Selector.open(); } queue.add(() -> { try { sc.register(worker, SelectionKey.OP_READ, null); } catch (ClosedChannelException e) { e.printStackTrace(); } }); worker.wakeup(); }
@Override public void run() { while (true) { try { worker.select(); Runnable task = queue.poll(); if (task != null) { task.run(); } Iterator<SelectionKey> iter = worker.selectedKeys().iterator(); while (iter.hasNext()) { SelectionKey key = iter.next(); iter.remove(); if (key.isReadable()) { ByteBuffer buffer = ByteBuffer.allocate(16); SocketChannel channel = (SocketChannel) key.channel(); log.debug("read ... {}", channel.getRemoteAddress()); channel.read(buffer); buffer.flip(); debugAll(buffer); } } } catch (IOException e) { e.printStackTrace(); } } } } }
|
执行顺序
- 连接过来
- 执行
- 开启新线程
- 向队列注册事件,但不执行
worker
中 worker.select(); // 只有事件出现时,才能继续执行
阻塞
worker.wakeup(); // 唤醒 select 方法
唤醒 worker ,程序继续向下执行
- 执行注册读事件
- 等待读事件出现
这里唯一的点就是,看上面的顺序
worker.wakeup();
的执行一定是在 worker.select();
之后吗?
答案是不一定,可能是在前面。但是 wakeup
的机制如下
如果一个线程在调用select()或select(long)方法时被阻塞,调用wakeup()会使线程立即从阻塞中唤醒;如果调用wakeup()期间没有select操作,下次调用select相关操作会立即返回,不会执行poll(),包括调用selectNow()。
在Select期间,多次调用wakeup()与调用一次效果是一样的。
这个和 java | park && unpark 有点像。
具体原理请参考 Java NIO wakeup实现原理
多个 worker
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112
| package com.redisc;
import lombok.extern.slf4j.Slf4j; import org.checkerframework.checker.units.qual.A;
import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.*; import java.util.Iterator; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicInteger;
import static com.redisc.ByteBufferUtil.debugAll;
@Slf4j(topic = "c.MultiThreadServer") public class MultiThreadServer {
public static void main(String[] args) throws IOException { Thread.currentThread().setName("boss"); ServerSocketChannel ssc = ServerSocketChannel.open(); ssc.configureBlocking(false); Selector boss = Selector.open(); SelectionKey bossKey = ssc.register(boss, 0, null); bossKey.interestOps(SelectionKey.OP_ACCEPT); ssc.bind(new InetSocketAddress(8888));
Worker[] workers = new Worker[2]; for (int i = 0; i < workers.length; i++) { workers[i] = new Worker("worker-" + i); }
AtomicInteger index = new AtomicInteger(); while (true) { boss.select(); Iterator<SelectionKey> iter = boss.selectedKeys().iterator(); while (iter.hasNext()) { SelectionKey key = iter.next(); iter.remove(); if (key.isAcceptable()) { SocketChannel sc = ssc.accept(); sc.configureBlocking(false); log.debug("connected ... {}", sc.getRemoteAddress()); log.debug("before reginster ... {},", sc.getRemoteAddress()); workers[index.getAndIncrement() % workers.length].reginster(sc); log.debug("after reginster ... {},", sc.getRemoteAddress()); } } } }
static class Worker implements Runnable { private Thread thread; private Selector worker; private String name; private volatile boolean start = false; private ConcurrentLinkedQueue<Runnable> queue = new ConcurrentLinkedQueue<>();
public Worker(String name) { this.name = name; }
public void reginster(SocketChannel sc) throws IOException { if (!start) { thread = new Thread(this); thread.start(); worker = Selector.open(); } queue.add(() -> { try { sc.register(worker, SelectionKey.OP_READ, null); } catch (ClosedChannelException e) { e.printStackTrace(); } }); worker.wakeup(); }
@Override public void run() { while (true) { try { worker.select(); Runnable task = queue.poll(); if (task != null) { task.run(); } Iterator<SelectionKey> iter = worker.selectedKeys().iterator(); while (iter.hasNext()) { SelectionKey key = iter.next(); iter.remove(); if (key.isReadable()) { ByteBuffer buffer = ByteBuffer.allocate(16); SocketChannel channel = (SocketChannel) key.channel(); log.debug("read ... {}", channel.getRemoteAddress()); channel.read(buffer); buffer.flip(); debugAll(buffer); } } } catch (IOException e) { e.printStackTrace(); } } } } }
|