原创

Netty源码分析-Channel和Unsafe

全网最全 Java 知识点整合总目录入口猛戳-->www.gameboys.cn

Netty 全网最全示例源码地址猛戳-->https://github.com/Sniper2016/NettyStudy

一、Channel 讲解

1.Channel 的工作原理

channel 是 Netty 抽象出来的网络 I/O 读写相关的接口,为什么不使用 JDK NIO 原生的 Channel 而要另起炉灶呢,主要原因如下。

  • JDK 的 SocketChannel 和 ServersocketChannel 没有统一的 Channel 接口供业务开发者使用,对一于用户而言,没有统一的操作视图,使用起来并不方便。
  • JDK 的 SocketChannel 和 ScrversockctChannel 的主要职责就是网络 I/O 操作,由于他们是 SPI 类接口,由具体的虚拟机厂家来提供,所以通过继承 SPI 功能直接实现 ServersocketChannel 和 SocketChannel 来扩展其工作量和重新 Channel 功类是差不多的。
  • Netty 的 ChannelPipeline Channel 需要够跟 Netty 的整体架构融合在一起,例如 I/O 模型、基的定制模型,以及基于元数据描述配置化的 TCP 参数等,这些 JDK SocketChannel 和 ServersocketChannel 都没有提供,需要重新封装。
  • 自定义的 Channel ,功实现更加灵活。

基于上述 4 原因,它的设计原理比较简单, Netty 重新设计了 Channel 接口,并且给一予了很多不同的实现。 但是功能却比较繁杂,主要的设计理念如下。

  • 在 Channel 接口层,相关联的其他操作封装起来,采用 Facade 模式进行统一封装,将网络 I/O 操作、网络 I/O 统一对外提供。
  • Channel 接口的定义尽量大而全,统一的视图,由不同子类实现不同的功能,公共功能在抽象父类中实现,最大程度上实现接口的重用。
  • 具体实现采用聚合而非包含的方式,将相关的功类聚合在 Channel 中,由 Channel 统一负责分配和调度,功能实现更加灵活。

2.Channel 的功能介绍

2.1 网络操作 API

Chanenl read():从当前 Channel 读取数据到第一个 inbound 缓冲区中,如果数据被成功读取,触发 ChannelHandler.channelRead(ChannelHandlerContext,Object)事件,读取操作调用完之后,紧接着会触发 ChannelHandler.channelReadComplete(ChannelHandlerContext)事件,这样业务的 ChannelHandler 可以决定是否需要继续读取数据。如果已经有读操作请求被挂起,则后续的读操作会被忽略。

ChannelFuture write(Object msg):请求将当前的 msg 通过 ChannelPipeline 写入到目标 Channel 中。注意,write 操作只是将消息存入到消息发送环形数组中,并没有真正被发送,只有调用 flush 操作才会被写入到 Channel 中,发送给对方。

ChannelFuture write(Object msg, ChannelPromise promise):功能与 write(Object msg)相同,但是携带了 ChannelPromise 参数负责设置写入操作的结果。

ChannelFuture writeAndFlush(Object msg,ChannelPromise promise):与方法 3 功能类似,不同之处在于它会将消息写入 Channel 中发送,等价于单独调用 write 和 flush 操作的组合。

ChannelFuture writeAndFlush(Object msg):功能等于方法 4,但是没有 ChannelPromise promise 参数。

Channel flush():将之前写入到发送环形数组中的消息全部写入到目标 Channel 中,发送给通信对方。

ChannelFuture close(ChannelPromise promise):主动关闭当前连接,通过 ChannelPromise 设置操作结果并进行结果通知,无论操作是否成功,都可以通过 ChannelPromise 获取操作结果。改操作会级联触发 ChannelPipe 中所有 ChannelHandler 的 ChannelHandler.close(ChannelHandlerContext,ChannelPromise)事件。

ChannelFuture disconnect(ChannelPromise promise):请求断开与远程通信对端的连接并使用 ChannelPromise 获取操作结果的通知信息。该方法会级联触发 ChannelHandler.disconnect(ChannelHandlerContext,ChannelPromise)事件。

ChannelFuture connect(SocketAddress remoteAddress):客户端使用指定的服务端地址发起连接请求,如果连接因为应答超时而失败,ChannelFuture 中的操作结果 ConnectTimeoutException 异常,如果连接被拒绝,操作结果为 ConnectException。该方法会级联触发 ChannelHandler.connect(ChannelHandlerContext,SocketAddress,SocketAddress,ChannelPromise)事件。

ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress):功能与方法 9 类似,唯一不同的是先绑定本地地址 localAddress,然后在连接服务端。

ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise):与方式 9 功能类似,唯一不同的是多了一个 ChannelPromise 参数用于写入操作结果。

ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise):与方法 11 功能类似,唯一不同的就是绑定了本地地址。

ChannelFuture bind(SocketAddress localAddress):绑定指定的本地 Socket 地址,该方法会级联触发 ChannelHandler.bind(ChannelHandlerContext,SocketAddress,ChannelPromise)事件。

ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise):与 13 方法功能类似,多了一个 ChannelPromise 参数用于写入操作结果。

ChannelConfig config():获取当前 Channel 的配置信息,例如 CONNECT_TIMEOUT_MILLIS。

blooean is opoen():判断当前 Channel 是否已经打开

boolean isRegistered():判断当前 Channel 是否已经注册到 EventLoop 上。

boolean isActive():判断当前 Channel 是否已经处于激活状态

ChannelMetadata metadata():获取当前 Channel 的元数据描述信息,包括 TCP 参数配等。

SocketAddress localAddress():获取当前 Channel 的本地绑定地址

SocketAddress remoteAddress():获取当前 Channel 通信的远程 Socket 地址。

2.2 其他常用的 API 功能说明

EventLoop eventLoop(),Channel 需要注册到 EventLoop 的多路复用器上,用于处理 IO 事件,通过 eventLoop 方法可以获取到 Channel 注册的 EventLoop。EventLoop 本质上就是处理网络读写事件的 Reactor 线程。在 Netty 中,它不仅仅用来处理网络事件,也可以用来执行定时任务和用户自定义 NIO Task 等任务。

ChannelMetadata metadata(),熟悉 TCP 协议的同学都可能知道,当创建 Socket 的时候需要指定 TCP 参数,例如接收和发送的 TCP 缓冲区大小,TCP 的超时事件,是否重用地址等等。在 Netty 中,每个 Channel 对应一个物理连接,每个链接都有自己的 TCP 参数配置。所以 Channel 会聚合一个 ChannelMetadata 用来对 TCP 参数提供元数据描述信息,通过 metadata 方法就可以获取当前 Channel 的 TCP 参数配置。

Channel parent(), 对于服务端 Channel 而言,它的父 Channel 为空,对于客户端 Channel,它的父 Channel 就是创建它的 ServerSocketChannel。

ChannelId id();,它返回 ChannelId 对象,ChannelId 是 Channel 的唯一标识。

3.Channel 源码分析

Channel 的实现类非常多,继承关系复杂,从学习的角度我们抽取最重要的两个 NioServerSocketChannel 和 NioSocketChannel。

服务端 NioServerSocketChannel 的继承关系类图如下:

客户端 NioSocketChannel 的继承关系类图如下:

源码就不具体分析了,可以翻阅《Netty 权威指南》16 章

4.AbstractNioChannel 类

成员变量

    private final SelectableChannel ch;
    protected final int readInterestOp;
    volatile SelectionKey selectionKey;
    boolean readPending;
  • 这里设置 SelectableChannel 为了 I/O 操作
  • readInterestOp 代表了 JDK SelectionKey 的 OP_READ
  • selectionKey 是使用 volatile 修饰的,由于 Channel 会面临多个业务线程的并发操作,为了让其他业务线程感知到 SelectionKey 修改了,这里用 volatile 保证修改的可见性。

二、Unsafe 分析

Unsafe 类实际上是 Channel 接口的辅助类,实际的 IO 操作都是由 Unsafe 接口完成的。

1.继承关系图

2.AbstractUnsafe 源码分析

1.register 方法

register 方法主要用于将当前 Unsafe 对应的 Channel 注册到 EventLoop 的多路复用器上,然后调用 DefaultChannelPipeline 的 fireChannelRegisted 方法,如果 Channel 被激活,则调用 fireChannelActive 方法。

2.bind 方法

bind 方法主要用于绑定指定端口。对于服务端,用于绑定监听端口,并设置 backlog 参数;对于客户端,用于指定客户端 Channel 的本地绑定 Socket 地址。

  1. disconnect 方法

    该方法用于客户端或服务端主动关闭连接。

4.close 方法

  1. write 方法

    write 方法实际上是将消息添加到环形发送数组上,并不真正的写 Channel(真正的写 Channel 是 flush 方法)。

6.flush 方法

前面提到,write 方法负责将消息放进发送缓冲区,并没有真正的发送,而 flush 方法就负责将发送缓冲区中待发送的消息全部写进 Channel 中并发送。

3.AbstractNioUnsafe 源码分析

  1. connect 方法

在 connect 方法中,如果连接成功,进行激活操作;如果连接暂未响应,则对其做一个监听,监听的内容是:如果连接失败,则关闭链路。

  1. finishConnect 方法
  • 该方法用于判断连接操作是否结束。
  • 首先判断当前线程是否就是 EventLoop 执行线程,不允许其他线程操作;
  • 缓存当前 active 状态,用以下面是否要执行 fireChannelActive 方法;
  • 调用 javaChannel 的 finishConnect 方法,该方法返回三种情况:

1)连接成功,返回 true

2)连接失败,返回 false

3)发生链路被关闭、链路中断异常,连接失败

  • 根据 javaChannel 的返回值,如果返回 false,直接抛出 error,进入到 catch 模块,然后就根据连接状态做不同的后续处理

三、总结

  • Unsafe 接口用户不能直接使用,Channel 的 I/O 操作都是通过 Unsafe 接口及其子类实现。
  • Netty 自定义 Channel,一方面是成本问题,另一方面是自定义 channel 更加切合框架,更加加灵活。
正文到此结束