ServerSocketChannel是如何初始化和绑定到EventLoop的?

ServerBootstrap 启动时需要初始化 ServerSocketChannel 并将其绑定到 EventLoop 上,用于处理该 channel 上产生的各种事件。那么 ServerSocketChannel 是如何完成创建和初始化的?又是如何绑定到 EventLoop 上的?

一个简单的Echo Server

要搞清楚这些问题,我们先要知道一个基本的 netty server 代码是如何编写的。

如下是一个简单的 Echo Server,通过它可以调试和跟踪 ServerBootstrap 的启动流程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();

try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(eventLoopGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new EchoServerHandler());
}
});

ChannelFuture future = bootstrap.bind(port).sync();
future.channel().closeFuture().sync();
} finally {
eventLoopGroup.shutdownGracefully();
}

这段代码相当简单,它只进行了一些必要的设置,注册了一个 EchoServerHandler 用于处理请求,绑定到本地端口并启动。因此可以分成 3 部分来看:

首先是 ServerBootstrap 的配置

  • 配置使用的 EventLoopGroup
  • 指定 channel 类型为 NioServerSocketChannel
  • 设置 childHandler,往 SocketChannel 的 pipeline 上添加了一个 EchoServerHandler

接下来是绑定端口

1
ChannelFuture future = bootstrap.bind(port).sync();

最后是阻塞等待服务关闭

1
future.channel().closeFuture().sync();

再来看下 EchoServerHandler

1
2
3
4
5
6
7
8
9
10
11
12
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ctx.write(msg);
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}

它继承了 ChannelInboundHandlerAdapter 类,用于对入站(Inbound)信息进行处理,对于 EchoServer 来说,也就是处理客户端的请求信息。

channelRead 是在读取到请求数据时调用的,它只是简单地将读取到的信息输出到 ChannelHandlerContext,最终会传递到责任链的末端并发送出去。

从ServerBootstrap的启动说起

绑定端口的同时会触发 server 的启动,因此从 bootstrap.bind(port) 看起。它的内部逻辑如下:先创建并初始化 channel,然后将其注册到 EventLoop 上,最后为 socket 绑定本地地址及端口

1
2
3
4
5
6
7
8
9
10
11
private ChannelFuture doBind(final SocketAddress localAddress) {
// 初始化和注册channel
final ChannelFuture regFuture = initAndRegister();

if (regFuture.isDone()) {
ChannelPromise promise = channel.newPromise();
// 绑定端口
doBind0(regFuture, channel, localAddress, promise);
return promise;
}
}

注意:为了方便阅读删除了部分代码

initAndRegister 会完成 channel 的创建和注册工作,因此需要看下它的内部是如何实现的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
channel = channelFactory.newChannel();
init(channel);
} catch (Throwable t) {
// 暂时忽略
}

ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}

return regFuture;
}

创建channel

首先是通过 channelFactory 创建 channel。channelFactory 是在配置 ServerBootstrap 时生成的

1
2
3
4
5
6
# AbstractBootStrap
public B channel(Class<? extends C> channelClass) {
return channelFactory(new ReflectiveChannelFactory<C>(
ObjectUtil.checkNotNull(channelClass, "channelClass")
));
}

ReflectiveChannelFactory 是一个工厂类,它可以通过反射生成指定类型的实例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> {

private final Constructor<? extends T> constructor;

public ReflectiveChannelFactory(Class<? extends T> clazz) {
try {
// 获取构造函数
this.constructor = clazz.getConstructor();
} catch (NoSuchMethodException e) {
}
}

@Override
public T newChannel() {
try {
// 通过反射创建实例
return constructor.newInstance();
} catch (Throwable t) {
}
}

回到开头的 Echo Server,因为我们配置的 channel 是 NioServerSocketChannel.class

1
bootstrap.channel(NioServerSocketChannel.class)

所以 channel = channelFactory.newChannel() 创建的是一个 NioServerSocketChannel 类型的实例

初始化channel

channel 的初始化是在 AbstractBootstrap 的子类中完成的, ServerBootstrap 中的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
void init(Channel channel) {
// 设置ChannelOption,例如TCP_NODELAY、SO_BACKLOG等
setChannelOptions(channel, options0().entrySet().toArray(newOptionArray(0)), logger);
// 设置属性
setAttributes(channel, attrs0().entrySet().toArray(newAttrArray(0)));

ChannelPipeline p = channel.pipeline();

// child EventLoopGroup
final EventLoopGroup currentChildGroup = childGroup;

// child ChannelHandler
final ChannelHandler currentChildHandler = childHandler;

// child ChannelOption
final Entry<ChannelOption<?>, Object>[] currentChildOptions =
childOptions.entrySet().toArray(newOptionArray(0));

// child attribute
final Entry<AttributeKey<?>, Object>[] currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));

// 添加一个ChannelHandler,在NioServerSocketChannel初始化时执行一些操作
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) {
final ChannelPipeline pipeline = ch.pipeline();

// 把ServerBootStrap上配置的handler添加到pipeline上
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}

// 通过NioServerSocketChannel绑定的EventLoop执行逻辑
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
// 往pipeline上添加一个Acceptor
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}

它主要是设置 ServerSocketChannel 的 option、attribute,然后往pipeline上添加一个channelhandler,用于添加 Acceptor

注册channel到EventLoop

1
ChannelFuture regFuture = config().group().register(channel);

获取到 EventLoopGroup,调用其 register 方法注册 channel

1
2
3
4
# MultithreadEventLoopGroup
public ChannelFuture register(Channel channel) {
return next().register(channel);
}

其内部是先选择一个 EventLoop,然后调用 EventLoop 的 register 方法。EventLoop的选择策略可参考 netty的EventLoop选择策略

1
2
3
4
5
6
7
8
9
10
11
12
# SingleThreadEventLoop
public ChannelFuture register(Channel channel) {
// 使用DefaultChannelPromise对channel和eventLoop进行封装,然后注册
return register(new DefaultChannelPromise(channel, this));
}

public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
// 调用channel的unsafe属性注册
promise.channel().unsafe().register(this, promise);
return promise;
}

本质上是调用 unsafe 属性进行注册

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# AbstractChannel$AbstractUnsafe
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
// 将EventLoop绑定到channel上
AbstractChannel.this.eventLoop = eventLoop;

if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
// 先忽略
}
}
}

这里会将 EventLoop 绑定到 channel 上。每个channel 都会对应一个 EventLoop,用于处理其生命周期中的所有事件,而每个 EventLoop 可以对应多个 channel。

注册channel到selector

接着看 register0 的代码,它会继续完成 channel 到 selector 的注册

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
private void register0(ChannelPromise promise) {
try {
// 注册
doRegister();

// 调用已经添加到pipeline的handler
// 往pipeline添加ServerBootstrapAcceptor就是在这里触发的
pipeline.invokeHandlerAddedIfNeeded();

// 设置注册成功标志
safeSetSuccess(promise);

// 注册完成事件通知
pipeline.fireChannelRegistered();

if (isActive()) {
if (firstRegistration) {
// channelActive事件通知
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
// This channel was registered before and autoRead() is set. This means we need to begin read
// again so that we process inbound data.
//
// See https://github.com/netty/netty/issues/4805
beginRead();
}
}
} catch (Throwable t) {
// 忽略
}
}

doRegister 用于完成注册,它是在子类中实现的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# AbstractNioChannel
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
// 完成注册
// javaChannel为jdk原生的ServerSocketChannel。这里是sun.nio.ch.ServerSocketChannelImpl
// unwrappedSelector为实际的selector,在mac下是KQueueSelectorImpl
// this为NioServerSocketChannel,作为attatchment绑定到selectionKey上
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
// 暂时忽略
}
}
}

unwrappedSelector 是具体的实现类,它依赖于底层操作系统,例如在 mac 下的实现类为 KQueueSelectorImpl

javaChannel 则是 jdk 原生的 ServerSocketChannel 实现,这里是 sun.nio.ch.ServerSocketChannelImpl

为了方便取出 NioServerSocketChannel,还会将其以 attatchment绑定到selectionKey

到这里就完成了 channel 的注册过程