ChannelHandler 用来处理 Channel 上的各种事件,分为入站、出站。
所有的 ChannelHandler 串成一串就是 pipeline。
- 入站处理器
- 通常是 ChannelInboundHandlerAdapter子类,读取客户端数据,写回结果
 
- 出站处理器
- 通常是 ChannelOutboundHandlerAdapter子类,主要对写回结果进行加工
 
每个 Channel 都是一个产品的加工间,pipeline 是流水线。
ChannelHandler 是流水线上的工序,Bytebuf 是加工原材料。
客户端
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 
 | package com.redisc;
 import io.netty.bootstrap.Bootstrap;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.channel.socket.nio.NioSocketChannel;
 import io.netty.handler.codec.string.StringEncoder;
 import io.netty.handler.logging.LogLevel;
 import io.netty.handler.logging.LoggingHandler;
 import lombok.extern.slf4j.Slf4j;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
 
 @Slf4j
 public class Client {
 public static void main(String[] args) throws IOException, InterruptedException {
 NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup();
 ChannelFuture channelFuture = new Bootstrap()
 .group(nioEventLoopGroup)
 .channel(NioSocketChannel.class)
 .handler(new ChannelInitializer<NioSocketChannel>() {
 @Override
 protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
 
 nioSocketChannel.pipeline().addLast(new StringEncoder());
 
 }
 })
 
 
 
 .connect(new InetSocketAddress("localhost", 8000));
 Channel channel = channelFuture.sync().channel();
 channel.writeAndFlush("123");
 }
 }
 
 | 
入站处理器
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 
 | package com.redisc;
 import io.netty.bootstrap.ServerBootstrap;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelPipeline;
 import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.channel.socket.nio.NioServerSocketChannel;
 import io.netty.channel.socket.nio.NioSocketChannel;
 import lombok.extern.slf4j.Slf4j;
 
 import java.io.IOException;
 
 
 @Slf4j
 public class Run {
 
 public static void main(String[] args) throws IOException {
 
 
 new ServerBootstrap()
 .group(new NioEventLoopGroup())
 .channel(NioServerSocketChannel.class)
 .childHandler(new ChannelInitializer<NioSocketChannel>() {
 
 @Override
 protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
 
 ChannelPipeline channelPipeline = nioSocketChannel.pipeline();
 
 channelPipeline.addLast("handler1", new ChannelInboundHandlerAdapter() {
 @Override
 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
 log.debug("1");
 super.channelRead(ctx, msg);
 }
 });
 
 channelPipeline.addLast("handler2", new ChannelInboundHandlerAdapter() {
 @Override
 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
 log.debug("2");
 super.channelRead(ctx, msg);
 }
 });
 
 channelPipeline.addLast("handler3", new ChannelInboundHandlerAdapter() {
 @Override
 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
 log.debug("3");
 super.channelRead(ctx, msg);
 }
 });
 }
 
 }).bind(8000);
 }
 
 }
 
 | 
有一个细节要注意,里面的 pipeline 链为
head -> handler1 -> handler2 -> handler3 -> tail
底层是一个双向链表。
另外,不加 super.channelRead(ctx, msg); 传递不了下一个。
出站处理器
添加一个新的处理器。
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 
 | package com.redisc;
 import io.netty.bootstrap.ServerBootstrap;
 import io.netty.channel.*;
 import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.channel.socket.nio.NioServerSocketChannel;
 import io.netty.channel.socket.nio.NioSocketChannel;
 import lombok.extern.slf4j.Slf4j;
 
 import java.io.IOException;
 
 
 @Slf4j
 public class Run {
 
 public static void main(String[] args) throws IOException {
 
 
 new ServerBootstrap()
 .group(new NioEventLoopGroup())
 .channel(NioServerSocketChannel.class)
 .childHandler(new ChannelInitializer<NioSocketChannel>() {
 
 @Override
 protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
 
 ChannelPipeline channelPipeline = nioSocketChannel.pipeline();
 
 channelPipeline.addLast("handler1", new ChannelInboundHandlerAdapter() {
 @Override
 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
 log.debug("1");
 super.channelRead(ctx, msg);
 }
 });
 
 channelPipeline.addLast("handler2", new ChannelInboundHandlerAdapter() {
 @Override
 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
 log.debug("2");
 super.channelRead(ctx, msg);
 }
 });
 
 channelPipeline.addLast("handler3", new ChannelInboundHandlerAdapter() {
 @Override
 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
 log.debug("3");
 channelPipeline.writeAndFlush(ctx.alloc().buffer().writeBytes("server...".getBytes()));
 }
 });
 
 channelPipeline.addLast("handler4", new ChannelOutboundHandlerAdapter() {
 @Override
 public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
 log.debug("4");
 super.write(ctx, msg, promise);
 }
 });
 
 channelPipeline.addLast("handler5", new ChannelOutboundHandlerAdapter() {
 @Override
 public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
 log.debug("5");
 super.write(ctx, msg, promise);
 }
 });
 }
 
 }).bind(8000);
 }
 
 }
 
 | 
注意,细节 handler3 添加了 channelPipeline.writeAndFlush(ctx.alloc().buffer().writeBytes("server...".getBytes()));
只有加了这个,才能触发 handler4 的 write 事件。
另外,输出是
| 12
 3
 4
 5
 
 | 11:50:05.266 [nioEventLoopGroup-2-2] DEBUG com.redisc.Run - 111:50:05.266 [nioEventLoopGroup-2-2] DEBUG com.redisc.Run - 2
 11:50:05.266 [nioEventLoopGroup-2-2] DEBUG com.redisc.Run - 3
 11:50:05.280 [nioEventLoopGroup-2-2] DEBUG com.redisc.Run - 5
 11:50:05.280 [nioEventLoopGroup-2-2] DEBUG com.redisc.Run - 4
 
 | 
虽然处理器是
head -> handler1 -> handler2 -> handler3 ->handler4 -> handler5 -> tail
入站是按照加入顺序。
出站是按照入栈顺序。
出站执行顺序细节
如果,写入数据是这样的
| 1
 | channelPipeline.writeAndFlush(ctx.alloc().buffer().writeBytes("server...".getBytes()));
 | 
则从 tail -> handler5 -> handler4 这样找。
如果写入数据是这样的
| 1
 | ctx.writeAndFlush(ctx.alloc().buffer().writeBytes("server...".getBytes()));
 | 
则查找顺序是
| 1
 | handler3 -> handler2 -> handler1 -> head
 | 
到 head 就结束了。
举一个例子来说
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 
 | package com.redisc;
 import io.netty.bootstrap.ServerBootstrap;
 import io.netty.channel.*;
 import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.channel.socket.nio.NioServerSocketChannel;
 import io.netty.channel.socket.nio.NioSocketChannel;
 import lombok.extern.slf4j.Slf4j;
 
 import java.io.IOException;
 
 
 @Slf4j
 public class Run {
 
 public static void main(String[] args) throws IOException {
 
 
 new ServerBootstrap()
 .group(new NioEventLoopGroup())
 .channel(NioServerSocketChannel.class)
 .childHandler(new ChannelInitializer<NioSocketChannel>() {
 
 @Override
 protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
 
 ChannelPipeline channelPipeline = nioSocketChannel.pipeline();
 
 channelPipeline.addLast("handler1", new ChannelInboundHandlerAdapter() {
 @Override
 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
 log.debug("1");
 super.channelRead(ctx, msg);
 }
 });
 
 channelPipeline.addLast("handler2", new ChannelInboundHandlerAdapter() {
 @Override
 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
 log.debug("2");
 super.channelRead(ctx, msg);
 }
 });
 
 channelPipeline.addLast("handler3", new ChannelOutboundHandlerAdapter() {
 @Override
 public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
 log.debug("3");
 super.write(ctx, msg, promise);
 }
 });
 
 channelPipeline.addLast("handler4", new ChannelInboundHandlerAdapter() {
 @Override
 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
 log.debug("4");
 super.channelRead(ctx, msg);
 ctx.writeAndFlush(ctx.alloc().buffer().writeBytes("server...".getBytes()));
 }
 });
 
 channelPipeline.addLast("handler5", new ChannelOutboundHandlerAdapter() {
 @Override
 public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
 log.debug("5");
 super.write(ctx, msg, promise);
 }
 });
 
 channelPipeline.addLast("handler6", new ChannelOutboundHandlerAdapter() {
 @Override
 public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
 log.debug("6");
 super.write(ctx, msg, promise);
 }
 });
 }
 
 }).bind(8000);
 }
 
 }
 
 | 
输出为
| 12
 3
 4
 
 | 12:07:15.799 [nioEventLoopGroup-2-2] DEBUG com.redisc.Run - 112:07:15.799 [nioEventLoopGroup-2-2] DEBUG com.redisc.Run - 2
 12:07:15.799 [nioEventLoopGroup-2-2] DEBUG com.redisc.Run - 4
 12:07:15.809 [nioEventLoopGroup-2-2] DEBUG com.redisc.Run - 3
 
 | 
如果换上
channelPipeline.writeAndFlush(ctx.alloc().buffer().writeBytes("server...".getBytes()));
则输出
| 12
 3
 4
 5
 6
 
 | 12:09:52.300 [nioEventLoopGroup-2-2] DEBUG com.redisc.Run - 112:09:52.300 [nioEventLoopGroup-2-2] DEBUG com.redisc.Run - 2
 12:09:52.300 [nioEventLoopGroup-2-2] DEBUG com.redisc.Run - 4
 12:09:52.310 [nioEventLoopGroup-2-2] DEBUG com.redisc.Run - 6
 12:09:52.310 [nioEventLoopGroup-2-2] DEBUG com.redisc.Run - 5
 12:09:52.310 [nioEventLoopGroup-2-2] DEBUG com.redisc.Run - 3
 
 |