netty中ChannelFuture.sync()的作用是什么?

如果你使用过 netty,你一定见过下面两行代码,它们可以说是创建一个 netty server 的标配代码

1
2
ChannelFuture future = bootstrap.bind(port).sync();
future.channel().closeFuture().sync();

不知道你有没想过这里面的 sync() 的作用是什么,如果去掉会有什么问题?

bootstrap.bind(port).sync() 分析

先来一步一步分析下第一行代码中的 sync 的作用

现象跟踪

为了方便调试,我把主要的代码贴到的下面,至于 EchoServerHandler 比较简单,你可以随便写个

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();

try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(eventLoopGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new EchoServerHandler());
}
});

ChannelFuture future = bootstrap.bind(port).sync();
future.channel().closeFuture().sync();
} finally {
eventLoopGroup.shutdownGracefully();
}

回到这行代码

1
ChannelFuture future = bootstrap.bind(port).sync();

我们可以把它拆成两行

1
2
ChannelFuture future1 = bootstrap.bind(port);
ChannelFuture future2 = future1.sync();

debug 下你会发现,future1 和 future2 其实是同一个对象

image-20210128004713080 image-20210128004734859

那么这个 sync 有什么作用呢?从名字上看是同步,猜测执行后会等待某种事件。将断点放在 future2 执行前再次 debug

image-20210128005025015

你会发现事情起了变化,future1 执行 toString() 的结果中有一个 incomplete 字样,是一个未执行完成的状态

继续执行 future1.sync() 并查看状态

image-20210128005401882

此时 future1 的状态变成了 success。这说明 future1.sync() 会等待异步事件执行完成,并且返回自身,可通过以下代码进一步验证

1
2
3
ChannelFuture future1 = bootstrap.bind(port);
Thread.sleep(1000);
ChannelFuture future2 = future1.sync();

future1.sync() 之前 sleep 1 秒钟,等待异步事件执行完,再次 debug 查看 future 状态

image-20210128005834185

此时 future1 已经是 success 状态

代码分析

bootstrap.bind(port) 返回了一个 future 对象,它是一个 AbstractBootstrap$PendingRegistrationPromise 类型的实例

image-20210128010446182

PendingRegistrationPromise 持有了 channel 的引用,并且往 ChannelFuture 注册了一个 Listener,当 ChannelFuture 的任务处理完成后,根据执行结果是否有异常决定执行 promise.setFailurepromise.registered 方法,若执行成功,则继续执行 doBind0 方法进行绑定操作。

回到上层,接着跟踪 future.sync 的内部执行逻辑,逐步跟踪进行,会发现执行到 DefaultPromise 的 await 方法

image-20210128011324372

isDone 会判断 result 的状态

1
2
3
private static boolean isDone0(Object result) {
return result != null && result != UNCANCELLABLE;
}

因为此时 result 为 null,所以返回 false,继续往下执行

最终会执行到 while 循环中,将 waiters 变量的值加一,并且进入到 Object.wait() 中,等待被 notify 或 notifyAll 唤醒

1
2
3
4
5
6
7
8
9
10
11
while (!isDone()) {
// waiters变量的值加一
incWaiters();
try {
// Object.wait(),等待被唤醒
wait();
} finally {
// waiters变量的值加一
decWaiters();
}
}

那么它是被谁唤醒的呢?debug 跟踪 promise.registered 的执行,会发现它并没有修改 promise 的执行状态,继续跟踪 doBind0 方法

1
2
3
4
5
6
7
8
9
doBind0 
-> ChannelFuture.bind
-> AbstractChannel.bind
-> pipeline.bind
-> tail.bind
-> next.invokeBind
-> ((ChannelOutboundHandler) handler()).bind
-> unsafe.bind
-> AbstractChannel$AbstractUnsafe.bind

经过层层地跟踪,最终会执行到 AbstractUnsafe.bind 方法

image-20210128014247167

注意最后一行,它是用来设置 future 的执行结果状态的,可以看到在执行前的状态是 uncancellable。跟踪进去

1
2
3
4
5
protected final void safeSetSuccess(ChannelPromise promise) {
if (!(promise instanceof VoidChannelPromise) && !promise.trySuccess()) {
logger.warn("Failed to mark a promise as success because it is done already: {}", promise);
}
}

关键的代码是 promise.trySuccess

1
2
3
DefaultChannelPromise.trySuccess 
-> DefaultPromise.trySuccess(null)
-> DefaultPromise.setSuccess0(null)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
AtomicReferenceFieldUpdater<DefaultPromise, Object> RESULT_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(DefaultPromise.class, Object.class, "result");

private boolean setSuccess0(V result) {
return setValue0(result == null ? SUCCESS : result);
}

private boolean setValue0(Object objResult) {
if (RESULT_UPDATER.compareAndSet(this, null, objResult) ||
RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {
// 通知waiter
if (checkNotifyWaiters()) {
// 通知listener
notifyListeners();
}
return true;
}
return false;
}

最终会通过 cas 操作将 AbstractBootstrap$PendingRegistrationPromise 的 result 属性设置为 SUCCESS,然后通知 waiter 和 listener

继续跟踪 checkNotifyWaiters 方法

1
2
3
4
5
6
7
8
private synchronized boolean checkNotifyWaiters() {
// waiter的数量大于0
if (waiters > 0) {
// 唤醒所有的waiter
notifyAll();
}
return listeners != null;
}

它会检查 waiters 变量的值,若大于 0 则说明有线程执行了 Object.wait 方法,此时通过 notifyAll 唤醒所有的线程,至此也就将整个过程串联了起来。

总结

bootstrap.bind() 返回一个 AbstractBootstrap$PendingRegistrationPromise 对象,它本质上是一个 DefaultPromise 对象,实现了 Future 接口

future.sync() 最终会使 DefaultPromise 的属性 waiters 值加一,然后调用 Object.wait 方法阻塞等待

bootstrap.bind() 会提交绑定事件到 EventLoop 中执行,待 socket 绑定地址成功后会调用 DefaultPromisetrySuccess 方法更改其状态并通知所有 waiter

此处 future.sync() 目的是等待异步的 socket 绑定事件完成

image-20210128020412505

future.channel().closeFuture().sync() 分析

有了上面的经验,再看这行代码就轻松多了

代码分析

为了便于理解,我将这行代码拆开并加上了注释

1
2
3
4
5
6
7
8
// 获取ServerSocketChannel
Channel channel = future.channel();

// 获取closeFuture
ChannelFuture closeFuture = channel.closeFuture();

// 同步等待closeFuture执行完成
closeFuture.sync();

这里面需要关注的是 closeFuture,它的定义如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
private final CloseFuture closeFuture = new CloseFuture(this);

static final class CloseFuture extends DefaultChannelPromise {
CloseFuture(AbstractChannel ch) {
super(ch);
}

@Override
public ChannelPromise setSuccess() {
throw new IllegalStateException();
}

@Override
public ChannelPromise setFailure(Throwable cause) {
throw new IllegalStateException();
}

@Override
public boolean trySuccess() {
throw new IllegalStateException();
}

@Override
public boolean tryFailure(Throwable cause) {
throw new IllegalStateException();
}

boolean setClosed() {
return super.trySuccess();
}
}

可以看到 CloseFuture 类只提供了一个 setClosed 方法,调用后会将其自身的状态设置为 SUCCESS

如果还未调用过 setClosed 方法,执行 closeFuture.sync() 方法会阻塞在 Object.wait() 上,等待被唤醒。

那么在什么情况下会调用 setClosed 方法呢?跟踪代码引用情况,你会发现在 register 和 close 两个时机都有可能调用

image-20210129004510287

它们之间的区别是,在 register 阶段,只有当出现异常的情况下会调用 closeFuture.setClosed() 方法

image-20210129004814557

而在 close 阶段,因为本身就是要关闭,所以不管成功或出现异常都会调用,只是在抛异常的情况下会记录异常栈

image-20210129004935112

总结

1
2
ChannelFuture future = bootstrap.bind(port).sync();
future.channel().closeFuture().sync();

再回顾下代码,这里面共有两个 Future 对象:

  • bind 方法返回的 future 用于等待底层网络组件启动完成
  • closeFuture 用于等待网络组件关闭完成