ChannelHandler
用来处理 Channel
上的各种事件,分为入站、出站。
所有的 ChannelHandler
串成一串就是 pipeline
。
- 入站处理器
- 通常是
ChannelInboundHandlerAdapter
子类,读取客户端数据,写回结果
- 出站处理器
- 通常是
ChannelOutboundHandlerAdapter
子类,主要对写回结果进行加工
每个 Channel
都是一个产品的加工间,pipeline
是流水线。
ChannelHandler
是流水线上的工序,Bytebuf
是加工原材料。
客户端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| package com.redisc;
import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.string.StringEncoder; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import lombok.extern.slf4j.Slf4j;
import java.io.IOException; import java.net.InetSocketAddress;
@Slf4j public class Client { public static void main(String[] args) throws IOException, InterruptedException { NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup(); ChannelFuture channelFuture = new Bootstrap() .group(nioEventLoopGroup) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
nioSocketChannel.pipeline().addLast(new StringEncoder());
} }) .connect(new InetSocketAddress("localhost", 8000)); Channel channel = channelFuture.sync().channel(); channel.writeAndFlush("123"); } }
|
入站处理器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
| package com.redisc;
import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
@Slf4j public class Run {
public static void main(String[] args) throws IOException {
new ServerBootstrap() .group(new NioEventLoopGroup()) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception { ChannelPipeline channelPipeline = nioSocketChannel.pipeline(); channelPipeline.addLast("handler1", new ChannelInboundHandlerAdapter() { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { log.debug("1"); super.channelRead(ctx, msg); } }); channelPipeline.addLast("handler2", new ChannelInboundHandlerAdapter() { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { log.debug("2"); super.channelRead(ctx, msg); } }); channelPipeline.addLast("handler3", new ChannelInboundHandlerAdapter() { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { log.debug("3"); super.channelRead(ctx, msg); } }); }
}).bind(8000); }
}
|
有一个细节要注意,里面的 pipeline
链为
head -> handler1 -> handler2 -> handler3 -> tail
底层是一个双向链表。
另外,不加 super.channelRead(ctx, msg);
传递不了下一个。
出站处理器
添加一个新的处理器。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73
| package com.redisc;
import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
@Slf4j public class Run {
public static void main(String[] args) throws IOException {
new ServerBootstrap() .group(new NioEventLoopGroup()) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception { ChannelPipeline channelPipeline = nioSocketChannel.pipeline(); channelPipeline.addLast("handler1", new ChannelInboundHandlerAdapter() { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { log.debug("1"); super.channelRead(ctx, msg); } }); channelPipeline.addLast("handler2", new ChannelInboundHandlerAdapter() { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { log.debug("2"); super.channelRead(ctx, msg); } }); channelPipeline.addLast("handler3", new ChannelInboundHandlerAdapter() { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { log.debug("3"); channelPipeline.writeAndFlush(ctx.alloc().buffer().writeBytes("server...".getBytes())); } }); channelPipeline.addLast("handler4", new ChannelOutboundHandlerAdapter() { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { log.debug("4"); super.write(ctx, msg, promise); } }); channelPipeline.addLast("handler5", new ChannelOutboundHandlerAdapter() { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { log.debug("5"); super.write(ctx, msg, promise); } }); }
}).bind(8000); }
}
|
注意,细节 handler3
添加了 channelPipeline.writeAndFlush(ctx.alloc().buffer().writeBytes("server...".getBytes()));
只有加了这个,才能触发 handler4
的 write
事件。
另外,输出是
1 2 3 4 5
| 11:50:05.266 [nioEventLoopGroup-2-2] DEBUG com.redisc.Run - 1 11:50:05.266 [nioEventLoopGroup-2-2] DEBUG com.redisc.Run - 2 11:50:05.266 [nioEventLoopGroup-2-2] DEBUG com.redisc.Run - 3 11:50:05.280 [nioEventLoopGroup-2-2] DEBUG com.redisc.Run - 5 11:50:05.280 [nioEventLoopGroup-2-2] DEBUG com.redisc.Run - 4
|
虽然处理器是
head -> handler1 -> handler2 -> handler3 ->handler4 -> handler5 -> tail
入站是按照加入顺序。
出站是按照入栈顺序。
出站执行顺序细节
如果,写入数据是这样的
1
| channelPipeline.writeAndFlush(ctx.alloc().buffer().writeBytes("server...".getBytes()));
|
则从 tail -> handler5 -> handler4
这样找。
如果写入数据是这样的
1
| ctx.writeAndFlush(ctx.alloc().buffer().writeBytes("server...".getBytes()));
|
则查找顺序是
1
| handler3 -> handler2 -> handler1 -> head
|
到 head
就结束了。
举一个例子来说
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82
| package com.redisc;
import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
@Slf4j public class Run {
public static void main(String[] args) throws IOException {
new ServerBootstrap() .group(new NioEventLoopGroup()) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception { ChannelPipeline channelPipeline = nioSocketChannel.pipeline(); channelPipeline.addLast("handler1", new ChannelInboundHandlerAdapter() { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { log.debug("1"); super.channelRead(ctx, msg); } }); channelPipeline.addLast("handler2", new ChannelInboundHandlerAdapter() { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { log.debug("2"); super.channelRead(ctx, msg); } }); channelPipeline.addLast("handler3", new ChannelOutboundHandlerAdapter() { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { log.debug("3"); super.write(ctx, msg, promise); } }); channelPipeline.addLast("handler4", new ChannelInboundHandlerAdapter() { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { log.debug("4"); super.channelRead(ctx, msg); ctx.writeAndFlush(ctx.alloc().buffer().writeBytes("server...".getBytes())); } }); channelPipeline.addLast("handler5", new ChannelOutboundHandlerAdapter() { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { log.debug("5"); super.write(ctx, msg, promise); } }); channelPipeline.addLast("handler6", new ChannelOutboundHandlerAdapter() { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { log.debug("6"); super.write(ctx, msg, promise); } }); }
}).bind(8000); }
}
|
输出为
1 2 3 4
| 12:07:15.799 [nioEventLoopGroup-2-2] DEBUG com.redisc.Run - 1 12:07:15.799 [nioEventLoopGroup-2-2] DEBUG com.redisc.Run - 2 12:07:15.799 [nioEventLoopGroup-2-2] DEBUG com.redisc.Run - 4 12:07:15.809 [nioEventLoopGroup-2-2] DEBUG com.redisc.Run - 3
|
如果换上
channelPipeline.writeAndFlush(ctx.alloc().buffer().writeBytes("server...".getBytes()));
则输出
1 2 3 4 5 6
| 12:09:52.300 [nioEventLoopGroup-2-2] DEBUG com.redisc.Run - 1 12:09:52.300 [nioEventLoopGroup-2-2] DEBUG com.redisc.Run - 2 12:09:52.300 [nioEventLoopGroup-2-2] DEBUG com.redisc.Run - 4 12:09:52.310 [nioEventLoopGroup-2-2] DEBUG com.redisc.Run - 6 12:09:52.310 [nioEventLoopGroup-2-2] DEBUG com.redisc.Run - 5 12:09:52.310 [nioEventLoopGroup-2-2] DEBUG com.redisc.Run - 3
|