Netty 笔记
netty核心--
包括:
Netty的技术和架构方面
Channel,EventLoop和ChannelFuture
ChannelHandler和ChannelPipeline
Bootstrapping
Channel—Sockets
EventLoop—控制流,多线程,并发
ChannelFuture—异步通知
channel:
基本的I/O操作(bind(),connect(),read()和write())依赖于底层网络传输层提供的基本类型(primitives)。在基于Java的网络编程中,基本的构造单元就是Socket类。Netty的Channel接口提供了一个API,极大地降低了直接使用Socket的复杂性
channel的实现:
EmbeddedChannel
LocalServerChannel
NioDatagramChannel
NioSctpChannel
NioSocketChannel
channel对应的方法;;Channel是线程安全的,,所以可以在多线程情况下,给指定的客户端写数据,
*eventLoop返回分配的EventLoop给Channel
Pipeline返回分配的ChannelPipeline给Channel
isActive如果Channel是活跃的返回true。活跃的含义可能依赖底层的传输方式。比如,一个Socket传输在连接到远端时变为活跃状态,而一个Datagram传输在开始时变为活跃状态。
localAddress返回本地的SocketAddress
remoteAddress返回远端的SocketAddress
write写数据到远端。数据被送到ChannelPipeline然后排在队列中直到被刷新
flush刷新之前已写的数据到底下传输层,比如一个Socket中
writeAndFlush先后调用write()和flush()的便捷方法
EventLoop:
EventLoop定义了Netty的核心抽象,用来处理发生在一个连接生命周期中的event。我们会在第七章讲Netty的线程模型时详细讨论EventLoop
一个EventLoopGroup包含一个或者多个EventLoop
所有的EventLoop在它的整个生命周期里都绑定在一个线程上
所有被一个EventLoop处理的event都是由EventLoop绑定的线程处理的
一个Channel在它的生命周期内只注册到一个EventLoop上
一个EventLoop可以被分配给一个或者多个Channel。
实现细节:
Netty线程模型的出色性能取决于判断当前执行线程是谁;也就是说,它是否是绑定到当前Channel和EventLoop的线程。(回忆下,EventLoop负责处理一个Channel在整个生命周期中的所有event)
如果当前调用线程(callingthread)就是分配给该Channel的EventLoop,代码被执行。否则,EventLoop将task放入一个内部的队列延后执行。当EventLoop处理它的event时,它会执行队列中的这些task。这就解释了为何任何线程都可以直接和Channel交互,而不需要为ChannelHandler加上同步。
http://ifeve.com/netty-in-action-7/
ChannelFuture:
etty中所有的I/O操作都是异步的。因为一个操作可能不会立刻返回,我们需要一个方法在稍后的时间来判定它的结果。因此,Netty提供了ChannelFuture,它的addListener()方法注册一个ChannelFutureListener,当操作完成时可以收到通知(无论成功与否)
对应java中的future,用于追踪异步操作,可以认为是异步操作的占位符,,且结合回调函数使用
ChannelHandler:
生命周期:
channel的生命周期:
ChannelUnregisteredChannel已创建,还未注册到一个EventLoop上
ChannelRegisteredChannel已经注册到一个EventLoop上
ChannelActiveChannel是活跃状态(连接到某个远端),可以收发数据
ChannelInactiveChannel未连接到远端
对应的handler的方法,,
handlerAdded当ChannelHandler被添加到一个ChannelPipeline时被调用
handlerRemoved当ChannelHandler从一个ChannelPipeline中移除时被调用
exceptionCaught处理过程中ChannelPipeline中发生错误时被调用
Netty定义的两个ChannelHandler子接口,,方法较多,可以使用适配器,,你可以用ChannelInboundHandlerAdapter和ChannelOutboundHandlerAdapter这两个类。这两个适配器类分别提供了ChannelInboundHandler和ChannelOutboundHandler的基本实现。它们继承了共同的父接口ChannelHandler的方法,扩展了抽象类ChannelHandlerAdapter。
ChannelInboundHandler——处理输入数据和所有类型的状态变化;API:
channelRegistered当一个Channel注册到EventLoop上,可以处理I/O时被调用
channelUnregistered当一个Channel从它的EventLoop上解除注册,不再处理I/O时被调用
channelActive当Channel变成活跃状态时被调用;Channel是连接/绑定、就绪的
channelInactive当Channel离开活跃状态,不再连接到某个远端时被调用
channelReadComplete当Channel上的某个读操作完成时被调用
channelRead当从Channel中读数据时被调用
channelWritabilityChanged当Channel的可写状态改变时被调用。通过这个方法,用户可以确保写操作不会进行地太快(避免OutOfMemoryError)或者当Channel又变成可写时继续写操作。Channel类的isWritable()方法可以用来检查Channel的可写状态。可写性的阈值可以通过Channel.config().setWriteHighWaterMark()和Channel.config().setWriteLowWaterMark()来设定。
userEventTriggered因某个POJO穿过ChannelPipeline引发ChannelnboundHandler.fireUserEventTriggered()时被调用
ChannelOutboundHandler——处理输出数据,可以拦截所有操作;API
bind(ChannelHandlerContext,SocketAddress,ChannelPromise)请求绑定Channel到一个本地地址
connect(ChannelHandlerContext,SocketAddress,SocketAddress,ChannelPromise)请求连接Channel到远端
disconnect(ChannelHandlerContext,ChannelPromise)请求从远端断开Channel
close(ChannelHandlerContext,ChannelPromise)请求关闭Channel
deregister(ChannelHandlerContext,ChannelPromise)请求Channel从它的EventLoop上解除注册
read(ChannelHandlerContext)请求从Channel中读更多的数据
flush(ChannelHandlerContext)请求通过Channel刷队列数据到远端
write(ChannelHandlerContext,Object,ChannelPromise)请求通过Channel写数据到远端
从应用开发者的角度来看,Netty的主要组件是ChannelHandler,它作为一个应用逻辑的容器,处理输入输出数据。这成为可能,是因为ChannelHandler的方法是被网络event(这里的event是广义的)触发的
ChannelHandlerContex:
handler的上下文,环境,ChannelHandlerContext代表了一个ChannelHandler和一个ChannelPipeline之间的关系,它在ChannelHandler被添加到ChannelPipeline时被创建。ChannelHandlerContext的主要功能是管理它对应的ChannelHandler和属于同一个ChannelPipeline的其他ChannelHandler之间的交互。ChannelHandlerContext有很多方法,其中一些方法Channel和ChannelPipeline也有,但是有些区别。如果你在Channel或者ChannelPipeline实例上调用这些方法,它们的调用会穿过整个pipeline。而在ChannelHandlerContext上调用的同样的方法,仅仅从当前ChannelHandler开始,走到pipeline中下一个可以处理这个event的ChannelHandler。
ChannelHandlerContextAPI:
bind绑定到给定的SocketAddress,返回一个ChannelFuture
channel返回绑定的Channel
close关闭Channel,返回一个ChannelFuture
connect连接到给定的SocketAddress,返回一个ChannelFuture
deregister从先前分配的EventExecutor上解除注册,返回一个ChannelFuture
disconnect从远端断开,返回一个ChannelFuture
executor返回分发event的EventExecutor
fireChannelActive触发调用下一个ChannelInboundHandler的channelActive()(已连接)
fireChannelInactive触发调用下一个ChannelInboundHandler的channelInactive()(断开连接)
fireChannelRead触发调用下一个ChannelInboundHandler的channelRead()(收到消息)
fireChannelReadComplete触发channelWritabilityChangedevent到下一个ChannelInboundHandler
handler返回绑定的ChannelHandler
isRemoved如果绑定的ChannelHandler已从ChannelPipeline中删除,返回true
name返回本ChannelHandlerContext实例唯一的名字
Pipeline返回绑定的ChannelPipeline
read从Channel读数据到第一个输入buffer;如果成功,触发一条channelReadevent,通知handlerchannelReadComplete
write通过本ChannelHandlerContext写消息穿过pipeline
ChannelPipeline:
一个ChannelPipeline为一串ChannelHandler提供了一个容器,同时定义了在这一串ChannelHandler中传送输入输出event的API。当一个Channel被创建,它会自动被分配一个ChannelPipeline。
ChannelHandler的执行和阻塞
通常ChannelPipeline中的每个ChannelHandler通过它的EventLoop(I/O线程)来处理传给它的event。不阻塞这个线程是非常重要的,因为阻塞有可能会对整个I/O处理造成负面影响。
有时候,我们会需要跟调用了阻塞API的现存代码交互。在这种情况下,ChannelPipeline的一些add()方法支持EventExcutorGroup。如果event被送往一个定制的EventExcutorGroup,它会被这个EventExcutorGroup中的一个EventExecutor处理,由此脱离了Channel的EventLoop。针对这种场景,Netty提供了一个具体实现,称为DefaultEventExecutorGroup。
ChannelPipeline的ChannelHandler相关方法:
addFirstaddBeforeaddAfteraddLast添加一个ChannelHandler到ChannelPipeline
remove从ChannelPipeline删除一个ChannelHandler
replace用一个ChannelHandler替换ChannelPipeline中的另一个ChannelHandler
get通过类型或者名字返回一个ChannelHandler
context返回ChannelHandler绑定的ChannelHandlerContext
names返回ChannelPipeline中的所有ChannelHandler的名字
ChannelPipeline输入方法:
fireChannelRegistered调用ChannelPipeline中下一个ChannelInboundHandler的channelRegistered(ChannelHandlerContext)
fireChannelUnregistered调用ChannelPipeline中下一个ChannelInboundHandler的channelUnRegistered(ChannelHandlerContext)
fireChannelActive调用ChannelPipeline中下一个ChannelInboundHandler的channelActive(ChannelHandlerContext)
fireChannelInactive调用ChannelPipeline中下一个ChannelInboundHandler的channelInactive(ChannelHandlerContext)
fireExceptionCaught调用ChannelPipeline中下一个ChanneHandler的exceptionCaught(ChannelHandlerContext,Throwable)
fireUserEventTriggered调用ChannelPipeline中下一个ChannelInboundHandler的userEventTriggered(ChannelHandlerContext,Object)
fireChannelRead调用ChannelPipeline中下一个ChannelInboundHandler的channelRead(ChannelHandlerContext,Objectmsg)
fireChannelReadComplete调用ChannelPipeline中下一个ChannelStateHandler的channelReadComplete(ChannelHandlerContext)
ChannelPipeline输出操作
bind绑定Channel到一个本地地址。这会调用ChannelPipeline中下一个ChannelOutboundHandler的bind(ChannelHandlerContext,SocketAddress,ChannelPromise)
connect连接Channel到一个远端地址。这会调用ChannelPipeline中下一个ChannelOutboundHandler的connect(ChannelHandlerContext,SocketAddress,ChannelPromise)
disconnect断开Channel。这会调用ChannelPipeline中下一个ChannelOutboundHandler的disconnect(ChannelHandlerContext,ChannelPromise)
close关闭Channel。这会调用ChannelPipeline中下一个ChannelOutboundHandler的close(ChannelHandlerContext,ChannelPromise)
deregisterChannel从它之前分配的EventLoop上解除注册。这会调用ChannelPipeline中下一个ChannelOutboundHandler的deregister(ChannelHandlerContext,ChannelPromise)
flush刷所有Channel待写的数据。这会调用ChannelPipeline中下一个ChannelOutboundHandler的flush(ChannelHandlerContext)
write往Channel写一条消息。这会调用ChannelPipeline中下一个ChannelOutboundHandler的write(ChannelHandlerContext,Objectmsg,ChannelPromise)注意:不会写消息到底层的Socket,只是排队等候。如果要写到Socket中,调用flush()或者writeAndFlush()
writeAndFlush这是先后调用write()和flush()的便捷方法。
read请求从Channel中读更多的数据。这会调用ChannelPipeline中下一个ChannelOutboundHandler的read(ChannelHandlerContext)
Bootstrapping:
Netty的bootstrap类为一个应用的网络层配置提供了容器,包括绑定一个进程到给定的端口,或者将一个进程连接到跑在另一个特定主机和端口上的进程。
类型BootstrapServerBootstrap
网络功能连接到一个远程主机和端口绑定到一个本地端口
EventLoopGroups数量12
netty传输模式
传输支持的多种模式:分别对应的privatefinalEventLoopGroupbossGroup=newNioEventLoopGroup(BIZGROUPSIZE);//非阻塞模式,OioEventLoopGroup为阻塞和b.channel(NioServerSocketChannel.class);//非阻塞,OIO(Old-Blocking-IO.)阻塞;NioDatagramChannelUDP相关
NIOio.netty.channel.socket.nio采用java.nio.channels包,一个基于Selector的方式。
Epollio.netty.channel.epoll采用epoll()的JNI和非阻塞IO。这个模式支持只有Linux平台才有的特性,比如SO_REUSEPORT,比NIO方式快,而且是完全非阻塞的。
OIOio.netty.channel.socket.oio采用java.net包,使用阻塞流(blockingstreams)。
Localio.netty.channel.local本地传输可以用于VM内部管道通信。
Embededio.netty.channel.embedded嵌入式传输,可以用不需要真正网络传输的ChannelHandler。对于测试你的ChannelHandler非常有用。
OP_ACCEPT当一个新的连接被接受,一个Channel创建时请求收到通知
OP_CONNECT当一个连接建立时请求收到通知
OP_READ当数据已准备好从Channel中被读取时请求收到通知
OP_WRITE当可以往一个Channel中写入更多数据时请求收到通知。这个用在socket缓冲完全满的情况下,通常发生在当数据传送比远端处理快很多的时候。
TransportTCPUDPSCTP*UDT
NIOXXXX
Epoll(Linuxonly)XX——
OIOXXXX
nettyByteBuf
ByteBuf-Netty的数据容器,替代了ByteBuffer,这个强大的实现突破了JDKAPI的限制,为网络开发者提供了更好的API。
ByteBuf包含两种不同的索引:一个用于读,另一个用于写。当你从一个ByteBuf读数据的时候,它的readerIndex按读取的字节数量递增。同样地,当你往一个ByteBuf中写数据时,它的writerIndex递增。图5.1是一个空ByteBuf的布局和状态图,都是0
另外还有capacityindex,最大,当开始都是0,当read和write重合的时候也就是尾,当write和capacity重合的时候也就不能写了。
读/写操作:
get()和set()操作从一个指定的索引开始操作,不改变索引的值
read()和write()操作从一个指定的索引开始操作,随着读/写的字节数调整索引值
get方法
getBoolean(int)返回指定索引位置的boolean值
getByte(int)返回指定索引位置的byte值
getUnsignedByte(int)返回指定索引位置的无符号byte值(返回类型是short)
getMedium(int)返回指定索引位置的24位(medium)值
getUnsignedMedium(int)返回指定索引位置的无符号24位(medium)值
getInt(int)返回指定索引位置的int值
getUnsignedInt(int)返回指定索引位置的无符号int值(返回类型是long)
getLong(int)返回指定索引位置的long值
getShort(int)返回指定索引位置的short值
getUnsignedShort(int)返回指定索引位置的无符号short值(返回类型是int)
getBytes(int,…)传送buffer数据到指定索引位置开始的目标位置?
set方法
setBoolean(int,boolean)在指定索引位置设置boolean值
setByte(intindex,intvalue)在指定索引位置设置Byte值
setMedium(intindex,intvalue)在指定索引位置设置24位(medium)值
setInt(intindex,intvalue)在指定索引位置设置int值
setLong(intindex,longvalue)在指定索引位置设置long值
setShort(intindex,intvalue)在指定索引位置设置short值
read方法;
readBoolean()返回当前readerIndex位置的boolean值,然后readerIndex加1
readByte()返回当前readerIndex位置的byte值,然后readerIndex加1
readUnsignedByte()返回当前readerIndex位置的无符号byte值(返回类型是short),然后readerIndex加1
readMedium()返回当前readerIndex位置的24位(medium)值,然后readerIndex加3
readUnsignedMedium()返回当前readerIndex位置的无符号24位(medium)值,然后readerIndex加3
readInt()返回当前readerIndex位置的int值,然后readerIndex加4
readUnsignedInt()返回当前readerIndex位置的无符号int值(返回类型是long),然后readerIndex加4
readLong()返回当前readerIndex位置的long值,然后readerIndex加8
readShort()返回当前readerIndex位置的short值,然后readerIndex加2
readUnsignedShort()返回当前readerIndex位置的无符号short值(返回类型是int),然后readerIndex加2
readBytes(ByteBuf|byte[]
destination,
intdstIndex[,intlength])
从readerIndex位置(如果有指定,长度是length字节)开始,传送当前ByteBuf数据到目标ByteBuf或者byte[]。当前ByteBuf的readerIndex值根据已传送的字节数增长
write方法
writeBoolean(boolean)在当前writerIndex位置写入boolean值,然后writerIndex加1
writeByte(int)在当前writerIndex位置写入byte值,然后writerIndex加1
writeMedium(int)在当前writerIndex位置写入medium值,然后writerIndex加3
writeInt(int)在当前writerIndex位置写入int值,然后writerIndex加4
writeLong(long)在当前writerIndex位置写入long值,然后writerIndex加8
writeShort(int)在当前writerIndex位置写入short值,然后writerIndex加2
writeBytes(sourceByteBuf|
byte[][,intsrcIndex
,intlength])
从当前writerIndex位置开始,从指定的源ByteBuf或者byte[]传送数据到当前ByteBuf。如果输入参数中提供了srcIndex和length,那么从srcIndex开始读出length长的字节。当前ByteBuf的writerIndex值根据已写入的字节数增长。
更多操作:
isReadable()如果至少一字节大小的数据可读,返回true
isWritable()如果至少一字节大小的数据可写,返回true
readableBytes()返回可读的字节数
writableBytes()返回可写的字节数
capacity()返回ByteBuf可容纳的字节数。调用该方法后,ByteBuf会试着扩展容量直到到达maxCapacity()
maxCapacity()返回ByteBuf可容纳的最大字节数
hasArray()如果ByteBuf有一个字节数组,返回true
Array()如果ByteBuf有一个字节数组,返回该数组;否则,抛出异常,UnsupportedOperationException
ByteBuf的分配
ByteBufAllocator:按需分配可以从channle和channelctx中获取ByteBufAllocatorallco=channel.alloc();;PoolByteBufAllocator和UnpooledByteBufAllocator。前者将ByteBuf实例放入池中,提高了性能,将内存碎片减少到最小。这个实现采用了一种内存分配的高效策略,称为jemalloc。它已经被好几种现代操作系统所采用。后者则没有把ByteBuf放入池中,每次被调用时,返回一个新的ByteBuf实例。
api方法
buffer();buffer(intinitialCapacity);buffer(intinitialCapacity,intmaxCapacity);返回heap或者directByteBuf
heapBuffer();heapBuffer(intinitialCapacity);heapBuffer(intinitialCapacity,intmaxCapacity);返回heapByteBuf
directBuffer();directBuffer(intinitialCapacity);directBuffer(intinitialCapacity,intmaxCapacity);返回directByteBuf
compositeBuffer();compositeBuffer(intmaxNumComponents);compositeDirectBuffer();compositeDirectBuffer(intmaxNumComponents);compositeHeapBuffer();compositeHeapBuffer(intmaxNumComponents);返回CoompositeByteBuf,可以根据指定的
component数量来扩展heap或者directbuffer
ioBuffer()返回一个用于socketI/O操作的ByteBuf
工具类:Unpooledbuffers
有些时候,你不能获取到ByteBufAllocator的引用。在这种情况下,Netty有一个被称为Unpooled的工具类,提供了一些静态的辅助方法(helpermethods)来创建unpooledByteBuf实例。表5.8列出了其中最重要的几个方法。
API方法:
buffer();buffer(intinitialCapacity);buffer(intinitialCapacity,intmaxCapacity)返回heapByteBuf
directBuffer();directBuffer(intinitialCapacity);directBuffer(intinitialCapacity,intmaxCapacity)返回directByteBuf
wrappedBuffer()返回wrappedByteBuf
copiedBuffer()返回copiedByteBuf
工具类:ByteBufUtil
ByteBufUtil提供了一些操作ByteBuf的静态辅助方法。因为这个ByteBufUtilAPI是通用的,并且和内存池并无关联,所以这些辅助方法是在buffer类之外实现的