0%

java | channel 关闭

客户端代码如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
package com.redisc;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Scanner;

@Slf4j
public class Client {
public static void main(String[] args) throws IOException, InterruptedException {
ChannelFuture channelFuture = new Bootstrap()
.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
nioSocketChannel.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
nioSocketChannel.pipeline().addLast(new StringEncoder());

}
})
// 连接服务器
// 异步非阻塞,调用线程是 main 线程
// 真正执行连接的是另一个线程,是一个 NIO 线程
.connect(new InetSocketAddress("localhost", 8000));
Channel channel = channelFuture.sync().channel();
new Thread(() -> {
Scanner scanner = new Scanner(System.in);
while (true) {
String line = scanner.nextLine();
if ("q".equals(line)) {
channel.close();
log.debug("close");
break;
}
channel.writeAndFlush("213");
}
}, "input").start();
}
}

上面代码,4243

1
2
channel.close();
log.debug("close");

希望 channel 关闭后,执行下面的操作,但是,真正执行下来

1
2
3
4
5
q
01:05:50.042 [input] DEBUG com.redisc.Client - close
01:05:50.043 [nioEventLoopGroup-2-1] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x9914e527, L:/127.0.0.1:59995 - R:localhost/127.0.0.1:8000] CLOSE
01:05:50.048 [nioEventLoopGroup-2-1] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x9914e527, L:/127.0.0.1:59995 ! R:localhost/127.0.0.1:8000] INACTIVE
01:05:50.048 [nioEventLoopGroup-2-1] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x9914e527, L:/127.0.0.1:59995 ! R:localhost/127.0.0.1:8000] UNREGISTERED

是先执行了 log.debug("close"); 然后执行了 channel.close()

这是因为,他们分属于两个线程,执行顺序完全是无法预计的。

同步关闭

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
package com.redisc;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Scanner;

@Slf4j
public class Client {
public static void main(String[] args) throws IOException, InterruptedException {
ChannelFuture channelFuture = new Bootstrap()
.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
nioSocketChannel.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
nioSocketChannel.pipeline().addLast(new StringEncoder());

}
})
// 连接服务器
// 异步非阻塞,调用线程是 main 线程
// 真正执行连接的是另一个线程,是一个 NIO 线程
.connect(new InetSocketAddress("localhost", 8000));
Channel channel = channelFuture.sync().channel();
new Thread(() -> {
Scanner scanner = new Scanner(System.in);
while (true) {
String line = scanner.nextLine();
if ("q".equals(line)) {
channel.close();
break;
}
channel.writeAndFlush("213");
}
}, "input").start();

// 获取 closeFuture 对象
ChannelFuture channelFuture1 = channel.closeFuture();
// 同步情况
channelFuture1.sync();
log.debug("关闭后执行的操作");
}
}

异步关闭

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
package com.redisc;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Scanner;

@Slf4j
public class Client {
public static void main(String[] args) throws IOException, InterruptedException {
ChannelFuture channelFuture = new Bootstrap()
.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
nioSocketChannel.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
nioSocketChannel.pipeline().addLast(new StringEncoder());

}
})
// 连接服务器
// 异步非阻塞,调用线程是 main 线程
// 真正执行连接的是另一个线程,是一个 NIO 线程
.connect(new InetSocketAddress("localhost", 8000));
Channel channel = channelFuture.sync().channel();
new Thread(() -> {
Scanner scanner = new Scanner(System.in);
while (true) {
String line = scanner.nextLine();
if ("q".equals(line)) {
channel.close();
break;
}
channel.writeAndFlush("213");
}
}, "input").start();

// 获取 closeFuture 对象
ChannelFuture channelFuture1 = channel.closeFuture();
// 异步情况
channelFuture1.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
log.debug("123");
}
});
}
}

但是,上面两个结束,最后程序都没有停下来,这是因为,还有一部分 NioEventLoopGroup() 这里面的线程没有关闭。

用异步为例做一个优化关闭

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
package com.redisc;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Scanner;

@Slf4j
public class Client {
public static void main(String[] args) throws IOException, InterruptedException {
NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup();
ChannelFuture channelFuture = new Bootstrap()
.group(nioEventLoopGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
nioSocketChannel.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
nioSocketChannel.pipeline().addLast(new StringEncoder());

}
})
// 连接服务器
// 异步非阻塞,调用线程是 main 线程
// 真正执行连接的是另一个线程,是一个 NIO 线程
.connect(new InetSocketAddress("localhost", 8000));
Channel channel = channelFuture.sync().channel();
new Thread(() -> {
Scanner scanner = new Scanner(System.in);
while (true) {
String line = scanner.nextLine();
if ("q".equals(line)) {
channel.close();
break;
}
channel.writeAndFlush("213");
}
}, "input").start();

// 获取 closeFuture 对象
ChannelFuture channelFuture1 = channel.closeFuture();
// 异步情况
channelFuture1.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
log.debug("执行关闭操作");
// 优雅的关闭
nioEventLoopGroup.shutdownGracefully();
}
});
}
}
请我喝杯咖啡吧~