Netty的Nio写优化

  今天又看了一遍boyan的《Nip trick and trip》,又有一些新的收获。比以前看地更深刻了。其中有一处写到Nio的写优化,当Nio的channel设置为nonblocking时,写操作不再阻塞,而是直接返回0,表示当时socket的写缓冲区已满不可写。通常的处理办法是返回本次loop并继续注册op_read或者op_write。这样会带来线程切换的开销,使当前select阻塞,直到有可写发生再次唤醒。Netty是怎么解决的?以前虽然看过Netty的代码但并没有深入到细节中去,看了boyanNetty采用writeSpinCount的介绍才深有感悟。好多知识都在细节里,就看我们对这个领域掌握地深不深,如果蜻蜓点水肯定发现了这些看似简单的代码的玄妙之处。话归正题,writeSpinCount如何解决这个问题的呢?类似自旋锁,还是看看Netty的代码吧,比较形象。

for (;;) {
                MessageEvent evt = channel.currentWriteEvent;
                SendBuffer buf;
                if (evt == null) {
                    if ((channel.currentWriteEvent = evt = writeBuffer.poll()) == null) {
                       //写事件队列为空,此时应该去除op_write
                        removeOpWrite = true;
                        channel.writeSuspended = false;
                        break;
                    }

                    channel.currentWriteBuffer = buf = sendBufferPool.acquire(evt.getMessage());
                } else {
                    buf = channel.currentWriteBuffer;
                }

                ChannelFuture future = evt.getFuture();
                try {
                    long localWrittenBytes = 0;
                    for (int i = writeSpinCount; i > 0; i --) {//此处为writeSpinCount的解决方式
                       //向channel写数据,为了使其fileChannel的transferTo接口一致。这样从file写向socket时避免四次copy,sendfile的功能
                        localWrittenBytes = buf.transferTo(ch);
                        //写入的数据大于0,跳出本次循环,如为零则循环writeSpinCount,如再次期间localWrittenBytes>0,说明再次可写。
                        if (localWrittenBytes != 0) {
                            writtenBytes += localWrittenBytes;
                            break;
                        }
                        if (buf.finished()) {
                            break;
                        }
                    }

                    if (buf.finished()) {
                        // Successful write - proceed to the next message.
                        //完整地写完,则继续处理下一个消息
                        buf.release();
                        channel.currentWriteEvent = null;
                        channel.currentWriteBuffer = null;
                        evt = null;
                        buf = null;
                        future.setSuccess();
                    } else {
                        // Not written fully - perhaps the kernel buffer is full.
                        //没能完整地写完当前的message,则继续设置op_write,并跳出外层循环
                        addOpWrite = true;
                        channel.writeSuspended = true;

                        if (localWrittenBytes > 0) {
                            // Notify progress listeners if necessary.
                            future.setProgress(
                                    localWrittenBytes,
                                    buf.writtenBytes(), buf.totalBytes());
                        }
                        break;
                    }
                } catch (AsynchronousCloseException e) {
                    // Doesn't need a user attention - ignore.
                } catch (Throwable t) {
                    if (buf != null) {
                        buf.release();
                    }
                    channel.currentWriteEvent = null;
                    channel.currentWriteBuffer = null;
                    buf = null;
                    evt = null;
                    future.setFailure(t);
                    if (iothread) {
                        fireExceptionCaught(channel, t);
                    } else {
                        fireExceptionCaughtLater(channel, t);
                    }
                    if (t instanceof IOException) {
                        open = false;
                        close(channel, succeededFuture(channel));
                    }
                }
            }
            channel.inWriteNowLoop = false;

            // Initially, the following block was executed after releasing
            // the writeLock, but there was a race condition, and it has to be
            // executed before releasing the writeLock:
            //
            //     https://issues.jboss.org/browse/NETTY-410
            //
            if (open) {
                if (addOpWrite) {
                    setOpWrite(channel);
                } else if (removeOpWrite) {
                    clearOpWrite(channel);
                }
            }
        }

相关推荐