netty server启动时触发的事件与相关代码分析

Netty server 在启动过程中会触发一系列的 Inbound 事件,它的流程是怎样的呢?

netty 项目中 example 模块下的 EchoServer 为例进行分析。它的核心代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
//p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(serverHandler);
}
});

// Start the server.
ChannelFuture f = b.bind(PORT).sync();

// Wait until the server socket is closed.
f.channel().closeFuture().sync();
} finally {
// Shut down all event loops to terminate all threads.
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}

可以看到,第 5 行是为 ServeChannel 的 pipeline 配置了一个 LoggingHandler,这样当有 Inbound 或 Outbount 事件时会打印相应的日志。

执行 EchoServer 的 main 方法启动 server,将会看到以下日志

1
2
3
01:20:47.137 [nioEventLoopGroup-2-1] INFO  i.n.handler.logging.LoggingHandler - [id: 0x63a20168] REGISTERED
01:20:47.145 [nioEventLoopGroup-2-1] INFO i.n.handler.logging.LoggingHandler - [id: 0x63a20168] BIND: 0.0.0.0/0.0.0.0:8007
01:20:47.150 [nioEventLoopGroup-2-1] INFO i.n.handler.logging.LoggingHandler - [id: 0x63a20168, L:/0:0:0:0:0:0:0:0:8007] ACTIVE

REGISTERED 注册

第一行表明 NioServerSocketChannel 已经完成了注册,并触发了 ChannelInboundHandlerchannelRegistered 事件。

NioServerSocketChannel 的注册本质上是将底层的 java ServerSocketChannel 注册到 Selector 上。其中 java ServerSocketChannel 是在 NioServerSocketChannel 构造函数中初始化的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();

public NioServerSocketChannel() {
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}

private static ServerSocketChannel newSocket(SelectorProvider provider) {
try {
return provider.openServerSocketChannel();
} catch (IOException e) {
throw new ChannelException(
"Failed to open a server socket.", e);
}
}

Selector 是在 NioEventLoop 的构造函数中初始化的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
EventLoopTaskQueueFactory queueFactory) {
// 忽略其他代码
final SelectorTuple selectorTuple = openSelector();
this.selector = selectorTuple.selector;
this.unwrappedSelector = selectorTuple.unwrappedSelector;
}

private SelectorTuple openSelector() {
final Selector unwrappedSelector;
try {
// 在这里创建
unwrappedSelector = provider.openSelector();
} catch (IOException e) {
throw new ChannelException("failed to open a new selector", e);
}

if (DISABLE_KEY_SET_OPTIMIZATION) {
return new SelectorTuple(unwrappedSelector);
}
// 忽略其他代码
}

注册的代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// AbstractNioChannel#AbstractNioUnsafe
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
// 将ServerSocketChannel注册到selector上
// this是ServerSocketChannel对象,作为attatchment关联到selectionKey上,方便后续使用
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
if (!selected) {
eventLoop().selectNow();
selected = true;
} else {
throw e;
}
}
}
}

完成 ServerSocketChannel 的注册后,还需要配置 pipeline,然后通知 pipeline 注册事件完成

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// AbstractNioChannel#AbstractNioUnsafe
private void register0(ChannelPromise promise) {
try {
// check if the channel is still open as it could be closed in the mean time when the register
// call was outside of the eventLoop
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
// 注册到selector上
doRegister();
neverRegistered = false;
registered = true;

// Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
// user may already fire events through the pipeline in the ChannelFutureListener.
pipeline.invokeHandlerAddedIfNeeded();

safeSetSuccess(promise);

// 通知注册完成,第一行的日志就是在这个时机打印的
pipeline.fireChannelRegistered();

// 忽略其他代码
}

BIND 绑定

Bind 是一个 Outbound 事件,它在 pipeline 上的传播路径是从 tail 到 head,因此当 LoggingHandler 打印出 BIND 日志时还未完成绑定操作。bind 操作最终是由 head 节点完成的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// DefaultChannelPipeline#HeadContext
public void bind(
ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {
unsafe.bind(localAddress, promise);
}

// AbstractChannel#AbstractUnsafe.bind
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
assertEventLoop();

if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}

boolean wasActive = isActive();
try {
// 绑定操作
doBind(localAddress);
} catch (Throwable t) {
safeSetFailure(promise, t);
closeIfClosed();
return;
}

// 通知active事件
if (!wasActive && isActive()) {
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireChannelActive();
}
});
}

safeSetSuccess(promise);
}

绑定操作依赖于具体的底层实现,对于 NioServerSocketChannel 来说的,它的实现如下

1
2
3
4
5
6
7
protected void doBind(SocketAddress localAddress) throws Exception {
if (PlatformDependent.javaVersion() >= 7) {
javaChannel().bind(localAddress, config.getBacklog());
} else {
javaChannel().socket().bind(localAddress, config.getBacklog());
}
}
1
2
01:21:07.827 [nioEventLoopGroup-2-1] INFO  i.n.handler.logging.LoggingHandler - [id: 0x63a20168, L:/0:0:0:0:0:0:0:0:8007] READ: [id: 0x7ac140e9, L:/127.0.0.1:8007 - R:/127.0.0.1:60796]
01:21:07.835 [nioEventLoopGroup-2-1] INFO i.n.handler.logging.LoggingHandler - [id: 0x63a20168, L:/0:0:0:0:0:0:0:0:8007] READ COMPLETE
0%