netty源码研究二(请求处理)


前言

上一篇文章中,已经分析了netty的服务器端启动过程,我们知道NioEventLoop.run()方法,是服务器端用来轮询客户端的请求。我们指定创建出的NioServerSocketChannel就是注册到了NioEventLoop中的Selector上。这篇文章,我们接着顺藤摸瓜,来分析下netty是怎么处理每个客户端请求的。

MainReactor

事件轮询

NioEventLoop
nioEventLoop.run
通过上图我们可以看到run方法中,selector一直在轮询客户端过来的请求。我们看到若接收到一个感兴趣的事件后,会通过调用this.processSelectedKeys()方法来处理被select的key,通过一系列方法调用,会到下图中的方法
nioEventLoop.run

if((ignored & 17) != 0 || ignored == 0) {
    unsafe.read();
    if(!ch.isOpen()) {
        return;
    }
}

通过“ignored&17”可以看到若事件是OP_ACCEPT活着OP_READ就会调用到unsafe.read()方法。这个方法会调用到NioMessageUnsafe.read();

事件处理

AbstractNioMessageChannel
AbstractNioMessageChannel.read
read()会不断调用doReadMessages(),将产生的readBuf逐一发送给Pipeline.fireChannelRead()去处理。这2个方法也是我们需要重点关注的。

  • 先来看看doReadMessages()方法。
    protected int doReadMessages(List buf) throws Exception {
    
    SocketChannel ch = this.javaChannel().accept();
    
    try {
        if(ch != null) {
            buf.add(new NioSocketChannel(this, ch));
            return 1;
        }
    } catch (Throwable var6) {
        logger.warn("Failed to create a new channel from an accepted socket.", var6);
    
        try {
            ch.close();
        } catch (Throwable var5) {
            logger.warn("Failed to close a socket.", var5);
        }
    }
    
    return 0;
    }
    ServerBootstrapAcceptor
    ServerBootstrapAcceptor.channelRead
    这里可以看到accept真正与客户端建立连接并创建了SocketChannel,也就是说这里只是创建了连接,但是没有真正的去读取io,这个也就mainreactor做的,只接受客户端连接,可以猜测接下来要把这个socketChannel交给subReactor来真正处理io。
  • 然后是fireChannelRead。
    最终会调用ServerBootstrap.ServerBootstrapAcceptor.channelRead()方法。
    注意this.childGroup.register(child)
    有没有一种似曾相识的感觉,不错,这个就是subReactor的注册过程,这里就不在啰嗦了
    方法调用堆栈图
    方法调用堆栈图

SubReactor

通过前面的分析能够看到,EventLoop轮询到的事件最终会交给unsafe.read()去处理。NioSocketChannel与NioServerSocketChannel的一个重要区别就是:NioSocketChannel继承AbstractNioByteChannel,而后者继承AbstractNioMessageChannel,两者的unsafe工具类实现是不同的。

注册读事件

因为Netty 4中已经完全统一了EventLoopGroup的代码,已经不区分主从Reactor的逻辑了。所以实际上,这里的注册过程我们已经分析过了。子EventLoopGroup会选择出一个EventLoop负责轮询绑定上的Channel的事件,而Channel感兴趣的事件前面也提到了,就是Channel构造方法中传入的。
// NioSocketChannel

public NioSocketChannel(Channel parent, SocketChannel socket) {
    super(parent, socket); config = new NioSocketChannelConfig(this, socket.socket());
}
AbstractNioByteChannel protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
    super(parent, ch, SelectionKey.OP_READ);
}

创建Handler

使用Netty时我们通常会在ChannelInitializer中初始化Handler,但Netty是什么时候调用它的呢?答案就在Channel注册到子EventLoop之后。之前看到的fireChannelRegistered()会触发ChannelInitializer。所以说:每个客户端Channel建立成功后会创建Handler,并且后续请求处理都由这一组Handler完成。

public abstract class ChannelInitializer extends ChannelInboundHandlerAdapter {
/**

 * This method will be called once the {@link Channel} was registered.
 */
protected abstract void initChannel(C ch) throws Exception;

@Override
public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {
    ChannelPipeline pipeline = ctx.pipeline();
    try {
        initChannel((C) ctx.channel());
        pipeline.remove(this);
        ctx.fireChannelRegistered();
    } catch (Throwable t) {
        logger.warn("Failed to initialize a channel. Closing: " + ctx.channel(), t);
    }
}
}

处理客户端io

//AbstractNioByteChannel
AbstractNioByteChannel.subRead
通过上图,可以看到这里才是真正的读取socketChannel的内容。在AbstractNioMessageChannel中接收到的是SocketChannel,所以并没有发生真正的读操作。而AbstractNioByteChannel是真正地从SocketChannel中读,所以这也是申请缓冲区的地方。每次发生读事件时,都会分配一块ByteBuf,然后尝试从Channel中读出数据写到ByteBuf中。之后触发fireChannelRead(),由Pipeline中的Handler继续处理,最终Tail处理器负责释放掉ByteBuf。

方法调用堆栈图

subReactor调用堆栈图

总结

首先,ServerSocketChannel会由一个EventLoop负责轮询接收事件,得到的SocketChannel是交给子Reactor中的一个EventLoop负责轮询读事件。也就是说多个客户端可能会对应一个EventLoop线程。每个SocketChannel注册完毕就会创建Handler,所以说每个客户端都对应自己的Handler实例,并且一直使用到连接断开。


文章作者: 叶明
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 叶明 !
评论
 上一篇
Spring IOC源码分析 Spring IOC源码分析
IOC简要解释在应用开发中,开发人员往往需要引用和调用其它组件的服务,这种依赖关系如果固化在组件设计中,就会造成依赖关系的僵化和以后维护成本的增加。如果使用IoC容易,把资源的获取反转,具体相对java来说就是把bean的依赖关系交给IoC
2016-04-16
下一篇 
netty源码研究一(服务端启动) netty源码研究一(服务端启动)
netty服务端代码分析服务端启动配置
2016-03-12
  目录