Netty 笔记

netty核心--

包括:

Netty的技术和架构方面

Channel,EventLoop和ChannelFuture

ChannelHandler和ChannelPipeline

Bootstrapping

Channel—Sockets

EventLoop—控制流,多线程,并发

ChannelFuture—异步通知

channel:

基本的I/O操作(bind(),connect(),read()和write())依赖于底层网络传输层提供的基本类型(primitives)。在基于Java的网络编程中,基本的构造单元就是Socket类。Netty的Channel接口提供了一个API,极大地降低了直接使用Socket的复杂性

channel的实现:

EmbeddedChannel

LocalServerChannel

NioDatagramChannel

NioSctpChannel

NioSocketChannel

channel对应的方法;;Channel是线程安全的,,所以可以在多线程情况下,给指定的客户端写数据,

*eventLoop返回分配的EventLoop给Channel

Pipeline返回分配的ChannelPipeline给Channel

isActive如果Channel是活跃的返回true。活跃的含义可能依赖底层的传输方式。比如,一个Socket传输在连接到远端时变为活跃状态,而一个Datagram传输在开始时变为活跃状态。

localAddress返回本地的SocketAddress

remoteAddress返回远端的SocketAddress

write写数据到远端。数据被送到ChannelPipeline然后排在队列中直到被刷新

flush刷新之前已写的数据到底下传输层,比如一个Socket中

writeAndFlush先后调用write()和flush()的便捷方法

EventLoop:

EventLoop定义了Netty的核心抽象,用来处理发生在一个连接生命周期中的event。我们会在第七章讲Netty的线程模型时详细讨论EventLoop

一个EventLoopGroup包含一个或者多个EventLoop

所有的EventLoop在它的整个生命周期里都绑定在一个线程上

所有被一个EventLoop处理的event都是由EventLoop绑定的线程处理的

一个Channel在它的生命周期内只注册到一个EventLoop上

一个EventLoop可以被分配给一个或者多个Channel。

实现细节:

Netty线程模型的出色性能取决于判断当前执行线程是谁;也就是说,它是否是绑定到当前Channel和EventLoop的线程。(回忆下,EventLoop负责处理一个Channel在整个生命周期中的所有event)

如果当前调用线程(callingthread)就是分配给该Channel的EventLoop,代码被执行。否则,EventLoop将task放入一个内部的队列延后执行。当EventLoop处理它的event时,它会执行队列中的这些task。这就解释了为何任何线程都可以直接和Channel交互,而不需要为ChannelHandler加上同步。

http://ifeve.com/netty-in-action-7/

ChannelFuture:

etty中所有的I/O操作都是异步的。因为一个操作可能不会立刻返回,我们需要一个方法在稍后的时间来判定它的结果。因此,Netty提供了ChannelFuture,它的addListener()方法注册一个ChannelFutureListener,当操作完成时可以收到通知(无论成功与否)

对应java中的future,用于追踪异步操作,可以认为是异步操作的占位符,,且结合回调函数使用

ChannelHandler:

生命周期:

channel的生命周期:

ChannelUnregisteredChannel已创建,还未注册到一个EventLoop上

ChannelRegisteredChannel已经注册到一个EventLoop上

ChannelActiveChannel是活跃状态(连接到某个远端),可以收发数据

ChannelInactiveChannel未连接到远端

对应的handler的方法,,

handlerAdded当ChannelHandler被添加到一个ChannelPipeline时被调用

handlerRemoved当ChannelHandler从一个ChannelPipeline中移除时被调用

exceptionCaught处理过程中ChannelPipeline中发生错误时被调用

Netty定义的两个ChannelHandler子接口,,方法较多,可以使用适配器,,你可以用ChannelInboundHandlerAdapter和ChannelOutboundHandlerAdapter这两个类。这两个适配器类分别提供了ChannelInboundHandler和ChannelOutboundHandler的基本实现。它们继承了共同的父接口ChannelHandler的方法,扩展了抽象类ChannelHandlerAdapter。

ChannelInboundHandler——处理输入数据和所有类型的状态变化;API:

channelRegistered当一个Channel注册到EventLoop上,可以处理I/O时被调用

channelUnregistered当一个Channel从它的EventLoop上解除注册,不再处理I/O时被调用

channelActive当Channel变成活跃状态时被调用;Channel是连接/绑定、就绪的

channelInactive当Channel离开活跃状态,不再连接到某个远端时被调用

channelReadComplete当Channel上的某个读操作完成时被调用

channelRead当从Channel中读数据时被调用

channelWritabilityChanged当Channel的可写状态改变时被调用。通过这个方法,用户可以确保写操作不会进行地太快(避免OutOfMemoryError)或者当Channel又变成可写时继续写操作。Channel类的isWritable()方法可以用来检查Channel的可写状态。可写性的阈值可以通过Channel.config().setWriteHighWaterMark()和Channel.config().setWriteLowWaterMark()来设定。

userEventTriggered因某个POJO穿过ChannelPipeline引发ChannelnboundHandler.fireUserEventTriggered()时被调用

ChannelOutboundHandler——处理输出数据,可以拦截所有操作;API

bind(ChannelHandlerContext,SocketAddress,ChannelPromise)请求绑定Channel到一个本地地址

connect(ChannelHandlerContext,SocketAddress,SocketAddress,ChannelPromise)请求连接Channel到远端

disconnect(ChannelHandlerContext,ChannelPromise)请求从远端断开Channel

close(ChannelHandlerContext,ChannelPromise)请求关闭Channel

deregister(ChannelHandlerContext,ChannelPromise)请求Channel从它的EventLoop上解除注册

read(ChannelHandlerContext)请求从Channel中读更多的数据

flush(ChannelHandlerContext)请求通过Channel刷队列数据到远端

write(ChannelHandlerContext,Object,ChannelPromise)请求通过Channel写数据到远端

从应用开发者的角度来看,Netty的主要组件是ChannelHandler,它作为一个应用逻辑的容器,处理输入输出数据。这成为可能,是因为ChannelHandler的方法是被网络event(这里的event是广义的)触发的

ChannelHandlerContex:

handler的上下文,环境,ChannelHandlerContext代表了一个ChannelHandler和一个ChannelPipeline之间的关系,它在ChannelHandler被添加到ChannelPipeline时被创建。ChannelHandlerContext的主要功能是管理它对应的ChannelHandler和属于同一个ChannelPipeline的其他ChannelHandler之间的交互。ChannelHandlerContext有很多方法,其中一些方法Channel和ChannelPipeline也有,但是有些区别。如果你在Channel或者ChannelPipeline实例上调用这些方法,它们的调用会穿过整个pipeline。而在ChannelHandlerContext上调用的同样的方法,仅仅从当前ChannelHandler开始,走到pipeline中下一个可以处理这个event的ChannelHandler。

ChannelHandlerContextAPI:

bind绑定到给定的SocketAddress,返回一个ChannelFuture

channel返回绑定的Channel

close关闭Channel,返回一个ChannelFuture

connect连接到给定的SocketAddress,返回一个ChannelFuture

deregister从先前分配的EventExecutor上解除注册,返回一个ChannelFuture

disconnect从远端断开,返回一个ChannelFuture

executor返回分发event的EventExecutor

fireChannelActive触发调用下一个ChannelInboundHandler的channelActive()(已连接)

fireChannelInactive触发调用下一个ChannelInboundHandler的channelInactive()(断开连接)

fireChannelRead触发调用下一个ChannelInboundHandler的channelRead()(收到消息)

fireChannelReadComplete触发channelWritabilityChangedevent到下一个ChannelInboundHandler

handler返回绑定的ChannelHandler

isRemoved如果绑定的ChannelHandler已从ChannelPipeline中删除,返回true

name返回本ChannelHandlerContext实例唯一的名字

Pipeline返回绑定的ChannelPipeline

read从Channel读数据到第一个输入buffer;如果成功,触发一条channelReadevent,通知handlerchannelReadComplete

write通过本ChannelHandlerContext写消息穿过pipeline

ChannelPipeline:

一个ChannelPipeline为一串ChannelHandler提供了一个容器,同时定义了在这一串ChannelHandler中传送输入输出event的API。当一个Channel被创建,它会自动被分配一个ChannelPipeline。

ChannelHandler的执行和阻塞

通常ChannelPipeline中的每个ChannelHandler通过它的EventLoop(I/O线程)来处理传给它的event。不阻塞这个线程是非常重要的,因为阻塞有可能会对整个I/O处理造成负面影响。

有时候,我们会需要跟调用了阻塞API的现存代码交互。在这种情况下,ChannelPipeline的一些add()方法支持EventExcutorGroup。如果event被送往一个定制的EventExcutorGroup,它会被这个EventExcutorGroup中的一个EventExecutor处理,由此脱离了Channel的EventLoop。针对这种场景,Netty提供了一个具体实现,称为DefaultEventExecutorGroup。

ChannelPipeline的ChannelHandler相关方法:

addFirstaddBeforeaddAfteraddLast添加一个ChannelHandler到ChannelPipeline

remove从ChannelPipeline删除一个ChannelHandler

replace用一个ChannelHandler替换ChannelPipeline中的另一个ChannelHandler

get通过类型或者名字返回一个ChannelHandler

context返回ChannelHandler绑定的ChannelHandlerContext

names返回ChannelPipeline中的所有ChannelHandler的名字

ChannelPipeline输入方法:

fireChannelRegistered调用ChannelPipeline中下一个ChannelInboundHandler的channelRegistered(ChannelHandlerContext)

fireChannelUnregistered调用ChannelPipeline中下一个ChannelInboundHandler的channelUnRegistered(ChannelHandlerContext)

fireChannelActive调用ChannelPipeline中下一个ChannelInboundHandler的channelActive(ChannelHandlerContext)

fireChannelInactive调用ChannelPipeline中下一个ChannelInboundHandler的channelInactive(ChannelHandlerContext)

fireExceptionCaught调用ChannelPipeline中下一个ChanneHandler的exceptionCaught(ChannelHandlerContext,Throwable)

fireUserEventTriggered调用ChannelPipeline中下一个ChannelInboundHandler的userEventTriggered(ChannelHandlerContext,Object)

fireChannelRead调用ChannelPipeline中下一个ChannelInboundHandler的channelRead(ChannelHandlerContext,Objectmsg)

fireChannelReadComplete调用ChannelPipeline中下一个ChannelStateHandler的channelReadComplete(ChannelHandlerContext)

ChannelPipeline输出操作

bind绑定Channel到一个本地地址。这会调用ChannelPipeline中下一个ChannelOutboundHandler的bind(ChannelHandlerContext,SocketAddress,ChannelPromise)

connect连接Channel到一个远端地址。这会调用ChannelPipeline中下一个ChannelOutboundHandler的connect(ChannelHandlerContext,SocketAddress,ChannelPromise)

disconnect断开Channel。这会调用ChannelPipeline中下一个ChannelOutboundHandler的disconnect(ChannelHandlerContext,ChannelPromise)

close关闭Channel。这会调用ChannelPipeline中下一个ChannelOutboundHandler的close(ChannelHandlerContext,ChannelPromise)

deregisterChannel从它之前分配的EventLoop上解除注册。这会调用ChannelPipeline中下一个ChannelOutboundHandler的deregister(ChannelHandlerContext,ChannelPromise)

flush刷所有Channel待写的数据。这会调用ChannelPipeline中下一个ChannelOutboundHandler的flush(ChannelHandlerContext)

write往Channel写一条消息。这会调用ChannelPipeline中下一个ChannelOutboundHandler的write(ChannelHandlerContext,Objectmsg,ChannelPromise)注意:不会写消息到底层的Socket,只是排队等候。如果要写到Socket中,调用flush()或者writeAndFlush()

writeAndFlush这是先后调用write()和flush()的便捷方法。

read请求从Channel中读更多的数据。这会调用ChannelPipeline中下一个ChannelOutboundHandler的read(ChannelHandlerContext)

Bootstrapping:

Netty的bootstrap类为一个应用的网络层配置提供了容器,包括绑定一个进程到给定的端口,或者将一个进程连接到跑在另一个特定主机和端口上的进程。

类型BootstrapServerBootstrap

网络功能连接到一个远程主机和端口绑定到一个本地端口

EventLoopGroups数量12

netty传输模式

传输支持的多种模式:分别对应的privatefinalEventLoopGroupbossGroup=newNioEventLoopGroup(BIZGROUPSIZE);//非阻塞模式,OioEventLoopGroup为阻塞和b.channel(NioServerSocketChannel.class);//非阻塞,OIO(Old-Blocking-IO.)阻塞;NioDatagramChannelUDP相关

NIOio.netty.channel.socket.nio采用java.nio.channels包,一个基于Selector的方式。

Epollio.netty.channel.epoll采用epoll()的JNI和非阻塞IO。这个模式支持只有Linux平台才有的特性,比如SO_REUSEPORT,比NIO方式快,而且是完全非阻塞的。

OIOio.netty.channel.socket.oio采用java.net包,使用阻塞流(blockingstreams)。

Localio.netty.channel.local本地传输可以用于VM内部管道通信。

Embededio.netty.channel.embedded嵌入式传输,可以用不需要真正网络传输的ChannelHandler。对于测试你的ChannelHandler非常有用。

OP_ACCEPT当一个新的连接被接受,一个Channel创建时请求收到通知

OP_CONNECT当一个连接建立时请求收到通知

OP_READ当数据已准备好从Channel中被读取时请求收到通知

OP_WRITE当可以往一个Channel中写入更多数据时请求收到通知。这个用在socket缓冲完全满的情况下,通常发生在当数据传送比远端处理快很多的时候。

TransportTCPUDPSCTP*UDT

NIOXXXX

Epoll(Linuxonly)XX——

OIOXXXX

nettyByteBuf

ByteBuf-Netty的数据容器,替代了ByteBuffer,这个强大的实现突破了JDKAPI的限制,为网络开发者提供了更好的API。

ByteBuf包含两种不同的索引:一个用于读,另一个用于写。当你从一个ByteBuf读数据的时候,它的readerIndex按读取的字节数量递增。同样地,当你往一个ByteBuf中写数据时,它的writerIndex递增。图5.1是一个空ByteBuf的布局和状态图,都是0

另外还有capacityindex,最大,当开始都是0,当read和write重合的时候也就是尾,当write和capacity重合的时候也就不能写了。

读/写操作:

get()和set()操作从一个指定的索引开始操作,不改变索引的值

read()和write()操作从一个指定的索引开始操作,随着读/写的字节数调整索引值

get方法

getBoolean(int)返回指定索引位置的boolean值

getByte(int)返回指定索引位置的byte值

getUnsignedByte(int)返回指定索引位置的无符号byte值(返回类型是short)

getMedium(int)返回指定索引位置的24位(medium)值

getUnsignedMedium(int)返回指定索引位置的无符号24位(medium)值

getInt(int)返回指定索引位置的int值

getUnsignedInt(int)返回指定索引位置的无符号int值(返回类型是long)

getLong(int)返回指定索引位置的long值

getShort(int)返回指定索引位置的short值

getUnsignedShort(int)返回指定索引位置的无符号short值(返回类型是int)

getBytes(int,…)传送buffer数据到指定索引位置开始的目标位置?

set方法

setBoolean(int,boolean)在指定索引位置设置boolean值

setByte(intindex,intvalue)在指定索引位置设置Byte值

setMedium(intindex,intvalue)在指定索引位置设置24位(medium)值

setInt(intindex,intvalue)在指定索引位置设置int值

setLong(intindex,longvalue)在指定索引位置设置long值

setShort(intindex,intvalue)在指定索引位置设置short值

read方法;

readBoolean()返回当前readerIndex位置的boolean值,然后readerIndex加1

readByte()返回当前readerIndex位置的byte值,然后readerIndex加1

readUnsignedByte()返回当前readerIndex位置的无符号byte值(返回类型是short),然后readerIndex加1

readMedium()返回当前readerIndex位置的24位(medium)值,然后readerIndex加3

readUnsignedMedium()返回当前readerIndex位置的无符号24位(medium)值,然后readerIndex加3

readInt()返回当前readerIndex位置的int值,然后readerIndex加4

readUnsignedInt()返回当前readerIndex位置的无符号int值(返回类型是long),然后readerIndex加4

readLong()返回当前readerIndex位置的long值,然后readerIndex加8

readShort()返回当前readerIndex位置的short值,然后readerIndex加2

readUnsignedShort()返回当前readerIndex位置的无符号short值(返回类型是int),然后readerIndex加2

readBytes(ByteBuf|byte[]

destination,

intdstIndex[,intlength])

从readerIndex位置(如果有指定,长度是length字节)开始,传送当前ByteBuf数据到目标ByteBuf或者byte[]。当前ByteBuf的readerIndex值根据已传送的字节数增长

write方法

writeBoolean(boolean)在当前writerIndex位置写入boolean值,然后writerIndex加1

writeByte(int)在当前writerIndex位置写入byte值,然后writerIndex加1

writeMedium(int)在当前writerIndex位置写入medium值,然后writerIndex加3

writeInt(int)在当前writerIndex位置写入int值,然后writerIndex加4

writeLong(long)在当前writerIndex位置写入long值,然后writerIndex加8

writeShort(int)在当前writerIndex位置写入short值,然后writerIndex加2

writeBytes(sourceByteBuf|

byte[][,intsrcIndex

,intlength])

从当前writerIndex位置开始,从指定的源ByteBuf或者byte[]传送数据到当前ByteBuf。如果输入参数中提供了srcIndex和length,那么从srcIndex开始读出length长的字节。当前ByteBuf的writerIndex值根据已写入的字节数增长。

更多操作:

isReadable()如果至少一字节大小的数据可读,返回true

isWritable()如果至少一字节大小的数据可写,返回true

readableBytes()返回可读的字节数

writableBytes()返回可写的字节数

capacity()返回ByteBuf可容纳的字节数。调用该方法后,ByteBuf会试着扩展容量直到到达maxCapacity()

maxCapacity()返回ByteBuf可容纳的最大字节数

hasArray()如果ByteBuf有一个字节数组,返回true

Array()如果ByteBuf有一个字节数组,返回该数组;否则,抛出异常,UnsupportedOperationException

ByteBuf的分配

ByteBufAllocator:按需分配可以从channle和channelctx中获取ByteBufAllocatorallco=channel.alloc();;PoolByteBufAllocator和UnpooledByteBufAllocator。前者将ByteBuf实例放入池中,提高了性能,将内存碎片减少到最小。这个实现采用了一种内存分配的高效策略,称为jemalloc。它已经被好几种现代操作系统所采用。后者则没有把ByteBuf放入池中,每次被调用时,返回一个新的ByteBuf实例。

api方法

buffer();buffer(intinitialCapacity);buffer(intinitialCapacity,intmaxCapacity);返回heap或者directByteBuf

heapBuffer();heapBuffer(intinitialCapacity);heapBuffer(intinitialCapacity,intmaxCapacity);返回heapByteBuf

directBuffer();directBuffer(intinitialCapacity);directBuffer(intinitialCapacity,intmaxCapacity);返回directByteBuf

compositeBuffer();compositeBuffer(intmaxNumComponents);compositeDirectBuffer();compositeDirectBuffer(intmaxNumComponents);compositeHeapBuffer();compositeHeapBuffer(intmaxNumComponents);返回CoompositeByteBuf,可以根据指定的

component数量来扩展heap或者directbuffer

ioBuffer()返回一个用于socketI/O操作的ByteBuf

工具类:Unpooledbuffers

有些时候,你不能获取到ByteBufAllocator的引用。在这种情况下,Netty有一个被称为Unpooled的工具类,提供了一些静态的辅助方法(helpermethods)来创建unpooledByteBuf实例。表5.8列出了其中最重要的几个方法。

API方法:

buffer();buffer(intinitialCapacity);buffer(intinitialCapacity,intmaxCapacity)返回heapByteBuf

directBuffer();directBuffer(intinitialCapacity);directBuffer(intinitialCapacity,intmaxCapacity)返回directByteBuf

wrappedBuffer()返回wrappedByteBuf

copiedBuffer()返回copiedByteBuf

工具类:ByteBufUtil

ByteBufUtil提供了一些操作ByteBuf的静态辅助方法。因为这个ByteBufUtilAPI是通用的,并且和内存池并无关联,所以这些辅助方法是在buffer类之外实现的

相关推荐