Netty基础入门
Netty is an asynchronous event-driven network application framework |
Netty是一个异步的、基于事件驱动的网络应用框架,用于快速开发可维护、高性能的网络服务器和客户端
HelloWorld
开发一个简单的服务器端和客户端
- 客户端向服务器端发送HelloWorld
- 服务器仅接收,不返回
加入依赖
<dependency> |
服务器端
new ServerBootstrap() |
代码解读
1 处,创建 NioEventLoopGroup,可以简单理解为
线程池 + Selector
2 处,选择服务 Scoket 实现类,其中 NioServerSocketChannel 表示基于 NIO 的服务器端实现
3 处,childHandler是接下来添加的处理器都是给 SocketChannel 用的,而不是给 ServerSocketChannel。ChannelInitializer 处理器(仅执行一次),它的作用是待客户端 SocketChannel 建立连接后,执行 initChannel 以便添加更多的处理器
4 处,ServerSocketChannel 绑定的监听端口
5 处,SocketChannel 的处理器,解码 ByteBuf => String
6 处,SocketChannel 的业务处理器,使用上一个处理器的处理结果
客户端
new Bootstrap() |
代码解读
1 处,创建NioEventLoopGroup,同Server
2 处,选择客户Socket实现类,NioSocketChannel表示基于NIO的客户端实现
3 处,添加SocketChannel的处理器,ChannelInitializer处理器(仅执行一次),它的作用是待客户端SocketChannel建立连接后,执行initChannel以便添加更多的处理器
4 处,指定要连接的服务器和端口
5 处,Netty中很多方法都是异步的,如connect,这时需要使用sync方法等待connect建立连接完毕
6 处,获取channel对象,它即为通道抽象,可以进行数据读写操作
7 处,写入消息并清空缓冲区
8 处,消息会经过通道handler处理,这里是将String => ByteBuf发出
数据经过网络传输,到达服务器端,服务器端5和6处的handler先后被触发,走完一个流程
流程梳理
- 把 channel 理解为数据的通道
- 把 msg 理解为流动的数据,最开始输入是 ByteBuf,但经过 pipeline 的加工,会变成其它类型对象,最后输出又变成 ByteBuf
- 把 handler 理解为数据的处理工序
- 工序有多道,合在一起就是 pipeline,pipeline 负责发布事件(读、读取完成…)传播给每个 handler, handler 对自己感兴趣的事件进行处理(重写了相应事件处理方法)
- handler 分 Inbound 和 Outbound 两类
- 把 eventLoop 理解为处理数据的工人
- 工人可以管理多个 channel 的 io 操作,并且一旦工人负责了某个 channel,就要负责到底(绑定)
- 工人既可以执行 io 操作,也可以进行任务处理,每位工人有任务队列,队列里可以堆放多个 channel 的待处理任务,任务分为普通任务、定时任务
- 工人按照 pipeline 顺序,依次按照 handler 的规划(代码)处理数据,可以为每道工序指定不同的工人
组件
EventLoop & EventLoopGroup
EventLoop是一个单线程执行器(内部维护了一个 Selector),里面有run方法处理Channel上源源不断的io事件
继承关系:
- 一条线是继承自j.u.c.ScheduledExecutorService因此包含了线程池中所有的方法
- 另一条线是继承自 netty 自己的 OrderedEventExecutor,
- 提供了 boolean inEventLoop(Thread thread) 方法判断一个线程是否属于此EventLoop
- 提供了parent方法来看看自己属于哪个EventLoopGroup
EventLoopGroup 是一组 EventLoop,Channel 一般会调用 EventLoopGroup 的 register 方法来绑定其中一个 EventLoop,后续这个 Channel 上的 io 事件都由此 EventLoop 来处理(保证了 io 事件处理时的线程安全)
- 继承自 netty 自己的 EventExecutorGroup
- 实现了 Iterable 接口提供遍历 EventLoop 的能力
- 另有 next 方法获取集合中下一个 EventLoop
关闭:
优雅关闭 shutdownGracefully
方法。该方法会首先切换 EventLoopGroup
到关闭状态从而拒绝新的任务的加入,然后在任务队列的任务都处理完成后,停止线程的运行。从而确保整体应用是在正常有序的状态下退出的
handler内部的切换逻辑
关键代码 io.netty.channel.AbstractChannelHandlerContext#invokeChannelRead()
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) { |
- 如果两个handler绑定的是同一个线程,那么就直接调用
- 否则,把要调用的代码封装为一个任务对象,由下一个handler的线程来调用
NioEventLoop 处理普通任务
NioEventLoop 除了可以处理 io 事件,同样可以向它提交普通任务
NioEventLoopGroup nioWorkers = new NioEventLoopGroup(2); |
可以用来执行耗时较长的任务
NioEventLoop 处理定时任务
NioEventLoopGroup nioWorkers = new NioEventLoopGroup(2); |
可以用来执行定时任务
Channel
channel的常用api
- close()用来关闭 channel
- closeFuture()用来处理 channel 的关闭
- sync方法作用是同步等待 channel 关闭
- 而addListener方法是异步等待 channel 关闭
- pipeline()方法添加处理器
- write()方法将数据写入
- writeAndFlush()方法将数据写入并刷出
ChannelFuture
刚才的客户端代码:
new Bootstrap() |
拆开来看:
ChannelFuture channelFuture = new Bootstrap() |
- 1 处返回的是 ChannelFuture 对象,它的作用是利用 channel() 方法来获取 Channel 对象
注意connect方法是异步的,意味着不等连接建立,方法执行就返回了。因此channelFuture对象中不能【立刻】获得到正确的 Channel 对象,需要调用sync方法同步等待连接建立完成
除了用 sync 方法可以让异步操作同步以外,还可以使用回调的方式:
ChannelFuture channelFuture = new Bootstrap() |
- ChannelFutureListener 会在连接建立时被调用(其中 operationComplete 方法)
CloseFuture
|
异步的提升
- 资源利用率
- 同步 I/O 模型中,每有一个连接就需要一个线程去阻塞等待读写结果,线程数量一多,就会导致上下文切换频繁、内存占用和调度开销增大
- 异步模型中,少量的 I/O 线程(EventLoop)就能同时驱动很多连接,极大地减少了线程数量、上下文切换和系统资源消耗
- 吞吐量与并发能力
- 同步阻塞模式下,线程在等待网络或磁盘 I/O 时无法做其它事情,导致CPU利用率不高
- 异步非阻塞模式下,线程在 I/O 未就绪时立即返回去处理其它事件,可充分利用CPU,提升整体吞吐量和并发连接数
- 响应时延
- 同步调用常因线程排队和切换导致响应延迟抖动,尤其在高负载下更明显
- 异步调用通过回调或Future通知一旦I/O就绪即可处理,减少了排队等待,响应更加及时和稳定
- 可伸缩性
- 同步模型在并发量增长时,需要线性增加线程池大小,最终受限于操作系统线程数和CPU核心数
- 异步模型中,固定数目的 EventLoop 线程即可水平扩展,连接层面几乎不受线程数量瓶颈限制,应用具有更好的可伸缩性
- xxxxxxxxxx // 用来判断是不是 读空闲时间过长,或 写空闲时间过长// 3s 内如果没有向服务器写数据,会触发一个 IdleState#WRITER_IDLE 事件ch.pipeline().addLast(new IdleStateHandler(0, 3, 0));// ChannelDuplexHandler 可以同时作为入站和出站处理器ch.pipeline().addLast(new ChannelDuplexHandler() { // 用来触发特殊事件 @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception{ IdleStateEvent event = (IdleStateEvent) evt; // 触发了写空闲事件 if (event.state() == IdleState.WRITER_IDLE) { ctx.writeAndFlush(new PingMessage()); } }});java
- 异步模型能更好地隔离单个连接或单次 I/O 的阻塞、慢启动等问题,一条慢连接不会拖垮整个线程池
- 通过超时、回调隔离等机制,更容易在网络波动时进行快速恢复和故障隔离
Future & Promise
在异步处理时,经常用到这两个接口
netty 中的 Future 与 jdk 中的 Future 同名,但是是两个接口,netty 的 Future 继承自 jdk 的 Future,而 Promise 又对 netty Future 进行了扩展
- jdk Future 只能同步等待任务结束(或成功、或失败)才能得到结果
- netty Future 可以同步等待任务结束得到结果,也可以异步方式得到结果,但都是要等任务结束
- netty Promise 不仅有 netty Future 的功能,而且脱离了任务独立存在,只作为两个线程间传递结果的容器
功能/名称 | jdk Future | netty Future | Promise |
---|---|---|---|
cancel | 取消任务 | - | - |
isCanceled | 任务是否取消 | - | - |
isDone | 任务是否完成,不能区分成功失败 | - | - |
get | 获取任务结果,阻塞等待 | - | - |
getNow | - | 获取任务结果,非阻塞,还未产生结果时返回 null | - |
await | - | 等待任务结束,如果任务失败,不会抛异常,而是通过 isSuccess 判断 | - |
sync | - | 等待任务结束,如果任务失败,抛出异常 | - |
isSuccess | - | 判断任务是否成功 | - |
cause | - | 获取失败信息,非阻塞,如果没有失败,返回null | - |
addLinstener | - | 添加回调,异步接收结果 | - |
setSuccess | - | - | 设置成功结果 |
setFailure | - | - | 设置失败结果 |
Handler & Pipeline
ChannelHandler 用来处理 Channel 上的各种事件,分为入站、出站两种。所有 ChannelHandler 被连成一串,就是 Pipeline
- 入站处理器通常是 ChannelInboundHandlerAdapter 的子类,主要用来读取客户端数据,写回结果
- 出站处理器通常是 ChannelOutboundHandlerAdapter 的子类,主要对写回结果进行加工
打个比喻,每个 Channel 是一个产品的加工车间,Pipeline 是车间中的流水线,ChannelHandler 就是流水线上的各道工序,而后面要讲的 ByteBuf 是原材料,经过很多工序的加工:先经过一道道入站工序,再经过一道道出站工序最终变成产品
示例
服务端
new ServerBootstrap() |
客户端
new Bootstrap() |
服务器端打印:
1 |
可以看到,ChannelInboundHandlerAdapter 是按照 addLast 的顺序执行的,而 ChannelOutboundHandlerAdapter 是按照 addLast 的逆序执行的。ChannelPipeline 的实现是一个 ChannelHandlerContext(包装了 ChannelHandler) 组成的双向链表
- 入站处理器中,ctx.fireChannelRead(msg) 是 调用下一个入站处理器
- 如果注释掉 1 处代码,则仅会打印 1
- 如果注释掉 2 处代码,则仅会打印 1 2
- 3 处的 ctx.channel().write(msg) 会 从尾部开始触发 后续出站处理器的执行
- 如果注释掉 3 处代码,则仅会打印 1 2 3
- 类似的,出站处理器中,ctx.write(msg, promise) 的调用也会 触发上一个出站处理器
- 如果注释掉 6 处代码,则仅会打印 1 2 3 6
- ctx.channel().write(msg) vs ctx.write(msg)
- 都是触发出站处理器的执行
- ctx.channel().write(msg) 从尾部开始查找出站处理器
- ctx.write(msg) 是从当前节点找上一个出站处理器
- 3 处的 ctx.channel().write(msg) 如果改为 ctx.write(msg) 仅会打印 1 2 3,因为节点3 之前没有其它出站处理器了
- 6 处的 ctx.write(msg, promise) 如果改为 ctx.channel().write(msg) 会打印 1 2 3 6 6 6… 因为 ctx.channel().write() 是从尾部开始查找,结果又是节点6自己
服务端 pipeline 触发的原始流程,图中数字代表了处理步骤的先后次序
ByteBuf
对字节数据Bytebuffer的封装
创建
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(10); |
log方法
private static void log(ByteBuf buffer) { |
直接内存 vs 堆内存
可以使用下面的代码来创建池化基于堆的 ByteBuf
ByteBuf buffer = ByteBufAllocator.DEFAULT.heapBuffer(10); |
也可以使用下面的代码来创建池化基于直接内存的 ByteBuf
ByteBuf buffer = ByteBufAllocator.DEFAULT.directBuffer(10); |
- 直接内存创建和销毁的代价昂贵,但读写性能高(少一次内存复制),适合配合池化功能一起用
- 直接内存对 GC 压力小,因为这部分内存不受 JVM 垃圾回收的管理,但也要注意及时主动释放
池化 vs 非池化
池化的最大意义在于可以重用 ByteBuf,优点有
- 没有池化,则每次都得创建新的 ByteBuf 实例,这个操作对直接内存代价昂贵,就算是堆内存,也会增加 GC 压力
- 有了池化,则可以重用池中 ByteBuf 实例,并且采用了与 jemalloc 类似的内存分配算法提升分配效率
- 高并发时,池化功能更节约内存,减少内存溢出的可能
池化功能是否开启,可以通过下面的系统环境变量来设置
-Dio.netty.allocator.type={unpooled|pooled} |
- Netty 4.1 以后,非 Android 平台默认启用池化实现,Android 平台启用非池化实现
组成
ByteBuf由四部分组成
最开始读写指针都在0位置
写入
方法签名 | 含义 | 备注 |
---|---|---|
writeBoolean(boolean value) | 写入 boolean 值 | 用一字节 01|00 代表 true|false |
writeByte(int value) | 写入 byte 值 | |
writeShort(int value) | 写入 short 值 | |
writeInt(int value) | 写入 int 值 | Big Endian,即 0x250,写入后 00 00 02 50 |
writeIntLE(int value) | 写入 int 值 | Little Endian,即 0x250,写入后 50 02 00 00 |
writeLong(long value) | 写入 long 值 | |
writeChar(int value) | 写入 char 值 | |
writeFloat(float value) | 写入 float 值 | |
writeDouble(double value) | 写入 double 值 | |
writeBytes(ByteBuf src) | 写入 netty 的 ByteBuf | |
writeBytes(byte[] src) | 写入 byte[] | |
writeBytes(ByteBuffer src) | 写入 nio 的 ByteBuffer | |
int writeCharSequence(CharSequence sequence, Charset charset) | 写入字符串 |
- 这些方法的未指明返回值的,其返回值都是 ByteBuf,意味着可以链式调用
- 网络传输,默认习惯是 Big Endian
先写入 4 个字节
buffer.writeBytes(new byte[]{1, 2, 3, 4}); |
结果是
read index:0 write index:4 capacity:10 |
再写入一个 int 整数,也是 4 个字节
buffer.writeInt(5); |
结果是
read index:0 write index:8 capacity:10 |
还有一类方法是 set 开头的一系列方法,也可以写入数据,但不会改变写指针位置
扩容
默认初始容量为256字节,默认最大容量为Integer.MAX_VALUE
扩容规则是
- 如何写入后数据大小未超过4MB,则每次扩容容量翻倍
- 如果写入后数据大小超过4MB,则每次增加4MB
- 扩容不能超过max capacity会报错
读取
例如读了 4 次,每次一个字节
System.out.println(buffer.readByte()); |
读过的内容,就属于废弃部分了,再读只能读那些尚未读取的部分
1 |
如果需要重复读取 int 整数 5,怎么办?
可以在 read 前先做个标记 mark
buffer.markReaderIndex(); |
结果
5 |
这时要重复读取的话,重置到标记位置 reset
buffer.resetReaderIndex(); |
这时
read index:4 write index:12 capacity:16 |
还有种办法是采用 get 开头的一系列方法,这些方法不会改变 read index
retain & release
由于 Netty 中有堆外内存的 ByteBuf 实现,堆外内存最好是手动来释放,而不是等GC垃圾回收。
- UnpooledHeapByteBuf 使用的是 JVM 内存,只需等 GC 回收内存即可
- UnpooledDirectByteBuf 使用的就是直接内存了,需要特殊的方法来回收内存
- PooledByteBuf 和它的子类使用了池化机制,需要更复杂的规则来回收内存
Netty采用了引用计数法来控制回收内存,每个ByteBuf都实现了ReferenceCounted接口
- 每个 ByteBuf 对象的初始计数为 1
- 调用 release 方法计数减 1,如果计数为 0,ByteBuf 内存被回收
- 调用 retain 方法计数加 1,表示调用者没用完之前,其它 handler 即使调用了 release 也不会造成回收
- 当计数为 0 时,底层内存会被回收,这时即使 ByteBuf 对象还在,其各个方法均无法正常使用
谁来负责 release 呢?
因为 pipeline 的存在,一般需要将 ByteBuf 传递给下一个 ChannelHandler,如果在 finally 中 release 了,就失去了传递性(当然,如果在这个 ChannelHandler 内这个 ByteBuf 已完成了它的使命,那么便无须再传递)
基本规则是,谁是最后使用者,谁负责 release
- 起点,对于NIO实现来讲,在 io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe#read 方法中首次创建 ByteBuf 放入 pipeline
- 入站 ByteBuf 处理原则
- 对原始 ByteBuf 不做处理,调用 ctx.fireChannelRead(msg) 向后传递,这时无须 release
- 将原始 ByteBuf 转换为其它类型的 Java 对象,这时 ByteBuf 就没用了,必须 release
- 如果不调用 ctx.fireChannelRead(msg) 向后传递,那么也必须 release
- 注意各种异常,如果 ByteBuf 没有成功传递到下一个 ChannelHandler,必须 release
- 假设消息一直向后传,那么 TailContext 会负责释放未处理消息(原始的 ByteBuf)
- 出站 ByteBuf 处理原则
- 出站消息最终都会转为 ByteBuf 输出,一直向前传,由 HeadContext flush 后 release
- 异常处理原则
- 有时候不清楚 ByteBuf 被引用了多少次,但又必须彻底释放,可以循环调用 release 直到返回 true
slice
零拷贝的体现之一,对原始 ByteBuf 进行切片成多个 ByteBuf,切片后的 ByteBuf 并没有发生内存复制,还是使用原始 ByteBuf 的内存,切片后的 ByteBuf 维护独立的 read,write 指针
例如,原始 ByteBuf 进行一些初始操作
ByteBuf origin = ByteBufAllocator.DEFAULT.buffer(10); |
输出
+-------------------------------------------------+ |
这时调用 slice 进行切片,无参 slice 是从原始 ByteBuf 的 read index 到 write index 之间的内容进行切片,切片后的 max capacity 被固定为这个区间的大小,因此不能追加 write
ByteBuf slice = origin.slice(); |
输出
+-------------------------------------------------+ |
如果原始 ByteBuf 再次读操作(又读了一个字节)
origin.readByte(); |
输出
+-------------------------------------------------+ |
这时的 slice 不受影响,因为它有独立的读写指针
System.out.println(ByteBufUtil.prettyHexDump(slice)); |
输出
+-------------------------------------------------+ |
如果 slice 的内容发生了更改
slice.setByte(2, 5); |
输出
+-------------------------------------------------+ |
这时,原始 ByteBuf 也会受影响,因为底层都是同一块内存
System.out.println(ByteBufUtil.prettyHexDump(origin)); |
输出
+-------------------------------------------------+ |
duplicate
零拷贝的体现之一,就好比截取了原始 ByteBuf 所有内容,并且没有 max capacity 的限制,也是与原始 ByteBuf 使用同一块底层内存,只是读写指针是独立的
copy
会将底层内存数据进行深拷贝,因此无论读写,都与原始 ByteBuf 无关
CompositeByteBuf
的体现之一,可以将多个 ByteBuf 合并为一个逻辑上的 ByteBuf,避免拷贝
有两个 ByteBuf 如下
ByteBuf buf1 = ByteBufAllocator.DEFAULT.buffer(5); |
输出
+-------------------------------------------------+ |
现在需要一个新的 ByteBuf,内容来自于刚才的 buf1 和 buf2,如何实现?
方法1:
ByteBuf buf3 = ByteBufAllocator.DEFAULT |
结果
+-------------------------------------------------+ |
这种方法进行了数据的内存复制操作
方法2:
CompositeByteBuf buf3 = ByteBufAllocator.DEFAULT.compositeBuffer(); |
结果是一样的
+-------------------------------------------------+ |
CompositeByteBuf 是一个组合的 ByteBuf,它内部维护了一个 Component 数组,每个 Component 管理一个 ByteBuf,记录了这个 ByteBuf 相对于整体偏移量等信息,代表着整体中某一段的数据。
- 优点,对外是一个虚拟视图,组合这些 ByteBuf 不会产生内存复制
- 缺点,复杂了很多,多次操作会带来性能的损耗
Unpooled
Unpooled 是一个工具类,类如其名,提供了非池化的 ByteBuf 创建、组合、复制等操作
这里仅介绍其跟【零拷贝】相关的 wrappedBuffer 方法,可以用来包装 ByteBuf
ByteBuf buf1 = ByteBufAllocator.DEFAULT.buffer(5); |
输出
+-------------------------------------------------+ |
也可以用来包装普通字节数组,底层也不会有拷贝操作
ByteBuf buf4 = Unpooled.wrappedBuffer(new byte[]{1, 2, 3}, new byte[]{4, 5, 6}); |
输出
class io.netty.buffer.CompositeByteBuf |
ByteBuf 优势
- 池化 - 可以重用池中 ByteBuf 实例,更节约内存,减少内存溢出的可能
- 读写指针分离,不需要像 ByteBuffer 一样切换读写模式
- 可以自动扩容
- 支持链式调用,使用更流畅
- 很多地方体现零拷贝,例如 slice、duplicate、CompositeByteBuf