Java NIO源码分析

1.前言

JDK1.4之前的传统阻塞IO(BIO),服务端需要为每一个客户端连接创建单独的线程为其服务,从JDK1.4开始NIO非阻塞式IO出现,它只需要单独的一个线程就能接收多个客户端请求,而真正处理各个请求的细节可以使用多线程的方式高效率的完成,这些处理线程与具体的业务逻辑分离,做到了IO的复用。

2.源码分析

首先以一段典型的NIO使用代码开始:

Selector selector = Selector.open();  
ServerSocketChannel ssc = ServerSocketChannel.open();  
ssc.configureBlocking(false);  
ssc.socket().bind(new InetSocketAddress(9527)); 
ssc.register(selector, SelectionKey.OP_ACCEPT); 
while(true){
	int n = selector.select();
	if (n <= 0) continue;
	Iterator it = selector.selectedKeys().iterator();
	while(it.hasNext()){
		SelectionKey key = (SelectionKey)it.next();
		if (key.isAcceptable()){
		       SocketChannel sc= ((ServerSocketChannel) key.channel()).accept();
		       sc.configureBlocking(false);
		       sc.register(key.selector(), SelectionKey.OP_READ|SelectionKey.OP_WRITE);
		 }
		 if (key.isReadable()){
		        SocketChannel channel = ((SocketChannel) key.channel());
		        ByteBuffer bf = ByteBuffer.allocate(10);
		        int read = channel.read(bf);
		        System.out.println("read "+read+" : "+new String(bf.array()).trim());
		  }
		  if (key.isWritable()){
		       SocketChannel channel = ((SocketChannel) key.channel());
		       channel.write(ByteBuffer.wrap(new String("hello client").getBytes()));
		  }
		 it.remove();
	}
}

2.1 Selector.open() 获取选择器。

public static Selector open() throws IOException {  
    return SelectorProvider.provider().openSelector();  
}
public static SelectorProvider provider() {
        synchronized (lock) {
            if (provider != null)
                return provider;
            return AccessController.doPrivileged(
                new PrivilegedAction<SelectorProvider>() {
                    public SelectorProvider run() {
                            if (loadProviderFromProperty())
                                return provider;
                            if (loadProviderAsService())
                                return provider;
                            provider = sun.nio.ch.DefaultSelectorProvider.create();
                            return provider;
                        }
                    });
        }
}

从Selector源码中可以看到,open方法是交给selectorProvider处理的。 其中provider = sun.nio.ch.DefaultSelectorProvider.create();会根据操作系统来返回不同的实现类,windows平台就返回WindowsSelectorProvider;Linux平台会根据不同的内核版本选择是使用select/poll模式还是epoll模式。

public static SelectorProvider create() {
PrivilegedAction pa = new GetPropertyAction("os.name");
String osname = (String) AccessController.doPrivileged(pa);
    if ("SunOS".equals(osname)) {
        return new sun.nio.ch.DevPollSelectorProvider();
    }
 
    // use EPollSelectorProvider for Linux kernels >= 2.6
    if ("Linux".equals(osname)) {
        pa = new GetPropertyAction("os.version");
        String osversion = (String) AccessController.doPrivileged(pa);
        String[] vers = osversion.split("\\.", 0);
        if (vers.length >= 2) {
            try {
                int major = Integer.parseInt(vers[0]);
                int minor = Integer.parseInt(vers[1]);
                if (major > 2 || (major == 2 && minor >= 6)) {
                    return new sun.nio.ch.EPollSelectorProvider();
                }
            } catch (NumberFormatException x) {
                // format not recognized
            }
        }
    }
    return new sun.nio.ch.PollSelectorProvider();
}

sun.nio.ch.EPollSelectorProvider 
public AbstractSelector openSelector() throws IOException {
    return new EPollSelectorImpl(this);
} 
sun.nio.ch.PollSelectorProvider 
public AbstractSelector openSelector() throws IOException {
    return new PollSelectorImpl(this);
}

 可以看到,如果Linux内核版本>=2.6则,具体的SelectorProvider为EPollSelectorProvider,否则为默认的PollSelectorProvider,实际上这是在JDK5U9之后才有这样的更新。

public static SelectorProvider create() {
        return new sun.nio.ch.WindowsSelectorProvider();
}

sun.nio.ch.WindowsSelectorProvider
public AbstractSelector openSelector() throws IOException {
        return new WindowsSelectorImpl(this);
}

WindowsSelectorImpl(SelectorProvider sp) throws IOException {
        super(sp);
        pollWrapper = new PollArrayWrapper(INIT_CAP);
        wakeupPipe = Pipe.open();
        wakeupSourceFd = ((SelChImpl)wakeupPipe.source()).getFDVal();

        // Disable the Nagle algorithm so that the wakeup is more immediate
        SinkChannelImpl sink = (SinkChannelImpl)wakeupPipe.sink();
        (sink.sc).socket().setTcpNoDelay(true);
        wakeupSinkFd = ((SelChImpl)sink).getFDVal();

        pollWrapper.addWakeupSocket(wakeupSourceFd, 0);
}
void addWakeupSocket(int fdVal, int index) {  
    putDescriptor(index, fdVal);  
    putEventOps(index, POLLIN);  
}

接下来,以Windows的实现为准进行分析。在openSelector方法里面实例化WindowsSelectorImpl的过程中,

1).实例化了PollWrapper,pollWrapper用Unsafe类申请一块物理内存,用于存放注册时的socket句柄fdVal和event的数据结构pollfd.

2)Pipe.open()打开一个管道(打开管道的实现后面再看);拿到wakeupSourceFd和wakeupSinkFd两个文件描述符;把唤醒端的文件描述符(wakeupSourceFd)放到pollWrapper里.addWakeupSocket方法将source的POLLIN事件(有数据可读)标识为感兴趣的,当sink端有数据写入时,source对应的文件描述描wakeupSourceFd就会处于就绪状态.

public static Pipe open() throws IOException {  
          return SelectorProvider.provider().openPipe();  
} 

public Pipe openPipe() throws IOException {  
    return new PipeImpl(this);  
}  

PipeImpl(final SelectorProvider sp) throws IOException {  
    try {  
        AccessController.doPrivileged(new Initializer(sp));  
    } catch (PrivilegedActionException x) {  
        throw (IOException)x.getCause();  
    }  
} 
private Initializer(SelectorProvider sp) {
            this.sp = sp;
}
public Void run() throws IOException {
            LoopbackConnector connector = new LoopbackConnector();
            connector.run();
            ....//省略
}
private class LoopbackConnector implements Runnable {

            @Override
            public void run() {
                ServerSocketChannel ssc = null;
                SocketChannel sc1 = null;
                SocketChannel sc2 = null;

                try {
                    // Loopback address
                    InetAddress lb = InetAddress.getByName("127.0.0.1");
                    assert(lb.isLoopbackAddress());
                    InetSocketAddress sa = null;
                    for(;;) {
                        // Bind ServerSocketChannel to a port on the loopback
                        // address
                        if (ssc == null || !ssc.isOpen()) {
                            ssc = ServerSocketChannel.open();
                            ssc.socket().bind(new InetSocketAddress(lb, 0));
                            sa = new InetSocketAddress(lb, ssc.socket().getLocalPort());
                        }

                        // Establish connection (assume connections are eagerly
                        // accepted)
                        sc1 = SocketChannel.open(sa);
                        ByteBuffer bb = ByteBuffer.allocate(8);
                        long secret = rnd.nextLong();
                        bb.putLong(secret).flip();
                        sc1.write(bb);

                        // Get a connection and verify it is legitimate
                        sc2 = ssc.accept();
                        bb.clear();
                        sc2.read(bb);
                        bb.rewind();
                        if (bb.getLong() == secret)
                            break;
                        sc2.close();
                        sc1.close();
                    }

                    // Create source and sink channels
                    source = new SourceChannelImpl(sp, sc1);
                    sink = new SinkChannelImpl(sp, sc2);
                } catch (IOException e) {
                    try {
                        if (sc1 != null)
                            sc1.close();
                        if (sc2 != null)
                            sc2.close();
                    } catch (IOException e2) {}
                    ioe = e;
                } finally {
                    try {
                        if (ssc != null)
                            ssc.close();
                    } catch (IOException e2) {}
                }
            }
        }
    }

通过创建管道的代码分析:创建管道的具体实现方式也是与具体的操作系统紧密相关的,这里以Windows为例,创建了一个PipeImpl对象, AccessController.doPrivileged调用后紧接着会执行initializer的run方法,在run方法里面,windows下的实现是创建两个本地的socketChannel,然后连接(链接的过程通过写一个随机long做两个socket的链接校验),两个socketChannel分别实现了管道的source与sink端。通过查阅资料,而在Linux下则是直接使用操作系统提供的管道。

到这里,Selector.open()就完成了,总结一下,主要完成以下几件事:

1.实例化pollWrapper对象,用于将来存放注册时的socket句柄fdVal和event的数据结构pollfd。

2.根据不同操作系统实现了用于自我唤醒的管道,Windows通过创建一对自己连着自己的socket通道,Linux直接使用系统提供的管道。同时,根据linux的不同内核版本还会选择底层进行事件通知的不同机制select/poll或者epoll。

2.2 serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);通道注册

public final SelectionKey register(Selector sel, int ops, Object att)
        throws ClosedChannelException{
        synchronized (regLock) {
            SelectionKey k = findKey(sel);
            if (k != null) {
                k.interestOps(ops);
                k.attach(att);
            }
            if (k == null) {
                // New registration
                synchronized (keyLock) {
                    if (!isOpen())
                        throw new ClosedChannelException();
                    k = ((AbstractSelector)sel).register(this, ops, att);
                    addKey(k);
                }
            }
            return k;
        }
    }
 如果该channel和selector已经注册过,则直接添加事件和附件。否则通过selector实现注册过程。
protected final SelectionKey register(AbstractSelectableChannel ch,
      int ops,  Object attachment) {
    if (!(ch instanceof SelChImpl))
        throw new IllegalSelectorException();
    SelectionKeyImpl k = new SelectionKeyImpl((SelChImpl)ch, this);
    k.attach(attachment);
    synchronized (publicKeys) {
        implRegister(k);
    }
    k.interestOps(ops);
    return k;
}

protected void implRegister(SelectionKeyImpl ski) {
    synchronized (closeLock) {
        if (pollWrapper == null)
            throw new ClosedSelectorException();
        growIfNeeded();
        channelArray[totalChannels] = ski;
        ski.setIndex(totalChannels);
        fdMap.put(ski);
        keys.add(ski);
        pollWrapper.addEntry(totalChannels, ski);
        totalChannels++;
    }
}

private void growIfNeeded() {
        if (channelArray.length == totalChannels) {
            int newSize = totalChannels * 2; // Make a larger array
            SelectionKeyImpl temp[] = new SelectionKeyImpl[newSize];
            System.arraycopy(channelArray, 1, temp, 1, totalChannels - 1);
            channelArray = temp;
            pollWrapper.grow(newSize);
        }
        if (totalChannels % MAX_SELECTABLE_FDS == 0) { // more threads needed
            pollWrapper.addWakeupSocket(wakeupSourceFd, totalChannels);
            totalChannels++;
            threadsCount++;
        }
}
void addEntry(int index, SelectionKeyImpl ski) {
        putDescriptor(index, ski.channel.getFDVal());
}
 通过selector注册的过程主要完成以下几件事:
  • 以当前channel和selector为参数,初始化 SelectionKeyImpl 对象,并添加附件attachment。
  • 如果当前channel的数量totalChannels等于SelectionKeyImpl数组大小,对SelectionKeyImpl数组和pollWrapper进行扩容操作。
  • 如果totalChannels % MAX_SELECTABLE_FDS == 0,则多开一个线程处理selector。windows上select系统调用有最大文件描述符限制,一次只能轮询1024个文件描述符,如果多于1024个,需要多线程进行轮询。
  • ski.setIndex(totalChannels)选择键记录下在数组中的索引位置。
  • keys.add(ski);将选择键加入到已注册键的集合中。
  • fdMap.put(ski);保存选择键对应的文件描述符与选择键的映射关系。
  • pollWrapper.addEntry将把selectionKeyImpl中的socket句柄添加到对应的pollfd。
  • k.interestOps(ops)方法最终也会把event添加到对应的pollfd。

2.3 selector.select();

public int select() throws IOException {
        return select(0);
}
public int select(long timeout) throws IOException  
{  
    if (timeout < 0)  
        throw new IllegalArgumentException("Negative timeout");  
    return lockAndDoSelect((timeout == 0) ? -1 : timeout);  
}  
private int lockAndDoSelect(long timeout) throws IOException {  
    synchronized (this) {  
        if (!isOpen())  
            throw new ClosedSelectorException();  
        synchronized (publicKeys) {  
            synchronized (publicSelectedKeys) {  
                return doSelect(timeout);  
            }  
        }  
    }  
}
当调用selector.select()以及select(0)时,JDK对参数进行修正,其实传给doSelect的timeout为-1。当调用的是selectNow()的时候,timeout则为0,直接以负数作为参数则会抛出异常,其中的doSelector又回到我们的Windows实现: