客户端代码如下。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
| package com.redisc;
import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.string.StringEncoder; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import lombok.extern.slf4j.Slf4j;
import java.io.IOException; import java.net.InetSocketAddress; import java.util.Scanner;
@Slf4j public class Client { public static void main(String[] args) throws IOException, InterruptedException { ChannelFuture channelFuture = new Bootstrap() .group(new NioEventLoopGroup()) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception { nioSocketChannel.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG)); nioSocketChannel.pipeline().addLast(new StringEncoder());
} }) .connect(new InetSocketAddress("localhost", 8000)); Channel channel = channelFuture.sync().channel(); new Thread(() -> { Scanner scanner = new Scanner(System.in); while (true) { String line = scanner.nextLine(); if ("q".equals(line)) { channel.close(); log.debug("close"); break; } channel.writeAndFlush("213"); } }, "input").start(); } }
|
上面代码,42
、43
处
1 2
| channel.close(); log.debug("close");
|
希望 channel 关闭后,执行下面的操作,但是,真正执行下来
1 2 3 4 5
| q 01:05:50.042 [input] DEBUG com.redisc.Client - close 01:05:50.043 [nioEventLoopGroup-2-1] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x9914e527, L:/127.0.0.1:59995 - R:localhost/127.0.0.1:8000] CLOSE 01:05:50.048 [nioEventLoopGroup-2-1] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x9914e527, L:/127.0.0.1:59995 ! R:localhost/127.0.0.1:8000] INACTIVE 01:05:50.048 [nioEventLoopGroup-2-1] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x9914e527, L:/127.0.0.1:59995 ! R:localhost/127.0.0.1:8000] UNREGISTERED
|
是先执行了 log.debug("close");
然后执行了 channel.close()
这是因为,他们分属于两个线程,执行顺序完全是无法预计的。
同步关闭
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
| package com.redisc;
import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.string.StringEncoder; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import lombok.extern.slf4j.Slf4j;
import java.io.IOException; import java.net.InetSocketAddress; import java.util.Scanner;
@Slf4j public class Client { public static void main(String[] args) throws IOException, InterruptedException { ChannelFuture channelFuture = new Bootstrap() .group(new NioEventLoopGroup()) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception { nioSocketChannel.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG)); nioSocketChannel.pipeline().addLast(new StringEncoder());
} }) .connect(new InetSocketAddress("localhost", 8000)); Channel channel = channelFuture.sync().channel(); new Thread(() -> { Scanner scanner = new Scanner(System.in); while (true) { String line = scanner.nextLine(); if ("q".equals(line)) { channel.close(); break; } channel.writeAndFlush("213"); } }, "input").start();
ChannelFuture channelFuture1 = channel.closeFuture(); channelFuture1.sync(); log.debug("关闭后执行的操作"); } }
|
异步关闭
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
| package com.redisc;
import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelInitializer; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.string.StringEncoder; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import lombok.extern.slf4j.Slf4j;
import java.io.IOException; import java.net.InetSocketAddress; import java.util.Scanner;
@Slf4j public class Client { public static void main(String[] args) throws IOException, InterruptedException { ChannelFuture channelFuture = new Bootstrap() .group(new NioEventLoopGroup()) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception { nioSocketChannel.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG)); nioSocketChannel.pipeline().addLast(new StringEncoder());
} }) .connect(new InetSocketAddress("localhost", 8000)); Channel channel = channelFuture.sync().channel(); new Thread(() -> { Scanner scanner = new Scanner(System.in); while (true) { String line = scanner.nextLine(); if ("q".equals(line)) { channel.close(); break; } channel.writeAndFlush("213"); } }, "input").start();
ChannelFuture channelFuture1 = channel.closeFuture(); channelFuture1.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture channelFuture) throws Exception { log.debug("123"); } }); } }
|
但是,上面两个结束,最后程序都没有停下来,这是因为,还有一部分 NioEventLoopGroup()
这里面的线程没有关闭。
用异步为例做一个优化关闭
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63
| package com.redisc;
import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelInitializer; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.string.StringEncoder; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import lombok.extern.slf4j.Slf4j;
import java.io.IOException; import java.net.InetSocketAddress; import java.util.Scanner;
@Slf4j public class Client { public static void main(String[] args) throws IOException, InterruptedException { NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup(); ChannelFuture channelFuture = new Bootstrap() .group(nioEventLoopGroup) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception { nioSocketChannel.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG)); nioSocketChannel.pipeline().addLast(new StringEncoder());
} }) .connect(new InetSocketAddress("localhost", 8000)); Channel channel = channelFuture.sync().channel(); new Thread(() -> { Scanner scanner = new Scanner(System.in); while (true) { String line = scanner.nextLine(); if ("q".equals(line)) { channel.close(); break; } channel.writeAndFlush("213"); } }, "input").start();
ChannelFuture channelFuture1 = channel.closeFuture(); channelFuture1.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture channelFuture) throws Exception { log.debug("执行关闭操作"); nioEventLoopGroup.shutdownGracefully(); } }); } }
|