0%

java | @sharable handler 共享

如果把 handler 提炼出来,是否可行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
package com.chat;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;

import java.io.IOException;


@Slf4j
public class Server {

public static void main(String[] args) throws IOException {
NioEventLoopGroup boss = new NioEventLoopGroup();
NioEventLoopGroup worker = new NioEventLoopGroup();
LengthFieldBasedFrameDecoder lengthFieldBasedFrameDecoder = new LengthFieldBasedFrameDecoder(1024, 12, 4, 0, 0);
LoggingHandler loggingHandler = new LoggingHandler();

try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.group(boss, worker);
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(lengthFieldBasedFrameDecoder);
socketChannel.pipeline().addLast(loggingHandler);
}
});
ChannelFuture channelFuture = serverBootstrap.bind(8000).sync();
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
log.error("error", e);
} finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
}

}
}

上面的代码将

1
2
LengthFieldBasedFrameDecoder lengthFieldBasedFrameDecoder = new LengthFieldBasedFrameDecoder(1024, 12, 4, 0, 0);
LoggingHandler loggingHandler = new LoggingHandler();

抽了出来。但是,在多线程环境下是出错的,因为,所有的线程都共享一个 lengthFieldBasedFrameDecoder

考虑到这样一个情况,假设线程 1,数据没有达到数据长度,这个时候线程2的数据又来了,解码器,很有可能会将两个数据合为一个。

但是,logger 由于不需要存储状态,所以可以共享。

所以,改成

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
package com.chat;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;

import java.io.IOException;


@Slf4j
public class Server {

public static void main(String[] args) throws IOException {
NioEventLoopGroup boss = new NioEventLoopGroup();
NioEventLoopGroup worker = new NioEventLoopGroup();
LoggingHandler loggingHandler = new LoggingHandler();

try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.group(boss, worker);
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024, 12, 4, 0, 0));
socketChannel.pipeline().addLast(loggingHandler);
}
});
ChannelFuture channelFuture = serverBootstrap.bind(8000).sync();
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
log.error("error", e);
} finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
}

}
}

如果看源码,LoggingHandler 就会发现 有 @SharableLengthFieldBasedFrameDecoder 没有 @Sharable

自己加 @Sharable

如果,我们写了一个 handler,里面的没有状态存储,能否使用 @Sharable 修饰,比如

1
2
3
4
5
@Slf4j
@ChannelHandler.Sharable
public class MessageCode extends ByteToMessageCodec<Message> {
...
}

如果使用的话会报

1
2
3
4
5
6
Exception in thread "main" java.lang.IllegalStateException: ChannelHandler com.chat.protocol.MessageCode is not allowed to be shared
at io.netty.channel.ChannelHandlerAdapter.ensureNotSharable(ChannelHandlerAdapter.java:37)
at io.netty.handler.codec.ByteToMessageCodec.<init>(ByteToMessageCodec.java:73)
at io.netty.handler.codec.ByteToMessageCodec.<init>(ByteToMessageCodec.java:55)
at com.chat.protocol.MessageCode.<init>(MessageCode.java:18)
at com.chat.protocol.TestMessageCode.main(TestMessageCode.java:17)

错误。

查看 ByteToMessageCodec 源码发现

  • Be aware that sub-classes of {@link ByteToMessageCodec} MUST NOT * annotated with {@link @Sharable}.

即,ByteToMessageCodec 的子类不能被共享。

因为,netty,默认觉得 ByteToMessageCodec 会保存状态。

想要继承它又被共享,需要套一层。

1
2
3
4
@ChannelHandler.Sharable
public class MessageCodeShare extends MessageToMessageCodec<ByteBuf, Message> {
...
}

完整代码如下,请参考 java | 自定义协议 编码器和解码器 | MessageCode 的比照

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
package com.chat.protocol;

import com.chat.message.Message;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageCodec;
import lombok.extern.slf4j.Slf4j;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.List;

@Slf4j
@ChannelHandler.Sharable
public class MessageCodeShare extends MessageToMessageCodec<ByteBuf, Message> {
@Override
protected void encode(ChannelHandlerContext ctx, Message message, List<Object> out) throws Exception {
ByteBuf byteBuf = ctx.alloc().buffer();
// 魔数 4 字节
byteBuf.writeBytes(new byte[]{1, 2, 3, 4});
// 数据版本 1 字节
byteBuf.writeByte(1);
// 序列化方式 1 字节 jdk = 0,json = 1
byteBuf.writeByte(0);
// 指令类型 1 字节
byteBuf.writeByte(message.getMessageType());
// 4 个字节 请求序号
byteBuf.writeInt(message.getSequenceId());
// 填充,让固定字节达到 16 个
byteBuf.writeByte(0xff);
// 正文长度
ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(bos);
oos.writeObject(message);
byte[] bytes = bos.toByteArray();
byteBuf.writeInt(bytes.length);
// 写入内容
byteBuf.writeBytes(bytes);
out.add(byteBuf);
}

@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf, List<Object> out) throws Exception {
// 解码对应上面的编码
int magicNum = byteBuf.readInt(); // 魔数,对应 4 个字节
byte version = byteBuf.readByte(); // 版本
byte serializerType = byteBuf.readByte(); // 序列化方式
byte messageType = byteBuf.readByte(); // 指令类型
int sequenceId = byteBuf.readInt(); // 请求序号
byteBuf.readByte(); // 填充字节
int length = byteBuf.readInt(); // 数据长度
byte[] bytes = new byte[length];
byteBuf.readBytes(bytes, 0, length);// 正文

ObjectInputStream objectInputStream = new ObjectInputStream(new ByteArrayInputStream(bytes));
Message message = (Message) objectInputStream.readObject();

log.debug("{},{},{},{},{},{}", magicNum, version, serializerType, messageType, sequenceId, length);
log.debug("{}", message);

out.add(message);
}
}
请我喝杯咖啡吧~