0%

java | 自定义协议 编码器和解码器

  • 魔数
    • 用来第一时间判别是否为无效包
  • 版本号
    • 支持协议升级
  • 序列化算法
    • 消息正文采用哪种序列化和反序列化
  • 指令类型
    • 登录、注册、单聊、群聊。。。
  • 请求序号
    • 双工通信,提供异步能力
  • 正文长度
  • 消息正文

ByteToMessageCodec 的编码和解码进行重写。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
package com.chat.protocol;

import com.chat.message.Message;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.handler.codec.ByteToMessageCodec;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.logging.LoggingHandler;
import lombok.Data;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;

import java.io.*;
import java.util.List;

@Data
abstract class MessageTest implements Serializable {

private int sequenceId;

private int messageType;

public abstract int getMessageType();

}

@Data
@ToString(callSuper = true)
class LoginRequestMessageTest extends MessageTest {
private String username;
private String password;

public LoginRequestMessageTest() {
}

public LoginRequestMessageTest(String username, String password) {
this.username = username;
this.password = password;
}

@Override
public int getMessageType() {
return 1;
}
}


@Slf4j
class MessageCode extends ByteToMessageCodec<MessageTest> {
// protobuf 编码器
@Override
protected void encode(ChannelHandlerContext channelHandlerContext, MessageTest message, ByteBuf byteBuf) throws Exception {
// 魔数 4 字节
byteBuf.writeBytes(new byte[]{1, 2, 3, 4});
// 数据版本 1 字节
byteBuf.writeByte(1);
// 序列化方式 1 字节 jdk = 0,json = 1
byteBuf.writeByte(0);
// 指令类型 1 字节
byteBuf.writeByte(message.getMessageType());
// 4 个字节 请求序号
byteBuf.writeInt(message.getSequenceId());
// 填充,让固定字节达到 16 个
byteBuf.writeByte(0xff);
// 正文长度
ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(bos);
oos.writeObject(message);
byte[] bytes = bos.toByteArray();
byteBuf.writeInt(bytes.length);
// 写入内容
byteBuf.writeBytes(bytes);

}

// protobuf 解码器
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
// 解码对应上面的编码

int magicNum = byteBuf.readInt(); // 魔数,对应 4 个字节
byte version = byteBuf.readByte(); // 版本
byte serializerType = byteBuf.readByte(); // 序列化方式
byte messageType = byteBuf.readByte(); // 指令类型
int sequenceId = byteBuf.readInt(); // 请求序号
byteBuf.readByte(); // 填充字节
int length = byteBuf.readInt(); // 数据长度
byte[] bytes = new byte[length];
byteBuf.readBytes(bytes, 0, length);// 正文

ObjectInputStream objectInputStream = new ObjectInputStream(new ByteArrayInputStream(bytes));
MessageTest message = (MessageTest) objectInputStream.readObject();

log.debug("{},{},{},{},{},{}", magicNum, version, serializerType, messageType, sequenceId, length);
log.debug("{}", message);

list.add(message);
}
}

public class TestMessageCode {

public static void main(String[] args) throws Exception {
EmbeddedChannel embeddedChannel = new EmbeddedChannel(
new LengthFieldBasedFrameDecoder(1024, 12, 4, 0, 0),
new LoggingHandler(),
new MessageCode()
);

// 测试 encode
LoginRequestMessageTest loginRequestMessage = new LoginRequestMessageTest("a", "b");
embeddedChannel.writeOutbound(loginRequestMessage);

// 测试 decode
ByteBuf buf = ByteBufAllocator.DEFAULT.buffer();
new MessageCode().encode(null, loginRequestMessage, buf);
// 入站
embeddedChannel.writeInbound(buf);

}


}

输出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
15:03:50.776 [main] DEBUG io.netty.util.ResourceLeakDetectorFactory - Loaded default ResourceLeakDetector: io.netty.util.ResourceLeakDetector@754ba872
15:03:50.801 [main] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] WRITE: 213B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 01 00 01 00 00 00 00 ff 00 00 00 c5 |................|
|00000010| ac ed 00 05 73 72 00 29 63 6f 6d 2e 63 68 61 74 |....sr.)com.chat|
|00000020| 2e 70 72 6f 74 6f 63 6f 6c 2e 4c 6f 67 69 6e 52 |.protocol.LoginR|
|00000030| 65 71 75 65 73 74 4d 65 73 73 61 67 65 54 65 73 |equestMessageTes|
|00000040| 74 c3 ea 08 94 45 23 11 fc 02 00 02 4c 00 08 70 |t....E#.....L..p|
|00000050| 61 73 73 77 6f 72 64 74 00 12 4c 6a 61 76 61 2f |asswordt..Ljava/|
|00000060| 6c 61 6e 67 2f 53 74 72 69 6e 67 3b 4c 00 08 75 |lang/String;L..u|
|00000070| 73 65 72 6e 61 6d 65 71 00 7e 00 01 78 72 00 1d |sernameq.~..xr..|
|00000080| 63 6f 6d 2e 63 68 61 74 2e 70 72 6f 74 6f 63 6f |com.chat.protoco|
|00000090| 6c 2e 4d 65 73 73 61 67 65 54 65 73 74 19 67 7d |l.MessageTest.g}|
|000000a0| 9e ee 8c d2 6b 02 00 02 49 00 0b 6d 65 73 73 61 |....k...I..messa|
|000000b0| 67 65 54 79 70 65 49 00 0a 73 65 71 75 65 6e 63 |geTypeI..sequenc|
|000000c0| 65 49 64 78 70 00 00 00 00 00 00 00 00 74 00 01 |eIdxp........t..|
|000000d0| 62 74 00 01 61 |bt..a |
+--------+-------------------------------------------------+----------------+
15:03:50.802 [main] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] FLUSH
15:03:50.810 [main] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] READ: 213B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 01 00 01 00 00 00 00 ff 00 00 00 c5 |................|
|00000010| ac ed 00 05 73 72 00 29 63 6f 6d 2e 63 68 61 74 |....sr.)com.chat|
|00000020| 2e 70 72 6f 74 6f 63 6f 6c 2e 4c 6f 67 69 6e 52 |.protocol.LoginR|
|00000030| 65 71 75 65 73 74 4d 65 73 73 61 67 65 54 65 73 |equestMessageTes|
|00000040| 74 c3 ea 08 94 45 23 11 fc 02 00 02 4c 00 08 70 |t....E#.....L..p|
|00000050| 61 73 73 77 6f 72 64 74 00 12 4c 6a 61 76 61 2f |asswordt..Ljava/|
|00000060| 6c 61 6e 67 2f 53 74 72 69 6e 67 3b 4c 00 08 75 |lang/String;L..u|
|00000070| 73 65 72 6e 61 6d 65 71 00 7e 00 01 78 72 00 1d |sernameq.~..xr..|
|00000080| 63 6f 6d 2e 63 68 61 74 2e 70 72 6f 74 6f 63 6f |com.chat.protoco|
|00000090| 6c 2e 4d 65 73 73 61 67 65 54 65 73 74 19 67 7d |l.MessageTest.g}|
|000000a0| 9e ee 8c d2 6b 02 00 02 49 00 0b 6d 65 73 73 61 |....k...I..messa|
|000000b0| 67 65 54 79 70 65 49 00 0a 73 65 71 75 65 6e 63 |geTypeI..sequenc|
|000000c0| 65 49 64 78 70 00 00 00 00 00 00 00 00 74 00 01 |eIdxp........t..|
|000000d0| 62 74 00 01 61 |bt..a |
+--------+-------------------------------------------------+----------------+
15:03:50.850 [main] DEBUG com.chat.protocol.MessageCode - 16909060,1,0,1,0,197
15:03:50.850 [main] DEBUG com.chat.protocol.MessageCode - LoginRequestMessageTest(super=MessageTest(sequenceId=0, messageType=1), username=a, password=b)
15:03:50.851 [main] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] READ COMPLETE
请我喝杯咖啡吧~