Java NIO源码分析
1.前言
JDK1.4之前的传统阻塞IO(BIO),服务端需要为每一个客户端连接创建单独的线程为其服务,从JDK1.4开始NIO非阻塞式IO出现,它只需要单独的一个线程就能接收多个客户端请求,而真正处理各个请求的细节可以使用多线程的方式高效率的完成,这些处理线程与具体的业务逻辑分离,做到了IO的复用。
2.源码分析
首先以一段典型的NIO使用代码开始:
Selector selector = Selector.open(); ServerSocketChannel ssc = ServerSocketChannel.open(); ssc.configureBlocking(false); ssc.socket().bind(new InetSocketAddress(9527)); ssc.register(selector, SelectionKey.OP_ACCEPT); while(true){ int n = selector.select(); if (n <= 0) continue; Iterator it = selector.selectedKeys().iterator(); while(it.hasNext()){ SelectionKey key = (SelectionKey)it.next(); if (key.isAcceptable()){ SocketChannel sc= ((ServerSocketChannel) key.channel()).accept(); sc.configureBlocking(false); sc.register(key.selector(), SelectionKey.OP_READ|SelectionKey.OP_WRITE); } if (key.isReadable()){ SocketChannel channel = ((SocketChannel) key.channel()); ByteBuffer bf = ByteBuffer.allocate(10); int read = channel.read(bf); System.out.println("read "+read+" : "+new String(bf.array()).trim()); } if (key.isWritable()){ SocketChannel channel = ((SocketChannel) key.channel()); channel.write(ByteBuffer.wrap(new String("hello client").getBytes())); } it.remove(); } }
2.1 Selector.open() 获取选择器。
public static Selector open() throws IOException { return SelectorProvider.provider().openSelector(); } public static SelectorProvider provider() { synchronized (lock) { if (provider != null) return provider; return AccessController.doPrivileged( new PrivilegedAction<SelectorProvider>() { public SelectorProvider run() { if (loadProviderFromProperty()) return provider; if (loadProviderAsService()) return provider; provider = sun.nio.ch.DefaultSelectorProvider.create(); return provider; } }); } }
从Selector源码中可以看到,open方法是交给selectorProvider处理的。 其中provider = sun.nio.ch.DefaultSelectorProvider.create();会根据操作系统来返回不同的实现类,windows平台就返回WindowsSelectorProvider;Linux平台会根据不同的内核版本选择是使用select/poll模式还是epoll模式。
public static SelectorProvider create() { PrivilegedAction pa = new GetPropertyAction("os.name"); String osname = (String) AccessController.doPrivileged(pa); if ("SunOS".equals(osname)) { return new sun.nio.ch.DevPollSelectorProvider(); } // use EPollSelectorProvider for Linux kernels >= 2.6 if ("Linux".equals(osname)) { pa = new GetPropertyAction("os.version"); String osversion = (String) AccessController.doPrivileged(pa); String[] vers = osversion.split("\\.", 0); if (vers.length >= 2) { try { int major = Integer.parseInt(vers[0]); int minor = Integer.parseInt(vers[1]); if (major > 2 || (major == 2 && minor >= 6)) { return new sun.nio.ch.EPollSelectorProvider(); } } catch (NumberFormatException x) { // format not recognized } } } return new sun.nio.ch.PollSelectorProvider(); } sun.nio.ch.EPollSelectorProvider public AbstractSelector openSelector() throws IOException { return new EPollSelectorImpl(this); } sun.nio.ch.PollSelectorProvider public AbstractSelector openSelector() throws IOException { return new PollSelectorImpl(this); }
可以看到,如果Linux内核版本>=2.6则,具体的SelectorProvider为EPollSelectorProvider,否则为默认的PollSelectorProvider,实际上这是在JDK5U9之后才有这样的更新。
public static SelectorProvider create() { return new sun.nio.ch.WindowsSelectorProvider(); } sun.nio.ch.WindowsSelectorProvider public AbstractSelector openSelector() throws IOException { return new WindowsSelectorImpl(this); } WindowsSelectorImpl(SelectorProvider sp) throws IOException { super(sp); pollWrapper = new PollArrayWrapper(INIT_CAP); wakeupPipe = Pipe.open(); wakeupSourceFd = ((SelChImpl)wakeupPipe.source()).getFDVal(); // Disable the Nagle algorithm so that the wakeup is more immediate SinkChannelImpl sink = (SinkChannelImpl)wakeupPipe.sink(); (sink.sc).socket().setTcpNoDelay(true); wakeupSinkFd = ((SelChImpl)sink).getFDVal(); pollWrapper.addWakeupSocket(wakeupSourceFd, 0); } void addWakeupSocket(int fdVal, int index) { putDescriptor(index, fdVal); putEventOps(index, POLLIN); }
接下来,以Windows的实现为准进行分析。在openSelector方法里面实例化WindowsSelectorImpl的过程中,
1).实例化了PollWrapper,pollWrapper用Unsafe类申请一块物理内存,用于存放注册时的socket句柄fdVal和event的数据结构pollfd.
2)Pipe.open()打开一个管道(打开管道的实现后面再看);拿到wakeupSourceFd和wakeupSinkFd两个文件描述符;把唤醒端的文件描述符(wakeupSourceFd)放到pollWrapper里.addWakeupSocket方法将source的POLLIN事件(有数据可读)标识为感兴趣的,当sink端有数据写入时,source对应的文件描述描wakeupSourceFd就会处于就绪状态.
public static Pipe open() throws IOException { return SelectorProvider.provider().openPipe(); } public Pipe openPipe() throws IOException { return new PipeImpl(this); } PipeImpl(final SelectorProvider sp) throws IOException { try { AccessController.doPrivileged(new Initializer(sp)); } catch (PrivilegedActionException x) { throw (IOException)x.getCause(); } } private Initializer(SelectorProvider sp) { this.sp = sp; } public Void run() throws IOException { LoopbackConnector connector = new LoopbackConnector(); connector.run(); ....//省略 } private class LoopbackConnector implements Runnable { @Override public void run() { ServerSocketChannel ssc = null; SocketChannel sc1 = null; SocketChannel sc2 = null; try { // Loopback address InetAddress lb = InetAddress.getByName("127.0.0.1"); assert(lb.isLoopbackAddress()); InetSocketAddress sa = null; for(;;) { // Bind ServerSocketChannel to a port on the loopback // address if (ssc == null || !ssc.isOpen()) { ssc = ServerSocketChannel.open(); ssc.socket().bind(new InetSocketAddress(lb, 0)); sa = new InetSocketAddress(lb, ssc.socket().getLocalPort()); } // Establish connection (assume connections are eagerly // accepted) sc1 = SocketChannel.open(sa); ByteBuffer bb = ByteBuffer.allocate(8); long secret = rnd.nextLong(); bb.putLong(secret).flip(); sc1.write(bb); // Get a connection and verify it is legitimate sc2 = ssc.accept(); bb.clear(); sc2.read(bb); bb.rewind(); if (bb.getLong() == secret) break; sc2.close(); sc1.close(); } // Create source and sink channels source = new SourceChannelImpl(sp, sc1); sink = new SinkChannelImpl(sp, sc2); } catch (IOException e) { try { if (sc1 != null) sc1.close(); if (sc2 != null) sc2.close(); } catch (IOException e2) {} ioe = e; } finally { try { if (ssc != null) ssc.close(); } catch (IOException e2) {} } } } }
通过创建管道的代码分析:创建管道的具体实现方式也是与具体的操作系统紧密相关的,这里以Windows为例,创建了一个PipeImpl对象, AccessController.doPrivileged调用后紧接着会执行initializer的run方法,在run方法里面,windows下的实现是创建两个本地的socketChannel,然后连接(链接的过程通过写一个随机long做两个socket的链接校验),两个socketChannel分别实现了管道的source与sink端。通过查阅资料,而在Linux下则是直接使用操作系统提供的管道。
到这里,Selector.open()就完成了,总结一下,主要完成以下几件事:
1.实例化pollWrapper对象,用于将来存放注册时的socket句柄fdVal和event的数据结构pollfd。
2.根据不同操作系统实现了用于自我唤醒的管道,Windows通过创建一对自己连着自己的socket通道,Linux直接使用系统提供的管道。同时,根据linux的不同内核版本还会选择底层进行事件通知的不同机制select/poll或者epoll。
2.2 serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);通道注册
public final SelectionKey register(Selector sel, int ops, Object att) throws ClosedChannelException{ synchronized (regLock) { SelectionKey k = findKey(sel); if (k != null) { k.interestOps(ops); k.attach(att); } if (k == null) { // New registration synchronized (keyLock) { if (!isOpen()) throw new ClosedChannelException(); k = ((AbstractSelector)sel).register(this, ops, att); addKey(k); } } return k; } }如果该channel和selector已经注册过,则直接添加事件和附件。否则通过selector实现注册过程。
protected final SelectionKey register(AbstractSelectableChannel ch, int ops, Object attachment) { if (!(ch instanceof SelChImpl)) throw new IllegalSelectorException(); SelectionKeyImpl k = new SelectionKeyImpl((SelChImpl)ch, this); k.attach(attachment); synchronized (publicKeys) { implRegister(k); } k.interestOps(ops); return k; } protected void implRegister(SelectionKeyImpl ski) { synchronized (closeLock) { if (pollWrapper == null) throw new ClosedSelectorException(); growIfNeeded(); channelArray[totalChannels] = ski; ski.setIndex(totalChannels); fdMap.put(ski); keys.add(ski); pollWrapper.addEntry(totalChannels, ski); totalChannels++; } } private void growIfNeeded() { if (channelArray.length == totalChannels) { int newSize = totalChannels * 2; // Make a larger array SelectionKeyImpl temp[] = new SelectionKeyImpl[newSize]; System.arraycopy(channelArray, 1, temp, 1, totalChannels - 1); channelArray = temp; pollWrapper.grow(newSize); } if (totalChannels % MAX_SELECTABLE_FDS == 0) { // more threads needed pollWrapper.addWakeupSocket(wakeupSourceFd, totalChannels); totalChannels++; threadsCount++; } } void addEntry(int index, SelectionKeyImpl ski) { putDescriptor(index, ski.channel.getFDVal()); }通过selector注册的过程主要完成以下几件事:
- 以当前channel和selector为参数,初始化 SelectionKeyImpl 对象,并添加附件attachment。
- 如果当前channel的数量totalChannels等于SelectionKeyImpl数组大小,对SelectionKeyImpl数组和pollWrapper进行扩容操作。
- 如果totalChannels % MAX_SELECTABLE_FDS == 0,则多开一个线程处理selector。windows上select系统调用有最大文件描述符限制,一次只能轮询1024个文件描述符,如果多于1024个,需要多线程进行轮询。
- ski.setIndex(totalChannels)选择键记录下在数组中的索引位置。
- keys.add(ski);将选择键加入到已注册键的集合中。
- fdMap.put(ski);保存选择键对应的文件描述符与选择键的映射关系。
- pollWrapper.addEntry将把selectionKeyImpl中的socket句柄添加到对应的pollfd。
- k.interestOps(ops)方法最终也会把event添加到对应的pollfd。
2.3 selector.select();
public int select() throws IOException { return select(0); } public int select(long timeout) throws IOException { if (timeout < 0) throw new IllegalArgumentException("Negative timeout"); return lockAndDoSelect((timeout == 0) ? -1 : timeout); } private int lockAndDoSelect(long timeout) throws IOException { synchronized (this) { if (!isOpen()) throw new ClosedSelectorException(); synchronized (publicKeys) { synchronized (publicSelectedKeys) { return doSelect(timeout); } } } }当调用selector.select()以及select(0)时,JDK对参数进行修正,其实传给doSelect的timeout为-1。当调用的是selectNow()的时候,timeout则为0,直接以负数作为参数则会抛出异常,其中的doSelector又回到我们的Windows实现: