如果,ByteBuffer
的长度小于客户端发送数据长度,就会出现消息边界问题。
引入自写的方法类。
下载 JAVA 文件
netty 包
1 2 3 4 5 <dependency > <groupId > io.netty</groupId > <artifactId > netty-all</artifactId > <version > 4.1.86.Final</version > </dependency >
客户端 1 2 3 4 5 6 7 8 9 10 11 12 13 package com.redisc; import java.io.IOException;import java.net.InetSocketAddress;import java.nio.channels.SocketChannel;public class Client { public static void main (String [] args) throws IOException { SocketChannel sc = SocketChannel.open (); sc.connect (new InetSocketAddress("localhost" , 8090 )); System.out.println ("waiting..." ); } }
服务端 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 package com.redisc; import lombok.extern .slf4j.Slf4j;import java.io.IOException;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.ServerSocketChannel;import java.nio.channels.SocketChannel;import java.nio.charset.Charset;import java.util.Iterator;import static com.redisc.ByteBufferUtil.debugRead;@Slf4j(topic = "c.Test" ) public class Run { public static void main (String [] args) throws IOException { Selector selector = Selector.open (); ServerSocketChannel ssc = ServerSocketChannel.open (); ssc.configureBlocking(false ); SelectionKey sscKey = ssc.register (selector, 0 , null); sscKey.interestOps(SelectionKey.OP_ACCEPT); log .debug("register key:{}" , sscKey); ssc.bind(new InetSocketAddress(8090 )); while (true ) { selector.select(); Iterator<SelectionKey> iter = selector.selectedKeys().iterator(); while (iter.hasNext()) { SelectionKey key = iter.next(); iter.remove (); log .debug("key:{}" , key); if (key.isAcceptable()) { ServerSocketChannel channel = (ServerSocketChannel) key.channel(); SocketChannel sc = channel.accept(); sc.configureBlocking(false ); SelectionKey scKey = sc.register (selector, 0 , null); scKey.interestOps(SelectionKey.OP_READ); log .debug("{}" , sc); } else if (key.isReadable()) { try { SocketChannel channel = (SocketChannel) key.channel(); ByteBuffer buffer = ByteBuffer.allocate(4 ); int read = channel.read (buffer ); if (read > 0 ) { buffer .flip(); System.out.println (Charset.defaultCharset().decode(buffer )); debugRead(buffer ); } else { log .debug("客户端关闭" ); key.cancel(); } } catch (IOException e) { e.printStackTrace(); key.cancel(); } } } } } }
使用
服务端开启
客户端,第 11 行断点,debug 开启
sc 变量执行
sc.write(Charset.defaultCharset().encode("中国"))
服务端 国
乱码
这是因为 jvm 默认为 UTF-8 编码,中文字每个字是 3 字节,所以,4 字节下国字解析不出来。
处理消息边界
消息长度和 ByteBuffer 一样大
客户端和服务端约定同一大小,大小不够的,客户端补齐
按分隔符拆分
协议方式
数据由两部分组成,比如前 4
个字节存储数据长度,后面就是数据本身
容量超过 这里举例是根据 「处理消息边界的第二种方式:按分隔符拆分」。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 package com.redisc; import lombok.extern .slf4j.Slf4j;import java.io.IOException;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.ServerSocketChannel;import java.nio.channels.SocketChannel;import java.nio.charset.Charset;import java.util.Iterator;import static com.redisc.ByteBufferUtil.debugAll;import static com.redisc.ByteBufferUtil.debugRead;@Slf4j(topic = "c.Test" ) public class Run { public static void main (String [] args) throws IOException { Selector selector = Selector.open (); ServerSocketChannel ssc = ServerSocketChannel.open (); ssc.configureBlocking(false ); SelectionKey sscKey = ssc.register (selector, 0 , null); sscKey.interestOps(SelectionKey.OP_ACCEPT); log .debug("register key:{}" , sscKey); ssc.bind(new InetSocketAddress(8090 )); while (true ) { selector.select(); Iterator<SelectionKey> iter = selector.selectedKeys().iterator(); while (iter.hasNext()) { SelectionKey key = iter.next(); iter.remove (); log .debug("key:{}" , key); if (key.isAcceptable()) { ServerSocketChannel channel = (ServerSocketChannel) key.channel(); SocketChannel sc = channel.accept(); sc.configureBlocking(false ); SelectionKey scKey = sc.register (selector, 0 , null); scKey.interestOps(SelectionKey.OP_READ); log .debug("{}" , sc); } else if (key.isReadable()) { try { SocketChannel channel = (SocketChannel) key.channel(); ByteBuffer buffer = ByteBuffer.allocate(16 ); int read = channel.read (buffer ); if (read > 0 ) { split(buffer ); } else { log .debug("客户端关闭" ); key.cancel(); } } catch (IOException e) { e.printStackTrace(); key.cancel(); } } } } } private static void split (ByteBuffer source) { source.flip(); for (int i = 0 ; i < source.limit(); i++) { if (source.get (i) == '\n' ) { int length = i + 1 - source.position (); ByteBuffer target = ByteBuffer.allocate(length); for (int j = 0 ; j < length; j++) { target.put (source.get ()); } debugAll(target); } } source.compact(); } }
开启服务端
开启 debug
下 11
行断点的客户端
sc 变量发送
sc.write(Charset.defaultCharset().encode("hello\nworld\n"))
成功。
如过 sc
变量发送
1 sc.write(Charset . defaultCharset() .encode("123456789012345qwert\n" ))
输出的是
1 2 3 4 5 6 7 +--------+ -------------------- all ------------------------+----------------+ position: [5], limit: [5] +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+ -------------------------------------------------+----------------+ |00000000| 77 65 72 74 0a |wert. | +--------+ -------------------------------------------------+----------------+
前面的 16
字符没有了。
解决方案: 附件 之前的代码
1 SelectionKey scKey = sc.register(selector, 0 , null );
代码中,那个 null 就是附件,这个附件可以是一个 bytebuff,每一个 SelectKey 对应自己的附件。
如果数据过大,那么,可以进行附件扩容,直到把所有的数据读取完毕。
修改服务器代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 package com.redisc; import lombok.extern .slf4j.Slf4j;import java.io.IOException;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.ServerSocketChannel;import java.nio.channels.SocketChannel;import java.nio.charset.Charset;import java.util.Iterator;import static com.redisc.ByteBufferUtil.debugAll;import static com.redisc.ByteBufferUtil.debugRead;@Slf4j(topic = "c.Test" ) public class Run { public static void main (String [] args) throws IOException { Selector selector = Selector.open (); ServerSocketChannel ssc = ServerSocketChannel.open (); ssc.configureBlocking(false ); SelectionKey sscKey = ssc.register (selector, 0 , null); sscKey.interestOps(SelectionKey.OP_ACCEPT); log .debug("register key:{}" , sscKey); ssc.bind(new InetSocketAddress(8090 )); while (true ) { selector.select(); Iterator<SelectionKey> iter = selector.selectedKeys().iterator(); while (iter.hasNext()) { SelectionKey key = iter.next(); iter.remove (); log .debug("key:{}" , key); if (key.isAcceptable()) { ServerSocketChannel channel = (ServerSocketChannel) key.channel(); SocketChannel sc = channel.accept(); sc.configureBlocking(false ); ByteBuffer buffer = ByteBuffer.allocate(16 ); SelectionKey scKey = sc.register (selector, 0 , buffer ); scKey.interestOps(SelectionKey.OP_READ); log .debug("{}" , sc); } else if (key.isReadable()) { try { SocketChannel channel = (SocketChannel) key.channel(); ByteBuffer buffer = (ByteBuffer) key.attachment(); int read = channel.read (buffer ); if (read > 0 ) { split(buffer ); if (buffer .position () == buffer .limit()) { ByteBuffer newBuffer = ByteBuffer.allocate(buffer .capacity() * 2 ); buffer .flip(); newBuffer.put (buffer ); key.attach (newBuffer); } } else { log .debug("客户端关闭" ); key.cancel(); } } catch (IOException e) { e.printStackTrace(); key.cancel(); } } } } } private static void split (ByteBuffer source) { source.flip(); for (int i = 0 ; i < source.limit(); i++) { if (source.get (i) == '\n' ) { int length = i + 1 - source.position (); ByteBuffer target = ByteBuffer.allocate(length); for (int j = 0 ; j < length; j++) { target.put (source.get ()); } debugAll(target); } } source.compact(); } }
客户端再次执行相同的动作。
输出
1 2 3 4 5 6 7 8 +--------+-------------------- all ------------------------+----------------+ position: [21 ], limit: [21 ] +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000 | 31 32 33 34 35 36 37 38 39 30 31 32 33 34 35 71 |123456789012345 q| |00000010 | 77 65 72 74 0 a |wert. | +--------+-------------------------------------------------+----------------+
bytebuffer 大小分配
每个 channel
都需要记录可能被切分的消息,因为 Bytebuff
是线程不安全的,所以,不能被多个 channel
使用,因为需要为每个 channel
维护一个 ByteBuffer
「附件」
Bytebuffer
不能太大
一种思路是,Bytebuffer
默认为 4K
,不够尽兴扩容,但是,拷贝数据耗费时间
一个思路是,多个数组组成 buffer
,一个数组不够,把多出的内容写到数组中,不需要拷贝,但是,数据不连续,解析复杂