如果把 handler
提炼出来,是否可行。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 package com.chat;import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioServerSocketChannel;import io.netty.handler .codec.LengthFieldBasedFrameDecoder;import io.netty.handler .logging.LoggingHandler;import lombok.extern.slf4j.Slf4j;import java.io.IOException;@Slf 4jpublic class Server { public static void main (String[] args) throws IOException { NioEventLoopGroup boss = new NioEventLoopGroup(); NioEventLoopGroup worker = new NioEventLoopGroup(); LengthFieldBasedFrameDecoder lengthFieldBasedFrameDecoder = new LengthFieldBasedFrameDecoder(1024 , 12 , 4 , 0 , 0 ); LoggingHandler loggingHandler = new LoggingHandler(); try { ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.channel(NioServerSocketChannel.class ) ; serverBootstrap.group(boss, worker); serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel (SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(lengthFieldBasedFrameDecoder); socketChannel.pipeline().addLast(loggingHandler); } }); ChannelFuture channelFuture = serverBootstrap.bind(8000 ).sync(); channelFuture.channel().closeFuture().sync(); } catch (InterruptedException e) { log.error ("error" , e); } finally { boss.shutdownGracefully(); worker.shutdownGracefully(); } } }
上面的代码将
1 2 LengthFieldBasedFrameDecoder lengthFieldBasedFrameDecoder = new LengthFieldBasedFrameDecoder(1024 , 12 , 4 , 0 , 0 ); LoggingHandler loggingHandler = new LoggingHandler();
抽了出来。但是,在多线程环境下是出错的,因为,所有的线程都共享一个 lengthFieldBasedFrameDecoder
。
考虑到这样一个情况,假设线程 1
,数据没有达到数据长度,这个时候线程2
的数据又来了,解码器,很有可能会将两个数据合为一个。
但是,logger
由于不需要存储状态,所以可以共享。
所以,改成
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 package com.chat;import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioServerSocketChannel;import io.netty.handler .codec.LengthFieldBasedFrameDecoder;import io.netty.handler .logging.LoggingHandler;import lombok.extern.slf4j.Slf4j;import java.io.IOException;@Slf 4jpublic class Server { public static void main (String[] args) throws IOException { NioEventLoopGroup boss = new NioEventLoopGroup(); NioEventLoopGroup worker = new NioEventLoopGroup(); LoggingHandler loggingHandler = new LoggingHandler(); try { ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.channel(NioServerSocketChannel.class ) ; serverBootstrap.group(boss, worker); serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel (SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024 , 12 , 4 , 0 , 0 )); socketChannel.pipeline().addLast(loggingHandler); } }); ChannelFuture channelFuture = serverBootstrap.bind(8000 ).sync(); channelFuture.channel().closeFuture().sync(); } catch (InterruptedException e) { log.error ("error" , e); } finally { boss.shutdownGracefully(); worker.shutdownGracefully(); } } }
如果看源码,LoggingHandler
就会发现 有 @Sharable
而 LengthFieldBasedFrameDecoder
没有 @Sharable
。
自己加 @Sharable 如果,我们写了一个 handler
,里面的没有状态存储,能否使用 @Sharable
修饰,比如
1 2 3 4 5 @Slf 4j@ChannelHandler .Sharable public class MessageCode extends ByteToMessageCodec<Message> { ... }
如果使用的话会报
1 2 3 4 5 6 Exception in thread "main " java .lang .IllegalStateException : ChannelHandler com .chat .protocol .MessageCode is not allowed to be shared at io .netty .channel .ChannelHandlerAdapter .ensureNotSharable (ChannelHandlerAdapter .java :37) at io .netty .handler .codec .ByteToMessageCodec .<init >(ByteToMessageCodec .java :73) at io .netty .handler .codec .ByteToMessageCodec .<init >(ByteToMessageCodec .java :55) at com .chat .protocol .MessageCode .<init >(MessageCode .java :18) at com .chat .protocol .TestMessageCode .main (TestMessageCode .java :17)
错误。
查看 ByteToMessageCodec
源码发现
Be aware that sub-classes of {@link ByteToMessageCodec} MUST NOT * annotated with {@link @Sharable}.
即,ByteToMessageCodec
的子类不能被共享。
因为,netty
,默认觉得 ByteToMessageCodec
会保存状态。
想要继承它又被共享,需要套一层。
1 2 3 4 @ChannelHandler .Sharable public class MessageCodeShare extends MessageToMessageCodec<ByteBuf , Message> { ... }
完整代码如下,请参考 java | 自定义协议 编码器和解码器 | MessageCode 的比照
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 package com.chat.protocol; import com.chat.message.Message; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToMessageCodec; import lombok.extern.slf4j.Slf4j; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.util.List; @Slf4j @ChannelHandler.Sharable public class MessageCodeShare extends MessageToMessageCodec<ByteBuf, Message> { @Override protected void encode(ChannelHandlerContext ctx, Message message, List<Object> out) throws Exception { ByteBuf byteBuf = ctx.alloc() .buffer() ; byteBuf.writeBytes(new byte []{1, 2, 3, 4}) ; byteBuf.writeByte(1) ; byteBuf.writeByte(0) ; byteBuf.writeByte(message .getMessageType () ); byteBuf.writeInt(message .getSequenceId () ); byteBuf.writeByte(0xff) ; ByteArrayOutputStream bos = new ByteArrayOutputStream() ; ObjectOutputStream oos = new ObjectOutputStream(bos ) ; oos.writeObject(message ) ; byte[] bytes = bos.to ByteArray() ; byteBuf.writeInt(bytes .length ) ; byteBuf.writeBytes(bytes ) ; out.add(byteBuf); } @Override protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf, List<Object> out) throws Exception { int magicNum = byteBuf.readInt() ; byte version = byteBuf.readByte() ; byte serializerType = byteBuf.readByte() ; byte messageType = byteBuf.readByte() ; int sequenceId = byteBuf.readInt() ; byteBuf.readByte() ; int length = byteBuf.readInt() ; byte[] bytes = new byte[length ] ; byteBuf.readBytes(bytes , 0, length ) ; ObjectInputStream objectInputStream = new ObjectInputStream(new ByteArrayInputStream(bytes ) ); Message message = (Message) objectInputStream.readObject() ; log.debug("{},{},{},{},{},{}" , magicNum, version, serializerType, messageType, sequenceId, length); log.debug("{}" , message); out.add(message); } }