简介
Netty是由JBOSS提供的一个java开源框架, 是一个高性能、 高可扩展性的异步事件驱动的网络应用程序框架, 它极大地简化了TCP和UDP客户端和服务器开发等网络编程。
Netty线程模型
为了让NIO处理,更好的利用多线程特性, Netty实现了Reactor线程模型。Reactor模型中有四个核心概念:
- Resources 资源(请求/任务)
- Synchronous Event Demultiplexer 同步事件复用器
- Dispatcher 分配器
- Request Handler 请求处理器
EventLoopGrop
用于统一管理EventLoop
EventLoop
由线程驱动,处理Channel的所有I/O事件。
EventLoop自身实现了Executor接口,当调用executor方法提交任务时,则判断是否启动,未启动则调用内置的executor创建新线程来触发run方法执行。
Channel
netty中的Channel是一个抽象的概念,可以理解为对JDK NIO Channel的增强和拓展。增加了很多属性和方法。
几个常见的属性和方法:
属性 | 描述 |
---|---|
DefaultChannelPipeline pipeline | 通道内事件处理链路 |
EventLoop eventLoop | 绑定的EventLoop,用于执行操作 |
Unsafe unsafe | 提供I/O相关操作的封装 |
方法 | 描述 |
---|---|
ChannelConfig config() | 返回通道配置信息 |
Channel read() | 开始读数据,触发读取链路调用 |
ChannelFuture write(Object msg) | 写数据,触发链路调用 |
ChannelFuture bind(SocketAddress SocketAddress) | 绑定 |
ChannelPipeline
事件处理流水线
ChannelPipeline责任链
Pipeline管道保存了通道所有处理器信息。创建新channel时自动创建一个专有的pipeline。入站事件和出站操作会调用pipeline上的处理器
维护Pipeline中的handler
ChannelPipeline是线程安全的, ChannelHandler可以在任何时候添加或删除。例如,你可以在即将交换敏感信息时插入加密处理程序,并在交换后删除它。一般操作,初始化的时候增加进去,较少删除。下面是Pipeline中管理handler的API。
方法名称 | 描述 |
---|---|
addFirst | 最前面插入 |
addLast | 最后面插入 |
addBefore | 插入到指定处理器前面 |
addAfter | 插入到指定处理器后面 |
remove | 移除指定处理器 |
removeFirst | 移除第一个处理器 |
removeLast | 移除最后一个处理器 |
replace | 替换指定的处理器 |
handler的执行分析
以上图为例
当入站事件时,执行顺序是1、 2、 3、 4、 5。当出站事件时,执行顺序是5、 4、 3、 2、 1
在这一原则之上,ChannelPipeline在执行时会进行选择。3和4为出站处理器,因此入站事件的实际执行是:1、 2、 5。1和2为入站处理器,因此出站事件的实际执行是:5、 4、 3
不同的入站事件会触发handler不同的方法执行:上下文对象中 fire** 开头的方法,代表入站事件传播和处理其余的方法代表出站事件的传播和处理。
ChannelHandlerContext
实际存储在Pipeline中的对象并非ChannelHandler,而是上下文对象。将handler,包裹在上下文对象中,通过上下文对象与它所属的ChannelPipeline交互,向上或向下传递事件或者修改pipeline都是通过上下文对象。
ChannelHandler
用于处理I/O事件或拦截I/O操作,并转发到ChannelPipeline中的下一个处理器。这个顶级接口定义功能很弱,实际使用时会去实现下面两大子接口:ChannelInboundHandler处理入站I/O事件、ChannelOutboundHandler处理出站I/O事件,ChannelDuplexHandler来支持同时处理入站和出站事件
inbound入站事件
通常指I/O线程生成了入站数据。(通俗理解:从socket底层自己往上冒上来的事件都是入站)比如EventLoop收到selector的OP_READ事件,入站处理器调用socketChannel.read(ByteBuffer)接收到数据后,这将导致通道的ChannelPipeline中包含的下一个中的channelRead方法被调用。
事件 | 描述 |
---|---|
fireChannelRegistered | channel注册事件 |
fireChannelUnregistered | channel解除注册事件 |
fireChannelActive | channel活跃事件 |
fireChannelInactive | channel非活跃事件 |
fireExceptionCaught | 异常事件 |
fireUserEventTriggered | 用户自定义事件 |
fireChannelRead | channel读事件 |
fireChannelReadComplete | channel读完成事件 |
fireChannelWritabilityChanged | channel写状态变化事件 |
outbound出站事件
通常是指I/O线程执行实际的输出操作。(通俗理解:想主动往socket底层操作的事件的都是出站)比如bind方法用意是请求server socket绑定到给定的SocketAddress,这将导致通道的ChannelPipeline中包含的下一个出站处理器中的bind方法被调用。
事件 | 描述 |
---|---|
bind | 端口绑定事件 |
connect | 连接事件 |
disconnect | 断开连接事件 |
close | 关闭事件 |
deregister | 解除注册事件 |
flush | 刷新数据到网络事件 |
read | 读事件,用于注册OP_READ到selector。 |
write | 写事件 |
writeAndFlush | 写出数据事件 |
ByteBuf
ByteBuf是增强的ByteBuffer缓冲区,在使用中,通过ByteBufAllocator分配器进行申请,同时分配器具备有内存管理的功能
JDK ByteBuffer缺点
- 无法动态扩容:长度是固定,不能动态扩展和收缩,当数据大于 ByteBuffer 容量时,会发生索引越界异常。
- API 使用复杂:读写的时候需要手工调用flip()和rewind()等方法,使用时需要非常谨慎的使用这些api,否则很容出现错误
ByteBuf三个重要属性: capacity 容量、 readerIndex 读取位置、 writerIndex 写入位置。提供了两个指针变量来支持顺序读和写操作, 分别是readerIndex和写操作writerIndex
动态扩容
capacity默认值: 256字节 、 最大值: Integer.MAX_VALUE( 2GB)
write*方法调用时,通过AbstractByteBuf.ensureWritable0进行检查。容量计算方法: AbstractByteBufAllocator. calculateNewCapacity (新capacity的最小要求, capacity最大值)
根据新capacity的最小值要求,对应有两套计算方法:
没超过4兆:从64字节开始,每次增加一倍,直至计算出来的newCapacity满足新容量最小要求。
示例: 当前大小256,已写250,继续写10字节数据,需要的容量最小要求是261,则新容量是6422*2=512超过4兆:新容量 = 新容量最小要求/4兆 * 4兆 + 4兆
示例:当前大小3兆,已写3兆,继续写2兆数据,需要的容量最小要求是5兆,则新容量是8兆(不能超过最大值)。
4兆的来源: 一个固定的阈值AbstractByteBufAllocator. CALCULATE_
ByteBuf类型
Unsafe的实现
unsafe意味着不安全的操作。但是更底层的操作会带来性能提升和特殊功能, Netty中会尽力使用unsafe。Java语言很重要的特性是“一次编写到处运行”,所以它针对底层的内存或者其他操作,做了很多封装。而unsafe提供了一系列我们操作底层的方法,可能会导致不兼容或者不可知的异常。
Info.
仅返回一些低级的内存信息
addressSize
pageSizeMemory.
直接访问内存方法
allocateMemory
copyMemory
freeMemory
getAddress
getInt
putIntObjects.
提供用于操作对象及其字段的方法
allocateInstance
objectFieldOffsetClasses.
提供用于操作类及其静态字段的方法
staticFieldOffset
defineClass
defineAnonymousClass
ensureClassInitializedArrays.
操作数组
arrayBaseOffset
arrayIndexScaleSynchronization.
低级的同步原语
monitorEnter
tryMonitorEnter
monitorExit
compareAndSwapInt
putOrderedInt
内存复用
PoolThreadCache : PooledByteBufAllocator 实例维护的一个线程变量。
多种分类的MemoryRegionCache数组用作内存缓存, MemoryRegionCache内部是链表,队列里面存Chunk。
PoolChunk里面维护了内存引用,内存复用的做法就是把buf的memory指向chunk的memory。
零拷贝
Netty的零拷贝机制,是一种应用层的实现。和底层JVM、操作系统内存机制并无过多关联。通过包裹数组,通过数组引用直接使用数组而不拷贝出一个新数组。
- CompositeByteBuf, 将多个ByteBuf合并为一个逻辑上的ByteBuf,避免了各个ByteBuf之间的拷贝。
1 | CompositeByteBuf compositeByteBuf = Unpooled.compositeBuffer(); |
- wrapedBuffer()方法,将byte[]数组包装成ByteBuf对象。
1 | ByteBuf newBuffer = Unpooled.wrappedBuffer(new byte[]{1,2,3,4,5}); |
- slice()方法。将一个ByteBuf对象切分成多个ByteBuf对象。
1 | ByteBuf buffer1 = Unpooled.wrappedBuffer("hello".getBytes()); |
Bootstrap
BootStrap是Netty中负责引导服务器和客户端启动,它将ChannelPipeline、ChannelHandler和EventLoop组织起来,成为一个可实际运行的引用程序。简单来说,引导一个应用程序是指对它进行配置,并运行起来的过程。
Bootstrap
引导客户端运行
- api
名称 | 描述 |
---|---|
group | 设置用于处理Channel所有事件的EventLoopgroup |
channel | 设置将要被实例化的Channel类 |
channelFactory | 如果不能通过默认的构造函数创建Channel,那么可以提供一个ChannelFactory |
localAddress | 指定Channel应该绑定到本地地址。如果没有指定,则将有操作系统使用一个随机地址。或者,可以通过bing方法来指定该localAddress |
option | 指定要应用到创建的Channel的配置项,只能在调用bind方法前设置。具体支持的配置项取决于所使用的Channel类型。参见ChannelConfig的API文档 |
attr | 指定Channel上的属性,只能在调用bind方法前设置 |
handler | 设置Channel的事件处理器 |
clone | 克隆一个设置和原始Bootstrap相同的Bootstrap |
remoteAddress | 设置 远程地址。或者, 也可以通过connect方法来指定它 |
connect | 连接 到 远程 节点并返回一个ChannelFuture |
bind | 绑定Channel并返回一个ChannelFuture |
ServerBootstrap
引导服务端运行
- api
名称 | 描述 |
---|---|
group | 设置ServerBootstrap要用的EventLoopGroup。这个EventLoopGroup将用于ServerChannel和被接受的子Channel的I/O处理 |
channel | 设置将要被实例化的ServerChannel类 |
channelFactory | 如果不能通过默认的构造函数创建Channel,那么可以提供一个ChannelFactory |
localAddress | 指定ServerChannel应该绑定到本地地址。如果没有指定,则将有操作系统使用一个随机地址。或者,可以通过bing方法来指定该localAddress |
option | 指定要应用到创建的ServerChannel的配置项,只能在调用bind方法前设置。具体支持的配置项取决于所使用的Channel类型。参见ChannelConfig的API文档 |
childOption | 指定客户端的Channel被接受时,应用到客户端Channel的配置项 |
attr | 指定ServerChannel上的属性,只能在调用bind方法前设置 |
childAttr | 指定接收的客户端Channel上的属性 |
handler | 设置ServerChannel的事件处理器 |
childHandler | 设置客户端Channel上的事件处理器 |
clone | 克隆一个设置和原始ServerBootstrap相同的ServerBootstrap |
bind | 绑定ServerChannel并返回一个ChannelFuture |
编解码
Netty中主要提供了抽象基类ByteToMessageDecoder和MessageToMessageDecoder。实现了ChannelInboundHandler接口。
与ByteToMessageDecoder和MessageToMessageDecoder相对应, Netty提供了对应的编码器实现MessageToByteEncoder和MessageToMessageEncoder,二者都实现ChannelOutboundHandler接口。
ByteToMessageDecoder
用于将接收到的二进制数据(byte)解码,得到完整的请求报文(Message)。
类 | 描述 |
---|---|
FixedLengthFrameDecoder | 定长协议解码器,可以指定固定的字节数算一个完整的报文 |
LineBasedFrameDecoder | 行分隔符解码器,遇到\n或者\r\n,则认为是一个完整的报文 |
DelimiterBasedFrameDecoder: | 分隔符解码器,与LineBasedFrameDecoder类似,只不过分隔符可以自己指定 |
LengthFieldBasedFrameDecoder: | 长度编码解码器,将报文划分为报文头/报文体,根据报文头中的Length字段确定报文体的长度,因此报文提的长度是可变的 |
JsonObjectDecoder | json格式解码器,当检测到匹配数量的”{“ 、” }” 或” [””]” 时,则认为是一个完整的json对象或者json数组。 |
MessageToMessageDecoder
将一个本身就包含完整报文信息的对象转换成另一个Java对象。
类 | 描述 |
---|---|
StringDecoder | 用于将包含完整的报文信息的ByteBuf转换成字符串。 |
Base64Decoder | 用于Base64编码。 |
MessageToByteEncoder
是一个泛型类,泛型参数I表示将需要编码的对象的类型,编码的结果是将信息转换成二进制流放入ByteBuf中。子类通过覆写其抽象方法encode,来实现编码。
MessageToMessageEncoder
同样是一个泛型类,泛型参数I表示将需要编码的对象的类型,编码的结果是将信息放到一个List中。子类通过覆写其抽象方法encode,来实现编码
类 | 描述 |
---|---|
LineEncoder | 按行编码,给定一个CharSequence(如String),在其之后添加换行符\n或者\r\n,并封装到ByteBuf进行输出,与LineBasedFrameDecoder相对应。 |
Base64Encoder | 给定一个ByteBuf,得到对其包含的二进制数据进行Base64编码后的新的ByteBuf进行输出,与Base64Decoder相对应。 |
LengthFieldPrepender | 给定一个ByteBuf,为其添加报文头Length字段,得到一个新的ByteBuf进行输出。 Length字段表示报文长度,与LengthFieldBasedFrameDecoder相对应。 |
StringEncoder | 给定一个CharSequence(如: StringBuilder、 StringBuffer、 String等),将其转换成ByteBuf进行输出,与StringDecoder对应。 |