通过Tomcat的Http11NioProtocol源码学习Java NIO设计

Tomcat的Http11NioProtocol协议使用Java NIO技术实现高性能Web服务器。本文通过分析Http11NioProtocol源码来学习Java NIO的使用。从中可以了解到阻塞IO和非阻塞IO的配合,NIO的读写操作以及Selector.wakeup的使用。

1. 初始化阶段
Java NIO服务器端实现的第一步是开启一个新的ServerSocketChannel对象。Http11NioProtocol的实现也不例外, 在NioEndPoint类的init方法可以看到这段代码。

代码1:NioEndPoint.init()方法

public void init()
    throws Exception {

    if (initialized )
        return;
    //开启一个新的ServerSocketChannel
    serverSock = ServerSocketChannel.open();
    //设置socket的性能偏好
    serverSock.socket().setPerformancePreferences(socketProperties .getPerformanceConnectionTime(),
                                                  socketProperties.getPerformanceLatency(),
                                                  socketProperties.getPerformanceBandwidth());
    InetSocketAddress addr = ( address!=null ?new InetSocketAddress(address ,port ):new InetSocketAddress(port));
    //绑定端口号,并设置backlog
    serverSock.socket().bind(addr,backlog );
    //将serverSock设置成阻塞IO
    serverSock.configureBlocking(true); //mimic APR behavior

    //初始化acceptor线程数
    if (acceptorThreadCount == 0) {
        // FIXME: Doesn't seem to work that well with multiple accept threads
        acceptorThreadCount = 1;
    }
    //初始化poller线程数
    if (pollerThreadCount <= 0) {
        //minimum one poller thread
        pollerThreadCount = 1;
    }

    // 根据需要,初始化SSL
    // 因为主要关注Java NIO, 所以这一块代码就省略掉了
    if (isSSLEnabled()) {
      ......
    }
    //OutOfMemoryError策略
    if (oomParachute >0) reclaimParachute(true);

    //开启NioSelectorPool
    selectorPool.open();
    initialized = true ;
}

 

在NioEndPoint.init方法中,可以看到ServerSocketChannel被设置成阻塞IO,并且没有注册任何就绪事件。这样可以和阻塞ServerSocket一样方便地使用阻塞accept方法来接收客户端新来的连接。但不同的是当NioEndPoint.Accept线程通过accept方法获得一个新的SocketChannel后会构建一个OP_REGISTER类型的PollerEvent事件并放到Poller.events队列中。而我们使用ServerSocket实现服务器的时候,在接收到新连接后,一般是从线程池中取出一个线程来处理这个连接。

在NioEndPoint.Accept的setSocketOptions方法中可以看到获得SocketChannel后的处理过程。步骤如下:

1)将SocketChannel设置成非阻塞;

2)构建OP_REGISTER类型的PollerEvent对象,并放入到Poller.events队列中。

代码2:NioEndPoint.Accept类的setSocketOptions方法

protected boolean setSocketOptions(SocketChannel socket) {
    try {
      //将客户端Socket设置为非阻塞, APR风格
        socket.configureBlocking( false);
        Socket sock = socket.socket();
        socketProperties.setProperties(sock);
        //从缓存中取NioChannel对象,如果取不到直接构建一个
        NioChannel channel = nioChannels.poll();
        if ( channel == null ) {
            // 如果sslContext不等于null, 需要启动ssl
            if (sslContext != null) {
                ....
            }
            //正常tcp启动
            else {
                //构建NioBufferHandler对象
                NioBufferHandler bufhandler = new NioBufferHandler(socketProperties .getAppReadBufSize(),
                                                                  socketProperties.getAppWriteBufSize(),
                                                                  socketProperties.getDirectBuffer());
                //构建NioChannel对象
                channel = new NioChannel(socket, bufhandler);
            }
        } else {
            //从缓存中取的NioChannel对象,将客户端socket设置进去
            channel.setIOChannel(socket);
            if ( channel instanceof SecureNioChannel ) {
                SSLEngine engine = createSSLEngine();
                ((SecureNioChannel)channel).reset(engine);
            } else {
                channel.reset();
            }
        }
        //注册NioChannel对象
        getPoller0().register(channel);
    } catch (Throwable t) {
        try {
            log.error("" ,t);
        } catch ( Throwable tt){}
        // Tell to close the socket
        return false ;
    }
    return true ;
}

Poller线程会从Poller.events队列中取出PollerEvent对象,并运行PollerEvent.run()方法。在PollerEvent.run()方法中发现是OP_REGISTER事件,则会在Poller.selector上注册SocketChannel对象的OP_READ就绪事件。

代码3:PollerEvent.run()方法代码片段

public void run() {
  if ( interestOps == OP_REGISTER ) {
      try {
          //在Poller.selector上注册OP_READ就绪事件
          socket.getIOChannel().register(socket .getPoller().getSelector(), SelectionKey.OP_READ , key );
      } catch (Exception x) {
          log.error("" , x);
      }
  }
  ......
}


至此,一个客户端连接准备工作就已经完成了。我们获得了一个客户端的SocketChannel, 并注册OP_READ就绪事件到Poller.selector上(如图1)。下面就可以进行数据读写了。

图1:ServerSocketChannel和SocketChannel的初始化状态

通过Tomcat的Http11NioProtocol源码学习Java NIO设计

2. Poller.selector的wakeup方法
Poller线程会做如下工作:

1) 通过selection操作获取已经选中的SelectionKey数量;

2) 执行Poller.events队列中的PollerEvent;

3) 处理已经选中的SelectionKey。

当有新PollerEvent对象加入Poller.events队列中,需要尽快执行第二步,而不应该阻塞的selection操作中。所以就需要配合Selector.wakeup()方法来实现这个需求。Tomcat使用信号量wakeupCounter来控制Selector.wakeup()方法,阻塞Selector.select()方法和非阻塞Selector.selectNow()方法的使用。

当有新PollerEvent对象加入Poller.events队列中,会按照如下条件执行Selector.wakeup()方法。

当wakeupCounter加1后等于0,说明Poller.selector阻塞在selection操作,这时才需要调用Selector.wakeup()方法。
当wakeupCounter加1后不等于0,说明Poller.selector没有阻塞在selection操作,则不需要调用Selector.wakeup()方法。并且为了尽快执行第二步,Poller线程在下一次直接调用非阻塞方法Selector.selectNow()。
代码4:Poller.addEvent()方法,实现将PollerEvent对象加入Poller.events队列中。

public void addEvent(Runnable event) {
  events.offer(event);
  //如果wakeupCount加1后等于0,则调用wakeup方法
  if ( wakeupCounter .incrementAndGet() == 0 ) selector.wakeup();
}


代码5: Poller线程的selection操作代码
if (wakeupCounter .get()>0) {
  keyCount = selector.selectNow();
 else {
  wakeupCounter.set(-1);
  keyCount = selector.select(selectorTimeout );
}
wakeupCounter.set(0);


这样的设计因为Java NIO的wakeup有如下的特性:
在Selector对象上调用wakeup()方法将会导致第一个没有返回的selection操作立即返回。如果当前没有进行的selection操作,那么下一次的select()方法的调用将立即返回。而这个将wakeup行为延迟到下一个select()方法经常不是我们想要的(当然也不是Tomcat想要的)。我们一般只是想从sleeping的线程wakeup,但允许接下来的selection操作正常处理。
所以,Tomcat通过wakeupCounter信号量的变化来控制只有阻塞在selection操作的时候才调用Selector.wakeup()方法。当有新PollerEvent对象加入Poller.events队列中,并且没有处于阻塞在selection操作中,则直接调用非阻塞方法Selector.selectNow()。

相关推荐