可写事件。向客户端发送数据。
引入自写的方法类。
下载 JAVA 文件
netty 包
1 2 3 4 5
| <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.86.Final</version> </dependency>
|
客户端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| package com.redisc;
import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SocketChannel;
public class Client { public static void main(String[] args) throws IOException { SocketChannel sc = SocketChannel.open(); sc.connect(new InetSocketAddress("localhost", 8090));
int count = 0; while (true) { ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024); count += sc.read(buffer); System.out.println(count); buffer.clear(); } } }
|
服务端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70
| package com.redisc;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.nio.charset.Charset; import java.util.Iterator;
@Slf4j(topic = "c.Test") public class Run {
public static void main(String[] args) throws IOException { Selector selector = Selector.open(); ServerSocketChannel ssc = ServerSocketChannel.open(); ssc.configureBlocking(false);
SelectionKey sscKey = ssc.register(selector, 0, null); sscKey.interestOps(SelectionKey.OP_ACCEPT); log.debug("register key:{}", sscKey);
ssc.bind(new InetSocketAddress(8090)); while (true) { selector.select(); Iterator<SelectionKey> iter = selector.selectedKeys().iterator(); while (iter.hasNext()) { SelectionKey key = iter.next(); iter.remove(); log.debug("key:{}", key); if (key.isAcceptable()) { SocketChannel sc = ssc.accept(); sc.configureBlocking(false);
StringBuilder sb = new StringBuilder(); for (int i = 0; i < 3000000; i++) { sb.append("a"); } ByteBuffer buffer = Charset.defaultCharset().encode(sb.toString()); while (buffer.hasRemaining()) { int wirte = sc.write(buffer); System.out.println(wirte); } }
}
} }
}
|
使用
服务端输出
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| 14:17:05.106 [main] DEBUG c.Test - register key:sun.nio.ch.SelectionKeyImpl@497470ed 14:17:09.349 [main] DEBUG c.Test - key:sun.nio.ch.SelectionKeyImpl@497470ed 261676 1136400 0 0 1080096 0 0 0 0 0 0 0 0 0 0 521828
|
之所以有这么多 0
,是因为缓冲区已经写满了,所以需要等待。
所以,这里有一个改进的地方,就是缓冲区满了,应该去干别的,而不是一直等待。
改进
如果数据量太大,增加一个可写事件,进行后续触发。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90
| package com.redisc;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.nio.charset.Charset; import java.util.Iterator;
@Slf4j(topic = "c.Test") public class Run {
public static void main(String[] args) throws IOException { Selector selector = Selector.open(); ServerSocketChannel ssc = ServerSocketChannel.open(); ssc.configureBlocking(false);
SelectionKey sscKey = ssc.register(selector, 0, null); sscKey.interestOps(SelectionKey.OP_ACCEPT); log.debug("register key:{}", sscKey);
ssc.bind(new InetSocketAddress(8090)); while (true) { selector.select(); Iterator<SelectionKey> iter = selector.selectedKeys().iterator(); while (iter.hasNext()) { SelectionKey key = iter.next(); iter.remove(); log.debug("key:{}", key); if (key.isAcceptable()) { SocketChannel sc = ssc.accept(); sc.configureBlocking(false); SelectionKey sckey = sc.register(selector, 0, null);
StringBuilder sb = new StringBuilder(); for (int i = 0; i < 3000000; i++) { sb.append("a"); } ByteBuffer buffer = Charset.defaultCharset().encode(sb.toString()); int wirte = sc.write(buffer); System.out.println(wirte);
if (buffer.hasRemaining()) { sckey.interestOps(sckey.interestOps() + SelectionKey.OP_WRITE); sckey.attach(buffer); } } else if (key.isWritable()) { ByteBuffer buffer = (ByteBuffer) key.attachment(); SocketChannel sc = (SocketChannel) key.channel(); int write = sc.write(buffer); System.out.println(write);
if (!buffer.hasRemaining()) { key.attach(null); key.interestOps(key.interestOps() - SelectionKey.OP_WRITE); } }
}
} }
}
|
输出
1 2 3 4 5 6 7 8 9
| 15:23:52.898 [main] DEBUG c.Test - register key:sun.nio.ch.SelectionKeyImpl@497470ed 15:23:57.351 [main] DEBUG c.Test - key:sun.nio.ch.SelectionKeyImpl@497470ed 261676 15:23:57.425 [main] DEBUG c.Test - key:sun.nio.ch.SelectionKeyImpl@61e4705b 1120068 15:23:57.426 [main] DEBUG c.Test - key:sun.nio.ch.SelectionKeyImpl@61e4705b 1071020 15:23:57.427 [main] DEBUG c.Test - key:sun.nio.ch.SelectionKeyImpl@61e4705b 547236
|