0%

java | handler & pipeline

ChannelHandler 用来处理 Channel 上的各种事件,分为入站、出站。

所有的 ChannelHandler 串成一串就是 pipeline

  • 入站处理器
    • 通常是 ChannelInboundHandlerAdapter 子类,读取客户端数据,写回结果
  • 出站处理器
    • 通常是 ChannelOutboundHandlerAdapter 子类,主要对写回结果进行加工

每个 Channel 都是一个产品的加工间,pipeline 是流水线。

ChannelHandler 是流水线上的工序,Bytebuf 是加工原材料。

客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package com.redisc;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.net.InetSocketAddress;

@Slf4j
public class Client {
public static void main(String[] args) throws IOException, InterruptedException {
NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup();
ChannelFuture channelFuture = new Bootstrap()
.group(nioEventLoopGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
// nioSocketChannel.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
nioSocketChannel.pipeline().addLast(new StringEncoder());

}
})
// 连接服务器
// 异步非阻塞,调用线程是 main 线程
// 真正执行连接的是另一个线程,是一个 NIO 线程
.connect(new InetSocketAddress("localhost", 8000));
Channel channel = channelFuture.sync().channel();
channel.writeAndFlush("123");
}
}

入站处理器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
package com.redisc;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import lombok.extern.slf4j.Slf4j;

import java.io.IOException;


@Slf4j
public class Run {

public static void main(String[] args) throws IOException {


new ServerBootstrap()
.group(new NioEventLoopGroup())
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {

@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
// 获取 channelPipeline
ChannelPipeline channelPipeline = nioSocketChannel.pipeline();
// 添加处理器
channelPipeline.addLast("handler1", new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.debug("1");
super.channelRead(ctx, msg);
}
});
// 添加处理器
channelPipeline.addLast("handler2", new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.debug("2");
super.channelRead(ctx, msg);
}
});
// 添加处理器
channelPipeline.addLast("handler3", new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.debug("3");
super.channelRead(ctx, msg);
}
});
}

}).bind(8000);
}

}

有一个细节要注意,里面的 pipeline 链为

head -> handler1 -> handler2 -> handler3 -> tail

底层是一个双向链表。

另外,不加 super.channelRead(ctx, msg); 传递不了下一个。

出站处理器

添加一个新的处理器。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
package com.redisc;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import lombok.extern.slf4j.Slf4j;

import java.io.IOException;


@Slf4j
public class Run {

public static void main(String[] args) throws IOException {


new ServerBootstrap()
.group(new NioEventLoopGroup())
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {

@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
// 获取 channelPipeline
ChannelPipeline channelPipeline = nioSocketChannel.pipeline();
// 添加处理器
channelPipeline.addLast("handler1", new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.debug("1");
super.channelRead(ctx, msg);
}
});
// 添加处理器
channelPipeline.addLast("handler2", new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.debug("2");
super.channelRead(ctx, msg);
}
});
// 添加处理器
channelPipeline.addLast("handler3", new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.debug("3");
channelPipeline.writeAndFlush(ctx.alloc().buffer().writeBytes("server...".getBytes()));
}
});
// 添加处理器
channelPipeline.addLast("handler4", new ChannelOutboundHandlerAdapter() {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
log.debug("4");
super.write(ctx, msg, promise);
}
});
// 添加处理器
channelPipeline.addLast("handler5", new ChannelOutboundHandlerAdapter() {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
log.debug("5");
super.write(ctx, msg, promise);
}
});
}

}).bind(8000);
}

}

注意,细节 handler3 添加了 channelPipeline.writeAndFlush(ctx.alloc().buffer().writeBytes("server...".getBytes()));

只有加了这个,才能触发 handler4write 事件。

另外,输出是

1
2
3
4
5
11:50:05.266 [nioEventLoopGroup-2-2] DEBUG com.redisc.Run - 1
11:50:05.266 [nioEventLoopGroup-2-2] DEBUG com.redisc.Run - 2
11:50:05.266 [nioEventLoopGroup-2-2] DEBUG com.redisc.Run - 3
11:50:05.280 [nioEventLoopGroup-2-2] DEBUG com.redisc.Run - 5
11:50:05.280 [nioEventLoopGroup-2-2] DEBUG com.redisc.Run - 4

虽然处理器是

head -> handler1 -> handler2 -> handler3 ->handler4 -> handler5 -> tail

入站是按照加入顺序。

出站是按照入栈顺序。

出站执行顺序细节

如果,写入数据是这样的

1
channelPipeline.writeAndFlush(ctx.alloc().buffer().writeBytes("server...".getBytes()));

则从 tail -> handler5 -> handler4 这样找。

如果写入数据是这样的

1
ctx.writeAndFlush(ctx.alloc().buffer().writeBytes("server...".getBytes()));

则查找顺序是

1
handler3 -> handler2 -> handler1 -> head

head 就结束了。

举一个例子来说

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
package com.redisc;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import lombok.extern.slf4j.Slf4j;

import java.io.IOException;


@Slf4j
public class Run {

public static void main(String[] args) throws IOException {


new ServerBootstrap()
.group(new NioEventLoopGroup())
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {

@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
// 获取 channelPipeline
ChannelPipeline channelPipeline = nioSocketChannel.pipeline();
// 添加处理器
channelPipeline.addLast("handler1", new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.debug("1");
super.channelRead(ctx, msg);
}
});
// 添加处理器
channelPipeline.addLast("handler2", new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.debug("2");
super.channelRead(ctx, msg);
}
});
// 添加处理器
channelPipeline.addLast("handler3", new ChannelOutboundHandlerAdapter() {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
log.debug("3");
super.write(ctx, msg, promise);
}
});
// 添加处理器
channelPipeline.addLast("handler4", new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.debug("4");
super.channelRead(ctx, msg);
ctx.writeAndFlush(ctx.alloc().buffer().writeBytes("server...".getBytes()));
}
});
// 添加处理器
channelPipeline.addLast("handler5", new ChannelOutboundHandlerAdapter() {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
log.debug("5");
super.write(ctx, msg, promise);
}
});
// 添加处理器
channelPipeline.addLast("handler6", new ChannelOutboundHandlerAdapter() {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
log.debug("6");
super.write(ctx, msg, promise);
}
});
}

}).bind(8000);
}

}

输出为

1
2
3
4
12:07:15.799 [nioEventLoopGroup-2-2] DEBUG com.redisc.Run - 1
12:07:15.799 [nioEventLoopGroup-2-2] DEBUG com.redisc.Run - 2
12:07:15.799 [nioEventLoopGroup-2-2] DEBUG com.redisc.Run - 4
12:07:15.809 [nioEventLoopGroup-2-2] DEBUG com.redisc.Run - 3

如果换上

channelPipeline.writeAndFlush(ctx.alloc().buffer().writeBytes("server...".getBytes()));

则输出

1
2
3
4
5
6
12:09:52.300 [nioEventLoopGroup-2-2] DEBUG com.redisc.Run - 1
12:09:52.300 [nioEventLoopGroup-2-2] DEBUG com.redisc.Run - 2
12:09:52.300 [nioEventLoopGroup-2-2] DEBUG com.redisc.Run - 4
12:09:52.310 [nioEventLoopGroup-2-2] DEBUG com.redisc.Run - 6
12:09:52.310 [nioEventLoopGroup-2-2] DEBUG com.redisc.Run - 5
12:09:52.310 [nioEventLoopGroup-2-2] DEBUG com.redisc.Run - 3
请我喝杯咖啡吧~