netty 中 ChannelFuture.sync () 的作用是什么?

如果你使用过 netty,你一定见过下面两行代码,它们可以说是创建一个 netty server 的标配代码

1
2
ChannelFuture future = bootstrap.bind(port).sync();
future.channel().closeFuture().sync();

不知道你有没想过这里面的 sync() 的作用是什么,如果去掉会有什么问题?

bootstrap.bind (port).sync () 分析

先来一步一步分析下第一行代码中的 sync 的作用

现象跟踪

为了方便调试,我把主要的代码贴到的下面,至于 EchoServerHandler 比较简单,你可以随便写个

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();

try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(eventLoopGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new EchoServerHandler());
}
});

ChannelFuture future = bootstrap.bind(port).sync();
future.channel().closeFuture().sync();
} finally {
eventLoopGroup.shutdownGracefully();
}

回到这行代码

1
ChannelFuture future = bootstrap.bind(port).sync();

我们可以把它拆成两行

1
2
ChannelFuture future1 = bootstrap.bind(port);
ChannelFuture future2 = future1.sync();

debug 下你会发现,future1 和 future2 其实是同一个对象

那么这个 sync 有什么作用呢?从名字上看是同步,猜测执行后会等待某种事件。将断点放在 future2 执行前再次 debug

你会发现事情起了变化,future1 执行 toString () 的结果中有一个 incomplete 字样,是一个未执行完成的状态

继续执行 future1.sync() 并查看状态

此时 future1 的状态变成了 success。这说明 future1.sync() 会等待异步事件执行完成,并且返回自身,可通过以下代码进一步验证

1
2
3
ChannelFuture future1 = bootstrap.bind(port);
Thread.sleep(1000);
ChannelFuture future2 = future1.sync();

future1.sync() 之前 sleep 1 秒钟,等待异步事件执行完,再次 debug 查看 future 状态

此时 future1 已经是 success 状态

代码分析

bootstrap.bind(port) 返回了一个 future 对象,它是一个 AbstractBootstrap$PendingRegistrationPromise 类型的实例

PendingRegistrationPromise 持有了 channel 的引用,并且往 ChannelFuture 注册了一个 Listener,当 ChannelFuture 的任务处理完成后,根据执行结果是否有异常决定执行 promise.setFailurepromise.registered 方法,若执行成功,则继续执行 doBind0 方法进行绑定操作。

回到上层,接着跟踪 future.sync 的内部执行逻辑,逐步跟踪进行,会发现执行到 DefaultPromise 的 await 方法

isDone 会判断 result 的状态

1
2
3
private static boolean isDone0(Object result) {
return result != null && result != UNCANCELLABLE;
}

因为此时 result 为 null,所以返回 false,继续往下执行

最终会执行到 while 循环中,将 waiters 变量的值加一,并且进入到 Object.wait () 中,等待被 notify 或 notifyAll 唤醒

1
2
3
4
5
6
7
8
9
10
11
while (!isDone()) {
// waiters变量的值加一
incWaiters();
try {
// Object.wait(),等待被唤醒
wait();
} finally {
// waiters变量的值加一
decWaiters();
}
}

那么它是被谁唤醒的呢?debug 跟踪 promise.registered 的执行,会发现它并没有修改 promise 的执行状态,继续跟踪 doBind0 方法

1
2
3
4
5
6
7
8
9
doBind0 
-> ChannelFuture.bind
-> AbstractChannel.bind
-> pipeline.bind
-> tail.bind
-> next.invokeBind
-> ((ChannelOutboundHandler) handler()).bind
-> unsafe.bind
-> AbstractChannel$AbstractUnsafe.bind

经过层层地跟踪,最终会执行到 AbstractUnsafe.bind 方法

注意最后一行,它是用来设置 future 的执行结果状态的,可以看到在执行前的状态是 uncancellable。跟踪进去

1
2
3
4
5
protected final void safeSetSuccess(ChannelPromise promise) {
if (!(promise instanceof VoidChannelPromise) && !promise.trySuccess()) {
logger.warn("Failed to mark a promise as success because it is done already: {}", promise);
}
}

关键的代码是 promise.trySuccess

1
2
3
DefaultChannelPromise.trySuccess 
-> DefaultPromise.trySuccess(null)
-> DefaultPromise.setSuccess0(null)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
AtomicReferenceFieldUpdater<DefaultPromise, Object> RESULT_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(DefaultPromise.class, Object.class, "result");

private boolean setSuccess0(V result) {
return setValue0(result == null ? SUCCESS : result);
}

private boolean setValue0(Object objResult) {
if (RESULT_UPDATER.compareAndSet(this, null, objResult) ||
RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {
// 通知waiter
if (checkNotifyWaiters()) {
// 通知listener
notifyListeners();
}
return true;
}
return false;
}

最终会通过 cas 操作将 AbstractBootstrap$PendingRegistrationPromise 的 result 属性设置为 SUCCESS,然后通知 waiter 和 listener

继续跟踪 checkNotifyWaiters 方法

1
2
3
4
5
6
7
8
private synchronized boolean checkNotifyWaiters() {
// waiter的数量大于0
if (waiters > 0) {
// 唤醒所有的waiter
notifyAll();
}
return listeners != null;
}

它会检查 waiters 变量的值,若大于 0 则说明有线程执行了 Object.wait 方法,此时通过 notifyAll 唤醒所有的线程,至此也就将整个过程串联了起来。

总结

bootstrap.bind() 返回一个 AbstractBootstrap$PendingRegistrationPromise 对象,它本质上是一个 DefaultPromise 对象,实现了 Future 接口

future.sync() 最终会使 DefaultPromise 的属性 waiters 值加一,然后调用 Object.wait 方法阻塞等待

bootstrap.bind() 会提交绑定事件到 EventLoop 中执行,待 socket 绑定地址成功后会调用 DefaultPromisetrySuccess 方法更改其状态并通知所有 waiter

此处 future.sync() 目的是等待异步的 socket 绑定事件完成

future.channel ().closeFuture ().sync () 分析

有了上面的经验,再看这行代码就轻松多了

代码分析

为了便于理解,我将这行代码拆开并加上了注释

1
2
3
4
5
6
7
8
// 获取ServerSocketChannel
Channel channel = future.channel();

// 获取closeFuture
ChannelFuture closeFuture = channel.closeFuture();

// 同步等待closeFuture执行完成
closeFuture.sync();

这里面需要关注的是 closeFuture,它的定义如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
private final CloseFuture closeFuture = new CloseFuture(this);

static final class CloseFuture extends DefaultChannelPromise {
CloseFuture(AbstractChannel ch) {
super(ch);
}

@Override
public ChannelPromise setSuccess() {
throw new IllegalStateException();
}

@Override
public ChannelPromise setFailure(Throwable cause) {
throw new IllegalStateException();
}

@Override
public boolean trySuccess() {
throw new IllegalStateException();
}

@Override
public boolean tryFailure(Throwable cause) {
throw new IllegalStateException();
}

boolean setClosed() {
return super.trySuccess();
}
}

可以看到 CloseFuture 类只提供了一个 setClosed 方法,调用后会将其自身的状态设置为 SUCCESS

如果还未调用过 setClosed 方法,执行 closeFuture.sync() 方法会阻塞在 Object.wait() 上,等待被唤醒。

那么在什么情况下会调用 setClosed 方法呢?跟踪代码引用情况,你会发现在 register 和 close 两个时机都有可能调用

它们之间的区别是,在 register 阶段,只有当出现异常的情况下会调用 closeFuture.setClosed() 方法

而在 close 阶段,因为本身就是要关闭,所以不管成功或出现异常都会调用,只是在抛异常的情况下会记录异常栈

总结

1
2
ChannelFuture future = bootstrap.bind(port).sync();
future.channel().closeFuture().sync();

再回顾下代码,这里面共有两个 Future 对象:

  • bind 方法返回的 future 用于等待底层网络组件启动完成
  • closeFuture 用于等待网络组件关闭完成