netty的启动流程分析

netty 是一个基于异步事件驱动实现的网络编程框架,它的内部使用了大量的异步编程方法,这是它性能高效的一个原因,但同时也使得代码阅读起来更加困难,本文就尝试分析下它的启动过程

按照惯例先上源码,这一段是 netty server 的启动代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();

try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(eventLoopGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new EchoServerHandler());
}
});

ChannelFuture future = bootstrap.bind(port).sync();
future.channel().closeFuture().sync();
} finally {
eventLoopGroup.shutdownGracefully();
}

对应的 Handler 代码如下

1
2
3
4
5
6
7
8
9
10
11
12
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ctx.write(msg);
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}

我们可以通过调试一步步跟踪代码的执行流程,由于以下原因,调试的流程可能不那么流畅

  • 注册 FutureListener,在任务执行完后通过回调执行相关逻辑
  • 提交任务到 EventLoop,将任务放到 EventLoop 线程中执行

为方便跟踪代码,特将整个启动流程的执行顺序梳理出来,进而整理成下图所示的流程图

首先看下 initAndRegister 的执行流程,它负责创建和初始化 channel,并将其注册到 EventLoop 上,同时将原生的 java channel 注册到 selector 上。

newChannel 和 initChannel

newChannel

netty 首先使用 channelFactory.newChannel 创建一个 channel 的实例,它的类型是在配置 ServerBootstrap 时指定的 .channel(NioServerSocketChannel.class)

很显然要通过反射创建的实例

1
2
3
4
5
6
7
public T newChannel() {
try {
return constructor.newInstance();
} catch (Throwable t) {
throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t);
}
}

initChannel

在创建好 channel 实例之后,先要设置它的属性,这些都可以通过 ServerBootstrap 进行配置

1
2
setChannelOptions(channel, newOptionsArray(), logger);
setAttributes(channel, attrs0().entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY));

然后获取 channel 的 pipeline,注意每个 channel 会对应一个 pipeline。拿到 pipeline 之后会调用其 addLast 方法注册一个 ChannelInitializer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}

ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});

跟踪其调用流程,最终会执行如下代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// DefaultChannelPipeline#callHandlerCallbackLater
// ctx为包裹ChannelInitizlizer的ChannelHandlerContext
// added = true
private void callHandlerCallbackLater(AbstractChannelHandlerContext ctx, boolean added) {
assert !registered;

PendingHandlerCallback task = added ? new PendingHandlerAddedTask(ctx) : new PendingHandlerRemovedTask(ctx);
PendingHandlerCallback pending = pendingHandlerCallbackHead;
if (pending == null) {
// 将head设置为PendingHandlerAddedTask
pendingHandlerCallbackHead = task;
} else {
// Find the tail of the linked-list.
while (pending.next != null) {
pending = pending.next;
}
pending.next = task;
}
}

此时 pendingHandlerCallbackHead 链表为空,并且调用该方法时入参 add 为 true,因此会将 head 设置为 PendingHandlerAddedTask

我们来看下 PendingHandlerAddedTask 的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
private final class PendingHandlerAddedTask extends PendingHandlerCallback {

PendingHandlerAddedTask(AbstractChannelHandlerContext ctx) {
super(ctx);
}

@Override
public void run() {
callHandlerAdded0(ctx);
}

@Override
void execute() {
EventExecutor executor = ctx.executor();
if (executor.inEventLoop()) {
callHandlerAdded0(ctx);
} else {
try {
executor.execute(this);
} catch (RejectedExecutionException e) {
if (logger.isWarnEnabled()) {
logger.warn(
"Can't invoke handlerAdded() as the EventExecutor {} rejected it, removing handler {}.",
executor, ctx.name(), e);
}
atomicRemoveFromHandlerList(ctx);
ctx.setRemoved();
}
}
}
}

它是 PendingHandlerCallback 的子类,它的职责是调用 callHandlerAdded0 方法触发 handler 的添加事件。

因为此时并不会触发其执行,所以暂且不去看内部逻辑,待后面调用到时再做分析

register

Channel 初始化好之后,接下来要将其注册到 EventLoop 上

1
config().group().register(channel)

config().group() 用于获取 ServerBootstrap 配置的 EventLoopGroup,拿到 EventLoopGroup 之后再调用其 register 方法

MultithreadEventLoopGroup.register 方法实现如下

1
return next().register(channel);

它先获取一个 EventLoop,然后调用其 register 方法。继续跟踪到 SingleThreadEventLoop 的 register 方法实现

1
promise.channel().unsafe().register(this, promise);

它是先获取 channel 的内部属性 unsafe,然后调用其 register 方法,如下是其核心代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
// 这里
eventLoop.execute(new Runnable() {
@Override
public void run() {
// 任务一
register0(promise);
}
});
} catch (Throwable t) {
// 忽略
}
}

因为此时是在主线程中执行,所以会执行到第 6 行,往 EventLoop 中提交一个任务,用于完成 register0 操作,可以将其记做任务一

register0 需要等到 EventLoop 调度后执行。

那么 EventLoop 是在什么时候启动的呢?这需要跟进到 eventLoop.execute 内部

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private void execute(Runnable task, boolean immediate) {
// 此时在主线程中执行,因此为false
boolean inEventLoop = inEventLoop();
// 添加到任务队列
addTask(task);
if (!inEventLoop) {
// 启动线程
startThread();
// 忽略
}

if (!addTaskWakesUp && immediate) {
wakeup(inEventLoop);
}
}

因为此时是在主线程中执行,而不是 EventLoop 线程,所以除了会将任务添加到执行队列外,还会启动线程,之后 EventLoop 就进入到循环中,开始处理任务

regFuture.addListener

回到 doBind,在 initAndRegister 执行完之后,因为是异步操作 regFuture 还未完成,所以会调用 regFuture.addListener 注册一个 FutureListener ,以便等到注册完成之后执行 doBind0 操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
// IllegalStateException once we try to access the EventLoop of the Channel.
promise.setFailure(cause);
} else {
// Registration was successful, so set the correct executor to use.
// See https://github.com/netty/netty/issues/2586
promise.registered();

doBind0(regFuture, channel, localAddress, promise);
}
}
});

register0

再回到 register,这次是因为 EventLoop 已经开始执行,取到了 任务一 并执行,也就是说开始执行 register0 操作了

doRegister

将内部的 java channel 注册到 selector 上

1
2
3
4
5
6
7
8
9
10
11
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
// 忽略
}
}
}

注意第五行的 this 是 netty 的 channel,对于本文的场景实际是 NioServerSocketChannel。这样做的目的是将其作为 attachment 存储在 selectionKey 上,方便后面使用

pipeline.invokeHandlerAddedIfNeeded

它的调用链路比较长,简单来讲就是执行 pipeline 上所有的 PendingHandlerCallback,对于本文的场景就是执行 initChannel 时添加的 PendingHandlerAddedTask

DefaultChannelPipeline.callHandlerAdded0 -> AbstractChannelHandlerContext.callHandlerAdded -> ChannelInitializer.handlerAdded -> ChannelInitializer.initChannel

可以看到最终会执行 ChannelInitializer.initChannel 方法,那么这个 ChannelInitializer 是什么呢?

它实际上是在 initChannel 时添加的,再回顾下代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}

ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});

它的作用是将用户配置的 handler 添加到 pipeline,并且往 EventLoop 提交一个任务,这个任务是往 pipeline 添加 ServerBootstrapAcceptor,我们把这个任务记做任务二

safeSetSuccess

更新 future 的状态为成功,同时通知所有的 listener。这里只有一个 listener,它是在 regFuture.addListener 执行时添加的,再次回顾下代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
// IllegalStateException once we try to access the EventLoop of the Channel.
promise.setFailure(cause);
} else {
// Registration was successful, so set the correct executor to use.
// See https://github.com/netty/netty/issues/2586
promise.registered();

doBind0(regFuture, channel, localAddress, promise);
}
}
});

很明显会触发 doBind0 的执行

doBind0

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {

// This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up
// the pipeline in its channelRegistered() implementation.
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}

它是往 EventLoop 中提交了一个任务,通过 EventLoop 执行 channel.bind 操作。我们把这个任务记做任务三

执行任务二

EventLoop 取出任务二执行,将 ServerBootstrapAcceptor 添加到 pipeline 上

1
pipeline.addLast(new ServerBootstrapAcceptor(ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));

执行任务三

EventLoop 取出任务三并执行

1
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);

bind 操作通过 pipeline 进行传递,从 tail 传递到 head,最终通过 head 调用 unsafe.bind 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
boolean wasActive = isActive();
try {
doBind(localAddress);
} catch (Throwable t) {
safeSetFailure(promise, t);
closeIfClosed();
return;
}

if (!wasActive && isActive()) {
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireChannelActive();
}
});
}

safeSetSuccess(promise);

doBind 依赖于具体的 channel 实现,这里是 NioServerSocketChannel ,直接调用 java channel 的 bind 方法

1
2
3
4
5
6
7
protected void doBind(SocketAddress localAddress) throws Exception {
if (PlatformDependent.javaVersion() >= 7) {
javaChannel().bind(localAddress, config.getBacklog());
} else {
javaChannel().socket().bind(localAddress, config.getBacklog());
}
}

safeSetSuccess 修改 future 状态为成功,通知所有的 listener bind 事件已执行完

0%