原创

Netty源码分析-ChannelPipeline和ChannelHandler

全网最全 Java 知识点整合总目录入口猛戳-->www.gameboys.cn

Netty 全网最全示例源码地址猛戳-->https://github.com/Sniper2016/NettyStudy

本章学习流程为ChannelHandler-->ChannelHandlerContext-->ChannelPipeline

ChannelHandler详解

Handler在Netty中的坐标

经典的Reactor模式,更多在于演示和说明,仅仅是有一种浓缩和抽象。

由于Netty更多用于生产,在实际开发中的业务处理这块,主要通过Handler来实现,所以Netty中在Handler的组织设计这块,远远比经典的Reactor模式实现,要纷繁复杂得多

Hander的根本使命,就是处理Channel的就绪事件,根据就绪事件,完成NIO处理和业务操作。比方Channel读就绪时,Hander就开始读;Channel写就绪时,Hander就开始写。

Netty中Handler的类型

从应用程序开发人员的角度来看,Netty的主要组件是ChannelHandler,所以,对ChannelHandler的分类,也是从应用开发的角度来的。

从应用程序开发人员的角度来看,数据有入站和出站两种类型。

这里的出站和入站,不是网络通信方面的入站和出站。而是相对于Netty Channel与Java NIO Channel而言的。

「数据入站,指的是数据从底层的Java NIO channel到Netty的Channel。数据出站,指的是通过Netty的Channel来操作底层的 Java NIO chanel。」

从入站和出战的角度出发,Netty中的ChannelHandler主要由两种类型,ChannelInboundHandler和ChannelOutboundHandler。

「inbound 事件(通常由I/O 线程触发,例如 TCP 链路建立事件、链路关闭事件、读事件、异常通知时间等,对应上图的左半部分)」

  • ChannelHandlerContext fireChannelRegistered(); Channel 注册事件
  • ChannelHandlerContext fireChannelActive(); Tcp 链路建立成功,Channel 激活事件
  • ChannelHandlerContext fireChannelRead(Object msg); 读事件
  • ChannelHandlerContext fireChannelReadComplete(); 读操作完成通知事件
  • ChannelHandlerContext fireExceptionCaught(Throwable cause); 异常通知事件
  • ChannelHandlerContext fireUserEventTriggered(Object event);用户自定义事件
  • ChannelHandlerContext fireChannelWritabilityChanged(); Channel 的可写状态变化通知事件
  • ChannelHandlerContext fireChannelInactive();Tcp 连接关闭,链路不可用通知事件

pipeline 中以 fireXXX 命名的方法都是从 IO 线程流向用户业务 handler 的 iobound 事件,他们的实现因功能而异。

「outbound 事件(通常由用户主动发起的网络 I/O 操作,例如用户发起的连接操作、绑定操作、消息发送等操作)」

  • ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise); 绑定本地地址事件
  • ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise);连接服务端事件
  • ChannelFuture write(Object msg, ChannelPromise promise);发送事件
  • ChannelHandlerContext flush();刷新事件
  • ChannelHandlerContext read();读事件
  • ChannelFuture disconnect(ChannelPromise promise);断开连接事件
  • ChannelFuture close(ChannelPromise promise); 关闭当前 Channel 事件

由用户线程或者代码发起的 IO 操作被称为 outbound 事件,事实上 inbound 和 outbound 是 netty 自身根据时间在 pipeline 中的流向抽象出来的术语,再其他 nio 框架中并没有这个概念。

ChannelHandler继承关系

ChannelHandler继承关系
ChannelHandler继承关系

ChannelHandler 类似于 servlet 的 Filter 过滤器,负责对 IO 事件或者 IO 操作进行拦截和处理,他可以选择性的拦截和处理自己感兴趣的事件,也可以透传和终止事件的传递,基于 ChannelHandler 接口,用户可以方便的进行业务逻辑定制,例如打印日志,统一封装异常信息,性能统计和消息编解码等。

ChannelHandler 支持的注解:

  • Sharable:多个 ChannelPipeline 共有同一个 ChannelHandler
  • Skip:被 skip 注解的方法不会被调用,直接被忽略

ChannelHandlerAdapter来由

对于大多数 ChannelHandler 会选择性的拦截和处理某个或者某些事件,其他事件会忽略,由下一个 Handler 进行拦截和处理,这就导致用户的 ChannelHandler 必须实现所有接口,这样就形成了代码冗余,可维护性差。

为了解决这个问题,Netty 提供了 ChannelHandlerAdapter 基类,他的所有接口实现都是事件透传,如果用户 ChannelHandler 关心某个事件,只需要覆盖 ChannelHandlerAdapter 对应的方法即可,这样类的代码就会非常简洁和清晰。

ChannelHandlerContext详解

入上图handler结构,每个Channel对应一个ChannelPipeline,而Handler仅仅为处理自己的逻辑而生,他是无状态的,一个ChannelPipeline可以有很多个Handler,为了将Handler和Pipeline联系起来,需要一个中间角色,它就是——ChannelHandlerContext。

所以,ChannelPipeline 中维护的,是一个由 ChannelHandlerContext 组成的双向链表。这个链表的头是 HeadContext, 链表的尾是 TailContext。而无状态的Handler,作为Context的成员,关联在ChannelHandlerContext 中。在对应关系上,每个 ChannelHandlerContext 中仅仅关联着一个 ChannelHandler。

主要代码

DefaultChannelPipeline中的addLast方法,即为Handler注入到ChannelPipeline中

@Override
    public final ChannelPipeline addLast(String name, ChannelHandler handler) {
        return addLast(null, name, handler);
    }

    @Override
    public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
        final AbstractChannelHandlerContext newCtx;
        synchronized (this) {
            checkMultiplicity(handler);

            newCtx = newContext(group, filterName(name, handler), handler);

            addLast0(newCtx);

            // If the registered is false it means that the channel was not registered on an eventLoop yet.
            // In this case we add the context to the pipeline and add a task that will call
            // ChannelHandler.handlerAdded(...) once the channel is registered.
            if (!registered) {
                newCtx.setAddPending();
                callHandlerCallbackLater(newCtx, true);
                return this;
            }

            EventExecutor executor = newCtx.executor();
            if (!executor.inEventLoop()) {
                callHandlerAddedInEventLoop(newCtx, executor);
                return this;
            }
        }
        callHandlerAdded0(newCtx);
        return this;
    }

从上面代码可以看出,handler经过一层包装,成为AbstractChannelHandlerContext对象然后添加到pipeline中。

DefaultChannelPipeline中有两个成员变量,他们即为

    final AbstractChannelHandlerContext head;
    final AbstractChannelHandlerContext tail;

他们的存在,是netty定义内部使用,为了处理handler最末尾需要处理的逻辑,Head上下文包裹器的主要作用: 主要是作为入站处理的起点。而TailContext是一个入站处理器。

ChannelPipeline详解

一般使用方式

 public void run() throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap(); // (2)
            b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class) // (3)
             .childHandler(new ChannelInitializer<SocketChannel>() { // (4)
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                  ch.pipeline().addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
                     ch.pipeline().addLast("decoder", new StringDecoder());
                     ch.pipeline().addLast("encoder", new StringEncoder());
                     ch.pipeline().addLast(new EchoServerHandler());
                 }
             })
             .option(ChannelOption.SO_BACKLOG, 128)          // (5)
             .childOption(ChannelOption.SO_KEEPALIVE, true); // (6)

            // 绑定端口,开始接收进来的连接
            ChannelFuture f = b.bind(port).sync(); // (7)
            
      System.out.println("Server start listen at " + port );
            // 等待服务器  socket 关闭 。
            // 在这个例子中,这不会发生,但你可以优雅地关闭你的服务器。
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }

从上面代码中,我们可以看到,我们向代码添加了4个handler,分别负责解码,解码、业务、编码。这些handler包裹在Context里面注册到pipeline中。

ChannelPipeline 是 ChannelHandler 的容器,他负责 ChannelHandler 的管理和事件拦截与调度

总结:

  • 1.ChannelPipeline 是 ChannelHandler 的容器,他负责 ChannelHandler 的管理和事件拦截与调度
  • 2.Netty 中事件分为 inbound 事件和 outbound 事件,inbound一般由IO线程触发,outbound一般有用户线程触发
  • 3.一个 Channel,拥有一个 ChannelPipeline
  • 4.ChannelHandlerAdapter主要主要为了解决用户直接实现ChannelHandler需要实现很多无关的方法,照成代码冗余。
  • 5.ChannelHandlerContext的存在为了将Handler和Pipeline联系起来,因为ChannelHandler只负责逻辑,没有保存上下文关系。
正文到此结束